Największy problem Dataframe w Sparku? Oczywiście – brak jasności, jaka schema jest w aktualnym DFie. W związku z tym często wykorzystuje się Dataset[T], gdzie T to konkretna Case Class. Dzięki temu, jeśli wywołujemy metodę, która zwraca Dataset[T], możemy być przekonani, że wiemy jakim typem operujemy, jaką nasz obiekt ma schemę.
Niestety, tak dobrze jest tylko w teorii. W praktyce Spark pozwala wyjść poza ramy schematu i dodać lub odjąć kolumny, które nie istnieją w case class T. Jest to bardzo mylące i osoby które o tym niewiedzą mogą przeżyć niezłą eksplozję mózgu. Pytanie – czy da się coś z tym zrobić? Jak najbardziej. Chociaż nie oficjalną ścieżką, bo to właśnie ona wprowadza w błąd.
Standardowe castowanie Dataframe do Dataset[T]
Na początku zobaczmy jak to powinno wyglądać według planu “standardowego”. W przykładzie będziemy się posługiwać case classą “Person”, która ma id, name i age. To do niej będziemy “równać” wszystkie Dataframy. Wykorzystamy do tego trzy DFy:
- perfectDF – będzie miał dokładnie takie kolumny jak powinniśmy mieć wykorzystując Person.
- bigDF – będzie zawierał jedną kolumnę więcej – “lastName”
- smallDF – będzie zawierał jedynie kolumny “id,name”. Będzie więc brakowało kolumny “age”
Dodam jeszcze, że cały kod pisany jest w Scali.
Poniżej przedstawiam uproszczone utworzenie pierwszego Dataframe’a. Reszta jest na podobnej zasadzie.
val perfectDF: Dataset[Row] = Seq( (1, "John", "26"), (2, "Anna", "28") ).toDF("id", "name", "age")
Czas na najważniejszą rzecz: oficjalnym i najbardziej popularnym sposobem na zamianę DF w DS[T] jest wywołanie df.as[T]. Aby to zrobić, musimy wcześniej zaimportować implicit._. Można ewentualnie zrobić także df.as[T](Encoders.product[T]).
Po takim “myku” spodziewamy się, że otrzymamy zmienną z typem Dataset[T]. W naszym przypadku, po zastosowaniu perfectDF.as[Person] faktycznie tak się dzieje. Wywołujemy więc “show()” aby sprawdzić jak wyglądają nasze dane i… wszystko gra. Mamy więc Dataset o jasnej schemie, gdzie możemy odwoływać się do konkretnych pól.
Problem zaczyna pojawiać się w sytuacji, gdy nasz Dataframe nie jest idealnie wpasowany do schematu klasy T. Przykładowo – zmienna bigDF to Dataframe, który zwiera kolumny “id”, “name”, “lastName”, “age”. Po wywołaniu metody bigDF.as[Person] możemy się spodziewać, że kolumna zostanie obcięta. Wszak w wyniku operacji dostajemy Dataset[Person], a klasa Person nie zawiera pola “lasName”. Niestety, jest zupełnie inaczej.
val bigDF: Dataset[Row] = Seq( (1, "John", "Smith", "27"), (3, "Anna", "Smith", "1") ).toDF("id", "name", "lastname", "age") val bigDFAsPerson: Dataset[Person] = bigDF.as[Person] bigDFAsPerson.show() bigDFAsPerson.printSchema()
Efekt – w postaci schemy oraz wyglądu tabelki – można obserwować poniżej:
+---+----+--------+---+ | id|name|lastname|age| +---+----+--------+---+ | 1|John| Smith| 27| | 3|Anna| Smith| 1| +---+----+--------+---+ root |-- id: integer (nullable = false) |-- name: string (nullable = true) |-- lastname: string (nullable = true) |-- age: string (nullable = true)
Sprawa ma się inaczej, gdy spróbujemy dokonać konwersji do klasy z Dataframe, który ma zbyt mało kolumn:
val smallDF: Dataset[Row] = Seq( (1, "maro"), (3, "ignacy") ).toDF("id", "name") val smallDFAsPerson: Dataset[Person] = smallDF.as[Person] smallDFAsPerson.show() smallDF.printSchema()
Efekt: ponieważ nie ma jednej z kolumn, Spark rzuci wyjątek. A konkretniej AnalysisException.
21/08/30 21:25:11 INFO CodeGenerator: Code generated in 25.8197 ms Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`age`' given input columns: [id, name]; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:110) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:107) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277) . . . at App$.main(App.scala:44) at App.main(App.scala)
Sprawdzony sposób na zamianę Dataframe do Dataset[T]
Na szczęście – jest możliwość, aby to naprawić. Aby nie przedłużać powiem tylko, że należy po prostu napisać swój własny kod, dzięki któremu rozszerzymy nieco Dataframe. Najpierw jednak rozpiszmy sobie jakie chcemy spełnić warunki.
Po konwersji z Dataframe, nowo utworzony Dataset:
- Nie będzie zawierał niepotrzebnych kolumn.
- Będzie miał nowo utworzone kolumny. Będą one wypełnione nullami.
Aby to zrobić, utwórz object SparkExtensions, a następnie dodaj kod:
import scala.reflect.runtime.universe.TypeTag import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row} object SparkExtensions { implicit class ExtendedDataFrame(df: DataFrame) { def to[T <: Product: TypeTag]: Dataset[T] = { import df.sparkSession.implicits._ import org.apache.spark.sql.functions.col val columnsUniqueInT = Encoders.product[T].schema.diff(df.schema) val withAdditionalColumns: Dataset[Row] = columnsUniqueInT .foldLeft(df)((previousDF, column) => previousDF.withColumn(column.name, lit(null).cast(column.dataType))) withAdditionalColumns.select(Encoders.product[T] .schema .map(f => col(f.name)): _*) .as[T] } } }
Co tu się dzieje? Przede wszystkim trzy rzeczy:
- Znajdujemy unikalne kolumny – robimy to poprzez różnicę tego co jest w naszej case class oraz w schemacie dataframe.
- Tworzymy Dataframe z dodatkowymi kolumnami, które wypełniamy nullami i castujemy do odpowiednich typów
- Tworzymy Dataframe (na bazie [2]), w którym wybieramy jedynie te kolumny, które są przewidziane w klasie T.
Jak tego użyć? Musimy zrobić dwie rzeczy: zaimportować nasz object SparkExtensions (import functions.SparkExtensions._), a następnie przy Dataframe zamiast “.as[Person]” zrobić “.to[Person]”.
val bigDFToPerson: Dataset[Person] = bigDF.to[Person] bigDFToPerson.show() bigDFToPerson.printSchema()
Efekt:
+---+----+---+ | id|name|age| +---+----+---+ | 1|John| 27| | 3|Anna| 1| +---+----+---+ root |-- id: integer (nullable = false) |-- name: string (nullable = true) |-- age: string (nullable = true)
UWAGA! Taka opcja zakłada, że wszystko musi zgadzać się w 100%.
Jak wiadomo, pole w Datasetach sparkowych składa się z 3 rzeczy:
- Nazwy
- Typu
- Określenia, czy jest nullable czy nie – czyli czy może przyjmować null jako wartość.
Dla tych, którzy wolą nieco bardziej liberalną wersję, przygotowałem też metodę, która pozwala okrajać nie uwzględniając trzeciej rzeczy.
def to[T <: Product: TypeTag]: Dataset[T] = { import df.sparkSession.implicits._ import org.apache.spark.sql.functions.col val columnsFromT = Encoders.product[T].schema.map(c=> (c.name,c.dataType)) val columnsFromDF = df.schema.map(c=> (c.name, c.dataType)) val columnsUniqueInT = columnsFromT.diff(columnsFromDF) val withAdditionalColumns: Dataset[Row] = columnsUniqueInT.foldLeft(df)((previousDF, column) => previousDF.withColumn(column._1, lit(null).cast(column._2))) withAdditionalColumns.select(Encoders.product[T] .schema .map(f => col(f.name)): _*) .as[T] }
Zdaję sobie sprawę, że robi się już nieco mało estetycznie, ale zapewniam – działa;-). Stosuje się dokładnie tak samo jak w przypadku poprzedniej wersji.
Liczę głęboko, że taka metoda się przyda – w projekcie w którym pracowałem, problem typów był naprawdę mocno odczuwalny. Cóż – po więcej, choć zwykle nieco bardziej “wysokopoziomowych” artykułów – zapraszam do newslettera;-). Powodzenia!