Language selector

The best Java 22 feature: concurrent gathering

Java 22 is about to be released tomorrow. There are many super nice things in this release. I suspect making Project Panama’s native/foreign calls will make many people’s life easier, as well as ongoing changes to concurrency, string templates and finally being able to call something before super. However, there’s one thing I guess is going to be my (almost) hidden hero of Java 22. Folks, I introduce to you Gatherers.mapConcurrent!

Streams are (un)parallel hero of Java 8

Java 8 was a big milestone in Java’s revolution at its time. I remember the vibes, after relatively small release of Java 7, version 8 was a BIG ONE. Introduction of lambdas allowed Stream API to co-exist in Java 8 (without our eyes bleeding, that is). IMHO it pushed Java towards declarative and functional way of thinking of data processing: instead of loops with intermediate calls and wrapping up results in a single object, we could finally express the intent in more, well, functional way.

Stream API had IMHO at least two bigger flaws at the time it was introduced, effectively preventing its usage in many critical path data processing scenarios.

Extend only once

In very simplified way stream’s lifecycle could be summarised like this:

  • creation of the stream
  • a number of intermediate operations, like filter, map, or flatMap (or peek, but this was only for debugging, right? RIGHT???)
  • a single call to collect the results.

Let’s start from the end: collectors. If you weren’t happy with simple joining, reduce or such, you could go for even simpler toList (since Java 16 if I remember correctly), or teeing (since Java 12), if you needed to pass the streamed objects to two or more collectors. Or, if you had really fancy needs, you could unveil all dark powers you obtained in your SQL 101 class and compose collectors with mapping, filtering, groupingBy, paritioningBy, collectingAndThen, etc., etc.

If that still wasn’t enough, you could write your own collector here. And that’s the first limitation: it was AFAICT the only extension point in the StreamAPI. If you needed something special, you either had to introduce weird intermediate data structures to handle that in intermediate operations (look for “nice” examples in JEP 461), or you had to de-facto deffer that until collecting phase. Which was kind of odd, because that neglected the whole purpose of expressing the data processing steps using intermediate operations before collection step. Or you could do that in a loop, in good ol' imperative style.

Where are my cores?

So you have 8 cores in your machine. And, let’s say, 64 items in your stream. You figured out, that making your stream .parallel() might make the whole pipeline like 8 times faster in some cases. You even verified that in jshell, and yeap, 8 times faster, so quickly copy-n-paste in many places, and we can git push --force ;-)

And then we all discover, that (almost) all the threads in parallel streams are from the same pool. And worse, that they are blocking on IO. (And here we skip the whole history of reactive stack and holy wars between reactive vs. virtual threads. I’m not in a mood for a holy war today, sorry).

For these reasons (and more) in the past we used to switch to reactive stack to have unblocked parallel processing. Or when we could deal with the parallelism in the collecting phase, we used parallel-collectors by Grzegorz Piwowarek & Co. Or really devoted vanilla Java folks used CompletableFutures.

Hunting with Gatherers

Java 22 in JEP 461 has introduced another extension point to StreamAPI: Stream Gatherers. In short: if you’re not happy with the intermediate stream operations that are already there, you can employ gatherer(s).

Apart from being just an extension, they can also allow slicing, scanning, and so on. For example, in this code:

Stream.of("ene", "due", "rabe")
    .gather(Gatherers.windowSliding(2))
    .filter(l -> Math.abs(l.getFirst().length() - l.getLast().length()) > 1)
    .forEach(System.out::println);
data hunter

we gather words like "ene", "due", "rabe" into windows (which are lists, hence getFirst() and getLast()): ["ene", "due"] and ["due", "rabe"]. And then we can check if subsequent words differ in length by more than one character. (If they did, we’d see them printed).

Very nice and very needed extension point, and very nice predefined gatherers. One of them starts really shining when you extract it from the JEP.

Gatherers.mapConcurrent gets from me “the method of Java 22” award

I very much like how Tomek Nurkiewicz opened his post about Map.merge(). For me, this entry (after long intro) is about Gatherers.mapConcurrent().

In JEP 461 it’s mentioned only once, here it goes:

mapConcurrent is a stateful one-to-one gatherer which invokes a supplied function for each input element concurrently, up to a supplied limit

And that’s it! That’s really it!

What it doesn’t mention in plain English, is that it’s using virtual threads under the hood. So what’s the big deal, you may ask. Well, let me show you what I think is missing in this JEP ;-)

Enriching data

Let’s say you receive some events. In this example we’re going to receive 100 integers, just to Keep Things Simple™. And for each event we need to call an external service, which will return us enriched event. Over network, mind you! And to Keep Things Simple™ again, let’s assume enriching each event takes one second.

Question: how long will this code execute?

var evens = IntStream.range(0, 100).boxed()
    .parallel().map(ExternalService::enrichEvent)
    .toArray();

You’re smart, so you’ll respond: it depends.
If you’re super smart, you’ll respond: it depends on many things, like: how many idle CPU cores do you have (with default config), if the network is running smoothly, and (most importantly perhaps) how many other parallel streams are running at the same time.

Assuming that we have 16 cores which have absolutely nothing else to do, it will last 100 tasks / 1 second / 16 worker threads = ceiling(6.25 seconds), which is like 7 seconds: for the first second we enhance 16 events, then next 16 events, and so on, and in the last second we process remaining 4.

But we’re using very little CPU, we’re just waiting for the data to come back over the wire. If only there was a simple way to do these intermediate mapping using virtual threads, which aren’t blocked by IO if used properly

And there is! It’s Gatherers.mapConcurrent()!

In the first parameter it (more or less) ask us how many virtual threads can we run. And since this is a very CPU-lightweight operation, let’s say 100. In the second parameter it takes… a mapping function ;-)

So we have:

var events = IntStream.range(0, 100).boxed()
    .gather(Gatherers.mapConcurrent(100, ExternalService::enrichEvent))
    .toArray();

Let’s repeat the question: how long will it take this time? Yes, you guessed it right: it depends on the network. But this time we have 100 concurrent virtual threads, and if things go smoothly, we have the enriched events right after 1 second. Yes, one second.

Gatherers not only for Gardeners

As a self-proclaimed Software Gardener, I suspect Gatherers might be something for me. And who knows, maybe for you too?

Finally, I see something, a simple method call, that (combined with underlying technology of virtual threads) is addressing the shortcomings of processing data with just StreamAPI. Super nice and simple paradigm, vanilla Java, nothing more. From tomorrow Java Streams are really parallel.

Language selector