Warto jeździć na konferencje i słuchać mądrzejszych do siebie. Warto, ponieważ często nie dowiemy się nic nowego, ale sposób przekazania wiedzy pozwoli na uporządkowanie kilku rzeczy.

Jose na tegorocznych LambdaDays opowiadał o GenStage i Flow. Jednak nie to jest najważniejsze. Jakby przy okazji omówił trzy różne podejścia do pracy z danymi.

Podejście zachłanne

Jest najprostsze. Ładujemy wszystko do pamięci i jazda. W takim przypadku program nie zawiera żadnej dodatkowej magii:

Listing 1. Zachłanne przetwarzanie danych.

def eager() do
  File.read!("plwiki-20170301-all-titles")
    |> String.split("\n")
       |> Enum.flat_map(&String.split/1)
       |> Enum.reduce(%{}, fn word, map ->
                                Map.update(map, word, 1, & &1 + 1)
                           end
                     )
end

W powyższym programie wczytujemy plik tekstowy i chcemy policzyć wystąpienia poszczególnych słów. Nie ma tu nic skomplikowanego. Funkcja read! wczytuje cały plik. Następnie dzielimy go na linie, a z linii budujemy listy (enumerable) słów za pomocą flat\_map. Na koniec wszystko redukujemy. Takie podejście ma jednak pewną wadę. Musimy zapewnić odpowiednio dużo pamięci dla naszego programu. Oczywistym rozwiązaniem jest…

Podejście leniwe

W tym przypadku ustalamy, że będziemy czytać plik linia po linii i przetwarzać na raz całą linię.

Listing 2. Leniwe przetwarzanie danych.

def lazy() do
  File.stream!("plwiki-20170301-all-titles", [], :line)
    |> Stream.flat_map(&String.split/1)
    |> Enum.reduce(%{}, fn word, map ->
                            Map.update(map, word, 1, & &1 + 1)
                        end
                  )
end

Funkcja stream! jako trzeci parametr przyjmuje :line. Dzięki czemu odczyta na raz tylko jedną linię z pliku. Udało nam się wyeliminować dużą konsumpcję pamięci, ale pytanie, co jak chcemy przyspieszyć?

Podejście współbieżne

Jeżeli dane są wzajemnie niezależne, to możemy spróbować usprawnić proces ich przetwarzania. Wystarczy, by przetwarzanie odbywało się na wielu procesorach.

Listing 3. Równoległe przetwarzanie danych.

def parallel() do
  File.stream!("plwiki-20170301-all-titles")
    |> Flow.from_enumerable()
    |> Flow.flat_map(&String.split/1)
    |> Flow.partition()
    |> Flow.reduce(fn -> %{} end, fn word, map ->
                                     Map.update(map, word, 1, & &1 + 1)
                                  end
                    )
    |> Enum.into(%{})
end

Narzędziem, które nam to ułatwia, jest Flow. Dzięki niemu możemy zaimplementować zarówno mechanizm równoległego przetwarzania danych, z wykorzystaniem GenServer, jak i wykorzystać tzw. Back-pressure, by dostroić obciążenie komponentu odczytującego dane do możliwości systemu.

Jednak w tym przypadku spotkamy jeszcze jeden problem. Jeżeli redukcja ma być nieblokująca, to musimy jakoś pogodzić pisanie do mapy przez wiele wątków. Chcemy uniknąć sytuacji, gdy dwa wątki modyfikują wartość pod tym samym kluczem. W tym celu Flow udostępnia funkcję partition, która gwarantuje, że dany klucz będzie „przywiązany” do wątku. Na koniec wystarczy zsumować mapy. Jest to bardzo proste, ponieważ wiemy, że zawierają unikalne klucze.

Podsumowanie

Ok, ale co w tym dziwnego? Zauważcie, że do naszego problemu podeszliśmy tak, by na początek zaaplikować rozwiązanie najprostsze. Następnie komplikowaliśmy rozwiązanie, jednocześnie uzyskując nowe możliwości i lepsze rezultaty. Warto tak działać, ponieważ w ten sposób minimalizujemy szansę na popełnienie błędu. Niestety zauważyłem, że coraz więcej osób chce od razu mieć rozwiązanie, które jest skomplikowane. Jednak produkt końcowy zawiera dużo dziwnych błędów.

Na koniec małe wizualne porównanie działania poszczególnych rozwiązań. Na dole zrzut z htopa. Erlang śmiga na rdzenie od 1 do 4. Na rdzeniach 7 i 8 śmiga nagrywanie pulpitu. Obserwujcie, jak dużo pamięci jest zajęte. Całość odpalać w wysokich detalach na pełnym ekranie.

Pliki w tym tekście to dumpy wikipedii, zanim jednak pobierzesz jakiś plik weź pod uwagę możliwości swojego komputera 😉

Kod znajdziecie tu.