December 20, 2017

Apache Sparkのcolumnar storageの解説

この記事は?

この記事は、Data Platform Advent Calendar 2017の20日目の記事です。

この記事の内容は?

2018年の早い時期にリリース予定のApache Spark 2.3に入る、Columnar storage(Spark内部ではColumnVectorクラスで定義されています)の改善に関するまとめです。

結局何がいいたいの?

  • ColumnVectorは、Spark 2.2までは内部構造体として使われていました。Apache Parquetのデータを読み込んだ際の内部的なデータ置き場、に使われていました。
    Spark 2.3では、下記のような拡張があります。
  • ColumnVectorは、publicなデータ構造とAPIになります。
  • ColumnVectorは、Apache Arrowとのデータのやり取りにも使われます。
  • Pandasのデータとのやり取りが高速になります。つまり、Pandasを使ったpysparkの実行が高速になります。
  • DataFrame/Dataset.cache()のインメモリキャッシュのデータ読み出しもColumnVectorを使うことによって効率的になり、cache()を使ったプログラムの実行が高速化されます。

本論

列単位のデータ保存について

Sparkでは、アプリケーションプログラマがプログラムを書く・データを保存する際処理の単位として、行(row)が基本となっています。一方、データベースの分野では、長年の研究・実装の経験から、列(column)単位でデータを保存する、処理するのが有効である場合が多い、ことが知られています。例えば、下記のようなメリットがあります。

  1. 列方向にデータを見ると、同じデータ(0など)が存在することが多いので圧縮しやすい
  2. 列方向に同じ型のデータが並んでいるので、同じ種類の処理について、並列処理可能なチャンスが存在する

Sparkでは、2.2までに内部的には、下記のような場合に列単位のデータ構造を使ってデータ保存を行っていました。

  1. Apache Parquetのデータを読み込んだ際の、内部的なデータ置き場
  2. DataFrame/Dataset.cache()を実行した場合の、インメモリキャッシュのデータ格納先(この場合、圧縮も行われます)

Spark 2.3では、列データを使うことによる利点を、いろいろな形でさらに活用しようとしています。

Apache Spark 2.3では何が変わるの?

ColumnVectorクラスの公開

Spark 2.3では、Spark summit west 2017で意向を表明したようにように、列データフォーマットのためのColumnVectorクラスを、public APIとして公開する予定です。このクラスは今までも存在していましたが、内部クラスとして使われていて、公開されていませんでした。topic JIRAエントリも作られ、着々と作業が進んでいます。 これによって、オレオレデータ構造を、abstractなColumnVectorクラスの実装クラスとして書くことができれば、Sparkのプログラムから低オーバヘッドでデータの読み出しができます。なお、ColumnVectorクラスはデータ読み出しだけをサポートしていて、書込みはサポートしていません。

Apache ArrowのサポートとPySparkの高速化

これまで、ColumnVectorを使った外部読み込みのデータは、Apache Parquetだけをサポートしていました(Apache ORCもcolumn storageですが、現在のSpark(2.3でも)の内部では行データに変換して扱っています)。Spark 2.3では、Apache Arrowというインメモリの列データフォーマットもサポートします(Apache Arrowに関しては、こちらなどをごらんください)。 Spark 2.3ではColumnVectorに関連するクラスを用いて、列データフォーマットのApache Arrowが持つデータを高速に読み書きできます。
そして、この機能を利用して、従来からの性能上の懸念事項の1つであったPySparkの実行速度の遅さ、の改善が行われました。この遅さは、昔から指摘されていて、PythonプロセスとJavaプロセスの間のデータフォーマット交換の非効率さ、によって引き起こされるものでした。この非効率さを解消するために、PythonとSparkの間のデータフォーマット交換自体をを無くしてしまいました。具体的には、Python側でpandasのAPIでアクセスされるデータをApache Arrow上に置き、そのデータをSpark側でも(内部的に)ColumnVectorを使いApache Arrowを経由してアクセスしています(プルリクはこちら。その結果、3~100倍も性能が向上する、とDataBrickのブログでも取り上げられています。 使い方は簡単で、今までUDFの前で、@udf("double")と宣言していたのを、@pandas_udf('double')と書き直すだけです。先程のブログやこちらのnotebook にもプログラム例があります。 この成果については、まだSpark 2.3がリリースされていないにも関わらず、Spark Summit Europe 2017のKeynoteの3ページ目、で取り上げられました。

この機能に関しては、@ueshinさんと@BryanCutler(IBMのSpark Technology Center)の貢献が大変大きかったと思います。私も、初期段階にちょっとだけ実装の方針についてコメントしました。

DataFrame/Dataset.cache()からの、データ読み出しの高速化

はじめに書いたように、Sparkではpandas上に置かれたデータを、Apache Parquetからデータを読み込んだ場合、ColumnVectorを通してアクセス可能な列データフォーマットにデータを格納して、queryなどの処理を行う際に列データフォーマットから直接データを読み出して処理を行っています。
さらに、cache()が実行されたときにも、インメモリキャッシュに列データフォーマットで保存されています。しかし、実装の歴史的な都合上、このデータはColumnVectorを経由してアクセスが行われていませんでした。さらに、列データフォーマットから一度行データフォーマットに変換してから、queryなどの処理が行われています。技術的には、このデータ変換は不要です。
この cache()の場合に、このデータ変換を取りさるプルリクエストを私が1年9ヶ月前に投げた後、いろいろな議論を経てようやくマージされました。簡単なベンチマークで、約3倍速くなっています。現時点ではスカラ変数の場合だけ有効で、配列の場合はまだ議論中です。

あとがき

こんな改善も含まれるApache Spark 2.3ですが、いつリリースされるのでしょうか?
12月中頃に2.3用のブランチ作成、年明けにテストしてリリース、というのが11月頃の予定でしたが、12/20現在まだブランチは作成されていません。