Zrozumieć Sparka: cache vs persist

Zrozumieć Sparka: cache vs persist

W naszej podróży po Sparku musiała przyjść pora na ten przystanek. “Cache vs persist” to absolutnie podstawowe pytanie na każdej rozmowie rekrutacyjnej ze Sparka. Co ważniejsze – to naprawdę przydatne narzędzie dla każdego inżyniera big data, który posługuje się tym frameworkiem! Dzięki niemu przyspieszysz swojego joba, zmniejszysz ryzyko nieoczekiwanego fuckupu. No i najważniejsze – podczas code review pokażesz, że znasz się na rzeczy i nie jesteś tu z przypadku. Żartuję oczywiście.

Nie traćmy więc czasu i sprawdźmy co to za dobrodziejstwo, owe osławione cache i persist. Całą serię “zrozumieć sparka” poznasz tutaj.

Krótka powtórka – akcje i transformacje

Zanim przejdziemy do sedna sprawy – najpierw krótkie przypomnienie jak działa Spark – w pigułce. Aby to zrozumieć, fundamentalne są trzy rzeczy:

  1. Spark działa na swoich własnych strukturach, “kolekcjach”. To przede wszystkim RDD, które przypominają “listy” lub “tablice” (nie w konkrecie, natomiast grunt że nie jest to kolekcja typu mapa), ale także Datasety i Dataframy (które są aliasem dla Dataset[Row]). Napisałem, że “przede wszystkim”, ponieważ pod spodem DSów i DFów leżą właśnie RDD. To na tych strukturach musimy pracować, żeby całość ładnie działała.
  2. Na tych strukturach możemy wykonywać dwa typy operacji:
    • transformacje – czyli operacje, które wykonują pewne obliczenia, natomiast nie są wykonywane w momencie ich wywołania w kodzie.
    • akcje – operacje, które zwykle są jakimś rodzajem operacji wyjścia”. I co istotne – w tym momencie wykonywane są wszystkie transformacje po kolei.
  3. Takie podejście lazy evaluation zastosowane jest, aby Spark mógł odpowiednio zoptymalizować te operacje. Spisuje je w drzewku DAG (Directed Acyclic Graph – graf skierowany, acykliczny).

Tutaj pojawia się właśnie podstawowy problem – może się okazać, że akcji będziemy mieli kilka. I za każdym razem Spark będzie musiał wykonywać te same transformacje. To oczywiście powoduje, że job staje się niewydajny, a moc obliczeniowa jest niepotrzebnie przepalana.

Cache i persist w Sparku

I właśnie tutaj wkracza cachowanie. Możemy na danym zbiorze danych wywołać .cache() lub .persist() aby zachować go w pamięci. Gdyby potem pojawiło się kilka akcji, Spark będzie pobierał dane z konkretnego punktu, a nie procesował je wszystkimi transformacjami od początku.

cache vs persist

Aby to zrobić, mamy do dyspozycji dwie funkcje: cache() oraz persist(). Cóż – prawdę mówiąc jest jeszcze trzecia (.checkpoint()) ale na ten moment ją zostawimy;-). Tak więc powtórzmy – cache i persist pozwalają “zatrzymać” dataset w kodzie. Od momentu wywołania na naszych danych (RDD, Dataset lub Dataframe) cache lub persist – wywołanie akcji nie będzie skutkowało ponownym przeliczaniem transformacji, które były napisane przed tym punktem.

Pora jednak wyjaśnić sobie różnicę między cache a persist. Otóż – mówiąc prosto – persist pozwala nam ustawić poziom na jakim chcemy przechowywać dane (storage level), natomiast cache ma domyślne wywołanie. Tak naprawdę pod spodem cache wywołuje… właśnie persist!

Jaki storage level ma cache? To zależy od tego czy wywoujemy go na RDD czy Dataset/Dataframe. Jeśli RDD – domyślnie mamy MEMORY_ONLY, natomiast w przypadku Datasetów – MEMORY_AND_DISK.

Kod cache i persist dla Dataset i Dataframe

Jak wygląda wywołanie w kodzie? Przede wszystkim, jeśli wywołujemy cache() na Dataframe, to jest to najzwyklejszy w świecie alias dla wywołania funkcji persist(). I nie zmieniło się to od dawna. Poniżej kod z “bebechów” Sparka 2.4.3.

/**
   * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
   *
   * @group basic
   * @since 1.6.0
   */
  def cache(): this.type = persist()

Jak wygląda natomiast persist()? W tym przypadku mamy do czynienia z dwoma funkcjami: pierwsza to funkcja, która przyjmuje argument typu StorageLevel, natomiast druga jest funkcją bez argumentów. Wywołuje ona funkcję cacheQuery() znajdującą się w CacheManager, która dokonuje całej “magii”. Jeśli nie podamy jej Storage Level, zostanie przypisany standardowy – czyli “MEMORY_AND_DISK”.

Poniżej implementacje obu funkcji persist().

def persist(): this.type = {
    sparkSession.sharedState.cacheManager.cacheQuery(this)
    this
  }

def persist(newLevel: StorageLevel): this.type = {
    sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
    this
  }

Kod cache i presist dla RDD

W kontekście RDD cache() także jest aliasem dla persist(). Znów mamy także do czynienia z dwiema funkcjami persist() – jedną bezargumentową, drugą przyjmującą argument typu StorageLevel. Tym razem jednak różnica jest taka, że standardowym, domyślnym poziomem jest “MEMORY_ONLY” (nie jak w przypadku datasetów “MEMORY_AND_DISK”).

Można nawet dostrzec “niezwykle elegancki” komentarz, który pokusił się zostawić w samym środku funkcji jeden z inżynierów tworzących Sparka;-).

def persist(newLevel: StorageLevel): this.type = {
    if (isLocallyCheckpointed) {
      // This means the user previously called localCheckpoint(), which should have already
      // marked this RDD for persisting. Here we should override the old storage level with
      // one that is explicitly requested by the user (after adapting it to use disk).
      persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
    } else {
      persist(newLevel, allowOverride = false)
    }
  }

Storage levels

Z implementacji funkcji persist() wiemy, że kluczową sprawą jest Storage Level – czyli poziom na jakim chcemy przechowywać nasze dane. Oto jakie poziomy możemy wybrać:

  1. MEMORY_ONLY – przechowuje RDD lub Datasety jako deserializowane obiekty w pamięci JVM. Jeśli nie ma wystarczająco dużo dostępnej pamięci, niektóre partycje nie zostaną zapisane i zostaną potem ponownie obliczone w razie potrzeby.
  2. MEMORY_ONLY_2 – To samo co MEMORY_ONLY, natomiast każda partycja jest replikowana na dwa nody.
  3. MEMORY_ONLY_SER – to samo co MEMORY_ONLY, natomiast przechowuje dane jako serializowane obiekty w pamięci JVM. Potrzebuje mniej pamięci niż [1].
  4. MEMORY_ONLY_SER_2 – analogicznie.
  5. MEMORY_AND_DISK – w tym przypadku dane są przechowywane jako deserializowane obiekty w pamięci JVM. W tym przypadku jednak, w przeciwieństwie do [1], jeśli nie wystarczy miejsca, nadmiarowe partycje są przechowywane na dysku. To prowadzi do mniejszej wydajności (wolniejszy proces), ponieważ angażujemy tu operacje I/O.
  6. Analogicznie do 1-4 mamy także MEMORY_AND_DISK_SER, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2.
  7. DISK_ONLY – w tym przypadku dane są przechowywane jedynie na dysku. Oczywiście najwolniejszy wariant.
  8. DISK_ONLY_2 – analogicznie, tworzone są replikacje.

Zostań prawdziwym sparkowym wojownikiem

Jeśli dotarłeś do tego punktu, to prawdopodobnie jesteś naprawdę zainteresowany rozwojem w Sparku. Trzeba uczciwie powiedzieć, że to złożona, duża technologia. Jeśli chcesz zgłębić ją bardziej – przekonaj swojego szefa, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy dużo i intensywnie na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!

A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!

 

Loading
Zrozumieć Sparka: joiny

Zrozumieć Sparka: joiny

Spark to rozległa i złożona technologia. Zrozumienie go wymaga zarówno silnych podstaw teoretycznych, jak i konkretnej, mozolnej praktyki. W serii “Zrozumieć Sparka” postaram się dogłębnie przybliżyć różne aspekty tego zacnego frameworku do przetwarzania danych.

Zaczynamy “z grubej rury” od czegoś, co wycisnęło niejedną łzę u Inżynierów Big Data. Chodzi oczywiście o joiny – mechanizm znany także z tradycyjnego SQL. Z pewnością pojawią się na ten temat także artykuły zaawansowane, dlatego dzisiaj zaczniemy dość lekko – od krótkiego omówienia typów joinów. Wiem, wiem – niby temat znany. W praktyce jednak czasem niedopowiedziany. Niech więc zostanie rozwiązany od podstaw;-).

Czym są joiny w Sparku?

Joiny to mechanizm znany dobrze ludziom pracującym z relacyjnymi bazami danych. Polega na łączeniu tabel. Wyobraź sobie, że masz tabelę, w której są zebrane dane o pracownikach – pesele, imiona, nazwiska, wykształcenie… Zaraz obok możesz zobaczyć tabelkę, w której widać dane dotyczące stanowisk – nazwy stanowisk, pensje, nazwa departamentów w firmie, pesele itd.

Jeśli chcemy pracować na danych dotyczących spraw stanowiskowych w kontekście ludzi – musimy te dwie tabele połączyć. I właśnie temu służą joiny;-). Problematyka w teorii dość prosta, w praktyce ma kilka zagwozdek o których warto wspomnieć. Przede wszystkim właśnie ta podstawowa – czyli różnice w joinach.

Typy joinów w Sparku

Chcąc połączyć tabele, mamy do dyspozycji kilka typów takiego mechanizmu. Podstawowe to inner join, left (outer) join, right (outer) join, full outer join oraz cross join. Dodatkowo możemy w sparku skorzystać z jeszcze dwóch: semi joina oraz anti joina. Omówienie każdego z nich będzie prostsze, jeśli spojrzymy najpierw na ten popularny w internecie diagram.

Wyjaśnijmy czym są owe kółeczka oznaczone jako A oraz B. Jak już napisałem, w tradycyjnych bazach danych będą to tabele. W Sparku będą to Dataframy lub Datasety. Skupimy się tutaj na Dataframach (alias dla Dataset[Row]), gdyż jest to struktura tabelaryczna, stricte przygotowana właśnie pod pracę na tabelach.

Na powyższym obrazku mamy zawsze dwie strony – lewą (A) i prawą (B). Dla wyjaśnienia: to który dataframe jest po lewej a który po prawej to rzecz czysto umowna, w zależności od tego po której stronie jaki dataframe umieścimy w kodzie. Podstawowy schemat tworzenia joina wygląda bowiem następująco:

firstDF.join(secondDF, columns, "joinType")

tak więc w takiej sytuacji “firstDF” będzie “lewym” dataframe, a “secondDF” będzie “prawy”.

Zanim przejdziemy do omówienia poszczególnych typów, zachęcam do przyjrzenia się powyższemu obrazkowi. Jeśli natomiast zapamiętać będzie ciężko, być może poniższe dzieło odrobinę pomoże:

 

W przykładach będę posługiwał się następującymi strukturami:

  1. Dataframe peopleDF zawiera id, firstName, lastName i age.
  2. Dataframe jobsDF zawiera id oraz job – czyli nazwę zawodu (np. “teacher”).

Wyglądają mniej więcej tak:

 

 

Inner Join

Zaczynamy od najprostszego typu, będącego jednocześnie domyślnym podczas łączenia tabel w Sparku. Innej join doprowadza do połączenia tych rekordów, które są wspólne – czyli wtedy, gdy warunek zostanie spełniony.

Ponieważ w Sparku inner join jest domyślny, nie musimy go określać podczas łączenia tabel. Obie poniższe implementacje są poprawne:

val innerJoin: Dataset[Row] = peopleDF.join(jobsDF, "id")
val innerJoin2: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "inner")

Generują one następujący efekt:

Left (outer) join

Left join, czy też posługując się pełną nazwą – left outer join, to mechanizm, który łączy obie tabele po konkretnych kolumnach, natomiast zapewnia, że wszystko z lewej strony będzie miało miejsce w finalnej tabeli. Jeśli więc nie uda się powiązać (“zmatchować”) rekordów, kolumny z “prawej strony” będą wypełnione nullami.

Implementacja wygląda następująco:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "left_outer")

I daje efekt w postaci takiej tabelki:

Zwróć uwagę na to że rekordy w peopleDF kończą się na id 18, natomiast tabela job posiada id od 1 do 13, a potem 19 i 20. Na tym przykładzie widać, że część nie powiązana została wypełniona nullami.

Right (outer) join

W przypadku right joina sprawa jest prosta – to po prostu “lustrzane odbicie” left joina. Łączy on więc obie tabele po konkretnych kolumnach, natomiast zapewnia, że wszystko z prawej strony znajdzie swoje miejsce w finalnej tabeli. Jeśli więc któreś rekordy z prawej tabeli nie będą miały swojego odpowiednika (match nie zostanie zrealizowany/“nie zmatchuje się”), to w tabeli finalnej kolumny z lewej w tych rekordach będą nullami.

Implementacja:

val rightJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "right_outer")

Nawet jeśli powyższy opis brzmi jak skomplikowany, poniższy efekt powinien wszystko rozjaśnić:

Full outer  join

Full outer join jest przeciwieństwem inner joina – bierze wszystko. Tak więc można powiedzieć, że jest sklejeniem left i right joinów wraz z usunięciem duplikatów.

Implementacja:

val fullJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "full")

Efekt:

Cross join (cartesian/iloczyn kartezjański)

Cross join nazywany jest także iloczynem kartezjańskim. Pochodzi to od matematycznego mechanizmu operacji na macierzach – właśnie iloczynu kartezjańskiego. Dla ciekawskich, z matematyczną definicją oraz kilkoma ćwiczeniami można zapoznać się tutaj.

Dla nas jednak chyba łatwiejsza będzie “definicja zbanalizowana by Maro” – czyli proste “łączymy każdy rekord z każdym”W związku z tym, że każdy z każdym, nie ma tu potrzeby (ani miejsca) by wpisać kolumnę, po której łączymy.

Implementacja:

val crossJoin: Dataset[Row] = peopleDF.crossJoin(jobsDF)

Jak widać, efekt jest znacznie bardziej okazały, niż przy pozostałych typach łączeń. Przedstawiam tylko część, natomiast w tym przypadku rekordów jest aż 270:

Semi join

Pozostały jeszcze dwa typy joinów, zwykle pomijane. Chociaż są znacznie rzadziej spotykane, warto moim zdaniem znać oba, gdyż oba mogą okazać się naprawdę przydatne. Jeśli taka potrzeba zajdzie, po prostu zastosujemy przygotowaną już konstrukcję, zamiast robić “skok przez plecy” z selectem, równaniami, wherami i filtrami.

Pierwszym z nich jest Semi Join. To nic innego jak inner join, który… pomija kolumny z lewej strony. Tak więc po zmatchowaniu rekordów, brane są pod uwagę tylko kolumny z lewej strony.

Implementacja:

val semiJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "left_semi")

Efekt:

Anti Join

Ostatnim, najciekawszym moim zdaniem, joinem – jest anti join. Jest on pewnego rodzaju “dopełnieniem” semi-joina. Albo jego przeciwieństwem – zależy jak patrzeć. Chodzi w każdym razie  to, że chociaż kolumny są takie same – a więc jedynie z lewej strony – to… w finalnej tabeli znajdują się tylko rekordy, które nie zostały zmatchowane, nie da się ich powiązać między tabelami.

Implementacja:

val antiJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "left_anti")

Efekt:

Implementacja inaczej

Warto zauważyć jedną rzecz: wszędzie powyżej poszedłem na łatwiznę i łączyłem po takiej samej kolumnie. Może się jednak okazać, że nie będzie takich samych kolumn! Cóż wtedy zrobić? Opcji jest jak zawsze kilka. Przede wszystkim dwie (poniższe konstrukcje nie odpowiadają już schematom których trzymaliśmy się przez większość artykułu):

Opcja nr 1: zróbmy tak, żeby były takie same kolumny. Czyli po prostu zmieńmy nazwy kolumn, które mają być matchowane.

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF.withColumnRenamed("personId", "id"),
      Seq("id"), "left_outer")

Opcja nr 2: powiedzmy, które kolumny mają być matchowane. Tak, to zdecydowanie brzmi rozsądniej:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, peopleDF("id") === jobsDF("personId"), "left_outer")

To samo można także zrobić za pomocą następującej składni:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, peopleDF("id").equalTo(jobsDF("personId")), "left_outer")

Kolejną rzeczą jaką warto rozważyć jest łączenie po kilku tabelach. Możemy to zrobić bardzo prosto:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id", "pesel"), "left_outer")

Można także budować bardziej skomplikowane warunki joinów:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, peopleDF("id").equalTo(jobsDF("personId")).and(peopleDF("age").gt(jobsDF("minimumAge"))), "left_outer")

Niebezpieczeństwa

Skoro joiny są takie proste i przyjemne, to czy można ich używać bez ograniczeń i bez głębszej refleksji? Otóż… jest dokładnie odwrotnie. Napisałem na początku, że mechanizm ten wycisnął niejedną łzę u inżynierów korzystających ze sparka. Oto dlaczego:

Nie będziemy się tutaj rozwodzić nad tym co i jak w detalu. Sprawa jest prosta: join to uruchomienie shufflingu – czyli przerzucania danych między partycjami oraz nodami. To z kolei bardzo zasobożerna sprawa i warto jej unikać. Na ile oczywiście się da, bo najczęściej się po prostu nie da.

W kolejnych artykułach zagłębimy się w kwestie wydajnościowe właśnie. Na ten moment, z racji że są to podstawy, polecam przejrzeć kod pod kątem akcji, które wykonujemy po joinie. Być może warto po połączeniu tabel zrobić bowiem cache(), persist() lub checkpoint(). Gdybyśmy wywoływali dwukrotnie akcje, dwukrotnie będzie zrobiony także join – a po co, jak nie trzeba?;-).

Ciąg dalszy…

Mam szczerą nadzieję, że wiesz już teraz wszystko czego trzeba, aby zacząć przygodę z joinami w Sparku. Oczywiście w kolejnych artykułach zagłębimy się nieco w tym zagadnieniu – zostań z nami przez newsletter!

Jeśli czujesz, że Twoje wiedza na temat Sparka jest nieuporządkowana i frustruje Cię to, to mam dobrą wiadomość. Organizujemy szkolenia dla firm z zakresu… właśnie Apache Spark. Jeśli masz dobrego szefa, podeślij mu kontakt do nas – możemy pojawić się u Ciebie w firmie i pokazać wszystko od 0 do Spark Inżyniera;-). Strona ze szkoleniem dostępna jest tutaj.

Tymczasem zapraszam do naszego newslettera. Zostańmy w kontakcie i… razem budujmy naszą wspólną, polską społeczność Big Data!

 

Loading

 

Zamiana Dataframe w Dataset w Sparku. Taka, która działa.

Zamiana Dataframe w Dataset w Sparku. Taka, która działa.

Największy problem Dataframe w Sparku? Oczywiście – brak jasności, jaka schema jest w aktualnym DFie. W związku z tym często wykorzystuje się Dataset[T], gdzie T to konkretna Case Class. Dzięki temu, jeśli wywołujemy metodę, która zwraca Dataset[T], możemy być przekonani, że wiemy jakim typem operujemy, jaką nasz obiekt ma schemę.

Niestety, tak dobrze jest tylko w teorii. W praktyce Spark pozwala wyjść poza ramy schematu i dodać lub odjąć kolumny, które nie istnieją w case class T. Jest to bardzo mylące i osoby które o tym niewiedzą mogą przeżyć niezłą eksplozję mózgu. Pytanie – czy da się coś z tym zrobić? Jak najbardziej. Chociaż nie oficjalną ścieżką, bo to właśnie ona wprowadza w błąd.

Standardowe castowanie Dataframe do Dataset[T]

Na początku zobaczmy jak to powinno wyglądać według planu “standardowego”. W przykładzie będziemy się posługiwać case classą “Person”, która ma id, name i age. To do niej będziemy “równać” wszystkie Dataframy. Wykorzystamy do tego trzy DFy:

  1. perfectDF – będzie miał dokładnie takie kolumny jak powinniśmy mieć wykorzystując Person.
  2. bigDF – będzie zawierał jedną kolumnę więcej – “lastName”
  3. smallDF – będzie zawierał jedynie kolumny “id,name”. Będzie więc brakowało kolumny “age”

Dodam jeszcze, że cały kod pisany jest w Scali.

Poniżej przedstawiam uproszczone utworzenie pierwszego Dataframe’a. Reszta jest na podobnej zasadzie.

val perfectDF: Dataset[Row] = Seq(
      (1, "John", "26"),
      (2, "Anna", "28")
    ).toDF("id", "name", "age")

Czas na najważniejszą rzecz: oficjalnym i najbardziej popularnym sposobem na zamianę DF w DS[T] jest wywołanie df.as[T]. Aby to zrobić, musimy wcześniej zaimportować implicit._. Można ewentualnie zrobić także df.as[T](Encoders.product[T]).

Po takim “myku” spodziewamy się, że otrzymamy zmienną z typem Dataset[T]. W naszym przypadku, po zastosowaniu perfectDF.as[Person] faktycznie tak się dzieje. Wywołujemy więc “show()” aby sprawdzić jak wyglądają nasze dane i… wszystko gra. Mamy więc Dataset o jasnej schemie, gdzie możemy odwoływać się do konkretnych pól.

Problem zaczyna pojawiać się w sytuacji, gdy nasz Dataframe nie jest idealnie wpasowany do schematu klasy T. Przykładowo – zmienna bigDF to Dataframe, który zwiera kolumny “id”, “name”, “lastName”, “age”. Po wywołaniu metody bigDF.as[Person] możemy się spodziewać, że kolumna zostanie obcięta. Wszak w wyniku operacji dostajemy Dataset[Person], a klasa Person nie zawiera pola “lasName”. Niestety, jest zupełnie inaczej.

val bigDF: Dataset[Row] = Seq(
      (1, "John", "Smith", "27"),
      (3, "Anna", "Smith", "1")
    ).toDF("id", "name", "lastname", "age")

val bigDFAsPerson: Dataset[Person] = bigDF.as[Person]
bigDFAsPerson.show()
bigDFAsPerson.printSchema()

Efekt – w postaci schemy oraz wyglądu tabelki – można obserwować poniżej:

+---+----+--------+---+
| id|name|lastname|age|
+---+----+--------+---+
|  1|John|   Smith| 27|
|  3|Anna|   Smith|  1|
+---+----+--------+---+

root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- age: string (nullable = true)

Sprawa ma się inaczej, gdy spróbujemy dokonać konwersji do klasy z Dataframe, który ma zbyt mało kolumn:

val smallDF: Dataset[Row] = Seq(
  (1, "maro"),
  (3, "ignacy")
).toDF("id", "name")

val smallDFAsPerson: Dataset[Person] = smallDF.as[Person]

smallDFAsPerson.show()
smallDF.printSchema()

Efekt: ponieważ nie ma jednej z kolumn, Spark rzuci wyjątek. A konkretniej AnalysisException.

21/08/30 21:25:11 INFO CodeGenerator: Code generated in 25.8197 ms
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`age`' given input columns: [id, name];
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:110)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:107)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)  
.
.
.
    at App$.main(App.scala:44)
    at App.main(App.scala)

 

Sprawdzony sposób na zamianę Dataframe do Dataset[T]

Na szczęście – jest możliwość, aby to naprawić. Aby nie przedłużać powiem tylko, że należy po prostu napisać swój własny kod, dzięki któremu rozszerzymy nieco Dataframe. Najpierw jednak rozpiszmy sobie jakie chcemy spełnić warunki.

Po konwersji z Dataframe, nowo utworzony Dataset:

  1. Nie będzie zawierał niepotrzebnych kolumn.
  2. Będzie miał nowo utworzone kolumny. Będą one wypełnione nullami.

Aby to zrobić, utwórz object SparkExtensions, a następnie dodaj kod:

import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row}

object SparkExtensions {
  implicit class ExtendedDataFrame(df: DataFrame) {
    def to[T <: Product: TypeTag]: Dataset[T] = {
      import df.sparkSession.implicits._
      import org.apache.spark.sql.functions.col
      val columnsUniqueInT = Encoders.product[T].schema.diff(df.schema)
      val withAdditionalColumns: Dataset[Row] = columnsUniqueInT .foldLeft(df)((previousDF, column) =>
        previousDF.withColumn(column.name,
          lit(null).cast(column.dataType)))
      withAdditionalColumns.select(Encoders.product[T]
        .schema
        .map(f => col(f.name)): _*)
        .as[T]
    }
  }
}

Co tu się dzieje? Przede wszystkim trzy rzeczy:

  1.  Znajdujemy unikalne kolumny – robimy to poprzez różnicę tego co jest w naszej case class oraz w schemacie dataframe.
  2. Tworzymy Dataframe z dodatkowymi kolumnami, które wypełniamy nullami i castujemy do odpowiednich typów
  3. Tworzymy Dataframe (na bazie [2]), w którym wybieramy jedynie te kolumny, które są przewidziane w klasie T.

Jak tego użyć? Musimy zrobić dwie rzeczy: zaimportować nasz object SparkExtensions (import functions.SparkExtensions._), a następnie przy Dataframe zamiast “.as[Person]” zrobić “.to[Person]”.

val bigDFToPerson: Dataset[Person] = bigDF.to[Person]
bigDFToPerson.show()
bigDFToPerson.printSchema()

Efekt:

+---+----+---+
| id|name|age|
+---+----+---+
|  1|John| 27|
|  3|Anna|  1|
+---+----+---+

root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)

 

UWAGA! Taka opcja zakłada, że wszystko musi zgadzać się w 100%.

Jak wiadomo, pole w Datasetach sparkowych składa się z 3 rzeczy:

  1. Nazwy
  2. Typu
  3. Określenia, czy jest nullable czy nie – czyli czy może przyjmować null jako wartość.

Dla tych, którzy wolą nieco bardziej liberalną wersję, przygotowałem też metodę, która pozwala okrajać nie uwzględniając trzeciej rzeczy.

def to[T <: Product: TypeTag]: Dataset[T] = {
      import df.sparkSession.implicits._
      import org.apache.spark.sql.functions.col
      val columnsFromT = Encoders.product[T].schema.map(c=> (c.name,c.dataType))
      val columnsFromDF = df.schema.map(c=> (c.name, c.dataType))
      val columnsUniqueInT = columnsFromT.diff(columnsFromDF)
      val withAdditionalColumns: Dataset[Row] = columnsUniqueInT.foldLeft(df)((previousDF, column) =>
        previousDF.withColumn(column._1,
          lit(null).cast(column._2)))
      withAdditionalColumns.select(Encoders.product[T]
        .schema
        .map(f => col(f.name)): _*)
        .as[T]
    }

Zdaję sobie sprawę, że robi się już nieco mało estetycznie, ale zapewniam – działa;-). Stosuje się dokładnie tak samo jak w przypadku poprzedniej wersji.

Liczę głęboko, że taka metoda się przyda – w projekcie w którym pracowałem, problem typów był naprawdę mocno odczuwalny. Cóż – po więcej, choć zwykle nieco bardziej “wysokopoziomowych” artykułów – zapraszam do newslettera;-). Powodzenia!

 

Loading