Red Hat
Jul 3, 2017
by William Burns
In Infinispan 8.0 we were very excited to announce Distributed Streams as we moved to Java 8. This feature allows applying any of the various java.util.stream.Stream operations on the datagrid, which are performed in a distributed nature, providing the highest possible performance as data is processed on the node where it lives, only requiring the terminal operation intermediate results to be returned to the invoker.

One problem with distributed streams though is that data is processed without acquiring locks: great for performance, but there is no guarantee that some other process isn't concurrently modifying the cache entry you're working on. Consider the following example which iterates through the entire contents of a cache, modifying each entry based on its existing value:

This works great until you have another cache put() running concurrently that changes a value. In this case the only way to be sure that an update is applied properly is to perform an optional update in the forEach. In a transactional cache you could also lock the entry manually (pessimistic) or retry on a WriteSkewException (optimistic). For example this is how the optional update could be performed.

As you can see the code isn't as pretty as it was before, but is still pretty concise.

Infinispan 9.1 introduces locked streams, which allow you to run your operation knowing that another update cannot be performed while running the Consumer. Note this only works in non transactional and pessimistic transactional caches (optimistic transactional caches are not supported).

If you notice the code looks very similar to the first example. You just have to invoke the lockedStream method on the AdvancedCache and then you can use the stream knowing that data for the given key won't change while performing your update.

This locked stream has a slightly limited API compared to the normal java.util.stream API. Only the filter method is allowed in addition to forEach. The CacheStream API is also supported, with a few exceptions. For more details on the API and what methods are supported you should check out the Javadoc.

The lock is only acquired for the given key while invoking the Consumer, allowing other updates on other keys to be performed concurrently, just like a normal put operation would behave. It is not suggested to perform operations on other keys in the Consumer, as this could cause possible deadlocks.

Now go forth and perform operations using the data stream knowing that the data underneath has not changed!

Original Post