Przygotowywałem się ostatnio do kolejnego szkolenia ze Sparka. Chciałem dla swoich kursantów przygotować solidną rozpiskę tego co i jak robić w spark submit. Niemałe było moje zdziwienie, gdy dokumentacja ani stackoverflow nie odpowiedziały poważnie na moje prośby i błagania. Cóż… czas zatem samodzielnie połatać i przedstawić od podstaw kwestię spark submit – czyli tego jak uruchamiać nasze aplikacje (joby) sparkowe na serwerze. Zapraszam!
Spark Submit – podstawy
Zacznijmy od podstaw. Kiedy piszemy naszą aplikację Sparkową, robimy to w swoim IDE. Przychodzi jednak moment, w którym musimy wysłać ją na serwer. Pytanie – jak uruchamiać poprawnie joba sparkowego na klastrze, który zawiera wiele nodów?
Służy temu polecenie spark submit. Uruchamiając je musimy mieć jedynie dostęp do naszego pliku jar lub pliku pythonowego. Wszystko poniżej będę robił na plikach jar. Musimy to polecenie wykonywać oczywiście na naszym klastrze, gdzie zainstalowane są biblioteki sparkowe. Dobrze, żebyśmy mieli także dostęp do jakiegoś cluster managera – np. YARNa.
W ramach spark submita musimy określić kilka podstawowych rzeczy:
master – czyli gdzie znajduje się master. W rzeczywistości wybieramy tutaj także nasz cluster manager (np. YARN).
deploy-mode – client lub cluster. W jakim trybie chcemy uruchomić nasz job.
class – od jakiej klasy chcemy wystartować
plik wykonywalny – czyli np. jar.
To są podstawy które muszą być. To jednak dopiero początek prawdziwej zabawy z ustawieniami joba w spark submit. W samym środku możemy wskazać znacznie więcej ustawien, dzięki którym nadamy odpowiedni kształt naszej aplikacji (jak choćby kwestia pamięcie w driverze czy w executorach). Mały hint: chociaż można za każdym razem wpisywać wszystko od początku, polecam zapisać spark submit w jednym pliku, (np. run.sh) a następnie uruchamiać ten konkretny plik. To pozwoli nie tylko archiwizować, ale przede wszystkim wygodnie korzystać z bardzo długiego polecenia.
Poniżej pokazuję przykład spark submit do zastosowania.
Tak jak napisałem, opcji w spark submit jest naprawdę, naprawdę wiele. Każda z nich znaczy co innego (choć występuje dużo podobieństw) i poniżej zbieram je wszystkie (lub większość) “do kupy”. Robię to m.in. dlatego, że chcąc znaleźć znaczenie wielu z nich, musiałem nurkować w różnych źródłach. Zatem nie dziękuj – korzystaj;-).
Poniżej najczęstsze ustawinia dla spark submit.
–executor-cores
Liczba corów przydzielonych do każdego executora. UWAGA! Opcja dostępna tylko dla Spark Standalone oraz YARN
–driver-cores
Liczba corów dla drivera. Dostępne tylko w trybie cluster i tylko dla Spark Standalone oraz YARN
–files
Pliki które dołączamy do sparka z lokalnego systemu. Potem pliki te są przesłane każdemu worker nodowi, dzieki czemu możemy je wykorzystać w kodzie.
–verbose
Pokazuje ustawienia przy starcie joba.
–conf
ustawienia konfiguracyjne (więcej o tym w rozdziale poniżej)
–supervise
Jeśli podane, restartuje drivera w przypadku awarii. UWAGA! Dostępne tylko dla Spark Standalone oraz Mesos.
–queue
Określenie YARNowej kolejki na której ma być startowany. UWAGA! Dostępne tylko dla YARN.
–packages
Lista oddzielonych przecinkami adresów mavena dla plików jar, które chcemy przyłączyć do drivera lub executorów. Najpierw będzie przeszukiwał lokalne repo mavena, potem centralne oraz zdalne repozytoria podane w –repositories.Format: groupId:artifactId:version.
–exclude-pachages
Lista dependencji, które mają być excludowane z listy –packages. Kolejne elementy oddzielone są przecinkiem i mają nastepujący format: groupId:artifactId.
–repositories
Lista oddzielonych przecinkami repozytoriów do których uderzamy przy okazji ustawienia –packages.
Poza ustawieniami służącymi do odpowiedniego zasubmitowania joba, można przekazać także kilka kwestii konfiguracyjnych. Dodajemy je za pomocą ustawienia –conf w spark-submit. Następnie podajemy klucz-wartość. Może być podana bezpośrednio lub przy użyciu cudzysłowów, np:
Spark Submit służy nam do tego, żeby uruchamiać naszego joba na klastrze. Możemy do tego wykorzystać jakiś zewnętrzny cluster manager, lub zrobić to przy pomocy Spark Standalone. Spark Standalone to taki tryb Sparka, w którym jest on postawiony na klastrze jako realnie działający serwis (z procesami itd). Cluster managerem może być YARN ale może być to także Mesos, Kubernetes i inne.
Za każdym razem gdy chcemy użyć sparka, musimy wskazać mastera (poprzez wlaśnie ustrawienie master w spark submit). Jeśli korzystamy z YARNa, sprawa jest banalnie prosta – do ustawienia master przekazujemy po prostu yarn. Reszta jest zrobiona.
Rzecz wygląda ciut trudniej, gdy chcemy wskazać za pomocą innych cluster managerów.
W niektórych dystrybucjach należy dodać także parę rzeczy, jednak są to już dodatkowe operacje. Jeśli będzie chęć – z przyjemnościa pochylę się nad bardziej szczeółowym porównaniem wyżej wymienionych cluster managerów;-).
Głodny Sparka?
Mam nadzieję, że rozwiązałem Twój problem ze zrozumieniem Spark Submit oraz sensu poszczególnych właściwości. Jeśli jesteś wyjątkowo głodny/a Sparka – daj znać szefowi. Przekonaj go, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy dużo i intensywnie na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!
A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!
Nie ma znaczenia czy dopiero zaczynasz swoją przygodę z Big Data, czy masz już doświadczenie, czy jesteś inżynierem, czy patrzysz na branżę stricte biznesowo. W każdym przypadku przyjdzie taki moment, w którym poczujesz się zagubiony mnogością technologii oraz tym jak bardzo są “niedookreślone”. W tym materiale postaram się wprowadzić względny porządek i przeprowadzić Cię suchą stopą przez bagno technologii obsługujących duże dane. Bierz zatem kubek mocnej jak otchłań Data Lake’a kawy – i zaczynamy!
Metodologia
Małe zastrzeżenie…
Zacznijmy od bardzo podstawowego zastrzeżenia: to co tu zaproponuję, może być bardzo łatwo podważone. Co więcej – to co tu pokażę, będzie z całą pewnością niepełne. BA! Niepełne? Dobre sobie… to będzie zaledwie muśnięcie Big Datowej rzeczywistości. Wszystko to wynika z faktu, że branża przyjęła już naprawdę pokaźne rozmiary i wprowadzenie poważnej metodologii porządkującej pojęcia oraz technologie, wymagałoby pracy dyplomowej. Być może nawet doktoratu. Czemu? Cóż – tego jest po prostu najzwyczajniej w świecie tak dużo i odpowiadają na tak wiele potrzeb, że łączenie technologii staje się prawdziwą sztuką.
Zatem powiedzmy sobie: ten artykuł jest dla Ciebie, jeśli wiesz coś niecoś o Big Data, ale wszystko zaczęło się mieszać. Zastanawiasz się nad tym które z topowych technologii służą do czego i jak można je połączyć. Z takim podejściem – zaczynamy!
Podział ze względu na przeznaczenie
Dzielić technologie można ze względu na wiele rzeczy – możemy podzielić patrząc na języki programowania, możemy podzielić patrząc czy jest to technologia chmurowa, a można poszukać pod kątem popularności. Można też – i tak zrobimy – podzielić ze względu na przeznaczenie, cel jakiemu służą.
Mój podział będzie następujący:
Storages
Bazy danych (nierelacyjne)
Full-text search
Przetwarzanie danych
Komunikacja z danymi
Schedulers
Messaging
Technologie analityczne
Jeszcze raz zaznaczę: jest to z całą pewnością obraz niepełny. Jest jeszcze trochę obszarów, które nie zostały tu wzięte pod uwagę. Ten pozwoli jednak złapać pewien punkt zaczepienia – i o to chodzi;-).
Storages
Czemu służą?: Storages (nie mam pojęcia jak przetłumaczyć to poprawnie, poza dość prostackimi “magazynami danych”) służą przechowywaniu ogromnych ilości danych w sposób możliwie prosty.
Krótki komentarz: Temat storages nie jest dobrze zdefiniowany. Niekiedy jako storage traktuje się wszystko, co przechowuje dane, a więc także bazy danych. Ja wyodrębniłem tu jednak “data storage” jako “prosty” system, który pozwala przechowywać dane w sposób mniej złożony, niż bazy. Należą więc do tego wszelkiego rodzaju rozproszone systemy plików, Data Lakes itd.
Przedstawiciele:
HDFS (Hadoop Distributed File System)
ADLS gen 2 (Azure Data Lake Storage gen 2)
Amazon S3 (na AWS)
Google Cloud Storage (na GCP)
Delta Lake
Kudu (wymienione także w Bazach Danych)
Ozone (wymienione także w Bazach Danych)
Bazy danych (noSql)
Czemu służą?: Bazy danych służą przechowywaniu ogromnych ilości danych. Różnią się jednak nieco od Storages. Ich przeznaczenie zawiera bardziej ustrukturyzowaną formę przechowywania danych, a także możliwości bardziej zaawansowanej manipulacji danymi (przeglądania, usuwania pojedynczych rekordów itd).
Krótki komentarz: W temacie baz danych mamy bardzo dużo i coraz więcej technologii, które mogą nas interesować. Niektóre z nich nieco mieszają się ze Storages. To są właśnie te płynne granice o których już wspominałem. UWAGA! Wspominam tu jedynie o stricte big datowych, nierelacyjnych bazach danych. Nie znajdziemy tu więc popularnego mysql czy postgresql. Mamy wiele rodzajów baz danych – przede wszystkim key-value store, graph db, document store.
Przedstawiciele:
HBase
Accumulo
MongoDB
Cassandra
CosmosDB (Azure)
Dynamo DB (AWS)
Google Cloud Datastore (GCP)
Kudu (wymienione także w Storages)
Ozone (wymienione także w Storages)
Neo4j
Druid
Full Text Searches
Czemu służą? Technologie full-text search (przeszukiwania pełno-tekstowego) także (znów!) odpowiadają za przechowywanie danych. Tym razem jednak przechowywanie zaprojektowane jest tak, aby dało się to potem bardzo dobrze przeszukiwać. Szczególnie mocny akcent położony jest na przeszukiwanie tekstu wraz z różnymi funkcjami wbudowanymi, tak aby nie było szczególnie trudne zbudowane wyszukiwarki zawierającej wyszukiwanie podobnych wyrazów czy uwzględnianie literówek.
Krótki komentarz: W przeciwieństwie do pozostałych obszarów, full-text searche zdają się być zdominowane przez dwie technologie. Co więcej – obie zbudowane są na tym samym silniku. Nie oznacza to jednak, że jest to jedyna oferta na rynku! Co ciekawe, full-text searche mogą stanowić także znakomity mix przydatny do analizy danych. Ciekawym przykładem jest zastosowanie Elasticsearcha w NASA (konkretniej JPL) m.in. do analizy danych przysyłanych przez łaziki.
Przedstawiciele:
Lucene – nie jest samodzielną osobną technologią, a raczej silnikiem, na którym powstały inne.
Elasticsearch
Solr
Sphinx
Przetwarzanie danych (processing)
Czemu służą? Technologie do przetwarzania danych oczywiście… przetwarzają dane;-). Oczywiście mowa tu o bardzo dużych danych. W związku z tym technologie te zwykorzystują mechanizmy zrównoleglania obliczeń. Można te technologie wykorzystywać do ogromnej ilości celów. Od strandardowego czyszczenia, przez harmonizację (sprowadzenie datasetów do wspólnej postaci pod kątem schematu), opracowywanie raportów statystycznych, aż po wykorzystywanie algorytmów sztucznej inteligencji.
Krótki komentarz: Technologie do przetwarzania danych podzielimy z grubsza na dwa rodzaje: batchowe i streamingowe. Batchowe to te, których zadaniem jest pobrać dużą paczkę danych, “przemielić je” i zwrócić wynik. Streamingowe natomiast działają w trybie ciągłym. W przeciwieństwie do pierwszego rodzaju – “nie kończą się”.
Przedstawiciele:
Spark
Spark Structured Streaming – choć zawiera się w pierwszym punkcie, zasługuje na osobne wyróżnienie.
Kafka Strams – świetnie wspólgra z Kafką. Dodatkowo cechuje się daleko posuniętą prostotą.
Flink
Storm
Map Reduce – choć obecnie nie jest już raczej implementowany w nowych systemach, to znajduje się w galerii sław i nie można o nim nie wspomnieć!
Komunikacja z danymi (interfejsy SQL-like)
Czemu służą? Technologie które mam na myśli powodują, że w prostszy sposób możemy dostać się do danych, które normalnie przechowywane są w postaci plików (lub w innej postaci, natomiast wciąż kiepskiej w kontekście pracy z danymi). Przykładem jest, gdy chcemy składować pliki na HDFS, ale zależy nam na zachowaniu możliwości pracy z tymi danymi (prostych operacji przeszukiwania, dodawania itd). Technologie te dostarczają często interfejs obsługi danych składowanych w różnych miejscach, podobny do SQL.
Przedstawiciele:
Hive
Impala
Shark
BigSQL
Schedulery
Czemu służą? Kiedy tworzymy joby, bardzo często mamy potrzebe ustawienia ich aby były uruchamiane o tej samej porze. Temu właśnie służą między innymi schedulery.
Krótki komentarz: Poza prostymi funkcjami określania kiedy jakie joby powinny zostać uruchomione, schedulery pozwalają także ustawić całą ścieżkę zależności w uruchamianiu jobów. Np. “jeśli zaciąganie danych zostanie ukończone, rozpcoznij czyszczenie, a potem harmonizację. Jeśli na którymś etapie coś pójdzie nie tak, wyślij maila z alertem”. Do tego dochodzą jeszcze możliwości (lepszego lub gorszego) monitoringu tych jobów oraz całych workflowów.
Przedstawiciele:
Oozie
Airflow
Luigi
Jenkins (częściowo)
Pinball (stworzony przez Pinterest, natomiast nie jest obecnie aktywnie przez pinterest rozwijany)
Step Functions (AWS)
Workflows (GCP)
Logic Apps (Azure)
Messaging
Czemu służą? Technologie do messagingu czy też kolejkowania, to technologie, które – nieco banalizując – są “punktem przesyłu” wielu danych. Wykorzystuje się je szczególnie często w kontekście przetwarzania streamingowego danych. Kiedy produkujemy jakieś dane, nie musimy się zastanawiać gdzie mają być dalej przetworzone. Wystarczy wykorzystać technologię kolejkowania i już. To jakie inne procesy podepną się pod ten “punkt przesyłu” to już zupełnie inna sprawa.
Krótki komentarz: Bardzo często technologie te zestawiane są z frameworkami do przetwarzania streamingowego. Wymienione zostały m.in. parę punktów wyżej (np. Spark Structured Streaming, Flink czy Kafka Streams). Warto tu dodać, że technologie tego typu są także często wykorzystywane w procesie IoT (internetu rzeczy – Internet of Things), gdy poszczególne urządzenia mogą raportować o swojej aktywności.
Przedstawiciele:
Kafka
RabbitMQ
Event Hub (Azure)
Kinesis (AWS)
Pub/Sub (GCP)
IBM MQ (IBM)
Technologie analityczne (BI – Business Intelligence)
Czemu służą? Za pomocą narzędzi analitycznych możemy tworzyć dashboardy, które pomogą nam analizować wcześniej zebrane dane.
Krótki komentarz: Warto pamiętać właśnie o takim aspekcie jak “wcześniej zebrane dane”. Nie wystarczy, że będziemy mieli aplikację analityczną. Aby w pełni wykorzystać jej potencjał, należy zawczasu pomyśleć o tym jak powinien wyglądać nasz pipeline, aby odpowiednie dane (nie za duże, nie za małe, odpowiednio ustrukturyzowane itd.) mogły zostać przez narzędzie BI zaciągnięte.
Przedstawiciele:
Apache Superset
Power BI (Azure)
Amazon QuickSite (AWS)
Google Data Studio (GCP)
Holistics
Looker
Tableau
I jak się w tym wszystkim nie zagubić?
Mam nadzieję, że tym artykułem chociaż odrobinę pomogłem w uporządkowaniu spojrzenia na świat Big Data. Mnóstwo technologii nie zostało tutaj ujętych. Jest to jednak dobry punkt startowy;-). Jeśli widzisz potrzebę, aby coś tutaj dodać lub zmienić – daj sobie swobodę napisania o tym w komentarzu:-).
W naszej podróży po Sparku musiała przyjść pora na ten przystanek. “Cache vs persist” to absolutnie podstawowe pytanie na każdej rozmowie rekrutacyjnej ze Sparka. Co ważniejsze – to naprawdę przydatne narzędzie dla każdego inżyniera big data, który posługuje się tym frameworkiem! Dzięki niemu przyspieszysz swojego joba, zmniejszysz ryzyko nieoczekiwanego fuckupu. No i najważniejsze – podczas code review pokażesz, że znasz się na rzeczy i nie jesteś tu z przypadku. Żartuję oczywiście.
Nie traćmy więc czasu i sprawdźmy co to za dobrodziejstwo, owe osławione cache i persist. Całą serię “zrozumieć sparka” poznasz tutaj.
Krótka powtórka – akcje i transformacje
Zanim przejdziemy do sedna sprawy – najpierw krótkie przypomnienie jak działa Spark – w pigułce. Aby to zrozumieć, fundamentalne są trzy rzeczy:
Spark działa na swoich własnych strukturach, “kolekcjach”. To przede wszystkim RDD, które przypominają “listy” lub “tablice” (nie w konkrecie, natomiast grunt że nie jest to kolekcja typu mapa), ale także Datasety i Dataframy (które są aliasem dla Dataset[Row]). Napisałem, że “przede wszystkim”, ponieważ pod spodem DSów i DFów leżą właśnie RDD. To na tych strukturach musimy pracować, żeby całość ładnie działała.
Na tych strukturach możemy wykonywać dwa typy operacji:
transformacje – czyli operacje, które wykonują pewne obliczenia, natomiast nie są wykonywane w momencie ich wywołania w kodzie.
akcje – operacje, które zwykle są jakimś rodzajem operacji “wyjścia”. I co istotne – w tym momencie wykonywane są wszystkie transformacje po kolei.
Takie podejście lazy evaluation zastosowane jest, aby Spark mógł odpowiednio zoptymalizować te operacje. Spisuje je w drzewku DAG (Directed Acyclic Graph – graf skierowany, acykliczny).
Tutaj pojawia się właśnie podstawowy problem – może się okazać, że akcji będziemy mieli kilka. I za każdym razem Spark będzie musiał wykonywać te same transformacje. To oczywiście powoduje, że job staje się niewydajny, a moc obliczeniowa jest niepotrzebnie przepalana.
Cache i persist w Sparku
I właśnie tutaj wkracza cachowanie. Możemy na danym zbiorze danych wywołać .cache() lub .persist() aby zachować go w pamięci. Gdyby potem pojawiło się kilka akcji, Spark będzie pobierał dane z konkretnego punktu, a nie procesował je wszystkimi transformacjami od początku.
cache vs persist
Aby to zrobić, mamy do dyspozycji dwie funkcje: cache() oraz persist(). Cóż – prawdę mówiąc jest jeszcze trzecia (.checkpoint()) ale na ten moment ją zostawimy;-). Tak więc powtórzmy – cache i persist pozwalają “zatrzymać” dataset w kodzie. Od momentu wywołania na naszych danych (RDD, Dataset lub Dataframe) cache lub persist – wywołanie akcji nie będzie skutkowało ponownym przeliczaniem transformacji, które były napisane przed tym punktem.
Pora jednak wyjaśnić sobie różnicę między cache a persist. Otóż – mówiąc prosto – persist pozwala nam ustawić poziom na jakim chcemy przechowywać dane (storage level), natomiast cache ma domyślne wywołanie. Tak naprawdę pod spodem cache wywołuje… właśnie persist!
Jaki storage level ma cache? To zależy od tego czy wywoujemy go na RDD czy Dataset/Dataframe. Jeśli RDD – domyślnie mamy MEMORY_ONLY, natomiast w przypadku Datasetów – MEMORY_AND_DISK.
Kod cache i persist dla Dataset i Dataframe
Jak wygląda wywołanie w kodzie? Przede wszystkim, jeśli wywołujemy cache() na Dataframe, to jest to najzwyklejszy w świecie alias dla wywołania funkcji persist(). I nie zmieniło się to od dawna. Poniżej kod z “bebechów” Sparka 2.4.3.
/**
* Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
*
* @group basic
* @since 1.6.0
*/
def cache(): this.type = persist()
Jak wygląda natomiast persist()? W tym przypadku mamy do czynienia z dwoma funkcjami: pierwsza to funkcja, która przyjmuje argument typu StorageLevel, natomiast druga jest funkcją bez argumentów. Wywołuje ona funkcję cacheQuery() znajdującą się w CacheManager, która dokonuje całej “magii”. Jeśli nie podamy jej Storage Level, zostanie przypisany standardowy – czyli “MEMORY_AND_DISK”.
Poniżej implementacje obu funkcji persist().
def persist(): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this)
this
}
def persist(newLevel: StorageLevel): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
this
}
Kod cache i presist dla RDD
W kontekście RDD cache() także jest aliasem dla persist(). Znów mamy także do czynienia z dwiema funkcjami persist() – jedną bezargumentową, drugą przyjmującą argument typu StorageLevel. Tym razem jednak różnica jest taka, że standardowym, domyślnym poziomem jest “MEMORY_ONLY” (nie jak w przypadku datasetów “MEMORY_AND_DISK”).
Można nawet dostrzec “niezwykle elegancki” komentarz, który pokusił się zostawić w samym środku funkcji jeden z inżynierów tworzących Sparka;-).
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
Storage levels
Z implementacji funkcji persist() wiemy, że kluczową sprawą jest Storage Level – czyli poziom na jakim chcemy przechowywać nasze dane. Oto jakie poziomy możemy wybrać:
MEMORY_ONLY – przechowuje RDD lub Datasety jako deserializowane obiekty w pamięci JVM. Jeśli nie ma wystarczająco dużo dostępnej pamięci, niektóre partycje nie zostaną zapisane i zostaną potem ponownie obliczone w razie potrzeby.
MEMORY_ONLY_2 – To samo co MEMORY_ONLY, natomiast każda partycja jest replikowana na dwa nody.
MEMORY_ONLY_SER – to samo co MEMORY_ONLY, natomiast przechowuje dane jako serializowane obiekty w pamięci JVM. Potrzebuje mniej pamięci niż [1].
MEMORY_ONLY_SER_2 – analogicznie.
MEMORY_AND_DISK – w tym przypadku dane są przechowywane jako deserializowane obiekty w pamięci JVM. W tym przypadku jednak, w przeciwieństwie do [1], jeśli nie wystarczy miejsca, nadmiarowe partycje są przechowywane na dysku. To prowadzi do mniejszej wydajności (wolniejszy proces), ponieważ angażujemy tu operacje I/O.
Analogicznie do 1-4 mamy także MEMORY_AND_DISK_SER, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2.
DISK_ONLY – w tym przypadku dane są przechowywane jedynie na dysku. Oczywiście najwolniejszy wariant.
DISK_ONLY_2 – analogicznie, tworzone są replikacje.
Zostań prawdziwym sparkowym wojownikiem
Jeśli dotarłeś do tego punktu, to prawdopodobnie jesteś naprawdę zainteresowany rozwojem w Sparku. Trzeba uczciwie powiedzieć, że to złożona, duża technologia. Jeśli chcesz zgłębić ją bardziej – przekonaj swojego szefa, żeby zapisał Ciebie i Twoich kolegów/koleżanki na szkolenie ze Sparka. Omawiamy całą budowę od podstaw, pracujemy dużo i intensywnie na ciekawych danych, a wszystko robimy w miłej, sympatycznej atmosferze;-) – Zajrzyj tutaj!
A jeśli chcesz pozostać z nami w kontakcie – zapisz się na newsletter lub obserwuj RDF na LinkedIn. Koniecznie, zrób to i razem twórzmy polską społeczność Big Data!
Inflacja po raz pierwszy (od dawna) weszła “pod strzechy” – nie jest już jedynie tematem dyskusji eksperckich. Wręcz przeciwnie – od kilku miesięcy jest bohaterką pierwszych stron gazet w całym kraju – z brukowcami włącznie. Powodem jest znaczne przyspieszenie utraty wartości waluty, co w przypadku naszej historii wzbudza szczególnie nieprzyjemne skojarzenia. Dodatkowo niektórzy zarzucają stronie rządowej, że oficjalna inflacja jest zaburzona.
Chciałbym dzisiaj zaproponować pewne rozwiązanie, które pomogłoby nam w analizie inflacji, a co za tym idzie – w odpowiedniej kontroli nad nią. Wszystko co poniżej to pewna ogólna wizja, która może posłużyć jako inspiracja. Jeśli jest chęć i zapotrzebowanie, bardzo chętnie się w tą wizję zagłębię architektonicznie i inżyniersko. Zachęcam także do kontaktu, jeśli TY jesteś osobą zainteresowaną tematem;-).
Krótka lekcja: jak liczona jest inflacja?
Czym jest inflacja?
Zanim przejdziemy do rozwiązania, zacznijmy od problemu. Czym jest inflacja i jak liczy ją GUS? Przede wszystkim najważniejsze to zrozumieć, że inflacja to spadek wartości pieniądza w czasie. Dzieje się tak w sposób praktyczny poprzez wzrost cen. Za ten sam chleb, wodę – musimy zapłacić więcej. I teraz to najważniejsze: inflacja jest inna dla każdego z nas. Każdy z nas ma bowiem inny portfel.
Jeśli zestawimy samotnego programistę oraz rodzinę wielodzietną, gdzie Tata zarabia jako architekt a Mama jako tłumaczka, ich budżety bedą zupełnie inne. Nawet jeśli zarabiają podobne kwoty, w rodzinie większy udział prawdopodobnie będzie na pieluchy, przedmioty szkolne i parę innych rzeczy. W przypadku młodego singla z solidną pensją, do tego z rozrywkowym podejściem do życia, znacznie większy procent budżetu zajmie alkohol, hotele, imprezy itd. Jeśli ceny alkoholi pójdą w górę o 40%, dla niektórych inflacja będzie nie do zniesienia, dla innych z kolei może nie zostać nawet zauważona.
Jak liczone jest CPI (inflacja konsumencka)?
Aby zaradzić tego typu problemom, GUS wylicza coś takiego jak CPI (Consumer Price Index) – indeks zmiany cen towarów i usług konsumpcyjnych. W skrócie mówimy inflacja CPI, czyli inflacja konsumencka. Warto zaznaczyć tutaj jeszcze, że zupełnie inna może być inflacja odczuwana na poziomie budżetów firm (a właściwie dla różnych firm jest także oczywiście różna).
Nie będziemy się tutaj wgryzać zbyt mocno w to jak dokładnie wylicza się inflację CPI. Skupimy się jedynie na paru najważniejszych rzeczach, które przydadzą nam się do późniejszej odpowiedzi na nasz inflacyjny problem;-). Dla zainteresowanych polecam solidniejsze omówienia:
Na nasze potrzeby powiedzmy sobie bardzo prosto, w jaki sposób GUS liczy CPI. Potrzeba do tego podstawowej rzeczy, czyli koszyka inflacyjnego. Taki koszyk to grupy towarów, które podlegają badaniu. GUS oblicza to na podstawie ankiet wysyłanych przez 30 000 osób. Już tutaj powstaje pewien problem – ankiety te mogą być wypełniane nierzetelnie.
Następnie, jeśli mamy już grupy produktów, musimy wiedzieć jak ich ceny zmieniają się w skali całego kraju. Aby to zrobić, wyposażeni w tablety ankieterzy, od 5 do 22 dnia każdego miesiąca, ruszają do akcji – a konkretnie do wytypowanych wcześniej punktów (np. sklepów spożywczych) w konkretnych rejonach. W 2019 roku badanie prowadzono w 207 rejonach w całej Polsce.
Big Data w służbie jej królew… to znaczy w służbie Rządu RP
Taka metodologia prowadzi do bardzo wielu wątpliwości. Badanie GUSu zakrojone jest na bardzo szeroką skalę. Mimo to jednak wciąż są to jedynie wybrane gospodarstwa oraz wybrane punkty sprzedaży. Chcę tutaj podkreślić, że nie podejrzewam naszych statystyków o manipulacje. Może jednak dałoby się zrobić tą samą pracę lepiej, bardziej precyzyjnie i znacznie mniejszym kosztem?
Cyfryzacja paragonowa – czyli jak Rząd przenosi nasze zakupy do baz danych?
Zanim przejdziemy dalej, powiedzmy najpierw coś, z czego być może większość z nas sobie nie zdaje sprawy. Ostatnie lata to stopniowe przechodzenie z tradycyjnych kas fiskalnych na kasy wirtualne oraz online. Na potrzeby artykułu nie będę wyjaśniał różnic. To co nas interesuje to fakt, że oba typy kas, zakupy raportują bezpośrednio do Rządu. Na ten moment objęta jest tym stanem rzeczy gastronomia, ale docelowo ma to objąć (wedle mojej wiedzy) także inne sektory posługujące się paragonami.
Jak zbudować system liczący inflację?
Przenieśmy się mentalnie do momentu, w którym każda, lub niemal każda sprzedaż jest odnotowywana przez państwo i zapisywana w tamtejszej bazie danych (najprawdopodobniej nierelacyjnej). Można to wykorzystać, aby zbudować system, który pozwoli nam liczyć inflację pozbawioną potrzeby wysyłania armii ankieterów. Co więcej – pozwoli to zrobić dokładniej oraz da nam potężne narzędzie analityczne!
Bazy danych / Storage
Przede wszystkim – zakładam, że wszystkie dane trzymane są w jakiejś nierelacyjnej bazie danych – na przykład w Apache HBase. Może to być jednak także rozproszony system plików, jsk HDFS. W takiej bazie powinny być trzymane wszystkie dane dotyczące transakcji – paragony, faktury, JPK itd. Osobną sprawą pozostają informacje dotyczące firm i inne dane, które są bardziej “ogólne” – dotykają mniejszej liczby podmiotów i nie są tak detaliczne.
Te dane, ze względu na niewielką liczbę i bardzo klarowną strukturę, można trzymać w bazie relacyjnej (np. PostgreSQL). Można jednak także jako osobną tabelę HBase, choć z przyczyn analitycznych (o których potem) znacznie lepiej będzie zrobić to w bazie relacyjnej. Można także zastosować rozwiązanie hybrydowe – wszystkie dane dotyczące firm trzymać w bazie nierelacyjnej, jako swoistym “magazynie”, natomiast pewną wyspecyfikowaną, odchudzoną esencję – w bazie relacyjnej.
Dodatkowo zakładam, że koszyk inflacyjny jest już wcześniej przygotowany. Da się ten proces uprościć poprzez informatyzację ankiet – jest to już zresztą robione (według wiedzy jaką mam). Taki koszyk można trzymać w bazie relacyjnej, ze względu na relatywnie niewielką liczbę danych (W 2021 roku zawierał on 12 grup produktów. Nawet jeśli w każdej z nich byłoby parę tysięcy produktów, liczby będą sięgać maksymalnie kilkudziesięciu tysięcy, niezbyt rozbudowanych rekordów).
W dalszej części dodam jeszcze możliwość tworzenia kolejnych koszyków i w takiej sytuacji prawdopodobnie należałoby już je wydzielić do osobnej bazy nierelacyjnej. W dalszym ciągu jednak ogólne adnotacje mogłyby pozostać w bazie relacyjnej (tak, żeby można było np. sprawnie wyciągnąć dane z HBase po rowkey, czyli id).
Jeśli zdecydujemy się na zastosowanie bazy row-key, jak HBase, uważam że i tak zaistnieje potrzeba wykorzystania HDFS (może być tak, że w HBase będzie wygodniej pierwotnie umieszczać pliki paragonowe). Będziemy tu umieszczać kolejne etapy przetworzonych danych z konkretnych okresów.
Jeszcze inną opcją jest zastosowanie Apache Kudu, który mógłby nieco zrównoważyć problemy HBase i HDFS i zastąpić oba byty w naszym systemie. Jak widać, opcji jest dużo;-)
Przygotowanie danych
Kiedy mamy już dane zebrane w przynajmniej dwóch miejscach, powinniśmy je przygotować. Same z siebie stanowią jedynie zbiór danych, głównie tekstowych. W drugim etapie należy te dane przetworzyć, oczyścić i doprowadzić do postaci, w jakiej ponownie będziemy mogli dokonać już finalnej analizy inflacji.
Finałem tego etapu powinny być dane, które będą pogrupowane tak, żebyśmy mogli je później wykorzystać. Wstępna, proponowana struktura wyglądać może następująco:
Okres badania
Grupa produktów
Punkt sprzedaży
towar
Musimy więc wyciągnąć surowe dane (z HBase), przetworzyć je, a następnie zapisać jako osobny zestaw – proponuję tu HDFS. Jak to uczynić? Możemy do tego celu wykorzystać Apache Spark oraz connector HBase Spark przygotowany przez Clouderę. Następnie dane muszą być poddane serii transformacji, dzięki którym dane:
Zostaną wydzielone jako osobne paragony
Zostaną podzielone na produkty
Poddane będą oczyszczeniu z wszelkich “śmieci” uniemożliwiających dalszą analizę
Wykryta zostanie grupa produktów dla każdego z nich
Pogrupowane zostaną po grupie produktów oraz okresie
Na końcu dane zapisujemy do HDFS. Wstępna struktura katalogów:
Dane przygotowane
Dane całościowe
okres
Tutaj umieszczamy plik *.parquet lub *.orc
Liczenie inflacji
Skoro mamy już przygotowane dane, czas policzyć inflację. Do tego celu także wykorzystamy Apache Spark, dzięki któremu możemy w zrównoleglony sposób przetwarzać dane. W najbardziej ogólnym kształcie sprawa wygląda dość prosto:
Łączymy się z bazą danych (relacyjną), w której trzymamy konkretny koszyk
Wybieramy okres za jaki chcemy policzyć inflację
Pobieramy dane z HDFS/Kudu, które okresem odpowiadają [2].
Wybieramy grupy produktów zgodne z koszykiem [1]
Przeliczamy inflację za pomocą danych, które są już solidnie przygotowane.
I teraz ważne: efekt zapisujemy do relacyjnej bazy danych.
Analiza
Czemu akurat do relacyjnej bazy danych? Odpowiedź wydaje się oczywista:
Dane będą niewielkie – choć od raz umożna powiedzieć, że z naszego procesu można wycisnąć więcej niż tylko wynik 6.8% 😉 – jest też sporo rzeczy przy okazji, takie jak jakie produkty wzrosły najmocniej, w jakich regionach, co ma największą zmiennośći itd.
Dane będą solidnie ustrukturyzowane
Dane umieszczone w relacyjnych bazach pozwalają na znacznie lepszą i prostszą analizę.
I właśnie ten trzeci punkt powinien nas zainteresować najmocniej. Można bowiem na klastrze zainstalować jakieś narzędzie BI, spiąć z bazą i… udostępnić analitykom. Takim narzędziem może być (znów open sourcowy) Apache Superset. Przynajmniej na dobry początek. W drugim rzucie należałoby się pokusić o zbudowanie dedykowanej aplikacji analitycznej. To jednak można zostawić na później. Na etap, w którym analitycy będą już zaznajomieni z systemem i będą mogli włączyć się w czynny proces budowy nowego narzędzia.
Rozwój
Wyżej opisałem podstawowy kształt systemu do badania inflacji. Warto jednak nie zatrzymywać się na tym i pomyśleć jak można tą analizę wynieść na wyższy poziom. Podstawową prawdą na temat inflacji jest to, że każdy ma swoją, więc nie da się dokładnie obliczać jak pieniądz traci na wartości. Cóż… dlaczego nie możnaby tego zmienić? Wszak mając do dyspozycji WSZYSTKIE (oczywiście upraszczając) dane, można zrobić “nieskończenie wiele” koszyków inflacyjnych.
Czym mogłoby być te koszyki inflacyjne? Kilka propozycji.
Dlaczego nie wykorzystać rozwoju technologii Big Datowych do podnoszenia świadomości finansowej oraz obywatelskiej Polaków? Niech każdy będzie mógł na specjalnym portalu wyklikać swój własny koszyk i regularnie otrzymywać powiadomienia dotyczące swojej własnej inflacji. Takie podejście byłoby bardzo nowatorskie i z pewnością wybilibyśmy się na tle innych państw.
Koszyki mogą powstawać dla różnych grup społecznych. Dzięki temu można będzie dokładniej badać przyczyny rozwarstwienia społecznego, niż jedynie osławiony współczynnik Giniego, czy także inne, powiedzmy sobie szczerze – skromne narzędzia, na podstawie których wyciągane są bardzo mocne dane.
Koszyki dla firm – oczywistym jest, że firmy mają znacząco różny koszyk od ludzi. Jest oczywiście inflacja przemysłowa (PPI), natomiast dotyczy ona produkcji przemysłowej (i to w dośćwąskim zakresie). Dzięki wyborowi produktów w “naszym systemie” można będzie obliczyć także jak bardzo wartość pieniądza spada dla różnych rodzajów firm.
Potencjalne korzyści
Powyżej opisałem przykładowy system, który pozwoliłby nam wynieść analizę inflacji na zupełnie inny poziom. Poniżej chciałbym zebrać w jedno miejsce kilka najważniejszych korzyści, jakie niosłyby takie zmiany:
Mniejsze koszty – cykliczne uruchamianie jobów mających na celu sprawdzenie inflacji to koszt znacznie mniejszy, niż utrzymywanie armii ankieterów.
Dokładniejsza inflacja – precyzja liczenia inflacji weszłaby na zupełnie inny poziom. Oczywiście na początku należałoby przez kilka lat liczyć w obu systemach, aby sprawdzić jak bardzo duże są różnice.
Różne modele inflacji – a więc koszyki o których pisałem powyżej, które spowodują, że przestanie być prawdziwa teza o tym, że “liczenie prawdziwej inflacji nie jest możliwe”.
Regionalizacja inflacji – Inflacja inflacji nie jest równa. Zupełnie inaczej ceny mogą się kształtować w różnych województwach. Również i to mógłby liczyć “nasz system”.
Większe możliwości analityczne – stopy nie są jedynym narzędziem, który można użyć w walce z inflacją. Ekonomiści wskazują, że poza wysokością stóp procentowych także inne czynniki wpływają na inflację. Są to m.in. skala dodruku pieniądza, rozwój świadczeń socjalnych, regulacje gospodarcze czy wysokość podatków pośrednich. Dzięki Big Datowemu systemowi, Rząd zyskałby znacznie większe możliwości analityczne do śledzenia wpływu swoich zmian na gospodarkę.
Wyższe morale i poczucie, że państwo gra “w pierwszej lidze”– unowocześnia swoje działanie na poziom niespotykany w innych krajach.
Potencjalne zagrożenia
Rozwój zaawansowanych systemów to oczywiście także zagrożenia. O tych najważniejszych poniżej:
Możliwości analityczne muszą wiązać się z większą inwigilacją – i jest to chyba największy problem. Im większe możliwości analizy chcemy sobie zafundować, tym głębiej trzeba zinfiltrować życie obywateli. Oczywiście przed infiltracją współcześnie uciec się nie da, ale należy zawsze mieć na uwadze mądre wyważanie.
Koszt utrzymania systemu – To system zbierania bardzo dużych danych i analizy ich. Wymagać będzie zaawansowanych klastrów obliczeniowych oraz odpowiedniego zespołu administracyjnego. Na pensjach – dodajmy – zdecydowanie rynkowych, nie urzędniczych.
Kwestia bezpieczeństwa – czasami zapominamy, że informatyzacja państwa to problem w bezpieczeństwie danych. Jeśli jesnak dane dotyczące zakupów i tak miałyby być zbierane – czemu ich nie wykorzystać?
Big Data to nasza wielka szansa – także w sektorze rządowym
Jesteśmy Państwem, które w wielu miejscach wiecznie próbuje nadgonić resztę (choć w wielu także tą resztę przegoniło). Big Data może pozwolić na działać lepiej, szybciej, precyzyjniej i… taniej. Wykorzystajmy tą szansę. Za nami poglądowy pomysł na to jak zbudować jeden z takich systemów, które mogłyby pomóc nam budować nowoczesne państwo. Jeśli chcesz dowiadywać się o innych pomysłach, które ukazują Big Data w odpowiednim kontekście, zapraszam na nasz profil LinkedIn oraz do zapisania się na newsletter RDF. Do zobaczenia na szlaku BD!;-)