Od pewnego czasu eksperymentuję z wątkami wirtualnymi. Chcę znaleźć sensowne ich zastosowanie, które będzie czyś więcej niż podręcznikowym „użyj przy I/O”. Zresztą takie podręcznikowe podejście nie mówi wiele o tym JAK budować aplikacje. Takie zabawy powodują, że piszę całą górę PoCów i teścików, które są prostymi modelami bardziej złożonych systemów. W czasie jednego z takich ćwiczeń chciałem sprawdzić, ile czasu zajmuje wykonanie wątku wirtualnego w porównaniu do tego samego zadania, ale na „normalnym” wątku. Wyniki są ciekawe, ale zanim do nich dojdziemy, czas na mały tour de teoria.

Wątki w Javie

Takie tradycyjne to tak naprawdę wątki systemowe ze wszystkimi ich zaletami i wadami. W dużym uproszczeniu uruchamiając wątek przez Thread.start() wywołujemy natywną metodę start0, która pod spodem finalnie woła ten kod. I fajnie, bo JVM nie musi kombinować z zarządzaniem wątkami. Metody takie jak yield, sleep czy setPrioritymają swoje „zerowe”, natywne odpowiedniki, których implementacja mapuje nasze javowe oczekiwania na rzeczywistość systemu operacyjnego.

Ma to oczywiście pewne wady. JVM, w teorii, powinien być jak endoskop i umożliwiać pracę z różnymi otworami. Rzecz w tym, że otwory te mają różne specyfikacje i nie wszystko zachowuje się tak samo. Cóż… Windows to nie system operacyjny, a jedynie platforma do grania. Mówiąc prościej jeżeli na * -UXie i na windowsie uruchomisz program, który tworzy wiele wątków, to istnieje pewna szansa, że wersja windowsowa wywali się dużo szybciej niż ta na *-UXa. Win ma/miał limit około 2000 wątków per proces. W linuxach możesz sprawdzić ten limit, wołając:

Listing 1. Maksymalna liczba wątków na moim desktopowym linuxue (Ubuntu)

$ sudo sysctl -a | grep threads-max
[sudo] hasło użytkownika koziolek:
kernel.threads-max = 1028799

W mniejszych maszynach będzie to mniej, np na moim laptopie:

Listing 2. Maksymalna liczba wątków na moim laptopie (Vanilla)

$ sudo sysctl -a | grep threads-max
[sudo] hasło użytkownika koziolek:
kernel.threads-max = 511098

Przy czym są to wątki dla całego systemu. Wiadomo, że JVM nie będzie mógł zająć całej puli. Możemy ograniczyć liczbę wątków, które będzie mógł uruchomić dany proces odpowiednio konfigurując cgroups i uruchamiając proces w ramach danej grupy. To jest jednak osobny temat. Do pewnego stopnia dotykamy go, konfigurując dokera (który wykorzystuje cgroups). Jeżeli osiągniemy limit, to zaczynają latać smoki…

Wątkami możemy zarządzać ręcznie wykorzystując wspomniane wcześniej metody takie jak yield czy Object.notify oraz mechanizm synchronizacji z użyciem synchronized. Nie jest to jednak ani proste ani bezpieczne. Dlatego w Javie 5 pojawił się mechanizm ExecutorService.

Egzekutory i co z nimi jest nie teges

Pomysł był prosty. Skoro głównym celem powołania wątku do życia jest wykonanie jakiegoś zadania, to odseparujmy to zadanie od zarządzania wątkami. Będzie sobie taki infrastrukturalny byt jak Executor, który będzie miał jedną metodę execute, która będzie przyjmować instancję Runnable. Implementacje będą ogarniać zarządzanie wątkami. Może to będzie kolejka, a może pula wątków. Do tego dodamy ExecutorService, który będzie potrafił zarządzać swoim stanem oraz będzie umiał zamienić zadanie w Future. Dodamy też kilka podstawowych implementacji kręcących się wokół ThreadPoolExecutor (jeden wątek, pula, cache), a w Javie 7 dorzucimy jeszcze ForkJoinPool. Programiści będą klepać „biznes”, a framework ogarnie infrastrukturę. Tylko nie do końca…

W tym momencie mamy możliwość wysłania zadania do egzekutora, ale nie mamy żadnego mechanizmu „obronnego” przeciwko długotrwałemu zablokowaniu. Nadal możemy więc w łatwy sposób zapchać egzekutor, po prostu wysyłając zadania IO-bound, które będą blokować wątki i w efekcie całość stanie. Problemem mogą być też nieprawidłowo użyte mechanizmy synchronizacji, ale na to nie mamy wpływu na poziomie egzekutora.

Problem, po części, rozwiązano w Javie 8, wprowadzając mechanizmy asynchronicznego wykonania zadań. W tym momencie możemy już wydelegować zadania blokujące do mechanizmu asynchronicznego. Jeżeli zadanie zakończy się, to zostaniemy poinformowani i będzie możliwe dalsze przetwarzanie. Nie jest to idealne rozwiązanie, ale w zamian możemy użyć bibliotek reaktywnych, które mają zgrabniejsze API.

Podsumowując, wątek egzekutorów. Uprościły one pisanie kodu, ale nie chroniły przed blokowaniem wątków w czasie oczekiwania na I/O. Dopiero wprowadzenie mechanizmów asynchronicznych pozwolło na zmianę podejścia (paradygmatu) i poprawę jakości pracy.

Reaktywność też jest nie teges

I to nie tylko dlatego, że debuggowanie jest popieprzone :D Przede wszystkim paradygmat reaktywny wymaga zupełnie innego podejścia do problemu. Ma on swoje zalety, ale wdrożenie go będzie trudne, jeżeli problem nie pasuje. Tak samo późniejsze utrzymanie wymaga dużo uwagi, żeby nie zatkać całego systemu przez nieuważną konfigurację. Dodatkowo w Javie nie ma jako takiej standardowej biblioteki reaktywnej. Mamy Reactor, mamy RxJava, mamy AkkaStreams, ale nikt nie daje nam gwarancji, że projekty te będą trwały tak długo, jak język. I jak już przy języku jesteśmy, to paradygmat reaktywny wprowadza zupełnie oderwany od języka model obsługi wyjątków. Nagle nasze javowe mechanizmy przestają działać lub trzeba kombinować.

Z drugiej strony biblioteki reaktywne pokazały pewną rzecz – da się wykonać kod blokujący w sposób asynchroniczny z wykorzystaniem zwykłych wątków. Dlaczego zatem nie pozbyć się reaktywnej otoczki?

Wątki wirtualne

Wchodzą całe na biało… a w zasadzie są wwożone, bo ich mechanizm działania jest arcyciekawy.

Najprościej mówiąc, wątek wirtualny reprezentuje zadanie, które ma zostać wykonane. Na tym poziomie możemy o nim myśleć, jak o Runnable. Szczególnie że wątek wirtualny tworzymy jak tradycyjny z pomocą Runnable, a sama klasa VirtualThread rozszerza Thread. W przeciwieństwie do tradycyjnych wątków nie mamy możliwość zawołania konstruktora. Całość dzieje się za pośrednictwem jednej z trzech metod fabrykujących:

Listing 3. Jak można wystartować wątek wirtualny

var runningThreadA = Thread.startVirtualThread(task);
var runningThreadB = Thread.ofVirtual().started(task);
var notRunnigThread = Thread.ofVirtual().unstarted(task);
// dodatkowo zwykłe wątki można tworzyć za pomocą:
var notRunnigPlatformThread = Thread.ofPlatform().unstarted(task);

Załóżmy, że użyjemy pierwszej metody i uruchomimy sobie taki prosty program:

Listing 4. Prosty program z użyciem wątków wirtualnych

public class Main {

	public static void main(String[] args) throws InterruptedException {
		Thread.startVirtualThread(()-> System.out.println("Hello World")).join();
	}

}

Oczywiście otrzymamy Hello World na wyjściu, ale pod spodem dzieje się magia. Wątek wirtualny jest uruchomiony, ale nie oznacza to, że będzie od razu działał. Trafi on do kolejki i będzie oczekiwał na „wątek nosiciela”.

Wątek nosiciel

Czyli wątek platformowy, taki prawdziwy, którym zarządza system operacyjny i który wykonuje zadanie. Cały mechanizm jest podobny do znanego nam egzekutora i nawet mamy dwa parametry, które służą do jego konfiguracji:

  • jdk.virtualThreadScheduler.parallelism – odpowiada za liczbę wątków platformy, które posłużą nam do obsługi wątków wirtualnych (domyślnie równy liczbie rdzeni).
  • jdk.virtualThreadScheduler.maxPoolSize – maksymalna liczba wątków nosicieli. Domyślnie 256, ale niech ja dorwę kolejnego Threadrippera…

Podsumowując wątek wirtualny zostaje uruchomiony i trafia do kolejki, gdzie oczekuje na wolny slot w postaci wątku nosiciela, który zacznie go wykonywać. Pytanie gdzie tu przewaga w stosunku do zwykłych egzekutorów?

Operacje blokujące (IO-bound)

W przypadku egzekutorów, gdy pojawiała się operacja blokująca, to blokowała wątek i koniec. Pewnym lekarstwem był ThreadPerTaskExecutor, ale to rozwiązanie szybko zjada pamięć i zabija JVMkę. W przypadku wątków wirtualnych JVM instrumentuje wykonywany kod w poszukiwaniu:

  • operacji I/O (sieć, dysk, klawiatura itp.)
  • synchronizacji z użyciem locków i synchronized
  • Operacji w rodzaju Thread.sleep i Object.wait

W momencie wykrycia takiej operacji parkuje wątek wirtualny, wycofując go z wątku nosiciela. Nosiciel może podjąć kolejny wątek wirtualny. Jak wątek zakończy operację blokującą, to jest wrzucany w kolejkę i nosiciel może go podjąć, opcjonalnie JVM tworzy nowy watek nosiciela jeżeli ma na to zasoby.

Operacje obliczeniowe (CPU-bound)

Tego typu operacje są domeną wątków platformowych. Jeżeli będziemy realizować je za pomocą wątków wirtualnych, to raz, że niewiele ugramy (poza organizacją kodu wokół jednego modelu wykonania), a dwa możemy sobie zrobić krzywdę, wyczerpując pulę wątków nosicielu. Dlatego nie należy używać wątków wirtualnych do obliczeń.

Porównanie operacji, czyli przykład

Poniżej kawałeczek kodu, który demonstruje na czym polega problem:

Listing 5. Blokowanie wątków wirtualnych

public class Main2 {

	private static final int NBR_OF_TASK = 100;

	public static void main(String[] args) throws InterruptedException {
		virtualIO();
		virtualCPU();
	}

	private static void virtualIO() throws InterruptedException {
		final ExecutorService executorService = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("virtual-io-thread-", 0).factory());
		for (int i = 0; i < NBR_OF_TASK; i++) {
			executorService.submit(Main2.ioTask());
		}
		executorService.shutdown();
		executorService.awaitTermination(20, TimeUnit.SECONDS);
	}

	private static void virtualCPU() throws InterruptedException {
		final ExecutorService executorService = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("virtual-cpu-thread-", 0).factory());
		for (int i = 0; i < NBR_OF_TASK; i++) {
			executorService.submit(Main2.cpuTask());
		}
		executorService.shutdown();
		executorService.awaitTermination(200, TimeUnit.SECONDS);
	}

	private static Runnable ioTask() {
		return () -> {
			final String name = Thread.currentThread().getName();
			System.out.println(name + " started at " + time());
			new IOBoundTask(1, ChronoUnit.SECONDS).run();
			System.out.println(name + " stopped at " + time());
		};
	}

	private static Runnable cpuTask() {
		return () -> {
			final String name = Thread.currentThread().getName();
			System.out.println(name + " started at " + time());
			new CpuBoundTask(0, 3_000_000).call();
			System.out.println(name + " stopped at " + time());
		};
	}

	private static String time(){
		LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneId.systemDefault());
		DateTimeFormatter formatter = DateTimeFormatter.ofPattern("hh:mm:ss.SSS");
		return dateTime.format(formatter);
	}

}

Kod jest prosty. Wysyłamy do egzekutora NBR_OF_TASK zadań. Zadania IOBound to zwykły Thread.sleep, a zadania CpuBound to szukanie ilości liczb pierwszych w podanym zakresie. Procek się trochę spoci, ale o to chodzi. Warto też zaznaczyć, że używam tu pewnego „haka”. Linijka Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("virtual-cpu-thread-", 0).factory()); pozwala na tworzenie nazwanych wątków wirtualnych, bo domyślnie wątek wirtualny zapytany o nazwę przez Thread.currentThread().getName() zwróci pusty String. Obserwujemy wyjście i zaczynają się tam dziać cuda. W przypadku zadań IOBound wszystkie zadania wystartują mniej więcej w tym samym momencie, poczekają sekundę i się zakończą. W logu zobaczmy:

Listing 6. Ten ciekawy fragment logu dla IO

virtual-io-thread-228 started at 04:36:04.468
virtual-io-thread-41 started at 04:36:04.397
virtual-io-thread-292 started at 04:36:04.468
virtual-io-thread-104 stopped at 04:36:05.468
virtual-io-thread-15 stopped at 04:36:05.469
virtual-io-thread-39 stopped at 04:36:05.469

Nawet jeżeli uruchomimy nasz program ze wspomnianymi wcześniej ograniczeniami ustawiając -Djdk.virtualThreadScheduler.parallelism=1 -Djdk.virtualThreadScheduler.maxPoolSize=1, to struktura pozostanie bez zmian. Najpierw wszystkie wątki wystartują. Nastąpi wejście w stan blokujący i wyemitowany zostanie komunikat. JVM zaparkuje wątek. Nastąpi wyjście ze stanu blokującego i tym samym zakończenie działania. Przez cały ten czas wątek nosiciela nie będzie zablokowany i będzie mógł podejmować kolejne działania. W przypadku zadań CpuBound sprawy maja się trochę inaczej.

Listing 7. Ten ciekawy fragment logu dla CPU

virtual-cpu-thread-35 started at 04:46:43.318
virtual-cpu-thread-7 started at 04:46:43.315
virtual-cpu-thread-35 stopped at 04:46:45.232
virtual-cpu-thread-68 started at 04:46:43.390
virtual-cpu-thread-26 stopped at 04:46:46.217
virtual-cpu-thread-67 started at 04:46:43.390
virtual-cpu-thread-11 stopped at 04:46:46.263

W pewnym momencie w logu zaczyna się przeplatanka zadań startujących i kończących się. Niezależnie czy ustawiamy limity, czy też nie wątki zaczynają mieć „opóźnienia startu”. Dlaczego? Skoro i tak będą kolejkowane, to przecież możemy wystartować i wrzucić do kolejki… no nie do końca. Żeby wątek mógł zostać umieszczony w kolejce, musi nastąpić jedna z wymienionych wcześniej sytuacji. Tutaj wątek wykorzystuje procesor, więc nie są emitowane zdarzenia pozwalające na zaparkowanie wątku. Pierwsze wątki, które zajmą rdzenie, czyli wątki nosicieli, będą dzielnie się liczyć, aż zakończą swoje działanie i zwolnią rdzeń, a wraz z nim wątek nosiciel będzie miał możliwość podjęcia kolejnego zadania z kolejki. Podejmie więc kolejne czekające zadanie, które znowuż go zajmie na dłuższy czas. Co ważne nie możemy w łatwy sposób odkryć tego problemu, bo formalnie nie jest to problem. Trochę inaczej ma się sprawa z wątkami przybitymi (pinned), ale to temat na osoby wpis… tak w okolicach Wielkanocy :D

Podsumowanie

To jest mocno skrócona wersja teorii stojącej za wątkami wirtualnymi. Musimy zapamiętać, że wątki wirtualne definiują zadania, ale do ich wykonania potrzebują tradycyjnego wątku, zwanego wątkiem nosicielem. JVM potrafi wykryć operacje blokujące i po ich wykryciu zaparkować wątek wirtualny zwalniając wątek nosiciela dla kolejnego zadania. Jednocześnie nie ma możliwości wykrycia sytuacji w której wątek wirtualny wykorzystuje CPU i przez dłuższy czas blokuje wątek nosiciela.