ストリーム処理における CSV ファイルの読み込み

Apache Flink は unbounded なストリームデータを処理するためのフレームワークだ。
しかし現実的な application を開発する場合、ストリームデータに加えて static なファイルや DB 等を読み込みたいこともある。
star schema における dimension table 的な情報をストリームに結合したい場合 等が考えられる。

このポストでは Flink で DataStream API ベースでの実装において CSV ファイルを読むことを考える。
Flink は現時点の stable である v1.11 を想定。

CSV ファイルを読む方法

DataStream API ベースの実装で CSV ファイルを読むには StreamExecutionEnvironment のメソッドである readFile() を使う。
overload された同名のメソッドがいくつか存在するが、次の2つの引数が特に重要だろう。

まず1つめは FileInputFormat<OUT> inputFormat であり、こちらは data stream の生成に用いる入力フォーマットを指定する。
おそらく最も一般的なのが TextInputFormat だと思われる。
もちろん単なる text として CSV ファイルを読み込み、後続の処理で各レコードを parse することも可能だが CSV 用の入力フォーマットがいくつか用意されているようだ。

  • PojoCsvInputFormat
  • RowCsvInputFormat
  • TupleCsvInputFormat

なんとなく名前でわかると思うが、それぞれ readFile() の結果として返される DataStreamSource が内包する型が異なる。
これについては後述の実験にて確認する。

次に FileProcessingMode watchType も見ておきたい。
この引数ではデータソースの監視についてのモードを指定する。
モードは2つある。

  • FileProcessingMode.PROCESS_CONTINUOUSLY
    • 対象のファイルが更新され、その更新に追随する必要がある場合に利用
    • 指定のインターバルでファイルの更新をチェック
    • 更新があった場合はファイル全体を読む
  • FileProcessingMode.PROCESS_ONCE
    • 対象のファイルの更新がない、更新について考えない場合に利用
    • 最初に一度だけファイルを読む

おそらく多くの場合は前者が必要になるのではないだろうか。
利用にあたっては更新があった場合にファイル全体が読まれるということに注意が必要だ。
例えばファイル末尾にレコードを1件追加するような更新であったとしても、全レコードが再度ストリームに流されるということである。
詳しくは ドキュメント を参照。

これはファイル全体で1つの atomic な単位だとみなされているものと思われる。
レコード単位で処理していくストリーム処理にファイルというバルクな単位のデータを流そうとしているのでこうなってしまう。
そう考えるとやはり static なファイルのデータは dimension table として情報を付加するような、ストリームの本川に合流する支川のような使い方が想定されているのだろう。

ちなみに CsvReader というものもあるが、こちらは DataSet API、つまりバッチ処理向けのようなので今回は扱わない。

実験

実際にコードを書いて readFile() で CSV を読んでみる。
ここでは PojoCsvInputFormatTupleCsvInputFormat を切り替えられるようにした。

コード

package com.example.entry

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.{PojoCsvInputFormat, TupleCsvInputFormat}
import org.apache.flink.api.java.tuple.Tuple3
import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}

import scala.collection.JavaConverters._
import scala.concurrent.duration._

/**
 * The experiment to read CSV file by Flink.
 * It reads CSV file as POJOs or tuples and just prints on console.
 */
object ReadCsvFileExperimentRunner {
  /** POJO */
  case class Company(name: String, ticker: String, numEmployees: Int)

  /** Tuple */
  type CompanyTuple = Tuple3[String, String, Int]

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)

    val companiesFilePath = "data/companies.csv"
    val interval = 10.seconds

    args.headOption match {
      case None | Some("pojo") =>
        val inputFormat = createPojoCsvInputFormat(companiesFilePath)
        env
          .readFile(inputFormat, companiesFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, interval.toMillis)
          .map(_.toString)
          .print()
      case Some("tuple") =>
        val inputFormat = createTupleCsvInputFormat(companiesFilePath)
        env
          .readFile(inputFormat, companiesFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, interval.toMillis)
          .map(_.toString)
          .print()
      case _ =>
        throw new RuntimeException(s"Unsupported input format: ${args(0)}")
    }

    env.execute()
  }

  private def createPojoCsvInputFormat(csvFilePath: String): PojoCsvInputFormat[Company] = {
    val clazz = classOf[Company]
    val pojoFields = Seq(
      new PojoField(clazz.getDeclaredField("name"), BasicTypeInfo.STRING_TYPE_INFO),
      new PojoField(clazz.getDeclaredField("ticker"), BasicTypeInfo.STRING_TYPE_INFO),
      new PojoField(clazz.getDeclaredField("numEmployees"), BasicTypeInfo.INT_TYPE_INFO)
    ).asJava
    val pojoTypeInfo = new PojoTypeInfo[Company](clazz, pojoFields)
    val fieldNames = Array("name", "ticker", "numEmployees")

    val inputFormat = new PojoCsvInputFormat[Company](new Path(csvFilePath), pojoTypeInfo, fieldNames)
    inputFormat.setSkipFirstLineAsHeader(true)
    inputFormat
  }

  private def createTupleCsvInputFormat(csvFilePath: String): TupleCsvInputFormat[CompanyTuple] = {
    val types = Seq(
      BasicTypeInfo.STRING_TYPE_INFO,
      BasicTypeInfo.STRING_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO
    )
    val tupleTypeInfo = new TupleTypeInfo[CompanyTuple](classOf[CompanyTuple], types: _*)

    val inputFormat = new TupleCsvInputFormat[CompanyTuple](new Path(csvFilePath), tupleTypeInfo)
    inputFormat.setSkipFirstLineAsHeader(true)
    inputFormat
  }
}

ポイントは POJO 版も tuple 版も型情報を作ってやる必要があるということだ。
それぞれ PojoTypeInfo, TupleTypeInfo を用意してやる必要があり、これがやや癖があって面倒。
あるフィールドを数値として読むことは可能だが、日付の parse のようなことはできないようである。
というのを考えると TextInputFormat で読んで自分で parse するのと比べてあまりうれしくないような…

データ

実験用のデータとして会社情報を示す簡単な CSV ファイルを適当に作って data/companies.csv に配置。

name,ticker,num_employees
Alphabet Inc,GOOG,98771
Apple Inc,AAPL,147000
Facebook Inc,FB,49942
Amazon.com Inc,AMZN,798000

実行

まずは POJO 版を実行してみた。
プログラムが起動するとすぐに以下が出力された。

[info] running com.example.entry.ReadCsvFileExperimentRunner pojo
2> Company(Alphabet Inc,GOOG,98771)
1> Company(Facebook Inc,FB,49942)
2> Company(Apple Inc,AAPL,147000)
1> Company(Amazon.com Inc,AMZN,798000)

Company インスタンスとして CSV ファイルの内容を取得できている。
プログラムは止まっていないが CSV ファイルの内容を一通り吐き出したところで出力は止まった。
ここで CSV ファイルに次の1行を追加してみる。

Microsoft Corporation,MSFT,163000

すると出力は

[info] running com.example.entry.ReadCsvFileExperimentRunner pojo
2> Company(Alphabet Inc,GOOG,98771)
1> Company(Facebook Inc,FB,49942)
2> Company(Apple Inc,AAPL,147000)
1> Company(Amazon.com Inc,AMZN,798000)
2> Company(Alphabet Inc,GOOG,98771)
1> Company(Amazon.com Inc,AMZN,798000)
2> Company(Apple Inc,AAPL,147000)
1> Company(Microsoft Corporation,MSFT,163000)
2> Company(Facebook Inc,FB,49942)

となり、最初に出力された4行に加えて新たに5行追加された。
CSV ファイルには1行追加しただけだが、既存の行も含む CSV ファイル全体が再度出力された。
ドキュメントに記載されているとおりの仕様となっている。

tuple 版で実行すると出力は次のようになった。

[info] running com.example.entry.ReadCsvFileExperimentRunner tuple
1> (Facebook Inc,FB,49942)
1> (Amazon.com Inc,AMZN,798000)
2> (Alphabet Inc,GOOG,98771)
2> (Apple Inc,AAPL,147000)

tuple として読めているようだ。
watchType が同じなので CSV ファイルの更新についての挙動は同様だった。

ちなみに実行可能なプロジェクトは GitHub に置いている。
sbt 'runMain com.example.entry.ReadCsvFileExperimentRunner pojo' または sbt 'runMain com.example.entry.ReadCsvFileExperimentRunner tuple' で実行できる。
(Ctrl + C で終了)

まとめ

TextInputFormat で読んで自分で parse するのと比べ、*CsvInputFormat を使う方法はコーディングとしてはあまりメリットが感じられなかった。
また、ストリーム処理においてやはりファイルというデータソースは傍流なんだなという感じ。

ちなみに Table API で CSV を読むこともおそらく可能。
気が向いたら書く。