Back of Hercules in main square in Florence, Italy.

Apache Flink の Backpressure の仕組みについて調べた

ストリーム処理のフレームワークが備える backpressure という機能がある。 このポストでは Apache Flink の backpressure について調べたことを記載する。 Backpressure の目的 backpressure はストリーム処理システムにおける負荷管理の仕組みの一つ。 一時的な入力データ量の増大に対応する。 インターネットユーザの行動履歴やセンサーデータなどは常に一定量のデータが流れているわけではなく、単位時間あたりのデータ量は常に変動している。 一時的にスパイクしてデータ量が増大するようなことも起こりうる。 複数の operator からなる dataflow graph により構成されるストリーム処理システムにおいては、処理スピードのボトルネックとなる operator が存在する。 一時的に入力データ量が増えてボトルネックの operator の処理速度を上回ってしまった場合に、データの取りこぼしが発生するのを防ぐのが backpressure の目的となる。 Backpressure の仕組み Buffer-based ここでは以前のブログでも紹介した、ストリーム処理で必要とされる機能について書かれた Fragkoulis et al. 1 を引用して一般論としての backpressure について述べたい。 上流/下流の operator をそれぞれ producer, consumer とする。 producer, consumer (それらの subtask と言ってもいいかも) がそれぞれ異なる物理マシンに deploy されているケースが Figure 12b となる。 各 subtask は input と output の buffer を持っており、 producer は処理結果を output buffer に書き出す TCP 等の物理的な接続でデータを送信 consumer 側の output buffer にデータを格納 consumer がそれを読み込んで処理する というような流れになる。 buffer はマシンごとの buffer pool で管理されており、input/output で buffer が必要となった場合はこの buffer pool に buffer が要求される。 ...

2月 28, 2021 · soonraah
CSV

Apache Flink の DataStream API 利用時の CSV ファイル読み込み

ストリーム処理における CSV ファイルの読み込み Apache Flink は unbounded なストリームデータを処理するためのフレームワークだ。 しかし現実的な application を開発する場合、ストリームデータに加えて static なファイルや DB 等を読み込みたいこともある。 star schema における dimension table 的な情報をストリームに結合したい場合 等が考えられる。 このポストでは Flink で DataStream API ベースでの実装において CSV ファイルを読むことを考える。 Flink は現時点の stable である v1.11 を想定。 CSV ファイルを読む方法 DataStream API ベースの実装で CSV ファイルを読むには StreamExecutionEnvironment のメソッドである readFile() を使う。 overload された同名のメソッドがいくつか存在するが、次の2つの引数が特に重要だろう。 まず1つめは FileInputFormat<OUT> inputFormat であり、こちらは data stream の生成に用いる入力フォーマットを指定する。 おそらく最も一般的なのが TextInputFormat だと思われる。 もちろん単なる text として CSV ファイルを読み込み、後続の処理で各レコードを parse することも可能だが CSV 用の入力フォーマットがいくつか用意されているようだ。 PojoCsvInputFormat RowCsvInputFormat TupleCsvInputFormat なんとなく名前でわかると思うが、それぞれ readFile() の結果として返される DataStreamSource が内包する型が異なる。 これについては後述の実験にて確認する。 ...

12月 1, 2020 · soonraah
Stream

ストリーム処理システムに求められる機能性、および Apache Flink におけるその対応

はじめに このポストではストリーム処理の survay 論文の話題に対して Apache Flink における例を挙げて紹介する。 論文概要 Fragkoulis, M., Carbone, P., Kalavri, V., & Katsifodimos, A. (2020). A Survey on the Evolution of Stream Processing Systems. 2020年の論文。 過去30年ぐらいのストリーム処理のフレームワークを調査し、その発展を論じている。 ストリーム処理に特徴的に求められるいくつかの機能性 (functionality) についてその実現方法をいくつか挙げ、比較的古いフレームワークと最近のフレームワークでの対比を行っている。 このポストのスコープ このポストでは前述のストリーム処理システムに求められる機能性とそれがなぜ必要となるかについて簡単にまとめる。 論文ではそこからさらにその実現方法がいくつか挙げられるが、ここでは個人的に興味がある Apache Flink ではどのように対処しているかを見ていく。 ちなみに論文中では Apache Flink はモダンなフレームワークの1つとしてちょいちょい引き合いに出されている。 ここでは Flink v1.11 をターゲットとする。 以下では論文で挙げられている機能性に沿って記載していく。 Out-of-order Data Management Out-of-order ストリーム処理システムにやってくるデータの順序は外的・内的要因により期待される順序になっていないことがある。 外的要因としてよくあるのはネットワークの問題。 データソース (producer) からストリーム処理システムに届くまでのルーティング、負荷など諸々の条件により各レコードごとに転送時間は一定にはならない。 各 operator の処理などストリーム処理システムの内的な要因で順序が乱されることもある。 out-of-order は処理の遅延や正しくない結果の原因となることがある。 out-of-order を管理するためにストリーム処理システムは処理の進捗を検出する必要がある。 “進捗” とはある時間経過でレコードの処理がどれだけ進んだかというもので、レコードの順序を表す属性 A (ex. event time) により定量化される。 ある期間で処理された最古の A を進捗の尺度とみなすことができる。 ...

11月 7, 2020 · soonraah
Stream

バッチ処理おじさんがストリーム処理のシステムを開発するにあたって調べたこと

ほとんどバッチ処理しか書いたことのない者だがストリーム処理のシステムを開発することになった。 それにあたって独学で調べたことなどまとめておく。 ストリーム処理とは そもそも “ストリーム処理” とは何を指しているのか。 以下の引用が簡潔に示している。 a type of data processing engine that is designed with infinite data sets in mind. Nothing more. – Streaming 101: The world beyond batch こちらは “streaming system” について述べたものだが、つまり終わりのないデータを扱うのがストリーム処理ということである。 例えば web サービスから生まれ続けるユーザ行動ログを逐次的に処理するというのがストリーム処理。 web サービスが終了しないかぎりはユーザ行動ログの生成には終わりがない。 これに対して “1日分のユーザ行動ログ” 等のように有限の量のデータを切り出して処理する場合、これはバッチ処理となる。 ストリーム処理とバッチ処理の違いは扱うデータが無限なのか有限なのかということだ。 この後触れていくが、この終わりのないデータを継続的に処理し続けるというところにバッチ処理にはない難しさがある。 なぜストリーム処理なのか なぜストリーム処理なのか。 ひとえに逐次的な入力データに対する迅速なフィードバックが求められているからと言えるだろう。 迅速なフィードバックがビジネス上のメリットとなることは自明だ。 SNS の配信 カーシェアリングにおける配車や料金設定 クレジットカードや広告クリックなどの不正検知 もしこれらの application が例えば hourly のバッチ処理で実装されていたらどうだろうか。 まあ待っていられない。 ...

9月 6, 2020 · soonraah
Apache Flink

Apache Flink の Temporary Table Function を用いた stream data と static data の join

前回の記事 では Apache Flink における stream data と static data の join において、DataStream API における broadcast state pattern を使う方法を示した。 今回の記事では Table API の temporal table function を用いた実験を行う。 Table API Table API は名前のとおりで class Table を中心として SQL-like な DSL により処理を記述するという、DataStream API より high-level な API となっている。 これらの関係は Apaceh Spark の RDD と DataFrame (DataSet) の関係に似ている。 SQL-like な API で記述された処理が実行時に最適化されて low-level API の処理に翻訳されるところも同じだ。 RDB の table の概念を元にしているものと考えられるが、本質的に table の概念とストリーム処理はあまりマッチしないと思う。 table はある時点のデータセット全体を表すのに対し、ストリーム処理ではやってくるレコードを逐次的に処理したい。 ここを合わせているため、ストリーム処理における Table API による処理の挙動の理解には注意が必要だ。 Streaming Concepts 以下のドキュメントを確認しておきたい。 ...

8月 16, 2020 · soonraah
Apache Flink

Apache Flink の Broadcast State Pattern を用いた stream data と static data の join

star schema における fact table と dimension table の join のようなことを Apache Flink におけるストリーム処理で行いたい。 stream data と static data の join ということになる。 ただし dimension table 側も更新されるため、完全な static というわけではない。 このポストでは Flink v1.11 を前提とした。 join の方法 今回は DataStream API でこれを実現することを考える。 Flink のドキュメントを読むと broadcast state pattern でできそうだ。 The Broadcast State Pattern やり方としては次のようになる。 static data のファイルを FileProcessingMode.PROCESS_CONTINUOUSLY で読み込み DataStream 化 1 を broadcast() stream data の DataStream と 2 を connect() static data を PROCESS_CONTINUOUSLY で読むのは変更を得るため。 PROCESS_ONCE で読んでしまうとストリーム処理の開始時に1回読むだけになり、dimension table の変更を得られない。 このあたりの仕様については Data Sources を参照。 ...

8月 6, 2020 · soonraah