W naszej podróży po Sparku musiała przyjść pora na ten przystanek. “Cache vs persist” to absolutnie podstawowe pytanie na każdej rozmowie rekrutacyjnej ze Sparka. Co ważniejsze – to naprawdę przydatne narzędzie dla każdego inżyniera big data, który posługuje się tym frameworkiem! Dzięki niemu przyspieszysz swojego joba, zmniejszysz ryzyko nieoczekiwanego fuckupu. No i najważniejsze – podczas code review pokażesz, że znasz się na rzeczy i nie jesteś tu z przypadku. Żartuję oczywiście.
Nie traćmy więc czasu i sprawdźmy co to za dobrodziejstwo, owe osławione cache i persist. Całą serię “zrozumieć sparka” poznasz tutaj.
Krótka powtórka – akcje i transformacje
Zanim przejdziemy do sedna sprawy – najpierw krótkie przypomnienie jak działa Spark – w pigułce. Aby to zrozumieć, fundamentalne są trzy rzeczy:
- Spark działa na swoich własnych strukturach, “kolekcjach”. To przede wszystkim RDD, które przypominają “listy” lub “tablice” (nie w konkrecie, natomiast grunt że nie jest to kolekcja typu mapa), ale także Datasety i Dataframy (które są aliasem dla Dataset[Row]). Napisałem, że “przede wszystkim”, ponieważ pod spodem DSów i DFów leżą właśnie RDD. To na tych strukturach musimy pracować, żeby całość ładnie działała.
- Na tych strukturach możemy wykonywać dwa typy operacji:
- transformacje – czyli operacje, które wykonują pewne obliczenia, natomiast nie są wykonywane w momencie ich wywołania w kodzie.
- akcje – operacje, które zwykle są jakimś rodzajem operacji “wyjścia”. I co istotne – w tym momencie wykonywane są wszystkie transformacje po kolei.
- Takie podejście lazy evaluation zastosowane jest, aby Spark mógł odpowiednio zoptymalizować te operacje. Spisuje je w drzewku DAG (Directed Acyclic Graph – graf skierowany, acykliczny).
Tutaj pojawia się właśnie podstawowy problem – może się okazać, że akcji będziemy mieli kilka. I za każdym razem Spark będzie musiał wykonywać te same transformacje. To oczywiście powoduje, że job staje się niewydajny, a moc obliczeniowa jest niepotrzebnie przepalana.
Cache i persist w Sparku
I właśnie tutaj wkracza cachowanie. Możemy na danym zbiorze danych wywołać .cache() lub .persist() aby zachować go w pamięci. Gdyby potem pojawiło się kilka akcji, Spark będzie pobierał dane z konkretnego punktu, a nie procesował je wszystkimi transformacjami od początku.
cache vs persist
Aby to zrobić, mamy do dyspozycji dwie funkcje: cache() oraz persist(). Cóż – prawdę mówiąc jest jeszcze trzecia (.checkpoint()) ale na ten moment ją zostawimy;-). Tak więc powtórzmy – cache i persist pozwalają “zatrzymać” dataset w kodzie. Od momentu wywołania na naszych danych (RDD, Dataset lub Dataframe) cache lub persist – wywołanie akcji nie będzie skutkowało ponownym przeliczaniem transformacji, które były napisane przed tym punktem.
Pora jednak wyjaśnić sobie różnicę między cache a persist. Otóż – mówiąc prosto – persist pozwala nam ustawić poziom na jakim chcemy przechowywać dane (storage level), natomiast cache ma domyślne wywołanie. Tak naprawdę pod spodem cache wywołuje… właśnie persist!
Jaki storage level ma cache? To zależy od tego czy wywoujemy go na RDD czy Dataset/Dataframe. Jeśli RDD – domyślnie mamy MEMORY_ONLY, natomiast w przypadku Datasetów – MEMORY_AND_DISK.
Kod cache i persist dla Dataset i Dataframe
Jak wygląda wywołanie w kodzie? Przede wszystkim, jeśli wywołujemy cache() na Dataframe, to jest to najzwyklejszy w świecie alias dla wywołania funkcji persist(). I nie zmieniło się to od dawna. Poniżej kod z “bebechów” Sparka 2.4.3.
/** * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`). * * @group basic * @since 1.6.0 */ def cache(): this.type = persist()
Jak wygląda natomiast persist()? W tym przypadku mamy do czynienia z dwoma funkcjami: pierwsza to funkcja, która przyjmuje argument typu StorageLevel, natomiast druga jest funkcją bez argumentów. Wywołuje ona funkcję cacheQuery() znajdującą się w CacheManager, która dokonuje całej “magii”. Jeśli nie podamy jej Storage Level, zostanie przypisany standardowy – czyli “MEMORY_AND_DISK”.
Poniżej implementacje obu funkcji persist().
def persist(): this.type = { sparkSession.sharedState.cacheManager.cacheQuery(this) this } def persist(newLevel: StorageLevel): this.type = { sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel) this }
Kod cache i presist dla RDD
W kontekście RDD cache() także jest aliasem dla persist(). Znów mamy także do czynienia z dwiema funkcjami persist() – jedną bezargumentową, drugą przyjmującą argument typu StorageLevel. Tym razem jednak różnica jest taka, że standardowym, domyślnym poziomem jest “MEMORY_ONLY” (nie jak w przypadku datasetów “MEMORY_AND_DISK”).
Można nawet dostrzec “niezwykle elegancki” komentarz, który pokusił się zostawić w samym środku funkcji jeden z inżynierów tworzących Sparka;-).
def persist(newLevel: StorageLevel): this.type = { if (isLocallyCheckpointed) { // This means the user previously called localCheckpoint(), which should have already // marked this RDD for persisting. Here we should override the old storage level with // one that is explicitly requested by the user (after adapting it to use disk). persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true) } else { persist(newLevel, allowOverride = false) } }
Storage levels
Z implementacji funkcji persist() wiemy, że kluczową sprawą jest Storage Level – czyli poziom na jakim chcemy przechowywać nasze dane. Oto jakie poziomy możemy wybrać:
- MEMORY_ONLY – przechowuje RDD lub Datasety jako deserializowane obiekty w pamięci JVM. Jeśli nie ma wystarczająco dużo dostępnej pamięci, niektóre partycje nie zostaną zapisane i zostaną potem ponownie obliczone w razie potrzeby.
- MEMORY_ONLY_2 – To samo co MEMORY_ONLY, natomiast każda partycja jest replikowana na dwa nody.
- MEMORY_ONLY_SER – to samo co MEMORY_ONLY, natomiast przechowuje dane jako serializowane obiekty w pamięci JVM. Potrzebuje mniej pamięci niż [1].
- MEMORY_ONLY_SER_2 – analogicznie.
- MEMORY_AND_DISK – w tym przypadku dane są przechowywane jako deserializowane obiekty w pamięci JVM. W tym przypadku jednak, w przeciwieństwie do [1], jeśli nie wystarczy miejsca, nadmiarowe partycje są przechowywane na dysku. To prowadzi do mniejszej wydajności (wolniejszy proces), ponieważ angażujemy tu operacje I/O.
- Analogicznie do 1-4 mamy także MEMORY_AND_DISK_SER, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2.
- DISK_ONLY – w tym przypadku dane są przechowywane jedynie na dysku. Oczywiście najwolniejszy wariant.
- DISK_ONLY_2 – analogicznie, tworzone są replikacje.
Zostań prawdziwym sparkowym wojownikiem
Jeśli dotarłeś do tego punktu, to prawdopodobnie jesteś naprawdę zainteresowany rozwojem w Sparku. Trzeba uczciwie powiedzieć, że to złożona, duża technologia. Jeśli chcesz zgłębić ją bardziej – przekonaj swojego szefa, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy dużo i intensywnie na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!
A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!