star schema における fact table と dimension table の join のようなことを Apache Flink におけるストリーム処理で行いたい。
stream data と static data の join ということになる。
ただし dimension table 側も更新されるため、完全な static というわけではない。
このポストでは Flink v1.11 を前提とした。
join の方法
今回は DataStream API でこれを実現することを考える。
Flink のドキュメントを読むと broadcast state pattern でできそうだ。
やり方としては次のようになる。
- static data のファイルを
FileProcessingMode.PROCESS_CONTINUOUSLY
で読み込みDataStream
化 - 1 を
broadcast()
- stream data の
DataStream
と 2 をconnect()
static data を PROCESS_CONTINUOUSLY
で読むのは変更を得るため。PROCESS_ONCE
で読んでしまうとストリーム処理の開始時に1回読むだけになり、dimension table の変更を得られない。
このあたりの仕様については Data Sources を参照。
その後は broadcast state pattern にそのまま従う。
したがって BroadcastProcessFunction
or KeyedBroadcastProcessFunction
を実装する必要がある。
その中で static data を取り込んで state として持ち、stream data 側で参照すればよい。
2つのデータの各 term に対する関係性を以下に示す。
in star schema | stream or static | broadcast or not |
---|---|---|
dimension table | static data | broadcasted |
fact table | stream data | non-broadcasted |
実験
実験概要
stream data として株価のデータを考える。
適当に乱数で作った株価が “GOOGL” 等の ticker とともに流れてくる。
一方、会社情報が記載された dimension table 的なファイルも用意する。
流れ続ける株価データに対して ticker を key にして会社情報を紐付ける、ということを行う。
コード
上記を実行するためのコードを以下に示す。
実行可能なプロジェクトを GitHub に置いたので興味があればどうぞ。
Entry Point
main()
の実装。MiniClusterWithClientResource
は本来は単体テスト用だが、簡単に local で cluster を動かすためにここで使用している。
package example
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.test.util.MiniClusterWithClientResource
object FlinkJoinTest {
// See https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/testing.html#testing-flink-jobs
private val FlinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration
.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build
)
def main(args: Array[String]): Unit = {
FlinkCluster.before()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val companiesMasterFilePath = "data/companies.csv"
val companies = readCompaniesMaster(companiesMasterFilePath, env)
.broadcast(new MapStateDescriptor(
"CompanyState",
classOf[String], // ticker
classOf[String] // name
))
// the broadcast state pattern
// See https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/broadcast_state.html
env
.fromCollection(new UnboundedStocks)
.connect(companies)
.process(new StockBroadcastProcessFunction)
.print()
env.execute("flink join test")
FlinkCluster.after()
}
private def readCompaniesMaster(companiesMasterFilePath: String,
env: StreamExecutionEnvironment): DataStream[Company] = {
env
.readFile(
new TextInputFormat(new Path(companiesMasterFilePath)),
companiesMasterFilePath,
FileProcessingMode.PROCESS_CONTINUOUSLY,
10 * 1000
)
.map { line =>
val items = line.split(",")
Company(items(0), items(1))
}
}
}
Records
各種レコードを表す case class。UnboundedStocks
は一定のインターバルで Stock
を無限に返す iterator であり、stream data 生成に利用する。
case class Company(ticker: String, name: String)
case class Stock(ticker: String, price: Double)
/**
* Iterator to generate unbounded stock data
*/
class UnboundedStocks extends Iterator[Stock] with Serializable {
override def hasNext: Boolean = true // unbounded
override def next(): Stock = {
Thread.sleep(1000)
val tickers = Seq("GOOGL", "AAPL", "FB", "AMZN")
val ticker = tickers(Random.nextInt(tickers.size)) // one of GAFA
val price = 100 + Random.nextDouble() * 300 // random price
Stock(ticker, price)
}
}
BroadcastProcessFunction
肝である BroadcastProcessFunction
の実装。
package example
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
class StockBroadcastProcessFunction
extends BroadcastProcessFunction[Stock, Company, (String, String, Double)] {
private val StateDescriptor = new MapStateDescriptor(
"CompanyState",
classOf[String], // ticker
classOf[String] // name
)
override def processElement(value: Stock,
ctx: BroadcastProcessFunction[Stock, Company, (String, String, Double)]#ReadOnlyContext,
out: Collector[(String, String, Double)]): Unit = {
val companyName = ctx.getBroadcastState(StateDescriptor).get(value.ticker)
out.collect((value.ticker, Option(companyName).getOrElse("-"), value.price))
}
override def processBroadcastElement(value: Company,
ctx: BroadcastProcessFunction[Stock, Company, (String, String, Double)]#Context,
out: Collector[(String, String, Double)]): Unit = {
ctx.getBroadcastState(StateDescriptor).put(value.ticker, value.name)
}
}
実行結果
上記を実行すると以下のような stream と static が結合された結果レコードが流れ続ける。
2> (FB,Facebook,158.76057838239333)
1> (GOOGL,Google,288.4271251901199)
2> (AAPL,Apple,191.00515338617706)
1> (FB,Facebook,121.98205452369652)
2> (FB,Facebook,140.05023554456997)
この状態で会社情報が記載されている data/companies.csv
を更新することを考える。
例えば “GOOGL” の社名を “Google” から “Alphabet” に変更して保存してみた。
するとしばらくしてその修正が反映された結果が流れてくるようになる。
1> (GOOGL,Alphabet,288.1008081843843)
2> (AMZN,Amazon,137.11135563851838)
1> (GOOGL,Alphabet,121.78368168964735)
2> (FB,Facebook,236.53483047124948)
1> (FB,Facebook,220.44300865769645)
static data の更新が反映されることが確認できた。
今回は10秒に1回のインターバルで元ファイルを確認するようにファイルを読んでいるため、変更してからそれが反映されるまで最大10秒程度かかる。
懸念点
join はできたが次のような懸念点がある。
- レコードの削除に対応していない
- state を上書きしているだけなので
companies.csv
から削除されたレコードは感知できない
- state を上書きしているだけなので
- ファイルの更新時に処理が重くなる可能性がある
companies.csv
の更新タイミングでその中の全レコードを処理してしまう
- checkpoint が大きくなる
- state が broadcast されているため、task ごとに重複した state が保存されてしまう
- See Important Considerations
まとめ
このように broadcast state pattern によって stream data と static data との join 処理を行うことができた。
ただし、まだちゃんと調べていないが DataStream API ではなく Table API を使えばもう少しカジュアルな感じで近いことができるかもしれない。
気が向いたらそちらも試してみる。
追記
Table API を使った場合についての記事を追加しました。