Spark: czemu jedna akcja tworzy wiele jobów?

Spark: czemu jedna akcja tworzy wiele jobów?

Zgłębiając kwestie wydajnościowe zauważyłem, że dzieje się coś dziwnego: jedna akcja generuje wiele jobów. Postanowiłem to sprawdzić i opisać tutaj:-). Śmiało, częstuj się. A jeśli artykuł okaże się przydatny – podziel się nim na LinkedIn… czy gdziekolwiek chcesz.

Kawka w dłoń i ruszamy!

Podstawy – jak działa aplikacja sparkowa?

Bardzo często mówiąc o tym, że piszemy w sparku, mówimy “muszę napisać joba sparkowego”, “mój job szedł 30 godzin!” i inne.

Cóż… według nomenklatury sparkowej, nie jest to poprawne. Co gorsza – może być nieco mylące. Okazuje się bowiem, że to co nazywamy “jobem sparkowym” (czyli cały kod który ma coś zrobić i jest uruchamiany przy użyciu sparka) to tak naprawdę “aplikacja” (application). Aplikacja natomiast składa się z… jobów.

Nie mówię, że masz dokonywać rewolucji w swoim słowniku. Sam zresztą też chwalę się ile to nie trwał “mój job”;-). Pamiętaj jednak, że prawdziwe joby siedzą pod spodem aplikacji.

No dobrze – ale czym dokładnie są te joby? I jak jest zbudowana aplikacja sparkowa? Oczywiście to nie miejsce na szkolenie (na listę oczekujących kursu online “Fundament Apache Spark” możesz zapisać się tutaj). Spróbujmy jednak bardzo pokrótce zrozumieć jak zbudowana jest taka aplikacja.

Zróbmy to w kilku krokach:

  1. Podczas pisania kodu posługujemy się dwoma rodzajami: akcjami i transformacjami:
    • transformacje pozwalają nam przetworzyć RDD/Dataset na inny RDD/Dataset. Nie są jednak wykonywane od razu (lazy evaluation)
    • akcje z kolei wykonują wspomniane wcześniej transformacje i tym samym kończą pewną robotę do zrobienia – czyli joba;-).
  2. Job, czyli praca do wykonania. No właśnie – mamy kilka transformacji, które składają się w jeden ciąg operacji dokonywanych na danych. Na końcu są np. zapisane na HDFS albo wyświetlone na ekranie. I to jest właśnie 1 job. Tak więc powiedzmy to sobie wreszcie – 1 akcja = 1 job, yeeeaahh!
  3. Czyli w aplikacji może być kilka jobów. To teraz kolejne zagłębienie. Job składa się ze… stage’y. Czyli z etapów. Jak to się dzieje? Wróćmy do transformacji – bo na tym etapie mamy tylko z transformacjami do czynienia (w końcu akcja kończy joba).
    • Transformacje też możemy podzielić!
    • Narrow Transformations – gdy transformacje z jednej partycji wyprowadzają dokładnie jedną partycję. Narrow Transformations (np. filtry) są dokonywane w pamięci.
      • przykłady: filter, map, union, sample, intersection
    • Wide Transformations – gd transformacje wyprowadzają z jednej partycji wejściowej kilka partycji wyjściowych. Tutaj, ponieważ wide transformations powodują shuffling, dane muszą zostać zapisane na dysk.
      • przykłady: groupBy, join, repartition (te trzy, szczególnie dwa pierwsze, to klasyki – postrachy inżynierów sparkowych)
    • No i właśnie dlatego, że wide transformations powodują shuffling (przemieszanie danych między partycjami/executorami/nodami), musi zakończyć się jakiś etap joba. Czyli stage;-).

I można by kilka rzeczy dodać, ale tyle wystarczy, a nawet może zbyt wiele.

Nie mogłem jednak się powstrzymać. Uff… liczę, że jeszcze ze mną jesteś!

Ale dlaczego jedna akcja tworzy wiele jobów?!

Wspomniałem wyżej, że jednej akcji odpowiada jeden job w aplikacji. Jakie było moje zdziwienie, gdy zobaczyłem co następuje.

Oto mój kod. Przykładowy, ćwiczeniowy, prosty do zrozumienia (zmienna spark to instancja SparkSession):

val behaviorDF: Dataset[Row] = spark.read
  .option("header", "true")
  .csv(pathToBehaviorFile)

behaviorDF.show()
val brandCountDF: Dataset[Row] = behaviorDF.groupBy("brand")
  .count()
  .as("brandCount")

val userIdCount: Dataset[Row] = behaviorDF.groupBy("user_id")
  .count()
  .as("userCount")

val behaviorsWithCounts: Dataset[Row] = behaviorDF.join(brandCountDF, "brand")
  .join(userIdCount, "user_id")

behaviorsWithCounts.show(20)

 

Jak widać mamy dwie akcje:

  1. behaviorDF.show() – linijka 23 (w rzeczywistości)
  2. behaviorsWithCounts.show(20) – linijka 35 (w rzeczywistości).

Czyli z grubsza powinny być 2, może 3 joby (jeśli liczyć także wczytywanie danych).

Co zastałem w Spark UI?

WHAAAT…

Jak to się stało?

Czemu mam… 5 różnych jobów do akcji z linijki 35?

Otóż – mogą być za to odpowiedzialne dwie rzeczy:

  1. DataFrame to abstrakcja zbudowana na RDD. 1 akcja odpowiada 1 jobowi, ale… na RDD. Dataframe czasami “pod spodem” może wykonywać jeszcze inne akcje. JEDNAK TO NIE TO BYŁO MOJE ROZWIĄZANIE. Dotarłem do takiego wyjaśnienia więc się nim dzielę. U mnie natomiast DUŻO WAŻNIEJSZY okazał się pkt 2.
  2. Włączone Adaptive Query Execution – czyli mechanizm optymalizacyjny Apache Spark. Może być włączony albo wyłączony. Od Sparka 3.2.0 AQE włączone jest domyślnie!

Po ustawieniu prostej konfiguracji “spark.sql.adaptive.enabled” na “false”, jak poniżej…

val spark: SparkSession = SparkSession.builder()
  .appName("spark3-rdf-tests")
  .config("spark.sql.adaptive.enabled", false)
  //      .master("local[*]")
  .getOrCreate()

 

… i uruchomieniu aplikacji raz jeszcze, efekt w 100% pokrył się z moją wiedzą teoretyczną.

OMG CO ZA ULGA

UWAGA! Warto pamiętać, że AQE jest z zasady dobrym pomysłem. Nie wyłączaj tego, jeśli nie wiesz dokładnie po co to chcesz robić.

Ja na przykład wyłączam w celach edukacyjnych;-)


Szkolenie z Apache Spark – może tego właśnie potrzebujesz?

Jeśli reprezentujesz firmę i potrzebujecie solidnie przeczołgać się ze Sparka… jesteśmy tu dla Was!

Mamy solidnie sprawdzoną formułę.

I własny klaster, na którym poeksperymentujecie;-)

Więcej informacji na tej stronie.

Możesz też po prostu napisać na: kontakt@riotechdatafactory.com !


Co to jest Adaptive Query Execution?

No to teraz pokrótce: co to jest Adaptive Query Execution?

Przeczytasz o tym w dokumentacji Sparka, o tutaj.

Mówiąc jednak prosto i zwięźle: Adaptive Query Execution to mechanizm zawarty w Spark SQL, który pozwala zoptymalizować pracę aplikacji. AQE dokonuje optymalizacji bazując na “runtime statistics”. Temat samych statystyk będę poszerzał w przyszłości tutaj lub na kanale YouTube. Zapisz się na newsletter, żeby nie przegapić;-). I przy okazji zgarnij jedynego polskiego ebooka wprowadzającego w branżę Big Data (i to kompleksowo!).

AQE ma 3 podstawowe funkcjonalności:

  1. Łączenie partycji po shufflingu – dzięki temu mechanizmowi bardziej wydajnie dobierane są partycje. Widać to m.in. na powyższym przykładzie – gdy porównasz liczby partycji w obu przypadkach.
  2. Dzielenie partycji ze “skośnościami” po shufflingu (data skewness) – spark będzie optymalizował partycje, które podlegają “skośności” (są zbyt duże, co wychodzi dopiero po shufflingu).
  3. Zamiana “sort-merge join” na “broadcast join” – zamienia jeśli statystyki którakolwiek strona joina jest mniejsza niż poziom pozwalający na taką operację.

W praktyce AQE daje zauważalne rezultaty. Widać to dość symbolicznie na mojej małej aplikacji (ładuję tam jedynie 5 gb z hakiem), gdzie wynik z ~5.4 min zszedł do ~5 min.

Minusy? Przede wszystkim mniejsza czytelność podczas monitoringu joba. Co z jednej strony może wydać się śmieszne, ale w rzeczywistości, gdy musimy zoptymalizować jakąś bardzo złożoną aplikację – może zrobić się uciążliwe.

Podsumowanie

Podsumowując:

  1. Od Sparka 3.2.0 domyślnie włączony jest Adaptive Query Execution.
  2. To mechanizm, który pozwala na bardzo konkretną optymalizację. Powoduje niestety pewne “zaszumienie” monitoringu aplikacji.
  3. W efekcie zamiast zasady 1 akcja = 1 job, nasza aplikacja będzie bardziej porozbijana.
  4. Można to wyłączyć (aby nie zachęcać do pójścia “na łatwiznę” – jak to zrobić zostało zawarte w tekście;-)). Nie rób jednak tego bez solidnej argumentacji.
  5. Zapisz się na newsletter i przeczytaj ebooka, który pokaże Ci Big Data z kilku różnych storn. A jak Ci się spodoba, napisz o tym w sieci żeby i inni wiedzieli:-).

Jeśli natomiast szukasz czegoś, co pokaże Ci podstawy Sparka od A do Z… 

Może sprawdzisz kurs “Fundament Apache Spark”?

“Podobno” niektórzy od tego kursu… zaczęli całą przygodę z branżą;-).

Zrozumieć Sparka: jak budować UDF? Instrukcja

Zrozumieć Sparka: jak budować UDF? Instrukcja

Dokonując transformacji w Sparku, bardzo często korzystamy z gotowych, wbudowanych rozwiązań. Łączenie tabel, explodowanie tablic na osobne wiersze czy wstawianie stałej wartości – te i wiele innych operacji zawarte są jako domyślne funkcje. Może się jednak okazać, że to nie wystarczy. Wtedy z pomocą w Sparku przychodzi mechanizm UDF (User Defined Function).

Dzisiaj o tym jak krok po kroku stworzyć UDFa, który może być wyorzystany w wygodny sposób wszędzie w projekcie. Do dzieła! Całą serię “zrozumieć sparka” poznasz tutaj.

Co to jest UDF w Sparku?

Wczuj się w sytuację. Tworzysz joba sparkowego, który obsługuje dane firmowe dotyczące pracowników. Chcesz przyznawać premie tym najlepszym, najwierniejszym i najbardziej pracowitym i zyskownym. Po zebraniu potrzebnych informacji w jednym DataFrame, będziemy chcieli utworzyć kolumnę “bonus” która zawiera prostą informację: kwotę premii na koniec roku.

Aby to wyliczyć, został utworzony wcześniej wzór. Wykorzystując informacje dotyczące stanowiska, zyskowności projektu, oceny współpracowników, przepracowanych godzin i kilku innych rzeczy. Oczywiście nie ma możliwości, żeby wyliczyć to przy pomocy zwykłych funkcji. Z drugiej jednak strony, jeśli mielibyśmy jednostkowo wszystkie potrzebne dane – nie ma problemu, aby taki wzór zakodować.

Temu właśnie służą sparkowe UDFs, czyli User Defined Functions. To funkcje, których działanie sami możemy napisać i które pozwolą nam na modyfikację Datasetów w sposób znacznie bardziej customowy. Można je utworzyć na kilka różnych sposobów, ale ja dzisiaj chciałbym przedstawić Ci swój ulubiony.

A ulubiony dlatego, ponieważ:

  • Jest elegancko zorganizowany
  • Daje możliwość wielokrotnego wykorzystywania UDFa w całym projekcie, przy jednokrotnej inicjalizacji go.

Jak zbudować UDF w Apache Spark? Instrukcja krok po kroku.

Instrukcja tworzenia UDFa jest dość prosta i można ją streścić do 3 kroków:

  1. Stwórz klasę UDFa (rozszerzającą UDFX – np. UDF1, jeśli mamy do podania jedną kolumnę).
  2. Zarejestruj UDFa.
  3. Wywołaj UDFa podczas dodawania kolumny.

Scenariusz

Zobrazujmy to pewnym przykładem. Mamy do dyspozycji dataframe z danymi o ludziach. Chcemy sprawdzić zagrożenie chorobami na podstawie informacji o nich. Dla zobrazowania – poniżej wygenerowany przeze mnie Dataframe. Taki sobie prosty zestaw:-).

Efekt który chcemy osiągnąć? te same dane, ale z kolumną oznaczającą zagrożenie: 1- niskie, 2-wysokie, 3-bardzo wysokie. Oczywiście bardzo tu banalizujemy, w rzeczywistości to nie będzie takie proste!

Załóżmy jednak, że mamy zakodować następujący mechanizm: zbieramy punkty zagrożenia.

  1. Bycie palaczem daje +20 do zagrożenia,
  2. Wiek ma przedziały: do 30 lat (+0); do 60 lat (+10); do 80 lat (+20); powyżej (+40)
  3. Aktywności fizyczne: jeśli są, to każda z nich daje -10 (czyli zabiera 10 pkt).

Tak, wiem – to nawet nie banalne, a prostackie. Rozumiem, zebrałem już baty od siebie samego na etapie wymyślania tego wiekopomnego dzieła. Idźmy więc dalej! Grunt, żeby był tutaj jakiś dość skomplikowany mechanizm (w każdym razie bardziej skomplikowany od takiego który łatwo możemy “ograć” funkcjami sparkowymi).

Krok 1 – Stwórz klasę UDFa

Disclaimer: zakładam, że piszemy w Scali (w Javie robi się to bardzo podobnie).

Oczywiście można też zrobić samą metodę. Ba! Można to “opękać” lambdą. Jednak, jak już napisałem, ten sposób rodzi największy porządek i jest moim ulubionym;-). Utwórz najpierw pakiet który nazwiesz “transformations”“udfs” czy jakkolwiek będzie dla Ciebie wygodnie. Grunt żeby trzymać wszystkie te klasy w jednym miejscu;-).

Wewnątrz pakietu utwórz klasę (scalową) o nazwie HealtFhormulaUDF. Ponieważ będziemy przyjmowali 3 wartości wejściowe (będące wartościami kolumn smoker, age activities), rozszerzymy interfejs UDF3<T1, T2, T3, R>. Oznacza to, że musimy podczas definicji klasy podać 3 typy wartości wejściowych oraz jeden typ tego co będzie zwracane.

Następnie tworzymy metodę call(T1 t1, T2 t2, T3 t3), która będzie wykonywać realną robotę. To w niej zaimplementujemy nasz mechanizm. Musi ona zwracać ten sam typ, który podaliśmy na końcu deklaracji klasy oraz przyjmować argumenty, które odpowiadają typami temu, co podaliśmy na początku deklaracji. Gdy już to mamy, wewnątrz należy zaimplementować mechanizm, który na podstawie wartości wejściowych wyliczy nam nasze ryzyko zachorowania. Wiem, brzmi to wszystko odrobinę skomplikowanie, ale już pokazuję o co chodzi. Spójrz na skończony przykład poniżej.

package udfs

import org.apache.spark.sql.api.java.UDF3

class HealthFormulaUDF extends UDF3[String, Int, String, Int]{
  override def call(smoker: String, age: Int, activities: String): Int = {
    val activitiesInArray: Array[String] = activities.split(",")
    val agePoints: Int = ageCalculator(age)
    val smokePoints: Int = if(smoker.toLowerCase.equals("t")) 20 else 0
    val activitiesPoints = activitiesInArray.size * 10
    agePoints + smokePoints - activitiesPoints
  }

  def ageCalculator(age: Int): Int ={
   age match {
    case x if(x < 30) => 0
    case x if(x >= 30 && x < 60) => 10
    case x if(x >= 60 && x < 80) => 20
    case _ => 40
   }
  }
}

Dodałem sobie jeszcze pomocniczą funkcję “ageCalculator()”, żeby nie upychać wszystkiego w metodzie call().

Zarejestruj UDF

Drugi krok to rejestracja UDF. Robimy to, aby potem w każdym miejscu projektu móc wykorzystać utworzony przez nas mechanizm. Właśnie z tego powodu polecam dokonać rejestracji zaraz za inicjalizacją Spark Session, a nie gdzieś w środku programu. Pozwoli to nabrać pewności, że ktokolwiek nie będzie w przyszłości wykorzystywał tego konkretnego UDFa, zrobi to po rejestracji, a nie przed. Poza tym utrzymamy porządek – będzie jedno miejsce na rejestrowanie UDFów, nie zaś przypadkowo tam gdzie komuś akurat się zachciało.

Aby zarejestrować musimy najpierw zainicjalizować obiekt UDFa. Robimy to w najprostszy możliwy sposób. Następnie dokonujemy rejestracji poprzez funkcję sparkSession.udf.register(). Musimy tam przekazać 3 argumenty:

  • Nazwę UDFa, do której będziemy się odnosić potem, przy wywoływaniu
  • Obiekt UDFa
  • Typ danych, jaki zwraca konkretny UDF (w naszym przypadku Integer). UWAGA! Typy te nie są prostymi typami Scalowymi. To typy sparkowe, które pochodzą z klasy DataTypes.

Poniżej zamieszczam całość, razem z inicjalizacją sparkSession aby było wiadomo w którym momencie t uczynić;-).

val sparkSession = SparkSession.builder()
  .appName("spark3-test")
  .master("local")
  .getOrCreate()

val healthFormulaUDF: HealthFormulaUDF = new HealthFormulaUDF()
sparkSession.udf.register("healthFormulaUDF", healthFormulaUDF, DataTypes.IntegerType)

W tym momencie UDF jest już zarejestrowany i można go wywoływać gdziekolwiek w całym projekcie.

Wywołaj UDF

Ostatni krok to wywołanie UDFa. To będzie bardzo proste, ale musimy zaimportować callUDF z pakietu org.apache.spark.sql.functions (można też zaimportować wszystkie funkcje;-)).

Ponieważ chcemy utworzyć nową kolumnę z liczbą punktów, skorzystamy z funkcji withColumn(). Całość poniżej.

val peopleWithDiseasePoints: Dataset[Row] = peopleDF.withColumn("diseasePoints",
        callUDF("healthFormulaUDF", col("smoker"), col("age"), col("activities")))

Efekt jest jak poniżej. Im mniej punktów w “diseasePoints” tym lepiej. Cóż, chyba nie mam się czym przejmować, mam -20 pkt!

Podsumowanie

W tym artykule dowiedzieliśmy się czym w Apache Spark jest UDF. Zasadniczo całość można sprowadzić do 3 prostych kroków:

  1. Stwórz klasę UDFa (rozszerzającą UDFX – np. UDF1, jeśli mamy do podania jedną kolumnę).
  2. Zarejestruj UDFa.
  3. Wywołaj UDFa podczas dodawania kolumny.

To był materiał z serii “Zrozumieć Sparka”. Nie pierwszy i definitywnie nie ostatni! Jeśli jesteś wyjątkowo głodny/a Sparka – daj znać szefowi. Przekonaj go, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy dużo i intensywnie na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!

A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!

 

Loading
Zrozumieć sparka: uruchamianie joba na serwerze (spark submit)

Zrozumieć sparka: uruchamianie joba na serwerze (spark submit)

Przygotowywałem się ostatnio do kolejnego szkolenia ze Sparka. Chciałem dla swoich kursantów przygotować solidną rozpiskę tego co i jak robić w spark submit. Niemałe było moje zdziwienie, gdy dokumentacja ani stackoverflow nie odpowiedziały poważnie na moje prośby i błagania. Cóż… czas zatem samodzielnie połatać i przedstawić od podstaw kwestię spark submit – czyli tego jak uruchamiać nasze aplikacje (joby) sparkowe na serwerze. Zapraszam!

Spark Submit – podstawy

Zacznijmy od podstaw. Kiedy piszemy naszą aplikację Sparkową, robimy to w swoim IDE. Przychodzi jednak moment, w którym musimy wysłać ją na serwer. Pytanie – jak uruchamiać poprawnie joba sparkowego na klastrze, który zawiera wiele nodów?

Służy temu polecenie spark submit. Uruchamiając je musimy mieć jedynie dostęp do naszego pliku jar lub pliku pythonowego. Wszystko poniżej będę robił na plikach jar. Musimy to polecenie wykonywać oczywiście na naszym klastrze, gdzie zainstalowane są biblioteki sparkowe. Dobrze, żebyśmy mieli także dostęp do jakiegoś cluster managera – np. YARNa.

W ramach spark submita musimy określić kilka podstawowych rzeczy:

  1. master – czyli gdzie znajduje się master. W rzeczywistości wybieramy tutaj także nasz cluster manager (np. YARN).
  2. deploy-mode – client lub cluster. W jakim trybie chcemy uruchomić nasz job.
  3. class – od jakiej klasy chcemy wystartować
  4. plik wykonywalny – czyli np. jar.

To są podstawy które muszą być. To jednak dopiero początek prawdziwej zabawy z ustawieniami joba w spark submit. W samym środku możemy wskazać znacznie więcej ustawien, dzięki którym nadamy odpowiedni kształt naszej aplikacji (jak choćby kwestia pamięcie w driverze czy w executorach). Mały hint: chociaż można za każdym razem wpisywać wszystko od początku, polecam zapisać spark submit w jednym pliku, (np. run.sh) a następnie uruchamiać ten konkretny plik. To pozwoli nie tylko archiwizować, ale przede wszystkim wygodnie korzystać z bardzo długiego polecenia.

Poniżej pokazuję przykład spark submit do zastosowania.

spark-submit \
--class App \
--master yarn \
--deploy-mode "client" \
--driver-memory 5G \
--executor-memory 15G \
rdfsuperapp.jar

Jak dokładnie rozpisać ustawienia joba?

Tak jak napisałem, opcji w spark submit jest naprawdę, naprawdę wiele. Każda z nich znaczy co innego (choć występuje dużo podobieństw) i poniżej zbieram je wszystkie (lub większość) “do kupy”. Robię to m.in. dlatego, że chcąc znaleźć znaczenie wielu z nich, musiałem nurkować w różnych źródłach. Zatem nie dziękuj – korzystaj;-).

Poniżej najczęstsze ustawinia dla spark submit.

–executor-cores Liczba corów przydzielonych do każdego executora. UWAGA! Opcja dostępna tylko dla Spark Standalone oraz YARN
–driver-cores Liczba corów dla drivera. Dostępne tylko w trybie cluster i tylko dla Spark Standalone oraz YARN
–files Pliki które dołączamy do sparka z lokalnego systemu. Potem pliki te są przesłane każdemu worker nodowi, dzieki czemu możemy je wykorzystać w kodzie.
–verbose Pokazuje ustawienia przy starcie joba.
–conf ustawienia konfiguracyjne (więcej o tym w rozdziale poniżej)
–supervise Jeśli podane, restartuje drivera w przypadku awarii. UWAGA! Dostępne tylko dla Spark Standalone oraz Mesos.
–queue Określenie YARNowej kolejki na której ma być startowany. UWAGA! Dostępne tylko dla YARN.
–packages Lista oddzielonych przecinkami adresów mavena dla plików jar, które chcemy przyłączyć do drivera lub executorów. Najpierw będzie przeszukiwał lokalne repo mavena, potem centralne oraz zdalne repozytoria podane w –repositories.Format: groupId:artifactId:version.
–exclude-pachages Lista dependencji, które mają być excludowane z listy –packages. Kolejne elementy oddzielone są przecinkiem i mają nastepujący format: groupId:artifactId.
–repositories Lista oddzielonych przecinkami repozytoriów do których uderzamy przy okazji ustawienia –packages.

Poza ustawieniami służącymi do odpowiedniego zasubmitowania joba, można przekazać także kilka kwestii konfiguracyjnych. Dodajemy je za pomocą ustawienia –conf w spark-submit. Następnie podajemy klucz-wartość. Może być podana bezpośrednio lub przy użyciu cudzysłowów, np:

--conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"

Jak uruchamiać Sparka za pomocą YARNa i bez?

Spark Submit służy nam do tego, żeby uruchamiać naszego joba na klastrze. Możemy do tego wykorzystać jakiś zewnętrzny cluster manager, lub zrobić to przy pomocy Spark Standalone. Spark Standalone to taki tryb Sparka, w którym jest on postawiony na klastrze jako realnie działający serwis (z procesami itd). Cluster managerem może być YARN ale może być to także Mesos, Kubernetes i inne.

Za każdym razem gdy chcemy użyć sparka, musimy wskazać mastera (poprzez wlaśnie ustrawienie master w spark submit). Jeśli korzystamy z YARNa, sprawa jest banalnie prosta – do ustawienia master przekazujemy po prostu yarn. Reszta jest zrobiona.

Rzecz wygląda ciut trudniej, gdy chcemy wskazać za pomocą innych cluster managerów.

  1. Spark Standalone: spark://<ip>:<port> np –master spark//207.184.161.138:7077
  2. Kubernetes: –master k8s://https://<k8s-apiserver-host>:<port>
  3. Mesos: –master mesos://<host>:<port> (port: 5050)
  4. YARN: –master yarn

W niektórych dystrybucjach należy dodać także parę rzeczy, jednak są to już dodatkowe operacje. Jeśli będzie chęć – z przyjemnościa pochylę się nad bardziej szczeółowym porównaniem wyżej wymienionych cluster managerów;-).

Głodny Sparka?

Mam nadzieję, że rozwiązałem Twój problem ze zrozumieniem Spark Submit oraz sensu poszczególnych właściwości. Jeśli jesteś wyjątkowo głodny/a Sparka – daj znać szefowi. Przekonaj go, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy dużo i intensywnie na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!

A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!

 

Loading
Zrozumieć Sparka: cache vs persist

Zrozumieć Sparka: cache vs persist

W naszej podróży po Sparku musiała przyjść pora na ten przystanek. “Cache vs persist” to absolutnie podstawowe pytanie na każdej rozmowie rekrutacyjnej ze Sparka. Co ważniejsze – to naprawdę przydatne narzędzie dla każdego inżyniera big data, który posługuje się tym frameworkiem! Dzięki niemu przyspieszysz swojego joba, zmniejszysz ryzyko nieoczekiwanego fuckupu. No i najważniejsze – podczas code review pokażesz, że znasz się na rzeczy i nie jesteś tu z przypadku. Żartuję oczywiście.

Nie traćmy więc czasu i sprawdźmy co to za dobrodziejstwo, owe osławione cache i persist. Całą serię “zrozumieć sparka” poznasz tutaj.

Krótka powtórka – akcje i transformacje

Zanim przejdziemy do sedna sprawy – najpierw krótkie przypomnienie jak działa Spark – w pigułce. Aby to zrozumieć, fundamentalne są trzy rzeczy:

  1. Spark działa na swoich własnych strukturach, “kolekcjach”. To przede wszystkim RDD, które przypominają “listy” lub “tablice” (nie w konkrecie, natomiast grunt że nie jest to kolekcja typu mapa), ale także Datasety i Dataframy (które są aliasem dla Dataset[Row]). Napisałem, że “przede wszystkim”, ponieważ pod spodem DSów i DFów leżą właśnie RDD. To na tych strukturach musimy pracować, żeby całość ładnie działała.
  2. Na tych strukturach możemy wykonywać dwa typy operacji:
    • transformacje – czyli operacje, które wykonują pewne obliczenia, natomiast nie są wykonywane w momencie ich wywołania w kodzie.
    • akcje – operacje, które zwykle są jakimś rodzajem operacji wyjścia”. I co istotne – w tym momencie wykonywane są wszystkie transformacje po kolei.
  3. Takie podejście lazy evaluation zastosowane jest, aby Spark mógł odpowiednio zoptymalizować te operacje. Spisuje je w drzewku DAG (Directed Acyclic Graph – graf skierowany, acykliczny).

Tutaj pojawia się właśnie podstawowy problem – może się okazać, że akcji będziemy mieli kilka. I za każdym razem Spark będzie musiał wykonywać te same transformacje. To oczywiście powoduje, że job staje się niewydajny, a moc obliczeniowa jest niepotrzebnie przepalana.

Cache i persist w Sparku

I właśnie tutaj wkracza cachowanie. Możemy na danym zbiorze danych wywołać .cache() lub .persist() aby zachować go w pamięci. Gdyby potem pojawiło się kilka akcji, Spark będzie pobierał dane z konkretnego punktu, a nie procesował je wszystkimi transformacjami od początku.

cache vs persist

Aby to zrobić, mamy do dyspozycji dwie funkcje: cache() oraz persist(). Cóż – prawdę mówiąc jest jeszcze trzecia (.checkpoint()) ale na ten moment ją zostawimy;-). Tak więc powtórzmy – cache i persist pozwalają “zatrzymać” dataset w kodzie. Od momentu wywołania na naszych danych (RDD, Dataset lub Dataframe) cache lub persist – wywołanie akcji nie będzie skutkowało ponownym przeliczaniem transformacji, które były napisane przed tym punktem.

Pora jednak wyjaśnić sobie różnicę między cache a persist. Otóż – mówiąc prosto – persist pozwala nam ustawić poziom na jakim chcemy przechowywać dane (storage level), natomiast cache ma domyślne wywołanie. Tak naprawdę pod spodem cache wywołuje… właśnie persist!

Jaki storage level ma cache? To zależy od tego czy wywoujemy go na RDD czy Dataset/Dataframe. Jeśli RDD – domyślnie mamy MEMORY_ONLY, natomiast w przypadku Datasetów – MEMORY_AND_DISK.

Kod cache i persist dla Dataset i Dataframe

Jak wygląda wywołanie w kodzie? Przede wszystkim, jeśli wywołujemy cache() na Dataframe, to jest to najzwyklejszy w świecie alias dla wywołania funkcji persist(). I nie zmieniło się to od dawna. Poniżej kod z “bebechów” Sparka 2.4.3.

/**
   * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
   *
   * @group basic
   * @since 1.6.0
   */
  def cache(): this.type = persist()

Jak wygląda natomiast persist()? W tym przypadku mamy do czynienia z dwoma funkcjami: pierwsza to funkcja, która przyjmuje argument typu StorageLevel, natomiast druga jest funkcją bez argumentów. Wywołuje ona funkcję cacheQuery() znajdującą się w CacheManager, która dokonuje całej “magii”. Jeśli nie podamy jej Storage Level, zostanie przypisany standardowy – czyli “MEMORY_AND_DISK”.

Poniżej implementacje obu funkcji persist().

def persist(): this.type = {
    sparkSession.sharedState.cacheManager.cacheQuery(this)
    this
  }

def persist(newLevel: StorageLevel): this.type = {
    sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
    this
  }

Kod cache i presist dla RDD

W kontekście RDD cache() także jest aliasem dla persist(). Znów mamy także do czynienia z dwiema funkcjami persist() – jedną bezargumentową, drugą przyjmującą argument typu StorageLevel. Tym razem jednak różnica jest taka, że standardowym, domyślnym poziomem jest “MEMORY_ONLY” (nie jak w przypadku datasetów “MEMORY_AND_DISK”).

Można nawet dostrzec “niezwykle elegancki” komentarz, który pokusił się zostawić w samym środku funkcji jeden z inżynierów tworzących Sparka;-).

def persist(newLevel: StorageLevel): this.type = {
    if (isLocallyCheckpointed) {
      // This means the user previously called localCheckpoint(), which should have already
      // marked this RDD for persisting. Here we should override the old storage level with
      // one that is explicitly requested by the user (after adapting it to use disk).
      persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
    } else {
      persist(newLevel, allowOverride = false)
    }
  }

Storage levels

Z implementacji funkcji persist() wiemy, że kluczową sprawą jest Storage Level – czyli poziom na jakim chcemy przechowywać nasze dane. Oto jakie poziomy możemy wybrać:

  1. MEMORY_ONLY – przechowuje RDD lub Datasety jako deserializowane obiekty w pamięci JVM. Jeśli nie ma wystarczająco dużo dostępnej pamięci, niektóre partycje nie zostaną zapisane i zostaną potem ponownie obliczone w razie potrzeby.
  2. MEMORY_ONLY_2 – To samo co MEMORY_ONLY, natomiast każda partycja jest replikowana na dwa nody.
  3. MEMORY_ONLY_SER – to samo co MEMORY_ONLY, natomiast przechowuje dane jako serializowane obiekty w pamięci JVM. Potrzebuje mniej pamięci niż [1].
  4. MEMORY_ONLY_SER_2 – analogicznie.
  5. MEMORY_AND_DISK – w tym przypadku dane są przechowywane jako deserializowane obiekty w pamięci JVM. W tym przypadku jednak, w przeciwieństwie do [1], jeśli nie wystarczy miejsca, nadmiarowe partycje są przechowywane na dysku. To prowadzi do mniejszej wydajności (wolniejszy proces), ponieważ angażujemy tu operacje I/O.
  6. Analogicznie do 1-4 mamy także MEMORY_AND_DISK_SER, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2.
  7. DISK_ONLY – w tym przypadku dane są przechowywane jedynie na dysku. Oczywiście najwolniejszy wariant.
  8. DISK_ONLY_2 – analogicznie, tworzone są replikacje.

Zostań prawdziwym sparkowym wojownikiem

Jeśli dotarłeś do tego punktu, to prawdopodobnie jesteś naprawdę zainteresowany rozwojem w Sparku. Trzeba uczciwie powiedzieć, że to złożona, duża technologia. Jeśli chcesz zgłębić ją bardziej – przekonaj swojego szefa, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy dużo i intensywnie na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!

A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!

 

Loading
Zrozumieć Sparka: czym różnią się podobne mechanizmy? Distinct vs dropDuplicates i inne.

Zrozumieć Sparka: czym różnią się podobne mechanizmy? Distinct vs dropDuplicates i inne.

Pracując ze sparkiem bardzo często spotykamy mechanizmy, które budzą naszą konsternację z powodu bardzo dużego podobieństwa. Czy każde z tych mechanizmów ma inne działanie? Jeśli tak, to jakie są praktyczne różnice? Postanowiłem zanurkować nieco w kod i sprawdzić najbardziej popularne funkcje. Zapraszam!

Aha… to oczywiście artykuł w serii “Zrozumieć Sparka”. Poprzednio omawiałem joiny – zainteresowanych zapraszam tutaj.

Porządkujemy – czyli orderBy vs sort

Pierwszy mechanizm, który bierzemy na tapet to sortowanie. Spark daje nam przynajmniej dwie funkcje, które spełniają to zadanie: orderBy() oraz sort(). Czym się różnią?

Tutaj sprawa jest banalnie prosta – orderBy() to po prostu alias dla sort(). Mówi nam o tym poniższy fragment kodu:

/**
   * Returns a new Dataset sorted by the given expressions.
   * This is an alias of the `sort` function.
   *
   * @group typedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs : _*)

Mamy dwie funkcje orderBy() – obie wywołują sort().

Zmieniamy nazwy kolumn: withColumnRenamed vs alias

Załóżmy, że mamy dataframe i nie pasuje nam nazwa jednej z kolumn. W takiej sytuacji mamy do dyspozycji dwie – cztery metody: withColumnRenamed(), alias(), as() oraz name(). Zajmijmy się najpierw różnicami między withColumnRenamed oraz alias.

  1. Zwracany typ:
    • alias() to funkcja zwracająca typ Column,
    • withColumnRenamed() zwraca Dataset[Row] (czyli Dataframe).
  2. Co dzieje się w środku:
    • alias() jest jedynie… aliasem dla funkcji name(). Nie robi nic innego. O tym co robi name() rozdział niżej.
    • withColumnRenamed() ma logikę, która zmienia schemat. Warto dodać, że sama zmiana nazwy kolumny dzieje się poprzez wykorzystanie funkcji as(). To – zdradzę przedwcześnie – jest alias dla aliasu. A alias jest aliasem dla name(). WOW! No nic… poniżej podrzucam jak zbudowana jest funkcja withColumnRenamed():

      def withColumnRenamed(existingName: String, newName: String): DataFrame = {
          val resolver = sparkSession.sessionState.analyzer.resolver
          val output = queryExecution.analyzed.output
          val shouldRename = output.exists(f => resolver(f.name, existingName))
          if (shouldRename) {
            val columns = output.map { col =>
              if (resolver(col.name, existingName)) {
                Column(col).as(newName)
              } else {
                Column(col)
              }
            }
            select(columns : _*)
          } else {
            toDF()
          }
        }
  3. Kiedy stosować:
    • alias() stosujemy wtedy, gdy możemy odwołać się do konkretnej kolumny – klasycznym przykładem jest zastosowanie funkcji select(). Tak jak poniżej:
      peopleDF.select(col("name").alias("firstName"))
    • withColumnRenamed() wywołujemy na Dataframe. W efekcie dostajemy nowy Dataframe ze zmienioną nazwą kolumny. Wygląda to tak jak poniżej:
      peopleDF.withColumnRenamed("name", "firstName")

 

Zmienianie nazw ciąg dalszy – as vs alias vs name

Skoro już znamy różnice między withColumnRenamed oraz alias, warto przejść do kolejnych meandrów sparkowego labiryntu. Tym razem poszukajmy czym różnią się od siebie trzy funkcje: as(), alias() oraz name().

Otóż, tym razem sprawa jest prostsza. Albo i bardziej zaskakująca, sam(/a) już musisz to rozstrzygnąć. Otóż – nie ma różnic. A konkretniej as() jest aliasem dla name(), tak samo jak alias() jest aliasem dla name().

Funkcja name() natomiast jest dość prostym mechanizmem. Jeśli obecna kolumna ma jakieś metadane powiązane z nią, zostaną propagowane na nową kolumnę. W  implementacji użyty jest Alias – warto jednak pamiętać, że nie chodzi tu o funkcję alias(), a o case classę Alias. Implementacja funkcji name() poniżej:

def name(alias: String): Column = withExpr {
    expr match {
      case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
      case other => Alias(other, alias)()
    }
  }

Usuwamy duplikaty w Sparku – distinct vs dropDuplicates

Prędzej czy później przychodzi taki moment, że w efekcie różnych transformacji dostajemy identyczne obiekty w naszych dataframach. Jest to szczególnie uciążliwe, gdy musimy pracować na unikatowych danych. Poza tym jednak najzwyczajniej w świecie niepotrzebnie zajmują one miejsce. Aby tego uniknąć, usuwamy duplikaty. Tylko jak to zrobić w Sparku? Istnieją dwa sposoby – distinct() oraz dropDuplicates().

Distinct jest najprostszą formą usuwania duplikatów. Tutaj sprawa jest prosta – na dataframe wywołujemy funkcję distinct(), po czym Spark wyszukuje rekordy, które w całości się pokrywają i zostawia tylko jeden z nich. Tak więc w tym przypadku wszystkie kolumny muszą się pokrywać, aby rekord został uznany za duplikat.

Implementacja:

sampleDF.distinct()

Sprawa ma się nieco inaczej, jeśli weźmiemy na warsztat funkcję dropDuplicates(). W tym przypadku możemy wybrać które kolumny bierzemy pod uwagę. W takiej sytuacji “duplikatem” może być np. osoba o tym samym imieniu i nazwisku, ale z innym PESELem.

Implementacja z przykładowymi kolumnami:

sampleDF.dropDuplicates("name", "age")

Co ważne – po takim wywołaniu, zwrócony dataframe oczywiście będzie zawierał wszystkie kolumny, nie tylko wyszczególnione w “dropDuplicates()”.

Tak więc, podsumowując – różnica polega na tym, że distinct() bierze wszystkie kolumny, zaś dropDuplicates() pozwala nam wybrać o które kolumny chodzi.

Explode vs ExplodeOuter

Niekiedy posługując się dataframe, stykamy się z kolumną, która zawiera listę – np. listę imion, liczb itd. Czasami w takiej sytuacji potrzebujemy, żeby te dane znalazły się w osobnych wierszach. Aby to zrobić, używamy funkcji explode – a właściwie jednej z dwóch “eksplodujących” funkcji.

Różnica jest bardzo prosta: explode() pominie nulle, natomiast explode_outer() rozbuduje naszą strukturę także o nulle.

Tak więc, jeśli chcemy rozbić nasze dane pomijając pry tym wartości typu null – wybierzemy explode().

Podsumowanie

Mam nadzieję, że zestawienie powyższych metod rozwieje różne wątpliwości, które mogą niekiedy przychodzić. Spark to złożona, duża technologia. Jeśli chcesz zgłębić ją bardziej – przekonaj swojego szefa, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!

A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter. Koniecznie, zrób to!

 

Loading
Zrozumieć Sparka: joiny

Zrozumieć Sparka: joiny

Spark to rozległa i złożona technologia. Zrozumienie go wymaga zarówno silnych podstaw teoretycznych, jak i konkretnej, mozolnej praktyki. W serii “Zrozumieć Sparka” postaram się dogłębnie przybliżyć różne aspekty tego zacnego frameworku do przetwarzania danych.

Zaczynamy “z grubej rury” od czegoś, co wycisnęło niejedną łzę u Inżynierów Big Data. Chodzi oczywiście o joiny – mechanizm znany także z tradycyjnego SQL. Z pewnością pojawią się na ten temat także artykuły zaawansowane, dlatego dzisiaj zaczniemy dość lekko – od krótkiego omówienia typów joinów. Wiem, wiem – niby temat znany. W praktyce jednak czasem niedopowiedziany. Niech więc zostanie rozwiązany od podstaw;-).

Czym są joiny w Sparku?

Joiny to mechanizm znany dobrze ludziom pracującym z relacyjnymi bazami danych. Polega na łączeniu tabel. Wyobraź sobie, że masz tabelę, w której są zebrane dane o pracownikach – pesele, imiona, nazwiska, wykształcenie… Zaraz obok możesz zobaczyć tabelkę, w której widać dane dotyczące stanowisk – nazwy stanowisk, pensje, nazwa departamentów w firmie, pesele itd.

Jeśli chcemy pracować na danych dotyczących spraw stanowiskowych w kontekście ludzi – musimy te dwie tabele połączyć. I właśnie temu służą joiny;-). Problematyka w teorii dość prosta, w praktyce ma kilka zagwozdek o których warto wspomnieć. Przede wszystkim właśnie ta podstawowa – czyli różnice w joinach.

Typy joinów w Sparku

Chcąc połączyć tabele, mamy do dyspozycji kilka typów takiego mechanizmu. Podstawowe to inner join, left (outer) join, right (outer) join, full outer join oraz cross join. Dodatkowo możemy w sparku skorzystać z jeszcze dwóch: semi joina oraz anti joina. Omówienie każdego z nich będzie prostsze, jeśli spojrzymy najpierw na ten popularny w internecie diagram.

Wyjaśnijmy czym są owe kółeczka oznaczone jako A oraz B. Jak już napisałem, w tradycyjnych bazach danych będą to tabele. W Sparku będą to Dataframy lub Datasety. Skupimy się tutaj na Dataframach (alias dla Dataset[Row]), gdyż jest to struktura tabelaryczna, stricte przygotowana właśnie pod pracę na tabelach.

Na powyższym obrazku mamy zawsze dwie strony – lewą (A) i prawą (B). Dla wyjaśnienia: to który dataframe jest po lewej a który po prawej to rzecz czysto umowna, w zależności od tego po której stronie jaki dataframe umieścimy w kodzie. Podstawowy schemat tworzenia joina wygląda bowiem następująco:

firstDF.join(secondDF, columns, "joinType")

tak więc w takiej sytuacji “firstDF” będzie “lewym” dataframe, a “secondDF” będzie “prawy”.

Zanim przejdziemy do omówienia poszczególnych typów, zachęcam do przyjrzenia się powyższemu obrazkowi. Jeśli natomiast zapamiętać będzie ciężko, być może poniższe dzieło odrobinę pomoże:

 

W przykładach będę posługiwał się następującymi strukturami:

  1. Dataframe peopleDF zawiera id, firstName, lastName i age.
  2. Dataframe jobsDF zawiera id oraz job – czyli nazwę zawodu (np. “teacher”).

Wyglądają mniej więcej tak:

 

 

Inner Join

Zaczynamy od najprostszego typu, będącego jednocześnie domyślnym podczas łączenia tabel w Sparku. Innej join doprowadza do połączenia tych rekordów, które są wspólne – czyli wtedy, gdy warunek zostanie spełniony.

Ponieważ w Sparku inner join jest domyślny, nie musimy go określać podczas łączenia tabel. Obie poniższe implementacje są poprawne:

val innerJoin: Dataset[Row] = peopleDF.join(jobsDF, "id")
val innerJoin2: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "inner")

Generują one następujący efekt:

Left (outer) join

Left join, czy też posługując się pełną nazwą – left outer join, to mechanizm, który łączy obie tabele po konkretnych kolumnach, natomiast zapewnia, że wszystko z lewej strony będzie miało miejsce w finalnej tabeli. Jeśli więc nie uda się powiązać (“zmatchować”) rekordów, kolumny z “prawej strony” będą wypełnione nullami.

Implementacja wygląda następująco:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "left_outer")

I daje efekt w postaci takiej tabelki:

Zwróć uwagę na to że rekordy w peopleDF kończą się na id 18, natomiast tabela job posiada id od 1 do 13, a potem 19 i 20. Na tym przykładzie widać, że część nie powiązana została wypełniona nullami.

Right (outer) join

W przypadku right joina sprawa jest prosta – to po prostu “lustrzane odbicie” left joina. Łączy on więc obie tabele po konkretnych kolumnach, natomiast zapewnia, że wszystko z prawej strony znajdzie swoje miejsce w finalnej tabeli. Jeśli więc któreś rekordy z prawej tabeli nie będą miały swojego odpowiednika (match nie zostanie zrealizowany/“nie zmatchuje się”), to w tabeli finalnej kolumny z lewej w tych rekordach będą nullami.

Implementacja:

val rightJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "right_outer")

Nawet jeśli powyższy opis brzmi jak skomplikowany, poniższy efekt powinien wszystko rozjaśnić:

Full outer  join

Full outer join jest przeciwieństwem inner joina – bierze wszystko. Tak więc można powiedzieć, że jest sklejeniem left i right joinów wraz z usunięciem duplikatów.

Implementacja:

val fullJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "full")

Efekt:

Cross join (cartesian/iloczyn kartezjański)

Cross join nazywany jest także iloczynem kartezjańskim. Pochodzi to od matematycznego mechanizmu operacji na macierzach – właśnie iloczynu kartezjańskiego. Dla ciekawskich, z matematyczną definicją oraz kilkoma ćwiczeniami można zapoznać się tutaj.

Dla nas jednak chyba łatwiejsza będzie “definicja zbanalizowana by Maro” – czyli proste “łączymy każdy rekord z każdym”W związku z tym, że każdy z każdym, nie ma tu potrzeby (ani miejsca) by wpisać kolumnę, po której łączymy.

Implementacja:

val crossJoin: Dataset[Row] = peopleDF.crossJoin(jobsDF)

Jak widać, efekt jest znacznie bardziej okazały, niż przy pozostałych typach łączeń. Przedstawiam tylko część, natomiast w tym przypadku rekordów jest aż 270:

Semi join

Pozostały jeszcze dwa typy joinów, zwykle pomijane. Chociaż są znacznie rzadziej spotykane, warto moim zdaniem znać oba, gdyż oba mogą okazać się naprawdę przydatne. Jeśli taka potrzeba zajdzie, po prostu zastosujemy przygotowaną już konstrukcję, zamiast robić “skok przez plecy” z selectem, równaniami, wherami i filtrami.

Pierwszym z nich jest Semi Join. To nic innego jak inner join, który… pomija kolumny z lewej strony. Tak więc po zmatchowaniu rekordów, brane są pod uwagę tylko kolumny z lewej strony.

Implementacja:

val semiJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "left_semi")

Efekt:

Anti Join

Ostatnim, najciekawszym moim zdaniem, joinem – jest anti join. Jest on pewnego rodzaju “dopełnieniem” semi-joina. Albo jego przeciwieństwem – zależy jak patrzeć. Chodzi w każdym razie  to, że chociaż kolumny są takie same – a więc jedynie z lewej strony – to… w finalnej tabeli znajdują się tylko rekordy, które nie zostały zmatchowane, nie da się ich powiązać między tabelami.

Implementacja:

val antiJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "left_anti")

Efekt:

Implementacja inaczej

Warto zauważyć jedną rzecz: wszędzie powyżej poszedłem na łatwiznę i łączyłem po takiej samej kolumnie. Może się jednak okazać, że nie będzie takich samych kolumn! Cóż wtedy zrobić? Opcji jest jak zawsze kilka. Przede wszystkim dwie (poniższe konstrukcje nie odpowiadają już schematom których trzymaliśmy się przez większość artykułu):

Opcja nr 1: zróbmy tak, żeby były takie same kolumny. Czyli po prostu zmieńmy nazwy kolumn, które mają być matchowane.

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF.withColumnRenamed("personId", "id"),
      Seq("id"), "left_outer")

Opcja nr 2: powiedzmy, które kolumny mają być matchowane. Tak, to zdecydowanie brzmi rozsądniej:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, peopleDF("id") === jobsDF("personId"), "left_outer")

To samo można także zrobić za pomocą następującej składni:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, peopleDF("id").equalTo(jobsDF("personId")), "left_outer")

Kolejną rzeczą jaką warto rozważyć jest łączenie po kilku tabelach. Możemy to zrobić bardzo prosto:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id", "pesel"), "left_outer")

Można także budować bardziej skomplikowane warunki joinów:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, peopleDF("id").equalTo(jobsDF("personId")).and(peopleDF("age").gt(jobsDF("minimumAge"))), "left_outer")

Niebezpieczeństwa

Skoro joiny są takie proste i przyjemne, to czy można ich używać bez ograniczeń i bez głębszej refleksji? Otóż… jest dokładnie odwrotnie. Napisałem na początku, że mechanizm ten wycisnął niejedną łzę u inżynierów korzystających ze sparka. Oto dlaczego:

Nie będziemy się tutaj rozwodzić nad tym co i jak w detalu. Sprawa jest prosta: join to uruchomienie shufflingu – czyli przerzucania danych między partycjami oraz nodami. To z kolei bardzo zasobożerna sprawa i warto jej unikać. Na ile oczywiście się da, bo najczęściej się po prostu nie da.

W kolejnych artykułach zagłębimy się w kwestie wydajnościowe właśnie. Na ten moment, z racji że są to podstawy, polecam przejrzeć kod pod kątem akcji, które wykonujemy po joinie. Być może warto po połączeniu tabel zrobić bowiem cache(), persist() lub checkpoint(). Gdybyśmy wywoływali dwukrotnie akcje, dwukrotnie będzie zrobiony także join – a po co, jak nie trzeba?;-).

Ciąg dalszy…

Mam szczerą nadzieję, że wiesz już teraz wszystko czego trzeba, aby zacząć przygodę z joinami w Sparku. Oczywiście w kolejnych artykułach zagłębimy się nieco w tym zagadnieniu – zostań z nami przez newsletter!

Jeśli czujesz, że Twoje wiedza na temat Sparka jest nieuporządkowana i frustruje Cię to, to mam dobrą wiadomość. Organizujemy szkolenia dla firm z zakresu… właśnie Apache Spark. Jeśli masz dobrego szefa, podeślij mu kontakt do nas – możemy pojawić się u Ciebie w firmie i pokazać wszystko od 0 do Spark Inżyniera;-). Strona ze szkoleniem dostępna jest tutaj.

Tymczasem zapraszam do naszego newslettera. Zostańmy w kontakcie i… razem budujmy naszą wspólną, polską społeczność Big Data!

 

Loading