Wybór języka

Java 22 i najlepsza zmiana: współbieżne zbieracze w strumieniach

Java 22 zostanie wydana jutro. W tej wersji jest wiele super rzeczy. Podejrzewam, że korzystanie z wywołań rodzimych/zewnętrznych w projekcie Panama sprawi, że życie wielu osób stanie się łatwiejsze, jak również trwające zmiany we współbieżności, szablonach napisów i wreszcie możliwość wywołania czegoś przed super. Super! Jednak jest jedna rzecz, która podejrzewam stanie się moim (prawie) ukrytym bohaterem Javy 22. Ludziska, przedstawiam wam Gatherers.mapConcurrent!

Strumienie są bohaterem Javy 8 (ale nie w świecie równoległym)

Java 8 była dużym kamieniem milowym w rewolucji Javy w swoim czasie. Pamiętam atmosferę po stosunkowo niewielkim wydaniu Javy 7, wersja 8 była NAPRAWDĘ WIELKA. Wprowadzenie lambd pozwoliło Stream API na istnienie w Javie 8 (bez krwawiących oczu, oczywiście). Moim zdaniem skierowało to Javę w stronę deklaratywnego i funkcyjnego sposobu myślenia o przetwarzaniu danych: zamiast pętli z pośrednimi wywołaniami i pakowania wyników w pojedynczy obiekt, mogliśmy wreszcie wyrazić intencję w bardziej deklaratywny sposób.

API strumieni miało moim zdaniem co najmniej dwie większe wady w momencie wprowadzenia, skutecznie uniemożliwiając jego użycie w wielu krytycznych scenariuszach przetwarzania danych.

Rozszerzaj, ale tylko raz

W bardzo uproszczony sposób cykl życia strumienia można było streścić w ten sposób:

  • utworzenie strumienia
  • szereg operacji pośrednich, takich jak filter, map lub flatMap (lub peek, ale to było tylko do debugowania, prawda? PRAWDA???)
  • pojedyncze wywołanie, aby zebrać wyniki.

Zacznijmy od końca: kolektory (ang. collectors). Jeśli nie byliśmy zadowoleni z prostego joining, reduce czy podobnych, można było sięgnąć po jeszcze prostsze toList (od Javy 16, jeśli dobrze pamiętam), lub teeing (od Javy 12), jeśli trzeba było przekazać obiekty do dwóch lub więcej kolektorów. Lub jeśli istniały naprawdę wymyślne potrzeby, można było ujawnić wszystkie mroczne moce, które zdobyliśmy na “Wprowadzeniu do SQLa” i skomponować kolektory za pomocą mapping, filtering, groupingBy, partitioningBy, collectingAndThen, itp., itp.

Jeśli to ciągle nie wystarczało, mogliśmy napisać swój własny kolektor. I to pierwsze ograniczenie: to było, o ile mi wiadomo, jedyne miejsce rozszerzeń w StreamAPI. Jeśli potrzebowaliśmy czegoś specjalnego, musieliśmy albo wprowadzić dziwne struktury danych pośrednich do obsługi tego w operacjach pośrednich (szukaj “ciekawych” przykładów w JEP 461), albo de facto odkładać to do fazy zbierania. Co było dość dziwne, ponieważ niweczyło to cały cel wyrażania kroków przetwarzania danych za pomocą operacji pośrednich przed krokiem kolekcji. Lub mogliśmy to zrobić w pętli, w dobrym, starym stylu imperatywnym.

Gdzie są moje rdzenie?

Masz więc 8 rdzeni w swoim komputerze. I, powiedzmy, 64 elementy w swoim strumieniu. Zatem dochodzisz do wniosku, że ustawienie strumienia jako .parallel() może sprawić, że cała operacja na strumieniu będzie w niektórych przypadkach około 8 razy szybsza. Nawet zweryfikowałeś/aś to w jshell, i tak, 8 razy szybsza, więc szybko kopiuj-wklej w wielu miejscach, i możemy git push --force ;-)

A potem wszyscy odkryliśmy, że (prawie) wszystkie wątki w strumieniach równoległych pochodzą z tej samego puli. A co gorsza, że są blokowane przez IO. (A tutaj pomijamy całą historię stosu reaktywnego i świętych wojen między reakcjonistami a wyznawcami wątków wirtualnych. Dziś nie mam nastroju na świętą wojnę, przepraszam.)

Z tych i innych powodów w przeszłości zwykliśmy przełączać się na stos reaktywny, aby mieć nieblokujące przetwarzanie równoległe. Lub gdy mogliśmy radzić sobie z równoległością w fazie zbierania, używaliśmy parallel-collectors autorstwa Grzegorza Piwowarka. A naprawdę oddani entuzjaści czystej Javy używali CompletableFuture.

Polowanie z Gatherers

W Javie 22 w JEP 461 został wprowadzony kolejny punkt rozszerzenia do StreamAPI: Stream Gatherers. W skrócie: jeśli nie jesteś zadowolony z istniejących operacji pośrednich strumienia, możesz użyć zbieraczy (ang. gatherers).

Poza możliwością rozszerzania na zasadzie wtyczki, mogą one również umożliwiać krojenie, przeszukiwanie itp. Na przykład w tym kodzie:

Stream.of("ene", "due", "rabe")
    .gather(Gatherers.windowSliding(2))
    .filter(l -> Math.abs(l.getFirst().length() - l.getLast().length()) > 1)
    .forEach(System.out::println);
data hunter

gromadzimy słowa takie jak "ene", "due", "rabe" w okna (które są listami, stąd getFirst() i getLast()): ["ene", "due"] i ["due", "rabe"]. A potem możemy sprawdzić, czy kolejne słowa różnią się długością o więcej niż jeden znak. (Gdyby tak było, zobaczylibyśmy je wydrukowane).

Bardzo przydatny i potrzebny punkt rozszerzenia, a także bardzo przyjemne predefiniowane zbieracze. Jeden z nich zaczyna naprawdę błyszczeć, gdy go wyłuskać z JEPa.

Gatherers.mapConcurrent zdobywa ode mnie nagrodę “metody Javy 22”

Bardzo podoba mi się, jak Tomek Nurkiewicz rozpoczął swój wpis na temat Map.merge(). Dla mnie ten wpis (po długim wstępie) dotyczy Gatherers.mapConcurrent().

W JEPie 461 jest o nim mowa tylko raz, oto co tam piszą:

mapConcurrent is a stateful one-to-one gatherer which invokes a supplied function for each input element concurrently, up to a supplied limit

I to wszystko! To naprawdę wszystko!

To, o czym tam nie jest napisane w języku zwykłych ludzi, to że używa pod spodem wątków wirtualnych. Zatem zapytasz pewnie, o co chodzi i czy to w ogóle warto poruszać ten temat. Cóż, pozwól, że pokażę ci, czego moim zdaniem brakuje w tym JEPie ;-)

Wzbogacanie danych

Powiedzmy, że otrzymujesz pewne zdarzenia. W tym przykładzie otrzymujemy 100 liczb całkowitych, tylko po to, że Rzeczy Powinny Być Proste™. I dla każdego zdarzenia musimy wywołać usługę zewnętrzną, która zwróci nam wzbogacone zdarzenie. Przez sieć, zwracam uwagę! Pamiętajmy ponownie, że Rzeczy Powinny Być Proste™, więc załóżmy, że wzbogacenie każdego zdarzenia zajmuje sekundę.

Pytanie: ile czasu będzie wykonywany ten kod?

var events = IntStream.range(0, 100).boxed()
    .parallel().map(ExternalService::enrichEvent)
    .toArray();

Ludzie mądrzy odpowiedzą: to zależy.
Ludzie mądrzy i doświadczeni odpowiedzą: to zależy od wielu rzeczy, takich jak: ile jest wolnych rdzeni procesora (w konfiguracji domyślnej), czy sieć działa płynnie, i (co najważniejsze być może) ile innych strumieni równoległych jest uruchomionych w tym samym czasie.

Załóżmy, że mamy 16 rdzeni, które absolutnie nic innego nie robią, potrwa to 100 zadań / 1 sekunda / 16 wątków roboczych = ceiling(6.25 sekundy), co daje jakieś 7 sekund: przez pierwszą sekundę wzbogacamy 16 zdarzeń, następnie kolejne 16 zdarzeń, i tak dalej, a w ostatniej sekundzie przetwarzamy pozostałe 4.

Ale, ale: zużywamy bardzo mało CPU, tylko czekamy na dane, które wrócą przez kabel. Gdyby tylko był prosty sposób na wykonanie tych pośrednich mapowań przy użyciu wątków wirtualnych, które nie są blokowane przez IO [jeśli używać ich prawidłowo]/pl/posts/new-java/loom/misusing-virtual-threads/…

I jest! To Gatherers.mapConcurrent()!

W pierwszym parametrze (mniej więcej) pyta nas, ile wątków wirtualnych możemy uruchomić. A ponieważ ta operacja jest bardzo lekka dla CPU, powiedzmy 100. W drugim parametrze przyjmuje… funkcję mapującą ;-)

Więc mamy:

var evens = IntStream.range(0, 100).boxed()
    .gather(Gatherers.mapConcurrent(100, ExternalService::enrichEvent))
    .toArray();

Zapytajmy jeszcze raz: ile to zajmie czasu? Tak, prawidłowo: to zależy od sieci. Jednak tym razem mamy 100 równoległych wątków wirtualnych i, jeśli wszystko pójdzie gładko, mamy wzbogacone zdarzenia zaraz po 1 sekundzie. Tak, jednej sekundzie.

Zbieracze nie tylko dla Ogrodników

Jako samozwańczy Software Gardener podejrzewam, że Gatherers mogą być czymś dla mnie. A kto wie, może także dla ciebie?

Wreszcie widzę coś, proste wywołanie jednej metody, które (w połączeniu z technologią wątków wirtualnych) rozwiązuje ograniczenia przetwarzania danych tylko przy użyciu StreamAPI. Bardzo ładny i prosty paradygmat, czysta Java, nic więcej. Od jutra strumienie Javy są naprawdę równoległe.

Wybór języka