
Apache Flink の DataStream API 利用時の CSV ファイル読み込み
ストリーム処理における CSV ファイルの読み込み Apache Flink は unbounded なストリームデータを処理するためのフレームワークだ。 しかし現実的な application を開発する場合、ストリームデータに加えて static なファイルや DB 等を読み込みたいこともある。 star schema における dimension table 的な情報をストリームに結合したい場合 等が考えられる。 このポストでは Flink で DataStream API ベースでの実装において CSV ファイルを読むことを考える。 Flink は現時点の stable である v1.11 を想定。 CSV ファイルを読む方法 DataStream API ベースの実装で CSV ファイルを読むには StreamExecutionEnvironment のメソッドである readFile() を使う。 overload された同名のメソッドがいくつか存在するが、次の2つの引数が特に重要だろう。 まず1つめは FileInputFormat<OUT> inputFormat であり、こちらは data stream の生成に用いる入力フォーマットを指定する。 おそらく最も一般的なのが TextInputFormat だと思われる。 もちろん単なる text として CSV ファイルを読み込み、後続の処理で各レコードを parse することも可能だが CSV 用の入力フォーマットがいくつか用意されているようだ。 PojoCsvInputFormat RowCsvInputFormat TupleCsvInputFormat なんとなく名前でわかると思うが、それぞれ readFile() の結果として返される DataStreamSource が内包する型が異なる。 これについては後述の実験にて確認する。 ...




