star schema における fact table と dimension table の join のようなことを Apache Flink におけるストリーム処理で行いたい。
stream data と static data の join ということになる。
ただし dimension table 側も更新されるため、完全な static というわけではない。

このポストでは Flink v1.11 を前提とした。

join の方法

今回は DataStream API でこれを実現することを考える。
Flink のドキュメントを読むと broadcast state pattern でできそうだ。

やり方としては次のようになる。

  1. static data のファイルを FileProcessingMode.PROCESS_CONTINUOUSLY で読み込み DataStream
  2. 1 を broadcast()
  3. stream data の DataStream と 2 を connect()

static data を PROCESS_CONTINUOUSLY で読むのは変更を得るため。
PROCESS_ONCE で読んでしまうとストリーム処理の開始時に1回読むだけになり、dimension table の変更を得られない。
このあたりの仕様については Data Sources を参照。

その後は broadcast state pattern にそのまま従う。
したがって BroadcastProcessFunction or KeyedBroadcastProcessFunction を実装する必要がある。
その中で static data を取り込んで state として持ち、stream data 側で参照すればよい。

2つのデータの各 term に対する関係性を以下に示す。

in star schemastream or staticbroadcast or not
dimension tablestatic databroadcasted
fact tablestream datanon-broadcasted

実験

実験概要

stream data として株価のデータを考える。
適当に乱数で作った株価が “GOOGL” 等の ticker とともに流れてくる。
一方、会社情報が記載された dimension table 的なファイルも用意する。
流れ続ける株価データに対して ticker を key にして会社情報を紐付ける、ということを行う。

コード

上記を実行するためのコードを以下に示す。
実行可能なプロジェクトを GitHub に置いたので興味があればどうぞ。

Entry Point

main() の実装。
MiniClusterWithClientResource は本来は単体テスト用だが、簡単に local で cluster を動かすためにここで使用している。

package example

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.test.util.MiniClusterWithClientResource

object FlinkJoinTest {
  // See https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/testing.html#testing-flink-jobs
  private val FlinkCluster = new MiniClusterWithClientResource(
    new MiniClusterResourceConfiguration
    .Builder()
      .setNumberSlotsPerTaskManager(2)
      .setNumberTaskManagers(1)
      .build
  )

  def main(args: Array[String]): Unit = {
    FlinkCluster.before()

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)

    val companiesMasterFilePath = "data/companies.csv"

    val companies = readCompaniesMaster(companiesMasterFilePath, env)
      .broadcast(new MapStateDescriptor(
        "CompanyState",
        classOf[String],    // ticker
        classOf[String]     // name
      ))

    // the broadcast state pattern
    // See https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/broadcast_state.html
    env
      .fromCollection(new UnboundedStocks)
      .connect(companies)
      .process(new StockBroadcastProcessFunction)
      .print()

    env.execute("flink join test")

    FlinkCluster.after()
  }

  private def readCompaniesMaster(companiesMasterFilePath: String,
                                  env: StreamExecutionEnvironment): DataStream[Company] = {
    env
      .readFile(
        new TextInputFormat(new Path(companiesMasterFilePath)),
        companiesMasterFilePath,
        FileProcessingMode.PROCESS_CONTINUOUSLY,
        10 * 1000
      )
      .map { line =>
        val items = line.split(",")
        Company(items(0), items(1))
      }
  }
}

Records

各種レコードを表す case class。
UnboundedStocks は一定のインターバルで Stock を無限に返す iterator であり、stream data 生成に利用する。

case class Company(ticker: String, name: String)

case class Stock(ticker: String, price: Double)

/**
 * Iterator to generate unbounded stock data
 */
class UnboundedStocks extends Iterator[Stock] with Serializable {
  override def hasNext: Boolean = true  // unbounded

  override def next(): Stock = {
    Thread.sleep(1000)
    val tickers = Seq("GOOGL", "AAPL", "FB", "AMZN")
    val ticker = tickers(Random.nextInt(tickers.size))  // one of GAFA
    val price = 100 + Random.nextDouble() * 300         // random price
    Stock(ticker, price)
  }
}

BroadcastProcessFunction

肝である BroadcastProcessFunction の実装。

package example

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector

class StockBroadcastProcessFunction
  extends BroadcastProcessFunction[Stock, Company, (String, String, Double)] {

  private val StateDescriptor = new MapStateDescriptor(
    "CompanyState",
    classOf[String],    // ticker
    classOf[String]     // name
  )

  override def processElement(value: Stock,
                              ctx: BroadcastProcessFunction[Stock, Company, (String, String, Double)]#ReadOnlyContext,
                              out: Collector[(String, String, Double)]): Unit = {
    val companyName = ctx.getBroadcastState(StateDescriptor).get(value.ticker)
    out.collect((value.ticker, Option(companyName).getOrElse("-"), value.price))
  }

  override def processBroadcastElement(value: Company,
                                       ctx: BroadcastProcessFunction[Stock, Company, (String, String, Double)]#Context,
                                       out: Collector[(String, String, Double)]): Unit = {
    ctx.getBroadcastState(StateDescriptor).put(value.ticker, value.name)
  }
}

実行結果

上記を実行すると以下のような stream と static が結合された結果レコードが流れ続ける。

2> (FB,Facebook,158.76057838239333)
1> (GOOGL,Google,288.4271251901199)
2> (AAPL,Apple,191.00515338617706)
1> (FB,Facebook,121.98205452369652)
2> (FB,Facebook,140.05023554456997)

この状態で会社情報が記載されている data/companies.csv を更新することを考える。
例えば “GOOGL” の社名を “Google” から “Alphabet” に変更して保存してみた。
するとしばらくしてその修正が反映された結果が流れてくるようになる。

1> (GOOGL,Alphabet,288.1008081843843)
2> (AMZN,Amazon,137.11135563851838)
1> (GOOGL,Alphabet,121.78368168964735)
2> (FB,Facebook,236.53483047124948)
1> (FB,Facebook,220.44300865769645)

static data の更新が反映されることが確認できた。
今回は10秒に1回のインターバルで元ファイルを確認するようにファイルを読んでいるため、変更してからそれが反映されるまで最大10秒程度かかる。

懸念点

join はできたが次のような懸念点がある。

  • レコードの削除に対応していない
    • state を上書きしているだけなので companies.csv から削除されたレコードは感知できない
  • ファイルの更新時に処理が重くなる可能性がある
    • companies.csv の更新タイミングでその中の全レコードを処理してしまう
  • checkpoint が大きくなる
    • state が broadcast されているため、task ごとに重複した state が保存されてしまう
    • See Important Considerations

まとめ

このように broadcast state pattern によって stream data と static data との join 処理を行うことができた。
ただし、まだちゃんと調べていないが DataStream API ではなく Table API を使えばもう少しカジュアルな感じで近いことができるかもしれない。
気が向いたらそちらも試してみる。

追記

Table API を使った場合についての記事を追加しました。