前回の記事 では Apache Flink における stream data と static data の join において、DataStream API における broadcast state pattern を使う方法を示した。
今回の記事では Table API の temporal table function を用いた実験を行う。

Table API

Table API は名前のとおりで class Table を中心として SQL-like な DSL により処理を記述するという、DataStream API より high-level な API となっている。
これらの関係は Apaceh Spark の RDDDataFrame (DataSet) の関係に似ている。
SQL-like な API で記述された処理が実行時に最適化されて low-level API の処理に翻訳されるところも同じだ。

RDB の table の概念を元にしているものと考えられるが、本質的に table の概念とストリーム処理はあまりマッチしないと思う。
table はある時点のデータセット全体を表すのに対し、ストリーム処理ではやってくるレコードを逐次的に処理したい。
ここを合わせているため、ストリーム処理における Table API による処理の挙動の理解には注意が必要だ。
Streaming Concepts 以下のドキュメントを確認しておきたい。

Temporal Table Function

star schema における、変更されうる dimension table を stream data と結合する方法として、temporal table という仕組みが提供されている。
ドキュメントでは為替レートの例が示されている。
stream でやってくる fact table 的なレコードに対して、為替のように時々刻々と変化する dimension table をそのレコードの時刻における snap shot としてぶつけるような形となる。

レコードの時刻としては processing time または event time を扱うことができる。
event time の場合であっても watermark で遅延の許容を定義できるため dimension table のすべての履歴を状態として保持する必要はなく、processing time または event time の watermark に応じて過去の履歴は捨てることが可能となっている。

Table API において temporal table を使うには temporal table function という形を取ることになる。

実験

実験概要

やることは 前回の記事 とまったく同じで乱数で作った株価のデータを扱う。
前回と違うのは DataStream API ではなく Table API で処理を記述したところである。

コード

上記を実行するためのコードを以下に示す。
実行可能なプロジェクトは GitHub に置いておいた。

Entry Point

toTable() により入力データの DataStreamTable に変換した後、処理を記述した。
func が temporal table function に当たる。
今回は processing time を基準として join しているが、実際のシステムでは event time を基準としたいことが多いのではないだろうか。

package example

import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.api.{AnyWithOperations, EnvironmentSettings, FieldExpression, call}
import org.apache.flink.test.util.MiniClusterWithClientResource

object FlinkTableJoinTest {
  // See https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/testing.html#testing-flink-jobs
  private val FlinkCluster = new MiniClusterWithClientResource(
    new MiniClusterResourceConfiguration
    .Builder()
      .setNumberSlotsPerTaskManager(2)
      .setNumberTaskManagers(1)
      .build
  )

  def main(args: Array[String]): Unit = {
    FlinkCluster.before()

    // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(2)

    // create settings
    val setting = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    // create a TableEnvironment
    val tableEnv = StreamTableEnvironment.create(env, setting)

    // create a Table instance for Company
    val companiesMasterFilePath = "data/companies.csv"
    val companies = readCompaniesMaster(companiesMasterFilePath, env)
      .toTable(tableEnv, $"ticker".as("c_ticker"), $"name", $"c_proc_time".proctime)

    // temporal table function
    val func = companies.createTemporalTableFunction($"c_proc_time", $"c_ticker")

    // create a Table instance for Stock
    val stocks = env
      .fromCollection(new UnboundedStocks)
      .toTable(tableEnv, $"ticker".as("s_ticker"), $"price", $"s_proc_time".proctime)

    // join with a temporal table function
    val results = stocks
      .joinLateral(call(func, $"s_proc_time"), $"s_ticker" === $"c_ticker")
      .select($"s_ticker", $"name", $"price")
      .toAppendStream[(String, String, Double)]
      .print

    env.execute()

    FlinkCluster.after()
  }

  private def readCompaniesMaster(companiesMasterFilePath: String,
                                  env: StreamExecutionEnvironment): DataStream[Company] = {
    env
      .readFile(
        new TextInputFormat(new Path(companiesMasterFilePath)),
        companiesMasterFilePath,
        FileProcessingMode.PROCESS_CONTINUOUSLY,
        10 * 1000
      )
      .map { line =>
        val items = line.split(",")
        Company(items(0), items(1))
      }
  }
}

実行結果

上記を実行すると以下のような stream と static が結合された結果レコードが流れ続ける。

2> (AMZN,Amazon,110.05826176785374)
2> (AMZN,Amazon,237.82717323588966)
1> (FB,Facebook,147.96046700184428)
1> (GOOGL,Google,393.58555322242086)
2> (AMZN,Amazon,104.18843434881401)

前回と同様に data/companies.csv の中身を更新するとその結果が反映される。
削除が反映されないのも同じだった。
おそらく physical な処理としてはほぼ同じようになっていると思われる。

まとめ

前回と同様の stream data と static data の join を、Table API + temporal table function で行えることを確認した。
temporal table function の概念さえ把握できれば Straem API のときに比べて簡潔に処理を記述できた。