Każdy kto zmuszony został korzystać ze starej wersji Sparka (<2.0) wie jak wiele wody musi upłynąć, aby zrobić niekiedy – wydawałoby się – proste rzeczy. Liczę, że poniższe zestawienie pomoże w rozwiązaniu choć części problemów.
Spark 1.X (1.6) – czyli przygotuj się na nowe (stare?) doznania.
Właśnie dowiedziałeś się, że znalazłeś się w nowym projekcie. Oczywiście musi z Twojej strony paść pierwsze, zapobiegawcze pytanie: “w jakich technologiach będę robić?”. Odpowiedź która przychodzi sprawia, że rozpływasz się niczym aktorka w reklamie czekolady: “Spark, będzie Spark…”.
Znakomicie! Szkoda tylko, że nikt nie wspomniał co to za Spark. Chodzi bowiem o starego dobrego Sparka 1.6, weterana Big Data. Zasłużony dla branży, odznaczony tysiącem orderów procesingu, choć czasem wolelibyśmy podziwiać go jedynie na starych fotografiach.
Czy jednak Spark <2.0 jest taki zły? Z mojego doświadczenia powiem dość prosto: jeśli przestawisz się i zdobędziesz zawczasu odpowiednią wiedzę, unikniesz gorzkich rozczarowań. Nie są to jednak rzeczy, które drastycznie zmieniają posługiwanie się Apache Spark i ciągle jest to solidna, przemyślana technologia do zrównoleglenia obliczeń. Jeśli pracowałeś/aś wcześniej głównie na 2.X (o “trójce” nie wspominam), to mam dobrą wiadomość: nie przeżyjesz szoku. Co najwyżej od czasu do czasu zmarszczysz czoło i sapniesz pod nosem to i owo. Na szczęście twórcy Sparka bardzo dobrze zadbali o kompatybilność wsteczną. Pracując tu, nie musimy zmagać się z koszmarami znanymi z innych technologii (pozdrawiam wszystkich programistów Scala Play – trzymajcie się tam!).
Poniższa lista jest moim (a także kilku osób które poprosiłem o radę – dzięki wielkie Panowie) subiektywnym zbiorem informacji, które spowodują, że praca w nowym projekcie ze “starym sprzętem” będzie naprawdę fajna. Mam przynajmniej taką nadzieję:-).
Subiektywna lista zmian w starym Sparku, które warto znać
Zacznijmy od spisania wszystkich rzeczy. Potem niektóre z nich (te, które nie są oczywiste zaraz po przywołaniu do tablicy;-)) opiszę nieco dokładniej.
- Brak możliwości wczytywania z CSV do DataFrame.
- Inna praca ze SparkContext / SparkSession
- Spark 1.X działa ze Scalą 2.10 (Spark 2.X – 2.11, Spark 3.X – 12) – oczywiście teoretycznie podział jest bardziej płynny, ale bardzo możliwe, że pracując ze Sparkiem 1.X, zmuszony/a będziesz robić to we współpracy ze starszą wersją Scali.
- Rozwoj Machine Learning od Sparka 2.0
- Spark Structured Streaming zostaje wprowadzony dopiero od Sparka 2.0.
- Od wersji 2.0 Dataframe staje się aliasem dla Dataset[Row], obie kolekcje mają ujednolicony interfejs.
- Poniżej 2.0 gdzie indziej znajdywały się encodery.
- Po prostu… wydajność. W codziennej, inżynierskiej robocie tego nie widzimy, ale przy procesowaniu to ma znaczenie.
Brak możliwości wczytywania z CSV do Dataframe
Jak wczytać dane z CSV? To proste!
val sampleDF = spark.read .option("header","true") .csv("sample_data.csv")
No jednak… nie. To jest proste, jeśli korzystasz ze Sparka >=2.0. Niestety, w “jedynkach” sprawa miała się nieco gorzej. Nie dysponujemy po prostu odpowiednią biblioteką, która dałaby nam dostęp do źródła typu csv. Na szczęście łatwo to naprawić.
Przede wszystkim – otwieramy nasz build.sbt (jeśli używamy sbt, inaczej maven itd.) i dodajemy odpowiednią bibliotekę:
libraryDependencies ++= Seq( ... "com.databricks" % "spark-csv_2.10" % "1.5.0", ... )
Następnie, już podczas wczytywania danych:
sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .load(sample_data.csv)
I już:-). Zauważ tylko, że wykorzystujemy tu sqlContext, a nie sparkSession.
Inna praca ze SparkContext / SparkSession
A właściwie to… po prostu brak SparkSession. Został dodany od Sparka 2.0, więc jeśli używasz nieco bardziej leciwych wersji – zapomnij, że coś takiego istnieje. Zamiast tego musisz nauczyć się żonglować SparkContext oraz SQLContext.
Jak to wygląda w praktyce? Na przykład tak:
val conf = new SparkConf().setAppName("RDFSparkAppName") val sc: SparkContext = new SparkContext(conf) val sqlContext: SQLContext = new org.apache.spark.sql.SQLContext(sc) // UDFs REGISTERING val pseudonymisationUDF: PseudonymisationUDF = new PseudonymisationUDF sqlContext.udf.register("pseudonymisationUDF", pseudonymisationUDF, DataTypes.StringType)
// DATAFRAME CREATING val sampleDF: DataFrame = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .load("sample_data.csv")
// RDD CREATING val sampleRDD: RDD[Array[Byte]] = sc.parallelize(ids) .map(i=> Bytes.toBytes(i))
W większości będzie trzeba operować na sqlContext, ale jak widać zdarzają się sytuacje, gdy trzeba użyć także sparkContext. Warto wiedzieć jednak, że sparkContext da się wyciągnąć z sqlContext.
Rozwój Machine Learning od Spark 2.0
I tu istotna zmiana: popularny moduł Spark MlLib, oparty o RDD, od sparka 2.0 przechodzi w stan utrzymania (maintanance). Choć w dokumentacji sparka 2.X możemy przeczytać, że prawdopodobnie od Sparka 3 MlLib będzie wyłączony – tak się nie stało. Ciągle możemy go używać, jednak nie jest już rozwijany.
W zamian jednak, od Sparka 2.0 mamy do dyspozycji Spark ML. To moduł oparty o Dataframe. Co ciekawe – Spark Ml nie jest oficjalną nazwą, natomiast jest to pakiet, w którym znajdziemy interesujące nas klasy.
Zmiany w Dataset i Dataframe
Oczywiście nie można nie wspomnieć o najważniejszej zmianie. Jeśli korzystasz ze Sparka <2, najprawdopodobniej będziesz używać RDD. Jest jednak opcja, żeby pracowć z Dataframe. Należy jednak pamiętać, że największa magia optymalizacyjna zaczyna się w tym temacie dopiero od Spark 2.0. Od tego momentu Dataframe staje się aliasem dla Dataset[Row], zaś Dataset i Dataframe mają ujednolicony interfejs.
Encodery w innym miejscu w Sparku 1.X
Encodery wykorzystujemy w Sparku aby przekonwertować DataFrame a Dataset. Robimy to za pomocą metody “as[T]”, gdzie “T” to konkretna case class. Co prawda nie jest to najlepszy sposób na otrzymanie silnego, klarownego Dataset, ale jest oficjalny, dlatego go tutaj przedstawiam.
Spark 1.X
case class Person(id: Int, name: String) val encoder = org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Person] // creating some DF val someDF = ... val dataset = someDF.as(encoder)
Spark >= 2.X
case class Person(id: Int, name: String) val encoder = org.apache.spark.sql.Encoders.product[Person] // creating some DF val someDF = ... val dataset = someDF.as(encoder)
Wydajność
Pisząc kod oczywiście nie mamy styczności z wydajnością. Nie zmienia to jednak postaci rzeczy, że podczas wykonywania joba to właśnie ona ma kluczowe znaczenie. I tutaj po prostu warto pamiętać, że pierwsze Sparki były znacznie, znacznie wolniejsze i posiadały mniejsze możliwości.
W tym eksperymencie porównywano Sparka 1.6 oraz 2.0. Sumowano miliard liczb, dokonywano joina oraz sprawdzano groupBy. Największe wrażenie robią dwa pierwsze testy – wyniki w tabelce.
Primitive |
Spark 1.6 |
Spark 2.0 |
Sum |
15.6 |
0.68 |
Join |
58.12 seconds |
0.98 |
Warto oczywiście pamiętać, że taki eksperyment niesie szereg wątpliwości (np. kwestia nodów). Nie zmienia to jednak postaci rzeczy, że między sparkiem 1.6 a 2.0 jest spora różnica. Wynika ona z optymalizacji, jakie zostały przeprowadzone pod spodem Datasetów oraz Dataframów. Co warto wiedzieć – między ostatnią wersją 2.X oraz 3.0 także wprowadzono optymalizacje, dzięki którym możliwości Sparka stały się jeszcze większe. Twórcy chwalą się nawet, że 3 jest nawet 17x szybszy, co zostało wykazane w teście TPCDS.
Podsumowanie
Jeśli już siedzisz w projekcie ze Sparkiem 1.X – nie masz wyjścia, trzeba robić. Pamiętaj, że nie jest taki zły, a pod kątem inżynierskim jest nawet dość mocno podobny. Bez obaw, po prostu postaraj się pamiętać o tym jakie rzeczy mogą Cię zaskoczyć i… rób swoje. Powodzenia!
A – właśnie. Jeśli jesteś zainteresowany/a tego typu rzeczami, zapisz się na newsletter RDF. Wspólnie tworzymy polską społeczność Big Data. Liczę, że od dzisiaj także z Tobą;-).