Szukanie podobieństw w tekstach przy pomocy Spark ML – efekty

Szukanie podobieństw w tekstach przy pomocy Spark ML – efekty

Co ty na to, żeby zbudować system, dzięki któremu wyszukujemy podobne wypowiedzi polityków? Tak dla sprawdzenia – jeśli jeden coś powiedział, poszukamy czy jego oponenci nie mówili przypadkiem podobnie. Dzięki temu być może oczyścimy trochę debatę – świadomość, że nasi przedstawiciele nie różnią się aż tak bardzo, może być bardzo orzeźwiająca. Jednak taki mechanizm to dużo danych do przetworzenia i zinterpretowania. Dodatkowo tekstowych.

Dziś w artykule o podobnym problemie przy wykorzystaniu Apache Spark. Porozmawiamy więc o sztucznej inteligencji – a konkretniej machine learning, natural language processing (NLP) oraz text similarity. Wyjątkowo jednak nie w kontekście pythona, a właście Scali i Sparka.

Text Similarity (AI) w Apache Spark

Wróćmy do problemu podobieństw wypowiedzi polityków. Oczywiście musimy najpierw zebrać dane. Ile może ich być? Jeśli bazujemy na krótkich wypowiedziach – ogromne ilości. Wszystko zależy od tego jak bardzo chcemy się cofać w czasie i jak wiele osób wziąć pod lupę (Sejm? Senat? Rząd? Polityków lokalnych? A może zagranicznych?). Sumarycznie można się pokusić jednak o miliony, dziesiątki a nawet setki milionów krótkich wypowiedzi. A im bardziej w używaniu jest Twitter, tym więcej.

Pytanie, czy do tak dużych zbiorów można użyć bibliotek Pythonowych? Wiele z nich bazuje na jednej maszynie, bez możliwości naturalnego przetwarzania rozproszonego. Oczywiście nie wszystkie i z pewnością jest tam najmocniej rozwinięte środowisko do NLP. Na tym blogu skupimy się dziś jednak na mało popularnym pomyśle. Sprawdzimy na ile naprawdę poważnym rozwiązaniem może być Apache Spark w świecie machine learning. Dziś pokażę owoc eksperymentu nad przetwarzaniem tekstu w Apache Spark.

Po pierwsze: efekt

Zanim wskażę jakie techniki można zastosować, spójrzmy co udało się osiągnąć.

Zacznijmy od podstawowej rzeczy

  1. Bazujemy na zbiorze, który ma ~204 tysiące krótkich tekstów – konkretnie tweetów.
  2. Tweety dotyczą trzech dziedzin tematycznych:
    • COVID – znakomita większość (166543 – 81,7%)
    • Finanse – pewna część (28874 – 14,1%)
    • Grammy’s – margines (8490 – 4,2%)
  3. W ramach systemu przekazujemy tekst od użytkownika. W odpowiedzi dostajemy 5 najbardziej podobnych tweetów.

Efekty

Poniżej kilka efektów. Chcę zauważyć, że sporą rolę odgrywa tutaj kwestia nierówności zbiorów. Dane związane z ceremonią przyznania nagród Grammy’s są właściwie marginalne (nieco ponad 4%). Tweety COVIDowe zapełniają natomiast nasz zbiór w ponad 80%. Jest to istotne, gdyż przy sprawdzaniu efektywności najbardziej naturalnym odniesieniem jest zwykłe prawdopodobieństwo. W zbiorze 100 “najbardziej podobnych” tekstów (do jakiegokolwiek), ok 80 powinno być związanych z COVID-19, nieco ponad 10 to najpewniej finansowe, natomiast muzyczne będą w liczbie kilku sztuk.

Text Similarity w Apache Spark na przykładzie wywołania tweetów związanych z COVID-19

Fraza covidowa, najprostsza

Wyszukiwania zacznijmy od najprostszego podejścia: frazą wyszukiwaną niech będzie podobna do tej, o której wiemy, że istnieje w podanym zbiorze. Liczba w nawiasie to stopień podobieństwa – od -1 do 1 (gdzie 1 to identyczne).

Fraza: Praying for health and recovery of @ChouhanShivraj . #covid #covidPositive (zmiany są bardzo drobne).

Podobne wykryte frazy:

  1. Praying for good health and recovery of @ChouhanShivraj (0.9456217146059263)
  2. Prayers needed for our vascular surgeon fighting Covid @gpianomd #COVID19 #Prayers #frontlinedoctors (0.8043357071420172)
  3. Prayers for your good health and speedy recovery from #COVID19 @BSYBJP (0.801822609000082)
  4. Hon’ble @CMMadhyaPradesh Shri @ChouhanShivraj Ji tested #COVID19 positive. Praying for his speedy recovery. (0.7740378229093525)
  5. I pray for Former President Shri @CitiznMukherjee speedy recovery. Brain tumor wounds ji a lot, God may heal his p…  (0.7626450268959205)

Jak widać każda z tych fraz pochodzi z grupy COVIDowych. Dodatkowo dotyczy pragnień szybkiego powrotu do zdrowia oraz modlitwy za cierpiących.

Fraza finansowa, trudniejsza

Przejdźmy do czegoś odrobinę trudniejszego – sprawdźmy coś finansowego. Niech będzie to fraza, którą absolutnie wymyślę od początku do końca.

Fraza: Ford’s earnings grow another quarter

Podobne wykryte frazy:

  1. XLE Goes Positive Brent UP Big & WTI UP Big Rally $XOM ExxonMobil Buy Now for the Rest of 2018 GASOLINE INVENTORIE… (0.7579525402567442)
  2. Morgan Stanley Begins Covering Murphy Oil $MUR Stock. “Shares to Hit $26.0” (0.7211353533183933)
  3. Seaport Global Securities Lowers Cabot Oil & Gas Q2 2018 Earnings Estimates to $0.15 EPS (Previously $0.17).… (0.7211353533183933)
  4. William E. Ford Purchases 1000 Shares of BlackRock Inc. $BLK Stock (0.7195004202231048)
  5. Anadarko Petroleum Is On A Buyback Binge $APC (0.7187907206133348)

W tym przypadku podobieństwa są znacznie mniejsze. Warto zauważyć jednak dwie rzeczy: Po pierwsze – system wskazuje, że podobieństwa są mniejsze (0.76 to dużo mniej niż 0.95). Prawdopodobnie bardzo podobne po prostu więc nie istnieją. Druga rzecz – wszystkie podobne tweety pochodzą ze zbioru finansowych! Zbioru, który stanowi ok 14% całości danych. Pozwala to nabrać przekonania, że odpowiedzi nie są przypadkowe.

Fraza muzyczna, najtrudniejsza

Na koniec – najtrudniejsze zadanie ze wszystkich. Wybierzemy zdanie, które teoretycznie powinno pasować do zbioru będącego marginesem całości – do Grammy’s. Dodatkowo zdanie to wymyślę całkowicie. A niech tam – niech dotyczy najwspanialszej piosenkarki w dziejach! Oczywiście moim, zupełnie subiektywnym i amatorskim okiem;-).

Fraza: Amy Lee is the greatest singer of all time!

  1. Christina Aguilera & Norah Jones were the only multiple recipients for ‘Best Female Pop Vocal Performance’ in the 2000s. (0.7306395709876714)
  2. @billboardcharts @justinbieber @billieeilish @oliviarodrigo @taylorswift13 @kanyewest Taylor the only real queen let’s be honest she deserves the Grammy for evermore but the #GRAMMYs wont give her. (0.7019156211438091)
  3. #GRAMMYs keep doing dirty to Lana Del Rey? Even though her talent is among the best this world has to offer (0.6868772967554752)
  4. Kylie Minogue deserved at least one nomination for Magic #GRAMMYs (0.6820704278110573)
  5. The answer will always be YES. #GRAMMYs #TwitterSpaces #SmallBusinesses #BlackOwned #adele #bts (0.6816903814884498)

I to właśnie te wyniki, przyznam, najmocniej wprowadziły mnie w euforię i ekscytację, gdy je zobaczyłem. I to nie tylko z powodu mojego niekłamanego uczucia do wokalistki Evanescence. Gdy spojrzymy na to “zdrowym, chłopskim okiem”, nie ma tutaj słowa o Grammy’s. Nie ma też szczególnego podobieństwa w słowach między pięcioma wymienionymi tweetami. Jest za to… kontekst. Jest podobieństwo tematyczne, jest znaczenie sensu.

A to wszystko naprawdę niedużym kosztem:-).

Text Similarity w Apache Spark na przykładzie wywołania tweetów muzycznych (z Grammy’s)

Apache Spark a text similarity – wykorzystane techniki

No dobrze, ale przejdźmy do konkretów – co należy zrobić, aby dostać takie wyniki? Tu zaproszę od razu do następnego artykułu, w którym pokażę dokładniej jak to zrobić. Dzisiejszy potraktujmy jako zajawkę. Żeby nie przeoczyć następnego – zapisz się na newsletter;-).

 

Loading

Po dość długich staraniach i wyeliminowaniu kilku ewidentnie beznadziejnych podejść, za sprawą kolegi Adama (za co ukłony w jego stronę) zacząłem czytać o embeddingach USE (Universal Sentence Encoder). Od razu powiem, że moją podstawową działką jest Big Data rozumiane jako składowanie i przetwarzanie danych. Sztuczna inteligencja to dopiero temat, który badam i definitywnie nie jestem w nim specem (choć parę kursów w tym kierunku ukończyłem i coś niecoś działałem). Liczę jednak, że obcowanie ze specami w tej działce (takimi jak właśnie Adam;-)) pomoże w eksploracji tego ciekawego gruntu.

Wróćmy jednak do USE. To była istne objawienie. Warto zaznaczyć, dla tych którzy nie do końca są zaznajomieni z tematyką machine learning, że komputer oczywiście tak naprawdę nie rozumie tekstu. Żeby mógł wyszukiwać podobieństwa, dzielić na grupy, uczyć się klas itd – potrzebne są liczby. Stąd wziął się pomysł sprowadzania tekstów do wektorów i różnego rodzaju wectorizerów – mechanizmów, które sprowadzają tekst do wektorów. Wektorów, czyli tablic jednowymiarowych o określonej długości (tu można się pomylić. Wielowymiarowe wektory dalej są jednowymiarowymi tablicami). Nieco bardziej rozbudowaną wersją są embeddingi, które mogą przechowywać w sobie wektory, natomiast które posiadają dodatkowe cechy pomocne. Jedną z nich (kluczową?) jest to, że słowa które chcemy zamienić na liczby, nabierają kontekstu. Pomaga to szczególnie mocno w niektórych przypadkach – na przykład naszych tweetów, które zawierają krótkie, czasami niezbyt treściwe przekazy. Jeśli będziemy je porównywali w prosty, czysto “statystyczny” sposób zestawiając wyrazy, nie uzyskamy odpowiedniego efektu.

Machine Learning w Apache Spark

Aby korzystać z dobrodziejstw ludzkości w zakresie machine learning, w tym text similarity w Apache Spark, należy wykorzystać bibliotekę Spark MlLib (w repozytorium Mavena dostępna tutaj). Tylko tutaj UWAGA! Wewnątrz biblioteki MlLib dostępne są dwa “rozgałęzienia”:

  1. Spark MlLib – starsza (choć wciąż utrzymywana) wersja, operująca bezpośrednio na RDD.
  2. Spark ML – nowocześniejsza część biblioteki. Możemy tutaj pisać operując na Datasetach i Dataframe’ach.

Wracając do technik – jednym z embeddingów jest właśnie USE. Jest on znacznie znacznie lepszym rozwiązaniem niż nieco podstarzały word2Vec, o innych, prostszych (np. Count Vectorizer) nie wspominając. Problem? Ano właśnie – nie wchodzi on w skład podstawowej biblioteki MlLib. Nie jest to jednak problem nie do przeskoczenia. Istnieje w Internecie gotowy zestaw rozwiązań, które poszerzają podstawowe biblioteki Sparkowe. Mam tu na myśli John Snow Labs. Udostępniają oni naprawdę imponująca liczbę algorytmów, które po prostu możemy wykorzystać – i to z całkiem niezłym skutkiem. Omówienie poszczególnych algorytmów można znaleźć tutaj. Samą bibliotekę do przetwarzania tekstu, czyli Spark-NLP zaciągniemy bez problemu z głównego repozytorium Mavena. To dzięki niej możemy rozwiązać bardzo wiele problemów, m.in. text-similarity w Apache Spark;-)

Jak technicznie dokładnie to zrobić, pokażę w kolejnym artykule. Już teraz zapraszam do subskrybowania;-).

Cosine Similarity

Skoro tylko udało mi się już porządnie sprowadzić tekst do jakiś ludzkich kształtów (czyli liczb;-)), należało znaleźć najlepszy z nich. Najlepszy – czyli najbardziej podobny do takiego, który wprowadzę. Dość dużo spędziłem czasu na szukaniu różnych gotowych rozwiązań. W końcu zdecydowałem się na zastosowanie czystej matematyki. Mowa tu o cosine similarity. Nie mam pojęcia jak to się tłumaczy na polski, a “podobieństwo kosinusowe” mi po prostu nie brzmi (ani nie znalazłem żeby tak się mówiło).

Z grubsza sprawa jest dość prosta – chodzi o to, żeby znaleźć podobieństwo między dwoma (niezerowymi) wektorami osadzonymi na jakiejś płaszczyźnie. Nie jest to żadna technika rakietowa i nie dotyczy ani NLP, ani nawet machine learning. Dotyczy zwykłej, prostej, nudnej matmy i można się zapoznać nawet na wikipedii.

Wzór na cosine similarity wygląda następująco:

{\displaystyle {\text{cosine similarity}}=S_{C}(A,B):=\cos(\theta )={\mathbf {A} \cdot \mathbf {B} \over \|\mathbf {A} \|\|\mathbf {B} \|}={\frac {\sum \limits _{i=1}^{n}{A_{i}B_{i}}}{{\sqrt {\sum \limits _{i=1}^{n}{A_{i}^{2}}}}{\sqrt {\sum \limits _{i=1}^{n}{B_{i}^{2}}}}}},}

Efekt jest prosty: wynik jest od -1 do 1. Im bliżej 1 tym bliższe są oba wektory. Problem? Oczywiście – w Sparku nie ma implementacji;-). Na szczęście jest to wzór na tyle prosty, że można go sobie zaimplementować samemu. Ja to zrobiłem w ramach UDF.

Podsumowanie

I to tyle! Tak więc można sprawę uprościć: zebrałem tweety, użyłem USE (od John Snow Labs) oraz cosine similarity. W efekcie dostałem naprawdę solidne wyniki podobieństwa. I to nie tylko jeśli chodzi o sam tekst, ale przede wszystkim jego znaczenie.

Już w najbliższym artykule pokażę jak dokładnie napisałem to w Sparku. Jeśli interesują Cię zagadnienia dotyczące Sparka, pamiętaj, że prowadzimy bardzo ciekawe szkolenia – od podstaw. Pracujemy z prawdziwymi danymi, na prawdziwych klastrach no i… cóż, uczymy się prawdziwego fachu;-). Jeśli interesuje Cię to – Zajrzyj tutaj!

Zostań z nami na dłużej. Razem budujmy polskie środowisko Big Data;-). Jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!

 

Loading
Inspiracja: prawdziwe datasety, które pomogą Ci w nauce Big Data

Inspiracja: prawdziwe datasety, które pomogą Ci w nauce Big Data

Któż z nas nie miał w szkole dosyć matematycznych zadań o “Ali Kasi i Małgosi, które dzieliły między sobą truskawki”? Albo na statystyce o obliczaniu prawdopodobieństwa stosunku “kul białych do kul czarnych które pozostaną w urnie po wyciągnięciu jednej z nich”? Niestety, nieżyciowe (czy gorzej – pseudożyciowe) przykłady zabijają piękno nauki. Nauki, która jest przecież wspaniałym narzędziem do poznawania i budowania świata.

Prawdziwe datasety do nauki Big Data – czemu warto?

Dokładnie tak samo jest w Big Data. Poznając technologie, często bazujemy na przykładach nudnych, oklepanych, o których wiemy, że nie sprawią nam żadnych niespodzianek. Są to “zbiory danych” które tworzymy sami. W locie, na potrzeby przykładu. Nierealne, w zbyt dużej liczbie potrafiące przyprawić o mdłości.

Oczywiście proste, jasne przykłady też są potrzebne! Sam je na szkoleniach stosuję. Warto jednak od samego początku obcować z prawdziwymi danymi. Choćby dlatego, że takie dane przeważnie nie są najpiękniejsze. Mają swoje wady, brudy, dziury. Mają więc wszystko to, co cechuje prawdziwe dane. Te, z którymi będziemy się zmagać w komercyjnych projektach. Dane, które zaskakują. Dane, które sprawiają problemy i zmuszają do wytężenia mózgownicy.

Poza tym jednak, są to dane, które najzwyczajniej w świecie są po prostu… ciekawe. Pracując z nimi możemy się czegoś dowiedzieć. Niekoniecznie musi nam się to przydać podczas najbliższej randki z Żoną czy w trakcie spotkania z kumplami w pubie. Wystarczy jednak, że cokolwiek o świecie dowiemy się dzięki naszej pracy z danymi. Satysfakcja gwarantowana. Podobnie zresztą jak to, że zaczną nam wpadać do głowy nowe pomysły, które pomogą nam w analizie danych.

Poniżej prezentuję listę kilku zestawów danych z których można skorzystać, które urozmaicą naszą naukę Big Data;-). Dla smaczku dodam jeszcze, ze w wielu przypadkach datasety te są świetnie znane moim kursantom. Wykorzystuję je  – m.in. szkoleniach ze Sparka – i sprawdzają się znakomicie.

Dane z Netflixa

Od przeglądania seriali Netflixa znacznie lepiej wejść na Bigdatowy szlak walki z potworami obliczeń i odszukać niespodzianki w danych, które na temat platformy znamy.

Kto nie korzystał z Netflixa? Ten czasoumilacz już dawno przestał być jedynie towarzyszem rozrywkowych wieczorów. Obecnie jest jednym z największych nośników i propagatorów kultury (co oczywiście ma swoje plusy i minusy). Czy nie byłoby fajnie popracować z danymi na temat jego filmów, reżyserów, dat i innych ciekawych rzeczy?

Źródło: Kaggle.

Pobieranie: netflix_titles.csv.

Wielkość: 3.4 MB.

Kolumny:

show_id
type
title
director
cast
country
date_added
release_year
rating
duration
listed_in
description

 

Przestępstwa ze zbiorów policji z Bostonu (crimes)

Jeśli kogoś nie rajcuje świat seriali, to może coś poważniejszego? Proponuję wcielić się w rolę urzędnika lub analityka kryminalnego. Zbadajmy, w jakim dystrykcie strzelaniny odgrywały największą rolę w poszczególnych latach. I nie tylko to, bo także całą masę innych rzeczy. Do zestawu danych dorzucony jest zbiór offense codes.

Źródło: Jak w poprzednim punkcie, Kaggle.

Pobieranie: crime oraz offense_codes.

Wielkość: 58 mb.

Kolumny:

incident_number
offense_code
offense_code_group
offense_description
district
reporting_area
shooting
occured_on_date
year
month
day_of_week
hour
ucr_part
street
lat
long
location

 

Użytkownicy telekomów (telecom users)

Być może przestraszyłeś/aś się nieco ponurych tematów, które podsunąłem wyżej. W takim razie mam coś bardzo przyziemnego. Czas na analizę użytkowników telekomów. Dataset znacznie mniejszy, natomiast wciąż ciekawy i można tu spędzić chwilę agregując i monitorując;-).

Źródło: Oczywiście niezawodny Kaggle.

Pobieranie: telecom_users

Wielkość: <1MB

Kolumny:

customerID
gender
SeniorCitizen
Partner
Dependents
tenure
PhoneService
MultipleLines
InternetService
OnlineSecurity
OnlineBackup
DeviceProtection
TechSupport
StreamingTV
StreamingMovies
Contract
PaperlessBilling
PaymentMethod
MonthlyCharges
TotalCharges
Churn

Tweety

Osobiście uważam, że Twitter to jedno z najlepszych źródeł danych do pracy z Big Data. Szczególnie, jeśli mówimy o zrobieniu większego projektu na samym początku drogi. Wynika to z faktu, że API (choć ma ograniczenia) pozwala w dłuższej perspektywie zgromadzić naprawdę duże ilości danych. Do tego są to dane które są dość dobrze ustrukturyzowane, ale nie aż tak jakbyśmy mieli je dostać w idealnie przygotowanej relacyjnej bazie danych. Poza tym prezentują realną wartość wyrażanych ludzkich emocji, wiedzy, przemyśleń. Jeśli chcesz zobaczyć mój system do analizy twittera, kliknij tutaj;-).

Tylko leniuch przy dzisiejszych możliwościach narzeka na brak solidnego materiału do pracy;-)

Dziś jednak nie o pełnym potencjale API Twitterowego, a o przykładowych zbiorach tweetów (statusów). Ja ostatnio na potrzeby swoich eksperymentów NLP pobrałem 3 zbiory danych: dotyczące COVID, dotyczące finansów oraz Grammy’s. Jak na przykładowe zbiory do ćwiczeń, dane są imponujące i zawierają ponad 100 000 tweetów.

Źródło: Kaggle.

Pobieranie: covid19_tweets, financial, GRAMMYs_tweets

Wielkość: Łącznie ~80 mb

Kolumn nie załączam z prostego powodu: w każdym z datasetów są nieco inne. Warto osobiście załadować (np. do Sparka) i popatrzeć.

Wiedźmińskie imiona

Na koniec załączam “dataset” który jest być może wątkiem humorystycznym bardziej niż realnymi danymi. Jeśli jednak człowiek kreatywny, to i z tym sobie poradzi;-). Poniżej do pobrania udostepniam listę ponad 100 imion z uniwersum Wiedźmina. Po prostu imiona, nic więcej. Można jednak dorobić sztuczne id, wylosować zawody lub upodobania i poprzypisywać do… no cóż, chociażby do tweetów z punktu wyżej.

Moim zdaniem grunt, żeby nauka była owocna, ale i dawała trochę radości i zabawy. A co jak co, ale akurat praca z danymi to może być zarówno koszmarnie nudny spektakl jak i najprawdziwsza zabawa:-).

Pobieranie: nazwy postaci z Wiedźmina.

TO TYLE. Mam nadzieję, że datasety które podrzucam przydadzą Ci się i nieco ubarwią naukę Big Data. Jeśli chcesz zostać w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!

 

Loading
HBase: jak zbudowany jest model danych?

HBase: jak zbudowany jest model danych?

Jest rok 2005 – inżynierowie Google publikują przełomowy dokument. “Big Table Paper” opisuje jak zbudowana powinna być baza danych, żeby mogła obsługiwać ogromne ilości danych. Z dokumentu tego natychmiast korzystają dwa ośrodki mające istotny wpływ na rozwój branży. Pierwszy z nich to NSA – amerykańska Agencja Bezpieczeństwa Narodowego, znana powszechnie z olbrzymiego systemu inwigilacji oraz poprzez postać Edwarda Snowdena. Drugi to fundacja Apache wraz ze swoim projektem Hadoop, który jest fundamentem współczesnego Big Data. W NSA powstaje Accumulo, w Apache HBase.

Ta ostatnia baza błyskawicznie zdobywa popularność i pozwala na przechowywanie potężnych ilości danych. Jak działa HBase i jego model? Jak wygląda struktura danych? W kolejnych artykułach weźmiemy pod lupę architekturę oraz różne HBasowe zagwozdki.

HBase – model danych

Zanim przejdziemy do architektury, warto poznać model jaki kryje się za danymi w HBase. Model ten jest bowiem z jednej strony niezbyt intuicyjny, z drugiej sam w sobie bardzo dużo mówi o tym jakie dane powinniśmy trzymać w bazie.

Rodzajów nierelacyjnych baz danych jest całkiem sporo. Gdy będziemy szukać informacji na temat HBase znajdziemy dwa opisy. Po pierwsze – że HBase to baza kolumnowa (column oriented). Po drugie – że to baza typu klucz-wartość (key-value store).

Ogólna budowa struktury HBase

Moim zdaniem znacznie bardziej fortunne byłoby stwierdzenie, że jest to baza zorientowana na column-familie (column familie oriented database) niż kolumnowa. Problem polega na tym, że coś takiego jak column familie oriented w powszechnych metodykach nie istnieje. Najmocniej przemawia jednak do mnie key-value store i to z dwóch powodów.

Po pierwsze – wynika to z Big Table Paper i tak właśnie przedstawia się największa alternatywa HBase, czyli Accumulo. Po drugie – ten model naprawdę ma w swojej strukturze klucz i wartość.

Jak to wygląda w praktyce? Zanim przejdziemy dalej, dwa podstawowe pojęcia:

  1. Namespace – czyli inaczej “baza danych”. Na tej samej instancji możemy mieć bazę związaną ze statusami z Twittera oraz osobną bazę na kwestie finansowe.
  2. Table – czyli swojska tabelka. Tabele są z grubsza tym czym tabelki w innych bazach, czyli  pewnym opisem zestawu danych. W “normalnych” bazach tabele mają zawsze kolumny. Tu także, jednak z pewną ważną modyfikacją…

Baza typu klucz-wartość (key-value store)

Zacznijmy od podstawowej rzeczy: wszystkie wiersze w tabeli zbudowane są na zasadzie klucz-wartość. Kluczem jest rowkey, czyli unikatowy w skali tabeli id. Wartością natomiast wszystkie dane zawarte w tym wierszu. Oddaje to dość prosty, poniższy rysunek.

HBase to baza typu klucz-wartość (key-value store).

Żeby zrozumieć dobrze na czym naprawdę polega struktura danych w HBase należy wziąć pod lupę owo “value”. Można spodziewać się, że albo siedzi tam jedna, konkretna wartość (np. liczba, tekst itd), albo że spotkamy tam kolumny. Otóż… pudło! Owszem, kolumny tam znajdziemy, ale niekoniecznie tak bezpośrednio.

W HBase kolumny pogrupowane są w “rodziny”, czyli column-families (cf). Dopiero pod cf znajdują się określone kolumny. I teraz uwaga! Znajdują się, jednak w żaden sposób nie są wymuszone, czy zdefiniowane w strukturze tabeli. Pojedynczy wiersz ma następującą strukturę.

Struktura wierszy tabeli w Apache HBase

Kolumny jednak dodawane są podczas… no właśnie, podczas dodawania konkretnego wiersza. Na etapie schematu (schemy) wymuszone mamy jedynie rowkey oraz column families. Efekt jest taki, że każdy wiersz może mieć inne kolumny (choć muszą mieścić się w ramach tych samych column families). Taka struktura ma oczywiście swoje wady – a konkretnie potencjalny bałagan. Należy bardzo uważać podczas pracy na takich danych, aby nie starać się “na siłę” odwołać do kolumn których może nei być.

Z drugiej strony ma to jednak daleko idące zalety, szczególnie w świecie Big Data. Można wykorzystać HBase jako zbiornik na dane, które są delikatnie ustrukturyzowane. Dane, które mają bardzo ogólną strukturę, a w środku mogą się nieco różnić. To pozwala umieszczać na przykład dane w pierwszym kroku ETL (extract, zaraz po zaciągnięciu ze źródła, z delikatnym “retuszem”).

Poznaj HBase dokładniej i zacznij z niego korzystać

To wszystko! Dzisiejszy artykuł bardzo krótki, jedynie wprowadzający do tematyki HBase. Tak naprawdę stanowi on niezbędną podstawę pod kolejny, na temat architektury HBase. Koniecznie zapisz się na nasz newsletter, aby nie przegapić;-).

 

Loading

Jeśli chcesz poznać HBase od podstaw, pod okiem specjalisty – zapraszam na nasze szkolenie. Nie tylko krok po kroku w usystematyzowany sposób poznasz jak obsługiwać HBase. Zrobisz także dużo ciekawych ćwiczeń na prawdziwej infrastrukturze Big Data, co znacząco przybliży Cię do świata realnego. Przekonaj swojego szefa i rozpocznij swoją przygodę z HBase!

Na dziś to tyle – jeszcze raz zachęcam do newslettera i powodzenia z HBase!

Zrozumieć sparka: uruchamianie joba na serwerze (spark submit)

Zrozumieć sparka: uruchamianie joba na serwerze (spark submit)

Przygotowywałem się ostatnio do kolejnego szkolenia ze Sparka. Chciałem dla swoich kursantów przygotować solidną rozpiskę tego co i jak robić w spark submit. Niemałe było moje zdziwienie, gdy dokumentacja ani stackoverflow nie odpowiedziały poważnie na moje prośby i błagania. Cóż… czas zatem samodzielnie połatać i przedstawić od podstaw kwestię spark submit – czyli tego jak uruchamiać nasze aplikacje (joby) sparkowe na serwerze. Zapraszam!

Spark Submit – podstawy

Zacznijmy od podstaw. Kiedy piszemy naszą aplikację Sparkową, robimy to w swoim IDE. Przychodzi jednak moment, w którym musimy wysłać ją na serwer. Pytanie – jak uruchamiać poprawnie joba sparkowego na klastrze, który zawiera wiele nodów?

Służy temu polecenie spark submit. Uruchamiając je musimy mieć jedynie dostęp do naszego pliku jar lub pliku pythonowego. Wszystko poniżej będę robił na plikach jar. Musimy to polecenie wykonywać oczywiście na naszym klastrze, gdzie zainstalowane są biblioteki sparkowe. Dobrze, żebyśmy mieli także dostęp do jakiegoś cluster managera – np. YARNa.

W ramach spark submita musimy określić kilka podstawowych rzeczy:

  1. master – czyli gdzie znajduje się master. W rzeczywistości wybieramy tutaj także nasz cluster manager (np. YARN).
  2. deploy-mode – client lub cluster. W jakim trybie chcemy uruchomić nasz job.
  3. class – od jakiej klasy chcemy wystartować
  4. plik wykonywalny – czyli np. jar.

To są podstawy które muszą być. To jednak dopiero początek prawdziwej zabawy z ustawieniami joba w spark submit. W samym środku możemy wskazać znacznie więcej ustawien, dzięki którym nadamy odpowiedni kształt naszej aplikacji (jak choćby kwestia pamięcie w driverze czy w executorach). Mały hint: chociaż można za każdym razem wpisywać wszystko od początku, polecam zapisać spark submit w jednym pliku, (np. run.sh) a następnie uruchamiać ten konkretny plik. To pozwoli nie tylko archiwizować, ale przede wszystkim wygodnie korzystać z bardzo długiego polecenia.

Poniżej pokazuję przykład spark submit do zastosowania.

spark-submit \
--class App \
--master yarn \
--deploy-mode "client" \
--driver-memory 5G \
--executor-memory 15G \
rdfsuperapp.jar

Jak dokładnie rozpisać ustawienia joba?

Tak jak napisałem, opcji w spark submit jest naprawdę, naprawdę wiele. Każda z nich znaczy co innego (choć występuje dużo podobieństw) i poniżej zbieram je wszystkie (lub większość) “do kupy”. Robię to m.in. dlatego, że chcąc znaleźć znaczenie wielu z nich, musiałem nurkować w różnych źródłach. Zatem nie dziękuj – korzystaj;-).

Poniżej najczęstsze ustawinia dla spark submit.

–executor-cores Liczba corów przydzielonych do każdego executora. UWAGA! Opcja dostępna tylko dla Spark Standalone oraz YARN
–driver-cores Liczba corów dla drivera. Dostępne tylko w trybie cluster i tylko dla Spark Standalone oraz YARN
–files Pliki które dołączamy do sparka z lokalnego systemu. Potem pliki te są przesłane każdemu worker nodowi, dzieki czemu możemy je wykorzystać w kodzie.
–verbose Pokazuje ustawienia przy starcie joba.
–conf ustawienia konfiguracyjne (więcej o tym w rozdziale poniżej)
–supervise Jeśli podane, restartuje drivera w przypadku awarii. UWAGA! Dostępne tylko dla Spark Standalone oraz Mesos.
–queue Określenie YARNowej kolejki na której ma być startowany. UWAGA! Dostępne tylko dla YARN.
–packages Lista oddzielonych przecinkami adresów mavena dla plików jar, które chcemy przyłączyć do drivera lub executorów. Najpierw będzie przeszukiwał lokalne repo mavena, potem centralne oraz zdalne repozytoria podane w –repositories.Format: groupId:artifactId:version.
–exclude-pachages Lista dependencji, które mają być excludowane z listy –packages. Kolejne elementy oddzielone są przecinkiem i mają nastepujący format: groupId:artifactId.
–repositories Lista oddzielonych przecinkami repozytoriów do których uderzamy przy okazji ustawienia –packages.

Poza ustawieniami służącymi do odpowiedniego zasubmitowania joba, można przekazać także kilka kwestii konfiguracyjnych. Dodajemy je za pomocą ustawienia –conf w spark-submit. Następnie podajemy klucz-wartość. Może być podana bezpośrednio lub przy użyciu cudzysłowów, np:

--conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"

Jak uruchamiać Sparka za pomocą YARNa i bez?

Spark Submit służy nam do tego, żeby uruchamiać naszego joba na klastrze. Możemy do tego wykorzystać jakiś zewnętrzny cluster manager, lub zrobić to przy pomocy Spark Standalone. Spark Standalone to taki tryb Sparka, w którym jest on postawiony na klastrze jako realnie działający serwis (z procesami itd). Cluster managerem może być YARN ale może być to także Mesos, Kubernetes i inne.

Za każdym razem gdy chcemy użyć sparka, musimy wskazać mastera (poprzez wlaśnie ustrawienie master w spark submit). Jeśli korzystamy z YARNa, sprawa jest banalnie prosta – do ustawienia master przekazujemy po prostu yarn. Reszta jest zrobiona.

Rzecz wygląda ciut trudniej, gdy chcemy wskazać za pomocą innych cluster managerów.

  1. Spark Standalone: spark://<ip>:<port> np –master spark//207.184.161.138:7077
  2. Kubernetes: –master k8s://https://<k8s-apiserver-host>:<port>
  3. Mesos: –master mesos://<host>:<port> (port: 5050)
  4. YARN: –master yarn

W niektórych dystrybucjach należy dodać także parę rzeczy, jednak są to już dodatkowe operacje. Jeśli będzie chęć – z przyjemnościa pochylę się nad bardziej szczeółowym porównaniem wyżej wymienionych cluster managerów;-).

Głodny Sparka?

Mam nadzieję, że rozwiązałem Twój problem ze zrozumieniem Spark Submit oraz sensu poszczególnych właściwości. Jeśli jesteś wyjątkowo głodny/a Sparka – daj znać szefowi. Przekonaj go, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy dużo i intensywnie na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!

A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!

 

Loading
W podróży Big Data – jak odnaleźć się w dżungli technologii?

W podróży Big Data – jak odnaleźć się w dżungli technologii?

Nie ma znaczenia czy dopiero zaczynasz swoją przygodę z Big Data, czy masz już doświadczenie, czy jesteś inżynierem, czy patrzysz na branżę stricte biznesowo. W każdym przypadku przyjdzie taki moment, w którym poczujesz się zagubiony mnogością technologii oraz tym jak bardzo są “niedookreślone”. W tym materiale postaram się wprowadzić względny porządek i przeprowadzić Cię suchą stopą przez bagno technologii obsługujących duże dane. Bierz zatem kubek mocnej jak otchłań Data Lake’a kawy – i zaczynamy!

Metodologia

Małe zastrzeżenie…

Zacznijmy od bardzo podstawowego zastrzeżenia: to co tu zaproponuję, może być bardzo łatwo podważone. Co więcej – to co tu pokażę, będzie z całą pewnością niepełne. BA! Niepełne? Dobre sobie… to będzie zaledwie muśnięcie Big Datowej rzeczywistości. Wszystko to wynika z faktu, że branża przyjęła już naprawdę pokaźne rozmiary i wprowadzenie poważnej metodologii porządkującej pojęcia oraz technologie, wymagałoby pracy dyplomowej. Być może nawet doktoratu. Czemu? Cóż – tego jest po prostu najzwyczajniej w świecie tak dużo i odpowiadają na tak wiele potrzeb, że łączenie technologii staje się prawdziwą sztuką.

Zatem powiedzmy sobie: ten artykuł jest dla Ciebie, jeśli wiesz coś niecoś o Big Data, ale wszystko zaczęło się mieszać. Zastanawiasz się nad tym które z topowych technologii służą do czego i jak można je połączyć. Z takim podejściem – zaczynamy!

Podział ze względu na przeznaczenie

Dzielić technologie można ze względu na wiele rzeczy – możemy podzielić patrząc na języki programowania, możemy podzielić patrząc czy jest to technologia chmurowa, a można poszukać pod kątem popularności. Można też – i tak zrobimy – podzielić ze względu na przeznaczenie, cel jakiemu służą.

Mój podział będzie następujący:

  1. Storages
  2. Bazy danych (nierelacyjne)
  3. Full-text search
  4. Przetwarzanie danych
  5. Komunikacja z danymi
  6. Schedulers
  7. Messaging
  8. Technologie analityczne

Jeszcze raz zaznaczę: jest to z całą pewnością obraz niepełny. Jest jeszcze trochę obszarów, które nie zostały tu wzięte pod uwagę. Ten pozwoli jednak złapać pewien punkt zaczepienia – i o to chodzi;-).

Storages

Czemu służą?: Storages (nie mam pojęcia jak przetłumaczyć to poprawnie, poza dość prostackimi “magazynami danych”) służą przechowywaniu ogromnych ilości danych w sposób możliwie prosty.

Krótki komentarz: Temat storages nie jest dobrze zdefiniowany. Niekiedy jako storage traktuje się wszystko, co przechowuje dane, a więc także bazy danych. Ja wyodrębniłem tu jednak “data storage” jako “prosty” system, który pozwala przechowywać dane w sposób mniej złożony, niż bazy. Należą więc do tego wszelkiego rodzaju rozproszone systemy plików, Data Lakes itd.

Przedstawiciele:

  1. HDFS (Hadoop Distributed File System)
  2. ADLS gen 2 (Azure Data Lake Storage gen 2)
  3. Amazon S3 (na AWS)
  4. Google Cloud Storage (na GCP)
  5. Delta Lake
  6. Kudu (wymienione także w Bazach Danych)
  7. Ozone (wymienione także w Bazach Danych)

Bazy danych (noSql)

Czemu służą?: Bazy danych służą przechowywaniu ogromnych ilości danych. Różnią się jednak nieco od Storages. Ich przeznaczenie zawiera bardziej ustrukturyzowaną formę przechowywania danych, a także możliwości bardziej zaawansowanej manipulacji danymi (przeglądania, usuwania pojedynczych rekordów itd).

Krótki komentarz: W temacie baz danych mamy bardzo dużo i coraz więcej technologii, które mogą nas interesować. Niektóre z nich nieco mieszają się ze Storages. To są właśnie te płynne granice o których już wspominałem. UWAGA! Wspominam tu jedynie o stricte big datowych, nierelacyjnych bazach danych. Nie znajdziemy tu więc popularnego mysql czy postgresql. Mamy wiele rodzajów baz danych – przede wszystkim key-value store, graph db, document store.

Przedstawiciele:

  1. HBase
  2. Accumulo
  3. MongoDB
  4. Cassandra
  5. CosmosDB (Azure)
  6. Dynamo DB (AWS)
  7. Google Cloud Datastore (GCP)
  8. Kudu (wymienione także w Storages)
  9. Ozone (wymienione także w Storages)
  10. Neo4j
  11. Druid

Full Text Searches

Czemu służą? Technologie full-text search (przeszukiwania pełno-tekstowego) także (znów!) odpowiadają za przechowywanie danych. Tym razem jednak przechowywanie zaprojektowane jest tak, aby dało się to potem bardzo dobrze przeszukiwać. Szczególnie mocny akcent położony jest na przeszukiwanie tekstu wraz z różnymi funkcjami wbudowanymi, tak aby nie było szczególnie trudne zbudowane wyszukiwarki zawierającej wyszukiwanie podobnych wyrazów czy uwzględnianie literówek.

Krótki komentarz: W przeciwieństwie do pozostałych obszarów, full-text searche zdają się być zdominowane przez dwie technologie. Co więcej – obie zbudowane są na tym samym silniku. Nie oznacza to jednak, że jest to jedyna oferta na rynku! Co ciekawe, full-text searche mogą stanowić także znakomity mix przydatny do analizy danych. Ciekawym przykładem jest zastosowanie Elasticsearcha w NASA (konkretniej JPL) m.in. do analizy danych przysyłanych przez łaziki.

Przedstawiciele:

  1. Lucene – nie jest samodzielną osobną technologią, a raczej silnikiem, na którym powstały inne.
  2. Elasticsearch
  3. Solr
  4. Sphinx

Przetwarzanie danych (processing)

Czemu służą? Technologie do przetwarzania danych oczywiście… przetwarzają dane;-). Oczywiście mowa tu o bardzo dużych danych. W związku z tym technologie te zwykorzystują mechanizmy zrównoleglania obliczeń. Można te technologie wykorzystywać do ogromnej ilości celów. Od strandardowego czyszczenia, przez harmonizację (sprowadzenie datasetów do wspólnej postaci pod kątem schematu), opracowywanie raportów statystycznych, aż po wykorzystywanie algorytmów sztucznej inteligencji.

Krótki komentarz: Technologie do przetwarzania danych podzielimy z grubsza na dwa rodzaje: batchowe i streamingowe. Batchowe to te, których zadaniem jest pobrać dużą paczkę danych, “przemielić je” i zwrócić wynik. Streamingowe natomiast działają w trybie ciągłym. W przeciwieństwie do pierwszego rodzaju – “nie kończą się”.

Przedstawiciele:

  1. Spark
  2. Spark Structured Streaming – choć zawiera się w pierwszym punkcie, zasługuje na osobne wyróżnienie.
  3. Kafka Strams – świetnie wspólgra z Kafką. Dodatkowo cechuje się daleko posuniętą prostotą.
  4. Flink
  5. Storm
  6. Map Reduce – choć obecnie nie jest już raczej implementowany w nowych systemach, to znajduje się w galerii sław i nie można o nim nie wspomnieć!
https://cdn.analyticsvidhya.com/wp-content/uploads/2020/11/repartition.jpg
Klasyka gatunku. Witamy w Sparku!;-)

Komunikacja z danymi (interfejsy SQL-like)

Czemu służą? Technologie które mam na myśli powodują, że w prostszy sposób możemy dostać się do danych, które normalnie przechowywane są w postaci plików (lub w innej postaci, natomiast wciąż kiepskiej w kontekście pracy z danymi). Przykładem jest, gdy chcemy składować pliki na HDFS, ale zależy nam na zachowaniu możliwości pracy z tymi danymi (prostych operacji przeszukiwania, dodawania itd). Technologie te dostarczają często interfejs obsługi danych składowanych w różnych miejscach, podobny do SQL.

Przedstawiciele:

  1. Hive
  2. Impala
  3. Shark
  4. BigSQL

Schedulery

Czemu służą? Kiedy tworzymy joby, bardzo często mamy potrzebe ustawienia ich aby były uruchamiane o tej samej porze. Temu właśnie służą między innymi schedulery.

Krótki komentarz:  Poza prostymi funkcjami określania kiedy jakie joby powinny zostać uruchomione, schedulery pozwalają także ustawić całą ścieżkę zależności w uruchamianiu jobów. Np. “jeśli zaciąganie danych zostanie ukończone, rozpcoznij czyszczenie, a potem harmonizację. Jeśli na którymś etapie coś pójdzie nie tak, wyślij maila z alertem”. Do tego dochodzą jeszcze możliwości (lepszego lub gorszego) monitoringu tych jobów oraz całych workflowów.

Przedstawiciele:

  1. Oozie
  2. Airflow
  3. Luigi
  4. Jenkins (częściowo)
  5. Pinball (stworzony przez Pinterest, natomiast nie jest obecnie aktywnie przez pinterest rozwijany)
  6. Step Functions (AWS)
  7. Workflows (GCP)
  8. Logic Apps (Azure)

Messaging

Czemu służą? Technologie do messagingu czy też kolejkowania, to technologie, które – nieco banalizując – są “punktem przesyłu” wielu danych. Wykorzystuje się je szczególnie często w kontekście przetwarzania streamingowego danych. Kiedy produkujemy jakieś dane, nie musimy się zastanawiać gdzie mają być dalej przetworzone. Wystarczy wykorzystać technologię kolejkowania i już. To jakie inne procesy podepną się pod ten “punkt przesyłu” to już zupełnie inna sprawa.

Krótki komentarz: Bardzo często technologie te zestawiane są z frameworkami do przetwarzania streamingowego. Wymienione zostały m.in. parę punktów wyżej (np. Spark Structured Streaming, Flink czy Kafka Streams). Warto tu dodać, że technologie tego typu są także często wykorzystywane w procesie IoT (internetu rzeczy – Internet of Things), gdy poszczególne urządzenia mogą raportować o swojej aktywności.

Przedstawiciele:

  1. Kafka
  2. RabbitMQ
  3. Event Hub (Azure)
  4. Kinesis (AWS)
  5. Pub/Sub (GCP)
  6. IBM MQ (IBM)

Technologie analityczne (BI – Business Intelligence)

Czemu służą? Za pomocą narzędzi analitycznych możemy tworzyć dashboardy, które pomogą nam analizować wcześniej zebrane dane.

Krótki komentarz: Warto pamiętać właśnie o takim aspekcie jak “wcześniej zebrane dane”. Nie wystarczy, że będziemy mieli aplikację analityczną. Aby w pełni wykorzystać jej potencjał, należy zawczasu pomyśleć o tym jak powinien wyglądać nasz pipeline, aby odpowiednie dane (nie za duże, nie za małe, odpowiednio ustrukturyzowane itd.) mogły zostać przez narzędzie BI zaciągnięte.

Przedstawiciele:

  1. Apache Superset
  2. Power BI (Azure)
  3. Amazon QuickSite (AWS)
  4. Google Data Studio (GCP)
  5. Holistics
  6. Looker
  7. Tableau

I jak się w tym wszystkim nie zagubić?

Mam nadzieję, że tym artykułem chociaż odrobinę pomogłem w uporządkowaniu spojrzenia na świat Big Data. Mnóstwo technologii nie zostało tutaj ujętych. Jest to jednak dobry punkt startowy;-). Jeśli widzisz potrzebę, aby coś tutaj dodać lub zmienić – daj sobie swobodę napisania o tym w komentarzu:-).

Jeśli chcesz tworzyć polską społeczność Big Data – odwiedź nas koniecznie na LinkedIn oraz zapisz się do newslettera!

 

Loading
Zrozumieć Sparka: cache vs persist

Zrozumieć Sparka: cache vs persist

W naszej podróży po Sparku musiała przyjść pora na ten przystanek. “Cache vs persist” to absolutnie podstawowe pytanie na każdej rozmowie rekrutacyjnej ze Sparka. Co ważniejsze – to naprawdę przydatne narzędzie dla każdego inżyniera big data, który posługuje się tym frameworkiem! Dzięki niemu przyspieszysz swojego joba, zmniejszysz ryzyko nieoczekiwanego fuckupu. No i najważniejsze – podczas code review pokażesz, że znasz się na rzeczy i nie jesteś tu z przypadku. Żartuję oczywiście.

Nie traćmy więc czasu i sprawdźmy co to za dobrodziejstwo, owe osławione cache i persist. Całą serię “zrozumieć sparka” poznasz tutaj.

Krótka powtórka – akcje i transformacje

Zanim przejdziemy do sedna sprawy – najpierw krótkie przypomnienie jak działa Spark – w pigułce. Aby to zrozumieć, fundamentalne są trzy rzeczy:

  1. Spark działa na swoich własnych strukturach, “kolekcjach”. To przede wszystkim RDD, które przypominają “listy” lub “tablice” (nie w konkrecie, natomiast grunt że nie jest to kolekcja typu mapa), ale także Datasety i Dataframy (które są aliasem dla Dataset[Row]). Napisałem, że “przede wszystkim”, ponieważ pod spodem DSów i DFów leżą właśnie RDD. To na tych strukturach musimy pracować, żeby całość ładnie działała.
  2. Na tych strukturach możemy wykonywać dwa typy operacji:
    • transformacje – czyli operacje, które wykonują pewne obliczenia, natomiast nie są wykonywane w momencie ich wywołania w kodzie.
    • akcje – operacje, które zwykle są jakimś rodzajem operacji wyjścia”. I co istotne – w tym momencie wykonywane są wszystkie transformacje po kolei.
  3. Takie podejście lazy evaluation zastosowane jest, aby Spark mógł odpowiednio zoptymalizować te operacje. Spisuje je w drzewku DAG (Directed Acyclic Graph – graf skierowany, acykliczny).

Tutaj pojawia się właśnie podstawowy problem – może się okazać, że akcji będziemy mieli kilka. I za każdym razem Spark będzie musiał wykonywać te same transformacje. To oczywiście powoduje, że job staje się niewydajny, a moc obliczeniowa jest niepotrzebnie przepalana.

Cache i persist w Sparku

I właśnie tutaj wkracza cachowanie. Możemy na danym zbiorze danych wywołać .cache() lub .persist() aby zachować go w pamięci. Gdyby potem pojawiło się kilka akcji, Spark będzie pobierał dane z konkretnego punktu, a nie procesował je wszystkimi transformacjami od początku.

cache vs persist

Aby to zrobić, mamy do dyspozycji dwie funkcje: cache() oraz persist(). Cóż – prawdę mówiąc jest jeszcze trzecia (.checkpoint()) ale na ten moment ją zostawimy;-). Tak więc powtórzmy – cache i persist pozwalają “zatrzymać” dataset w kodzie. Od momentu wywołania na naszych danych (RDD, Dataset lub Dataframe) cache lub persist – wywołanie akcji nie będzie skutkowało ponownym przeliczaniem transformacji, które były napisane przed tym punktem.

Pora jednak wyjaśnić sobie różnicę między cache a persist. Otóż – mówiąc prosto – persist pozwala nam ustawić poziom na jakim chcemy przechowywać dane (storage level), natomiast cache ma domyślne wywołanie. Tak naprawdę pod spodem cache wywołuje… właśnie persist!

Jaki storage level ma cache? To zależy od tego czy wywoujemy go na RDD czy Dataset/Dataframe. Jeśli RDD – domyślnie mamy MEMORY_ONLY, natomiast w przypadku Datasetów – MEMORY_AND_DISK.

Kod cache i persist dla Dataset i Dataframe

Jak wygląda wywołanie w kodzie? Przede wszystkim, jeśli wywołujemy cache() na Dataframe, to jest to najzwyklejszy w świecie alias dla wywołania funkcji persist(). I nie zmieniło się to od dawna. Poniżej kod z “bebechów” Sparka 2.4.3.

/**
   * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
   *
   * @group basic
   * @since 1.6.0
   */
  def cache(): this.type = persist()

Jak wygląda natomiast persist()? W tym przypadku mamy do czynienia z dwoma funkcjami: pierwsza to funkcja, która przyjmuje argument typu StorageLevel, natomiast druga jest funkcją bez argumentów. Wywołuje ona funkcję cacheQuery() znajdującą się w CacheManager, która dokonuje całej “magii”. Jeśli nie podamy jej Storage Level, zostanie przypisany standardowy – czyli “MEMORY_AND_DISK”.

Poniżej implementacje obu funkcji persist().

def persist(): this.type = {
    sparkSession.sharedState.cacheManager.cacheQuery(this)
    this
  }

def persist(newLevel: StorageLevel): this.type = {
    sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
    this
  }

Kod cache i presist dla RDD

W kontekście RDD cache() także jest aliasem dla persist(). Znów mamy także do czynienia z dwiema funkcjami persist() – jedną bezargumentową, drugą przyjmującą argument typu StorageLevel. Tym razem jednak różnica jest taka, że standardowym, domyślnym poziomem jest “MEMORY_ONLY” (nie jak w przypadku datasetów “MEMORY_AND_DISK”).

Można nawet dostrzec “niezwykle elegancki” komentarz, który pokusił się zostawić w samym środku funkcji jeden z inżynierów tworzących Sparka;-).

def persist(newLevel: StorageLevel): this.type = {
    if (isLocallyCheckpointed) {
      // This means the user previously called localCheckpoint(), which should have already
      // marked this RDD for persisting. Here we should override the old storage level with
      // one that is explicitly requested by the user (after adapting it to use disk).
      persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
    } else {
      persist(newLevel, allowOverride = false)
    }
  }

Storage levels

Z implementacji funkcji persist() wiemy, że kluczową sprawą jest Storage Level – czyli poziom na jakim chcemy przechowywać nasze dane. Oto jakie poziomy możemy wybrać:

  1. MEMORY_ONLY – przechowuje RDD lub Datasety jako deserializowane obiekty w pamięci JVM. Jeśli nie ma wystarczająco dużo dostępnej pamięci, niektóre partycje nie zostaną zapisane i zostaną potem ponownie obliczone w razie potrzeby.
  2. MEMORY_ONLY_2 – To samo co MEMORY_ONLY, natomiast każda partycja jest replikowana na dwa nody.
  3. MEMORY_ONLY_SER – to samo co MEMORY_ONLY, natomiast przechowuje dane jako serializowane obiekty w pamięci JVM. Potrzebuje mniej pamięci niż [1].
  4. MEMORY_ONLY_SER_2 – analogicznie.
  5. MEMORY_AND_DISK – w tym przypadku dane są przechowywane jako deserializowane obiekty w pamięci JVM. W tym przypadku jednak, w przeciwieństwie do [1], jeśli nie wystarczy miejsca, nadmiarowe partycje są przechowywane na dysku. To prowadzi do mniejszej wydajności (wolniejszy proces), ponieważ angażujemy tu operacje I/O.
  6. Analogicznie do 1-4 mamy także MEMORY_AND_DISK_SER, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2.
  7. DISK_ONLY – w tym przypadku dane są przechowywane jedynie na dysku. Oczywiście najwolniejszy wariant.
  8. DISK_ONLY_2 – analogicznie, tworzone są replikacje.

Zostań prawdziwym sparkowym wojownikiem

Jeśli dotarłeś do tego punktu, to prawdopodobnie jesteś naprawdę zainteresowany rozwojem w Sparku. Trzeba uczciwie powiedzieć, że to złożona, duża technologia. Jeśli chcesz zgłębić ją bardziej – przekonaj swojego szefa, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy dużo i intensywnie na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!

A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!

 

Loading
Zrozumieć Sparka: czym różnią się podobne mechanizmy? Distinct vs dropDuplicates i inne.

Zrozumieć Sparka: czym różnią się podobne mechanizmy? Distinct vs dropDuplicates i inne.

Pracując ze sparkiem bardzo często spotykamy mechanizmy, które budzą naszą konsternację z powodu bardzo dużego podobieństwa. Czy każde z tych mechanizmów ma inne działanie? Jeśli tak, to jakie są praktyczne różnice? Postanowiłem zanurkować nieco w kod i sprawdzić najbardziej popularne funkcje. Zapraszam!

Aha… to oczywiście artykuł w serii “Zrozumieć Sparka”. Poprzednio omawiałem joiny – zainteresowanych zapraszam tutaj.

Porządkujemy – czyli orderBy vs sort

Pierwszy mechanizm, który bierzemy na tapet to sortowanie. Spark daje nam przynajmniej dwie funkcje, które spełniają to zadanie: orderBy() oraz sort(). Czym się różnią?

Tutaj sprawa jest banalnie prosta – orderBy() to po prostu alias dla sort(). Mówi nam o tym poniższy fragment kodu:

/**
   * Returns a new Dataset sorted by the given expressions.
   * This is an alias of the `sort` function.
   *
   * @group typedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs : _*)

Mamy dwie funkcje orderBy() – obie wywołują sort().

Zmieniamy nazwy kolumn: withColumnRenamed vs alias

Załóżmy, że mamy dataframe i nie pasuje nam nazwa jednej z kolumn. W takiej sytuacji mamy do dyspozycji dwie – cztery metody: withColumnRenamed(), alias(), as() oraz name(). Zajmijmy się najpierw różnicami między withColumnRenamed oraz alias.

  1. Zwracany typ:
    • alias() to funkcja zwracająca typ Column,
    • withColumnRenamed() zwraca Dataset[Row] (czyli Dataframe).
  2. Co dzieje się w środku:
    • alias() jest jedynie… aliasem dla funkcji name(). Nie robi nic innego. O tym co robi name() rozdział niżej.
    • withColumnRenamed() ma logikę, która zmienia schemat. Warto dodać, że sama zmiana nazwy kolumny dzieje się poprzez wykorzystanie funkcji as(). To – zdradzę przedwcześnie – jest alias dla aliasu. A alias jest aliasem dla name(). WOW! No nic… poniżej podrzucam jak zbudowana jest funkcja withColumnRenamed():

      def withColumnRenamed(existingName: String, newName: String): DataFrame = {
          val resolver = sparkSession.sessionState.analyzer.resolver
          val output = queryExecution.analyzed.output
          val shouldRename = output.exists(f => resolver(f.name, existingName))
          if (shouldRename) {
            val columns = output.map { col =>
              if (resolver(col.name, existingName)) {
                Column(col).as(newName)
              } else {
                Column(col)
              }
            }
            select(columns : _*)
          } else {
            toDF()
          }
        }
  3. Kiedy stosować:
    • alias() stosujemy wtedy, gdy możemy odwołać się do konkretnej kolumny – klasycznym przykładem jest zastosowanie funkcji select(). Tak jak poniżej:
      peopleDF.select(col("name").alias("firstName"))
    • withColumnRenamed() wywołujemy na Dataframe. W efekcie dostajemy nowy Dataframe ze zmienioną nazwą kolumny. Wygląda to tak jak poniżej:
      peopleDF.withColumnRenamed("name", "firstName")

 

Zmienianie nazw ciąg dalszy – as vs alias vs name

Skoro już znamy różnice między withColumnRenamed oraz alias, warto przejść do kolejnych meandrów sparkowego labiryntu. Tym razem poszukajmy czym różnią się od siebie trzy funkcje: as(), alias() oraz name().

Otóż, tym razem sprawa jest prostsza. Albo i bardziej zaskakująca, sam(/a) już musisz to rozstrzygnąć. Otóż – nie ma różnic. A konkretniej as() jest aliasem dla name(), tak samo jak alias() jest aliasem dla name().

Funkcja name() natomiast jest dość prostym mechanizmem. Jeśli obecna kolumna ma jakieś metadane powiązane z nią, zostaną propagowane na nową kolumnę. W  implementacji użyty jest Alias – warto jednak pamiętać, że nie chodzi tu o funkcję alias(), a o case classę Alias. Implementacja funkcji name() poniżej:

def name(alias: String): Column = withExpr {
    expr match {
      case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
      case other => Alias(other, alias)()
    }
  }

Usuwamy duplikaty w Sparku – distinct vs dropDuplicates

Prędzej czy później przychodzi taki moment, że w efekcie różnych transformacji dostajemy identyczne obiekty w naszych dataframach. Jest to szczególnie uciążliwe, gdy musimy pracować na unikatowych danych. Poza tym jednak najzwyczajniej w świecie niepotrzebnie zajmują one miejsce. Aby tego uniknąć, usuwamy duplikaty. Tylko jak to zrobić w Sparku? Istnieją dwa sposoby – distinct() oraz dropDuplicates().

Distinct jest najprostszą formą usuwania duplikatów. Tutaj sprawa jest prosta – na dataframe wywołujemy funkcję distinct(), po czym Spark wyszukuje rekordy, które w całości się pokrywają i zostawia tylko jeden z nich. Tak więc w tym przypadku wszystkie kolumny muszą się pokrywać, aby rekord został uznany za duplikat.

Implementacja:

sampleDF.distinct()

Sprawa ma się nieco inaczej, jeśli weźmiemy na warsztat funkcję dropDuplicates(). W tym przypadku możemy wybrać które kolumny bierzemy pod uwagę. W takiej sytuacji “duplikatem” może być np. osoba o tym samym imieniu i nazwisku, ale z innym PESELem.

Implementacja z przykładowymi kolumnami:

sampleDF.dropDuplicates("name", "age")

Co ważne – po takim wywołaniu, zwrócony dataframe oczywiście będzie zawierał wszystkie kolumny, nie tylko wyszczególnione w “dropDuplicates()”.

Tak więc, podsumowując – różnica polega na tym, że distinct() bierze wszystkie kolumny, zaś dropDuplicates() pozwala nam wybrać o które kolumny chodzi.

Explode vs ExplodeOuter

Niekiedy posługując się dataframe, stykamy się z kolumną, która zawiera listę – np. listę imion, liczb itd. Czasami w takiej sytuacji potrzebujemy, żeby te dane znalazły się w osobnych wierszach. Aby to zrobić, używamy funkcji explode – a właściwie jednej z dwóch “eksplodujących” funkcji.

Różnica jest bardzo prosta: explode() pominie nulle, natomiast explode_outer() rozbuduje naszą strukturę także o nulle.

Tak więc, jeśli chcemy rozbić nasze dane pomijając pry tym wartości typu null – wybierzemy explode().

Podsumowanie

Mam nadzieję, że zestawienie powyższych metod rozwieje różne wątpliwości, które mogą niekiedy przychodzić. Spark to złożona, duża technologia. Jeśli chcesz zgłębić ją bardziej – przekonaj swojego szefa, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!

A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter. Koniecznie, zrób to!

 

Loading
Zrozumieć Sparka: joiny

Zrozumieć Sparka: joiny

Spark to rozległa i złożona technologia. Zrozumienie go wymaga zarówno silnych podstaw teoretycznych, jak i konkretnej, mozolnej praktyki. W serii “Zrozumieć Sparka” postaram się dogłębnie przybliżyć różne aspekty tego zacnego frameworku do przetwarzania danych.

Zaczynamy “z grubej rury” od czegoś, co wycisnęło niejedną łzę u Inżynierów Big Data. Chodzi oczywiście o joiny – mechanizm znany także z tradycyjnego SQL. Z pewnością pojawią się na ten temat także artykuły zaawansowane, dlatego dzisiaj zaczniemy dość lekko – od krótkiego omówienia typów joinów. Wiem, wiem – niby temat znany. W praktyce jednak czasem niedopowiedziany. Niech więc zostanie rozwiązany od podstaw;-).

Czym są joiny w Sparku?

Joiny to mechanizm znany dobrze ludziom pracującym z relacyjnymi bazami danych. Polega na łączeniu tabel. Wyobraź sobie, że masz tabelę, w której są zebrane dane o pracownikach – pesele, imiona, nazwiska, wykształcenie… Zaraz obok możesz zobaczyć tabelkę, w której widać dane dotyczące stanowisk – nazwy stanowisk, pensje, nazwa departamentów w firmie, pesele itd.

Jeśli chcemy pracować na danych dotyczących spraw stanowiskowych w kontekście ludzi – musimy te dwie tabele połączyć. I właśnie temu służą joiny;-). Problematyka w teorii dość prosta, w praktyce ma kilka zagwozdek o których warto wspomnieć. Przede wszystkim właśnie ta podstawowa – czyli różnice w joinach.

Typy joinów w Sparku

Chcąc połączyć tabele, mamy do dyspozycji kilka typów takiego mechanizmu. Podstawowe to inner join, left (outer) join, right (outer) join, full outer join oraz cross join. Dodatkowo możemy w sparku skorzystać z jeszcze dwóch: semi joina oraz anti joina. Omówienie każdego z nich będzie prostsze, jeśli spojrzymy najpierw na ten popularny w internecie diagram.

Wyjaśnijmy czym są owe kółeczka oznaczone jako A oraz B. Jak już napisałem, w tradycyjnych bazach danych będą to tabele. W Sparku będą to Dataframy lub Datasety. Skupimy się tutaj na Dataframach (alias dla Dataset[Row]), gdyż jest to struktura tabelaryczna, stricte przygotowana właśnie pod pracę na tabelach.

Na powyższym obrazku mamy zawsze dwie strony – lewą (A) i prawą (B). Dla wyjaśnienia: to który dataframe jest po lewej a który po prawej to rzecz czysto umowna, w zależności od tego po której stronie jaki dataframe umieścimy w kodzie. Podstawowy schemat tworzenia joina wygląda bowiem następująco:

firstDF.join(secondDF, columns, "joinType")

tak więc w takiej sytuacji “firstDF” będzie “lewym” dataframe, a “secondDF” będzie “prawy”.

Zanim przejdziemy do omówienia poszczególnych typów, zachęcam do przyjrzenia się powyższemu obrazkowi. Jeśli natomiast zapamiętać będzie ciężko, być może poniższe dzieło odrobinę pomoże:

 

W przykładach będę posługiwał się następującymi strukturami:

  1. Dataframe peopleDF zawiera id, firstName, lastName i age.
  2. Dataframe jobsDF zawiera id oraz job – czyli nazwę zawodu (np. “teacher”).

Wyglądają mniej więcej tak:

 

 

Inner Join

Zaczynamy od najprostszego typu, będącego jednocześnie domyślnym podczas łączenia tabel w Sparku. Innej join doprowadza do połączenia tych rekordów, które są wspólne – czyli wtedy, gdy warunek zostanie spełniony.

Ponieważ w Sparku inner join jest domyślny, nie musimy go określać podczas łączenia tabel. Obie poniższe implementacje są poprawne:

val innerJoin: Dataset[Row] = peopleDF.join(jobsDF, "id")
val innerJoin2: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "inner")

Generują one następujący efekt:

Left (outer) join

Left join, czy też posługując się pełną nazwą – left outer join, to mechanizm, który łączy obie tabele po konkretnych kolumnach, natomiast zapewnia, że wszystko z lewej strony będzie miało miejsce w finalnej tabeli. Jeśli więc nie uda się powiązać (“zmatchować”) rekordów, kolumny z “prawej strony” będą wypełnione nullami.

Implementacja wygląda następująco:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "left_outer")

I daje efekt w postaci takiej tabelki:

Zwróć uwagę na to że rekordy w peopleDF kończą się na id 18, natomiast tabela job posiada id od 1 do 13, a potem 19 i 20. Na tym przykładzie widać, że część nie powiązana została wypełniona nullami.

Right (outer) join

W przypadku right joina sprawa jest prosta – to po prostu “lustrzane odbicie” left joina. Łączy on więc obie tabele po konkretnych kolumnach, natomiast zapewnia, że wszystko z prawej strony znajdzie swoje miejsce w finalnej tabeli. Jeśli więc któreś rekordy z prawej tabeli nie będą miały swojego odpowiednika (match nie zostanie zrealizowany/“nie zmatchuje się”), to w tabeli finalnej kolumny z lewej w tych rekordach będą nullami.

Implementacja:

val rightJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "right_outer")

Nawet jeśli powyższy opis brzmi jak skomplikowany, poniższy efekt powinien wszystko rozjaśnić:

Full outer  join

Full outer join jest przeciwieństwem inner joina – bierze wszystko. Tak więc można powiedzieć, że jest sklejeniem left i right joinów wraz z usunięciem duplikatów.

Implementacja:

val fullJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "full")

Efekt:

Cross join (cartesian/iloczyn kartezjański)

Cross join nazywany jest także iloczynem kartezjańskim. Pochodzi to od matematycznego mechanizmu operacji na macierzach – właśnie iloczynu kartezjańskiego. Dla ciekawskich, z matematyczną definicją oraz kilkoma ćwiczeniami można zapoznać się tutaj.

Dla nas jednak chyba łatwiejsza będzie “definicja zbanalizowana by Maro” – czyli proste “łączymy każdy rekord z każdym”W związku z tym, że każdy z każdym, nie ma tu potrzeby (ani miejsca) by wpisać kolumnę, po której łączymy.

Implementacja:

val crossJoin: Dataset[Row] = peopleDF.crossJoin(jobsDF)

Jak widać, efekt jest znacznie bardziej okazały, niż przy pozostałych typach łączeń. Przedstawiam tylko część, natomiast w tym przypadku rekordów jest aż 270:

Semi join

Pozostały jeszcze dwa typy joinów, zwykle pomijane. Chociaż są znacznie rzadziej spotykane, warto moim zdaniem znać oba, gdyż oba mogą okazać się naprawdę przydatne. Jeśli taka potrzeba zajdzie, po prostu zastosujemy przygotowaną już konstrukcję, zamiast robić “skok przez plecy” z selectem, równaniami, wherami i filtrami.

Pierwszym z nich jest Semi Join. To nic innego jak inner join, który… pomija kolumny z lewej strony. Tak więc po zmatchowaniu rekordów, brane są pod uwagę tylko kolumny z lewej strony.

Implementacja:

val semiJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "left_semi")

Efekt:

Anti Join

Ostatnim, najciekawszym moim zdaniem, joinem – jest anti join. Jest on pewnego rodzaju “dopełnieniem” semi-joina. Albo jego przeciwieństwem – zależy jak patrzeć. Chodzi w każdym razie  to, że chociaż kolumny są takie same – a więc jedynie z lewej strony – to… w finalnej tabeli znajdują się tylko rekordy, które nie zostały zmatchowane, nie da się ich powiązać między tabelami.

Implementacja:

val antiJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id"), "left_anti")

Efekt:

Implementacja inaczej

Warto zauważyć jedną rzecz: wszędzie powyżej poszedłem na łatwiznę i łączyłem po takiej samej kolumnie. Może się jednak okazać, że nie będzie takich samych kolumn! Cóż wtedy zrobić? Opcji jest jak zawsze kilka. Przede wszystkim dwie (poniższe konstrukcje nie odpowiadają już schematom których trzymaliśmy się przez większość artykułu):

Opcja nr 1: zróbmy tak, żeby były takie same kolumny. Czyli po prostu zmieńmy nazwy kolumn, które mają być matchowane.

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF.withColumnRenamed("personId", "id"),
      Seq("id"), "left_outer")

Opcja nr 2: powiedzmy, które kolumny mają być matchowane. Tak, to zdecydowanie brzmi rozsądniej:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, peopleDF("id") === jobsDF("personId"), "left_outer")

To samo można także zrobić za pomocą następującej składni:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, peopleDF("id").equalTo(jobsDF("personId")), "left_outer")

Kolejną rzeczą jaką warto rozważyć jest łączenie po kilku tabelach. Możemy to zrobić bardzo prosto:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, Seq("id", "pesel"), "left_outer")

Można także budować bardziej skomplikowane warunki joinów:

val leftJoin: Dataset[Row] = peopleDF.join(jobsDF, peopleDF("id").equalTo(jobsDF("personId")).and(peopleDF("age").gt(jobsDF("minimumAge"))), "left_outer")

Niebezpieczeństwa

Skoro joiny są takie proste i przyjemne, to czy można ich używać bez ograniczeń i bez głębszej refleksji? Otóż… jest dokładnie odwrotnie. Napisałem na początku, że mechanizm ten wycisnął niejedną łzę u inżynierów korzystających ze sparka. Oto dlaczego:

Nie będziemy się tutaj rozwodzić nad tym co i jak w detalu. Sprawa jest prosta: join to uruchomienie shufflingu – czyli przerzucania danych między partycjami oraz nodami. To z kolei bardzo zasobożerna sprawa i warto jej unikać. Na ile oczywiście się da, bo najczęściej się po prostu nie da.

W kolejnych artykułach zagłębimy się w kwestie wydajnościowe właśnie. Na ten moment, z racji że są to podstawy, polecam przejrzeć kod pod kątem akcji, które wykonujemy po joinie. Być może warto po połączeniu tabel zrobić bowiem cache(), persist() lub checkpoint(). Gdybyśmy wywoływali dwukrotnie akcje, dwukrotnie będzie zrobiony także join – a po co, jak nie trzeba?;-).

Ciąg dalszy…

Mam szczerą nadzieję, że wiesz już teraz wszystko czego trzeba, aby zacząć przygodę z joinami w Sparku. Oczywiście w kolejnych artykułach zagłębimy się nieco w tym zagadnieniu – zostań z nami przez newsletter!

Jeśli czujesz, że Twoje wiedza na temat Sparka jest nieuporządkowana i frustruje Cię to, to mam dobrą wiadomość. Organizujemy szkolenia dla firm z zakresu… właśnie Apache Spark. Jeśli masz dobrego szefa, podeślij mu kontakt do nas – możemy pojawić się u Ciebie w firmie i pokazać wszystko od 0 do Spark Inżyniera;-). Strona ze szkoleniem dostępna jest tutaj.

Tymczasem zapraszam do naszego newslettera. Zostańmy w kontakcie i… razem budujmy naszą wspólną, polską społeczność Big Data!

 

Loading

 

Zamiana Dataframe w Dataset w Sparku. Taka, która działa.

Zamiana Dataframe w Dataset w Sparku. Taka, która działa.

Największy problem Dataframe w Sparku? Oczywiście – brak jasności, jaka schema jest w aktualnym DFie. W związku z tym często wykorzystuje się Dataset[T], gdzie T to konkretna Case Class. Dzięki temu, jeśli wywołujemy metodę, która zwraca Dataset[T], możemy być przekonani, że wiemy jakim typem operujemy, jaką nasz obiekt ma schemę.

Niestety, tak dobrze jest tylko w teorii. W praktyce Spark pozwala wyjść poza ramy schematu i dodać lub odjąć kolumny, które nie istnieją w case class T. Jest to bardzo mylące i osoby które o tym niewiedzą mogą przeżyć niezłą eksplozję mózgu. Pytanie – czy da się coś z tym zrobić? Jak najbardziej. Chociaż nie oficjalną ścieżką, bo to właśnie ona wprowadza w błąd.

Standardowe castowanie Dataframe do Dataset[T]

Na początku zobaczmy jak to powinno wyglądać według planu “standardowego”. W przykładzie będziemy się posługiwać case classą “Person”, która ma id, name i age. To do niej będziemy “równać” wszystkie Dataframy. Wykorzystamy do tego trzy DFy:

  1. perfectDF – będzie miał dokładnie takie kolumny jak powinniśmy mieć wykorzystując Person.
  2. bigDF – będzie zawierał jedną kolumnę więcej – “lastName”
  3. smallDF – będzie zawierał jedynie kolumny “id,name”. Będzie więc brakowało kolumny “age”

Dodam jeszcze, że cały kod pisany jest w Scali.

Poniżej przedstawiam uproszczone utworzenie pierwszego Dataframe’a. Reszta jest na podobnej zasadzie.

val perfectDF: Dataset[Row] = Seq(
      (1, "John", "26"),
      (2, "Anna", "28")
    ).toDF("id", "name", "age")

Czas na najważniejszą rzecz: oficjalnym i najbardziej popularnym sposobem na zamianę DF w DS[T] jest wywołanie df.as[T]. Aby to zrobić, musimy wcześniej zaimportować implicit._. Można ewentualnie zrobić także df.as[T](Encoders.product[T]).

Po takim “myku” spodziewamy się, że otrzymamy zmienną z typem Dataset[T]. W naszym przypadku, po zastosowaniu perfectDF.as[Person] faktycznie tak się dzieje. Wywołujemy więc “show()” aby sprawdzić jak wyglądają nasze dane i… wszystko gra. Mamy więc Dataset o jasnej schemie, gdzie możemy odwoływać się do konkretnych pól.

Problem zaczyna pojawiać się w sytuacji, gdy nasz Dataframe nie jest idealnie wpasowany do schematu klasy T. Przykładowo – zmienna bigDF to Dataframe, który zwiera kolumny “id”, “name”, “lastName”, “age”. Po wywołaniu metody bigDF.as[Person] możemy się spodziewać, że kolumna zostanie obcięta. Wszak w wyniku operacji dostajemy Dataset[Person], a klasa Person nie zawiera pola “lasName”. Niestety, jest zupełnie inaczej.

val bigDF: Dataset[Row] = Seq(
      (1, "John", "Smith", "27"),
      (3, "Anna", "Smith", "1")
    ).toDF("id", "name", "lastname", "age")

val bigDFAsPerson: Dataset[Person] = bigDF.as[Person]
bigDFAsPerson.show()
bigDFAsPerson.printSchema()

Efekt – w postaci schemy oraz wyglądu tabelki – można obserwować poniżej:

+---+----+--------+---+
| id|name|lastname|age|
+---+----+--------+---+
|  1|John|   Smith| 27|
|  3|Anna|   Smith|  1|
+---+----+--------+---+

root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- age: string (nullable = true)

Sprawa ma się inaczej, gdy spróbujemy dokonać konwersji do klasy z Dataframe, który ma zbyt mało kolumn:

val smallDF: Dataset[Row] = Seq(
  (1, "maro"),
  (3, "ignacy")
).toDF("id", "name")

val smallDFAsPerson: Dataset[Person] = smallDF.as[Person]

smallDFAsPerson.show()
smallDF.printSchema()

Efekt: ponieważ nie ma jednej z kolumn, Spark rzuci wyjątek. A konkretniej AnalysisException.

21/08/30 21:25:11 INFO CodeGenerator: Code generated in 25.8197 ms
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`age`' given input columns: [id, name];
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:110)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:107)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)  
.
.
.
    at App$.main(App.scala:44)
    at App.main(App.scala)

 

Sprawdzony sposób na zamianę Dataframe do Dataset[T]

Na szczęście – jest możliwość, aby to naprawić. Aby nie przedłużać powiem tylko, że należy po prostu napisać swój własny kod, dzięki któremu rozszerzymy nieco Dataframe. Najpierw jednak rozpiszmy sobie jakie chcemy spełnić warunki.

Po konwersji z Dataframe, nowo utworzony Dataset:

  1. Nie będzie zawierał niepotrzebnych kolumn.
  2. Będzie miał nowo utworzone kolumny. Będą one wypełnione nullami.

Aby to zrobić, utwórz object SparkExtensions, a następnie dodaj kod:

import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row}

object SparkExtensions {
  implicit class ExtendedDataFrame(df: DataFrame) {
    def to[T <: Product: TypeTag]: Dataset[T] = {
      import df.sparkSession.implicits._
      import org.apache.spark.sql.functions.col
      val columnsUniqueInT = Encoders.product[T].schema.diff(df.schema)
      val withAdditionalColumns: Dataset[Row] = columnsUniqueInT .foldLeft(df)((previousDF, column) =>
        previousDF.withColumn(column.name,
          lit(null).cast(column.dataType)))
      withAdditionalColumns.select(Encoders.product[T]
        .schema
        .map(f => col(f.name)): _*)
        .as[T]
    }
  }
}

Co tu się dzieje? Przede wszystkim trzy rzeczy:

  1.  Znajdujemy unikalne kolumny – robimy to poprzez różnicę tego co jest w naszej case class oraz w schemacie dataframe.
  2. Tworzymy Dataframe z dodatkowymi kolumnami, które wypełniamy nullami i castujemy do odpowiednich typów
  3. Tworzymy Dataframe (na bazie [2]), w którym wybieramy jedynie te kolumny, które są przewidziane w klasie T.

Jak tego użyć? Musimy zrobić dwie rzeczy: zaimportować nasz object SparkExtensions (import functions.SparkExtensions._), a następnie przy Dataframe zamiast “.as[Person]” zrobić “.to[Person]”.

val bigDFToPerson: Dataset[Person] = bigDF.to[Person]
bigDFToPerson.show()
bigDFToPerson.printSchema()

Efekt:

+---+----+---+
| id|name|age|
+---+----+---+
|  1|John| 27|
|  3|Anna|  1|
+---+----+---+

root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)

 

UWAGA! Taka opcja zakłada, że wszystko musi zgadzać się w 100%.

Jak wiadomo, pole w Datasetach sparkowych składa się z 3 rzeczy:

  1. Nazwy
  2. Typu
  3. Określenia, czy jest nullable czy nie – czyli czy może przyjmować null jako wartość.

Dla tych, którzy wolą nieco bardziej liberalną wersję, przygotowałem też metodę, która pozwala okrajać nie uwzględniając trzeciej rzeczy.

def to[T <: Product: TypeTag]: Dataset[T] = {
      import df.sparkSession.implicits._
      import org.apache.spark.sql.functions.col
      val columnsFromT = Encoders.product[T].schema.map(c=> (c.name,c.dataType))
      val columnsFromDF = df.schema.map(c=> (c.name, c.dataType))
      val columnsUniqueInT = columnsFromT.diff(columnsFromDF)
      val withAdditionalColumns: Dataset[Row] = columnsUniqueInT.foldLeft(df)((previousDF, column) =>
        previousDF.withColumn(column._1,
          lit(null).cast(column._2)))
      withAdditionalColumns.select(Encoders.product[T]
        .schema
        .map(f => col(f.name)): _*)
        .as[T]
    }

Zdaję sobie sprawę, że robi się już nieco mało estetycznie, ale zapewniam – działa;-). Stosuje się dokładnie tak samo jak w przypadku poprzedniej wersji.

Liczę głęboko, że taka metoda się przyda – w projekcie w którym pracowałem, problem typów był naprawdę mocno odczuwalny. Cóż – po więcej, choć zwykle nieco bardziej “wysokopoziomowych” artykułów – zapraszam do newslettera;-). Powodzenia!

 

Loading
Najczęstsze problemy ze starym Sparkiem (1.X) – co warto wiedzieć, zaczynając projekt

Najczęstsze problemy ze starym Sparkiem (1.X) – co warto wiedzieć, zaczynając projekt

Każdy kto zmuszony został korzystać ze starej wersji Sparka (<2.0) wie jak wiele wody musi upłynąć, aby zrobić niekiedy – wydawałoby się – proste rzeczy. Liczę, że poniższe zestawienie pomoże w rozwiązaniu choć części problemów.

Spark 1.X (1.6) – czyli przygotuj się na nowe (stare?) doznania.

Właśnie dowiedziałeś się, że znalazłeś się w nowym projekcie. Oczywiście musi z Twojej strony paść pierwsze, zapobiegawcze pytanie: “w jakich technologiach będę robić?”. Odpowiedź która przychodzi sprawia, że rozpływasz się niczym aktorka w reklamie czekolady: “Spark, będzie Spark…”.

Znakomicie! Szkoda tylko, że nikt nie wspomniał co to za Spark. Chodzi bowiem o starego dobrego Sparka 1.6, weterana Big Data. Zasłużony dla branży, odznaczony tysiącem orderów procesingu, choć czasem wolelibyśmy podziwiać go jedynie na starych fotografiach.

Czy jednak Spark <2.0 jest taki zły? Z mojego doświadczenia powiem dość prosto: jeśli przestawisz się i zdobędziesz zawczasu odpowiednią wiedzę, unikniesz gorzkich rozczarowań. Nie są to jednak rzeczy, które drastycznie zmieniają posługiwanie się Apache Spark i ciągle jest to solidna, przemyślana technologia do zrównoleglenia obliczeń. Jeśli pracowałeś/aś wcześniej głównie na 2.X (o “trójce” nie wspominam), to mam dobrą wiadomość: nie przeżyjesz szoku. Co najwyżej od czasu do czasu zmarszczysz czoło i sapniesz pod nosem to i owo. Na szczęście twórcy Sparka bardzo dobrze zadbali o kompatybilność wsteczną. Pracując tu, nie musimy zmagać się z koszmarami znanymi z innych technologii (pozdrawiam wszystkich programistów Scala Play – trzymajcie się tam!).

Poniższa lista jest moim (a także kilku osób które poprosiłem o radę – dzięki wielkie Panowie) subiektywnym zbiorem informacji, które spowodują, że praca w nowym projekcie ze “starym sprzętem” będzie naprawdę fajna. Mam przynajmniej taką nadzieję:-).

Subiektywna lista zmian w starym Sparku, które warto znać

Zacznijmy od spisania wszystkich rzeczy. Potem niektóre z nich (te, które nie są oczywiste zaraz po przywołaniu do tablicy;-)) opiszę nieco dokładniej.

  1. Brak możliwości wczytywania z CSV do DataFrame.
  2. Inna praca ze SparkContext / SparkSession
  3. Spark 1.X działa ze Scalą 2.10 (Spark 2.X – 2.11, Spark 3.X – 12) – oczywiście teoretycznie podział jest bardziej płynny, ale bardzo możliwe, że pracując ze Sparkiem 1.X, zmuszony/a będziesz robić to we współpracy ze starszą wersją Scali.
  4. Rozwoj Machine Learning od Sparka 2.0
  5. Spark Structured Streaming zostaje wprowadzony dopiero od Sparka 2.0.
  6. Od wersji 2.0 Dataframe staje się aliasem dla Dataset[Row], obie kolekcje mają ujednolicony interfejs.
  7. Poniżej 2.0 gdzie indziej znajdywały się encodery.
  8. Po prostu… wydajność. W codziennej, inżynierskiej robocie tego nie widzimy, ale przy procesowaniu to ma znaczenie.

Brak możliwości wczytywania z CSV do Dataframe

Jak wczytać dane z CSV? To proste!

val sampleDF = spark.read
      .option("header","true")
      .csv("sample_data.csv")

No jednak… nie. To jest proste, jeśli korzystasz ze Sparka >=2.0. Niestety, w “jedynkach” sprawa miała się nieco gorzej. Nie dysponujemy po prostu odpowiednią biblioteką, która dałaby nam dostęp do źródła typu csv. Na szczęście łatwo to naprawić.

Przede wszystkim – otwieramy nasz build.sbt (jeśli używamy sbt, inaczej maven itd.) i dodajemy odpowiednią bibliotekę:

libraryDependencies ++= Seq(
  ...
  "com.databricks" % "spark-csv_2.10" % "1.5.0",
  ...
)

Następnie, już podczas wczytywania danych:

sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .load(sample_data.csv)

I już:-). Zauważ tylko, że wykorzystujemy tu sqlContext, a nie sparkSession.

Inna praca ze SparkContext / SparkSession

A właściwie to… po prostu brak SparkSession. Został dodany od Sparka 2.0, więc jeśli używasz nieco bardziej leciwych wersji – zapomnij, że coś takiego istnieje. Zamiast tego musisz nauczyć się żonglować SparkContext oraz SQLContext.

Jak to wygląda w praktyce? Na przykład tak:

val conf = new SparkConf().setAppName("RDFSparkAppName")

val sc: SparkContext = new SparkContext(conf)
val sqlContext: SQLContext = new org.apache.spark.sql.SQLContext(sc)

// UDFs REGISTERING
val pseudonymisationUDF: PseudonymisationUDF = new PseudonymisationUDF
sqlContext.udf.register("pseudonymisationUDF", pseudonymisationUDF, DataTypes.StringType)

// DATAFRAME CREATING val sampleDF: DataFrame = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .load("sample_data.csv")
// RDD CREATING val sampleRDD: RDD[Array[Byte]] = sc.parallelize(ids) .map(i=> Bytes.toBytes(i))

W większości będzie trzeba operować na sqlContext, ale jak widać zdarzają się sytuacje, gdy trzeba użyć także sparkContext. Warto wiedzieć jednak, że sparkContext da się wyciągnąć z sqlContext.

Rozwój Machine Learning od Spark 2.0

I tu istotna zmiana: popularny moduł Spark MlLib, oparty o RDD, od sparka 2.0 przechodzi w stan utrzymania (maintanance). Choć w dokumentacji sparka 2.X możemy przeczytać, że prawdopodobnie od Sparka 3 MlLib będzie wyłączony – tak się nie stało. Ciągle możemy go używać, jednak nie jest już rozwijany.

W zamian jednak, od Sparka 2.0 mamy do dyspozycji Spark ML. To moduł oparty o Dataframe. Co ciekawe – Spark Ml nie jest oficjalną nazwą, natomiast jest to pakiet, w którym znajdziemy interesujące nas klasy.

Zmiany w Dataset i Dataframe

Oczywiście nie można nie wspomnieć o najważniejszej zmianie. Jeśli korzystasz ze Sparka <2, najprawdopodobniej będziesz używać RDD. Jest jednak opcja, żeby pracowć z Dataframe. Należy jednak pamiętać, że największa magia optymalizacyjna zaczyna się w tym temacie dopiero od Spark 2.0. Od tego momentu Dataframe staje się aliasem dla Dataset[Row], zaś Dataset i Dataframe mają ujednolicony interfejs.

Encodery w innym miejscu w Sparku 1.X

Encodery wykorzystujemy w Sparku aby przekonwertować DataFrame a Dataset. Robimy to za pomocą metody “as[T]”, gdzie “T” to konkretna case class. Co prawda nie jest to najlepszy sposób na otrzymanie silnego, klarownego Dataset, ale jest oficjalny, dlatego go tutaj przedstawiam.

Spark 1.X

case class Person(id: Int, name: String)

val encoder = org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Person]

// creating some DF
val someDF = ...

val dataset = someDF.as(encoder)

Spark >= 2.X

case class Person(id: Int, name: String)
val encoder = org.apache.spark.sql.Encoders.product[Person]

// creating some DF val someDF = ...

val dataset = someDF.as(encoder)

Wydajność

Pisząc kod oczywiście nie mamy styczności z wydajnością. Nie zmienia to jednak postaci rzeczy, że podczas wykonywania joba to właśnie ona ma kluczowe znaczenie. I tutaj po prostu warto pamiętać, że pierwsze Sparki były znacznie, znacznie wolniejsze i posiadały mniejsze możliwości.

W tym eksperymencie porównywano Sparka 1.6 oraz 2.0. Sumowano miliard liczb, dokonywano joina oraz sprawdzano groupBy. Największe wrażenie robią dwa pierwsze testy – wyniki w tabelce.

 

Primitive

Spark 1.6

Spark 2.0

Sum

15.6
seconds

0.68
seconds

Join

58.12  seconds

0.98
seconds

Warto oczywiście pamiętać, że taki eksperyment niesie szereg wątpliwości (np. kwestia nodów). Nie zmienia to jednak postaci rzeczy, że między sparkiem 1.6 a 2.0 jest spora różnica. Wynika ona z optymalizacji, jakie zostały przeprowadzone pod spodem Datasetów oraz Dataframów. Co warto wiedzieć – między ostatnią wersją 2.X oraz 3.0 także wprowadzono optymalizacje, dzięki którym możliwości Sparka stały się jeszcze większe. Twórcy chwalą się nawet, że 3 jest nawet 17x szybszy, co zostało wykazane w teście TPCDS.

Podsumowanie

Jeśli już siedzisz w projekcie ze Sparkiem 1.X – nie masz wyjścia, trzeba robić. Pamiętaj, że nie jest taki zły, a pod kątem inżynierskim jest nawet dość mocno podobny. Bez obaw, po prostu postaraj się pamiętać o tym jakie rzeczy mogą Cię zaskoczyć i… rób swoje. Powodzenia!

A – właśnie. Jeśli jesteś zainteresowany/a tego typu rzeczami, zapisz się na newsletter RDF. Wspólnie tworzymy polską społeczność Big Data. Liczę, że od dzisiaj także z Tobą;-).

 

Loading