ほとんどバッチ処理しか書いたことのない者だがストリーム処理のシステムを開発することになった。
それにあたって独学で調べたことなどまとめておく。

ストリーム処理とは

そもそも “ストリーム処理” とは何を指しているのか。
以下の引用が簡潔に示している。

a type of data processing engine that is designed with infinite data sets in mind. Nothing more.
Streaming 101: The world beyond batch

こちらは “streaming system” について述べたものだが、つまり終わりのないデータを扱うのがストリーム処理ということである。

例えば web サービスから生まれ続けるユーザ行動ログを逐次的に処理するというのがストリーム処理。
web サービスが終了しないかぎりはユーザ行動ログの生成には終わりがない。

これに対して “1日分のユーザ行動ログ” 等のように有限の量のデータを切り出して処理する場合、これはバッチ処理となる。
ストリーム処理とバッチ処理の違いは扱うデータが無限なのか有限なのかということだ。
この後触れていくが、この終わりのないデータを継続的に処理し続けるというところにバッチ処理にはない難しさがある。

なぜストリーム処理なのか

なぜストリーム処理なのか。
ひとえに逐次的な入力データに対する迅速なフィードバックが求められているからと言えるだろう。
迅速なフィードバックがビジネス上のメリットとなることは自明だ。

  • SNS の配信
  • カーシェアリングにおける配車や料金設定
  • クレジットカードや広告クリックなどの不正検知

もしこれらの application が例えば hourly のバッチ処理で実装されていたらどうだろうか。
まあ待っていられない。

一般的なストリーム処理の構成

モダンな…と言っていいのかわからないが、ストリーム処理を行うための一般的なシステムは次の3つの要素で構成される。

  • producer
  • broker
  • consumer

producer は最初にレコードを生成する、ストリームデータの発生源となるものである。
例えばログを生成する web application であったり、何らかのセンサーを持つ IoT 機器であったりがこれに該当する。
producer は絶え間なくログを生成し、それを broker へと送る。

broker は producer から送られたログを格納し、任意のタイミングで取り出せるようにするものである。
誤解を恐れずに言うとメッセージキューに近いイメージだ。
Apache Kafka クラスタや Amazon Kinesis Data Streams 等がこれに該当する。

consumer は broker からログを取り出し、それに対し何かしらの処理を行うものだ。
time window 集計であったりログからの異常検知であったり、処理した結果として何かビジネス上意味があるものを得るのである。
これを行うフレームワークとしては Spark Streaming や Apache Flink 等がメジャーなのだろうか。

producer と consumer の間に broker を挟むメリットとしては次のようなことが挙げられる。

  • producer が M 個、consumer が N 個の場合に M * N の関係になるところを broker を挟めば M + N にできる
    • producer, consumer に多数のシステムがあったとしても各自は broker との接続だけを考えればよい
  • 任意のタイミングでデータを読み出せる
  • producer または consumer に問題が発生してもデータロスが起こりにくくできる
    • その分 broker には高い可用性が求められる
      • Kafka はクラスタで冗長構成
      • Kinesis Data Streams は複数 AZ でレプリケーション

時間の概念

ストリーム処理では時間の概念がいくつかあり、集計などの処理をどの時間をベースにして実行するのか、意識する必要がある。

  • event time
    • producer 側でログイベントが発生した時間
  • ingestion time
    • broker にそのログイベントのレコードが挿入された時間
  • processing time
    • consumer 側でレコードを処理した時間

processing time を使うのが一番簡単なのだが、おそらく分析系の処理であれば window 集計等では event time を使うことが多いのではないだろうか。

ingestion time はおそらく実際のプロダクトではあまり使われないのではと思われる。
(ネットワークのパフォーマンスを見るぐらい?)

Windowing

ストリーム処理の中で sum, count などを伴う集計処理を行う場合、通常は時間方向の window で切って処理するということになるのではないだろうか。
window で切らずに完全なデータセットがそろうまで待つことはできないし、データが来るたびに逐次的に全体の結果を更新するしていくというのも割に合わない。

window の切り方もいくつかある。

  • tumbling window
    • 固定長でオーバーラップしない
  • sliding window
    • 固定長でオーバーラップを含む
  • session window
    • いわゆる web の session のように、ある種のイベントがある期間発生しないことにより window が区切られる

これらについては Flink のドキュメントが図もあってわかりやすい。

個人的な感想だが、この time window の集計がない単なる map 的なストリーム処理であれば traditional なアーキテクチャでも難しくはない。
しかし time window 集計が必要となった場合は Spark Streaming 等のモダンなフレームワークが威力を発揮してくる。

Watermark

時間で window を切るときは、前述のどの時間の定義を用いるかを考えなければいけない。
processing time を用いる場合は簡単だが event time はやや難しい。
consumer 側では event のレコードがどれくらい遅れてやってくるかわからないためだ。

ネットワークその他の影響により、event のレコードが producer -> broker -> consumer という経路で consumer に届くまでの時間というのは一定にはならない。
また、古い event が新しい event より後に届くというように順番が前後することも起こりうる。
ここで “watermark” という考え方が必要になってくる。

A watermark with a value of time X makes the statement: "all input data with event times less than X have been observed."
Streaming 102: The world beyond batch

ある processing time において「event time X より前のレコードはすべて到着したよ」というのが watermark である。
別の言い方をすると watermark により event のレコードがどの程度遅延してもよいかが定義される。

event time X より前のレコードが真の意味ですべて到着した、というのは難しい。
実際には heuristic にどの程度遅れていいかを決め、それより遅れた場合はある event time 期間における window 処理には含めないということになる。

watermark の決め方はフレームワーク次第だろうか。
例えば Spark Structured Streaming の例だと図もあって比較的わかりやすい。

Schema Evolution

何らかの業務システムや web システム等をある程度運用したことがある人ならわかると思うが、データの schema というのはナマモノだ。
一度決めたら終わりというわけではなくプロダクトやビジネスの変化に応じて変化していく。
カラムが増えたり、削除されたり、名前や型が変わったり…
このようにデータの構造が変化していくこと、またはそれを扱うことを “schema evolution” という。

バッチ処理において schema の変更に追従することを考えるのはそれほど難しくない。
hourly のバッチ処理であったとしても、バッチ処理とバッチ処理の間の時間で application を更新すればいいだけだ。
(が、実際に行うのは困難が伴うことも多い)

ではストリーム処理ではどうだろうか。
いわゆるストリーム処理においては処理と処理の間というものがなく、application がずっと稼働しっぱなしということになる。
バッチ処理のような更新はできない。
もっと言うと producer で生まれた新しい schema のレコードがいつ届くかもわからない。

おそらくこの問題には2つの対応方法がある。
1つめは consumer 側のシステムで前方互換性を保つという方法である。
この場合、新しいフィールドは必ず末尾に追加される等、producer 側での schema 更新についてある程度のルールが必要となるだろう。
producer 側で生成されるレコードの schema の変更が必ず事前にわかるというのであれば後方互換性でもいいが、多くの場合は難しい。
ところで前方互換と後方互換、どっちがどっちなのか覚えられません。

2つめの方法として schema 情報をレコード自体に入れ込んでしまうという方法もある。
Apach Avro のような serialization の方法を取っているとレコード自体に schema の情報を付与することができる。

おそらく最もエレガントにこれをやるのが Confluent の Schema Registry という機能だ。
producer から送出されるレコードには schema ID を付与する。
schema の実体は Schema Registry という broker とは別の場所で管理されており、consumer 側では受け取ったレコードに付与されている schema ID と Schema Registry に登録されている shcema の実体を参照してレコードを deserialize することができる。

Deploy

ストリーム処理を行うシステムは終わりのないデータを処理するためのものであり、ずっと動き続けることが期待されている。
しかし通常システムは一度立ち上げれば終わりということではなく、運用されている中で更新していく必要がある。

ずっと動かしながらどのように deploy, release するのか。
この問題は主に consumer 側のシステムで配慮が必要になると思われる。
正直これについてはちゃんと調べられていないが、2点ほど述べておきたい。

まず1点目、application を中断・更新・再開するのにどの程度の時間がかかるのかを知っておく必要があるということ。
アーキテクチャやフレームワーク、処理の内容や checkpoint (後述) を使うか等によりこの時間は変わってくる。
一例だが、AWS 環境において

  • AWS Glue + Spark Structured Streaming
  • Amazon Kinesis Data Analytics + Flink

の比較をしたことがある。
前者は再開に数分かかったのに対し、後者は1分未満で再開できた。
再開までの時間が十分に短いと判断できるのであればそのまま deploy, release してしまっていいだろう。

一方そうでない場合はどうすべきかという話が2点目。
再開までの時間が長く、システム要件的に許容できないというのであれば、release 時は二重で動かすというような措置が必要かもしれない。
おそらく Blue-Green Deployment のようなことを考えることになるだろう。

Checkpoint

前述のとおり、ストリーム処理を行うシステムはずっと動き続けることが期待されている。
しかし予定された application の更新や不測のエラー等、何らかの理由で一時的に中断されるということが実際の運用中には起こる。

中断されたとき速やかに復帰する仕組みとして “checkpoint” というものがいくつかの consumer 側のフレームワークで提供されている。
雑に説明すると、処理のある時点における進捗や内部状態などをディスク等に永続化し、そこから処理を再開できるようにするものである。

上記は Spark Structured Streaming と Flink の例だ。
checkpoint には次のようなメリットがあり、運用上有用だと言える。

  • 内部の状態を保持しているため、速やかに復帰できる
  • 中断した位置から再開できるので出力に穴が開かない

一方で落とし穴もある。
checkpoint では内部の状態が永続化されるわけだが、内部の状態というのは当然 application の実装が決めているものである。
application のコードを変更したとき、変更の内容によっては永続化された checkpoint と application が合わなくなることがあるのだ。
未定義の挙動となることもあるので、checkpoint の運用には十分に配慮する必要がある。
どのような変更なら checkpoint が安全に利用できるのかはフレームワークのドキュメントに記載があるので確認しておきたい。

RDB の世界との折り合い

みんな大好きな RDB の世界では table を操作してデータの処理を行う。
基本的には table というものはある時点における完全なデータセットを表すものである。 (ex. isolation)
他方、ストリーム処理はやってきたデータを逐次的に処理するものである (mini-batch の場合もあるが)。

直感的にこの2つは相性が悪そうに見える。
しかし Spark や Flink では table ベースの操作でストリーム処理を行うための API が提供されている。
おそらく

  • ストリーム処理の周辺のデータソースとして RDB が存在する
  • RDB 的な table 操作があまりにも浸透している

というところが API が必要である理由なのだろう。

ストリームデータを table 的に扱うというのが、やや直感的な理解をしにくいものとなっている。
フレームワークのドキュメントを確認しておきたい。

例えば Spark Structured Streaming であれば処理の出力のための3つの output mode が示されている。

  • Append mode: 追加された行だけ出力
  • Complete mode: table 全体を出力
  • Update mode: 更新された行だけ出力

どれを選ぶかにより必要とする内部メモリの大きさも影響される。

まとめ

思ったより長文になってしまった。
結局ストリーム処理の難しさは以下の2点に尽きるだろう。

  • 複数の時間の概念
  • 常時稼働のシステム

独学なので抜け漏れがあったり、話が新しくなかったりすることもあると思われる。

参考