Stream

バッチ処理おじさんがストリーム処理のシステムを開発するにあたって調べたこと

ほとんどバッチ処理しか書いたことのない者だがストリーム処理のシステムを開発することになった。 それにあたって独学で調べたことなどまとめておく。 ストリーム処理とは そもそも “ストリーム処理” とは何を指しているのか。 以下の引用が簡潔に示している。 a type of data processing engine that is designed with infinite data sets in mind. Nothing more. – Streaming 101: The world beyond batch こちらは “streaming system” について述べたものだが、つまり終わりのないデータを扱うのがストリーム処理ということである。 例えば web サービスから生まれ続けるユーザ行動ログを逐次的に処理するというのがストリーム処理。 web サービスが終了しないかぎりはユーザ行動ログの生成には終わりがない。 これに対して “1日分のユーザ行動ログ” 等のように有限の量のデータを切り出して処理する場合、これはバッチ処理となる。 ストリーム処理とバッチ処理の違いは扱うデータが無限なのか有限なのかということだ。 この後触れていくが、この終わりのないデータを継続的に処理し続けるというところにバッチ処理にはない難しさがある。 なぜストリーム処理なのか なぜストリーム処理なのか。 ひとえに逐次的な入力データに対する迅速なフィードバックが求められているからと言えるだろう。 迅速なフィードバックがビジネス上のメリットとなることは自明だ。 SNS の配信 カーシェアリングにおける配車や料金設定 クレジットカードや広告クリックなどの不正検知 もしこれらの application が例えば hourly のバッチ処理で実装されていたらどうだろうか。 まあ待っていられない。 ...

9月 6, 2020 · soonraah
A/B Testing

A/B テストの運用が重くてつらいという話

前提 ここでは web システムで使われている機械学習のモデルやアルゴリズムを改善するための online の A/B テストを考える。 具体的に述べると web 広告における CTR 予測や EC サイトのレコメンデーション等が対象である。 よくあるやつ。 web システムにおいて online の A/B テストは KPI 改善の根幹でありとても重要だ。 それが重くなるとつらい、という話。 ここで「重い」と言っているのは計算資源のことではなく、A/B テストを実施する担当者の運用コストについて。 A/B テストの運用が重い場合のデメリット デメリット 1. KPI 改善が遅くなる デメリットと言えばこれが一番大きい。 単純に A/B テストを1回まわすのに時間がかかってしまうし、それがゆえに online の A/B テストに入るまでの offline のテストが厚くなりここでも時間がかかってしまう。 KPI 改善に時間がかかるというのはつまり売上や利益を大きくするのに時間がかかってしまうということである。 デメリット 2. KPI 改善における offline テストの比重が大きくなる 前述のとおりだが online の A/B テストが重いとそこで失敗できなくなり、結果としてその前段の offline のテストを厚くするということになる。 offline のテストが厚いことの何が問題だろうか。 ここで前提としている CTR 予測やレコメンデーションのようなタスクの場合、offline のデータは既存のモデルやアルゴリズムの影響を受けることになる。 例えばレコメンデーションの場合を考えると、新しいモデルを offline で評価するための実験データの正例 (コンテンツの閲覧等) は既存モデルによって生み出される。 既存モデルが「このコンテンツがいいよ」といってユーザに出したリスト、その中からコンテンツの閲覧が行われ正例となるからだ。 このような状況下での offline テストにおいては既存モデルと近い好みを持ったモデルのスコアが高くなる傾向がある。 ...

8月 23, 2020 · soonraah
Apache Flink

Apache Flink の Temporary Table Function を用いた stream data と static data の join

前回の記事 では Apache Flink における stream data と static data の join において、DataStream API における broadcast state pattern を使う方法を示した。 今回の記事では Table API の temporal table function を用いた実験を行う。 Table API Table API は名前のとおりで class Table を中心として SQL-like な DSL により処理を記述するという、DataStream API より high-level な API となっている。 これらの関係は Apaceh Spark の RDD と DataFrame (DataSet) の関係に似ている。 SQL-like な API で記述された処理が実行時に最適化されて low-level API の処理に翻訳されるところも同じだ。 RDB の table の概念を元にしているものと考えられるが、本質的に table の概念とストリーム処理はあまりマッチしないと思う。 table はある時点のデータセット全体を表すのに対し、ストリーム処理ではやってくるレコードを逐次的に処理したい。 ここを合わせているため、ストリーム処理における Table API による処理の挙動の理解には注意が必要だ。 Streaming Concepts 以下のドキュメントを確認しておきたい。 ...

8月 16, 2020 · soonraah
Apache Flink

Apache Flink の Broadcast State Pattern を用いた stream data と static data の join

star schema における fact table と dimension table の join のようなことを Apache Flink におけるストリーム処理で行いたい。 stream data と static data の join ということになる。 ただし dimension table 側も更新されるため、完全な static というわけではない。 このポストでは Flink v1.11 を前提とした。 join の方法 今回は DataStream API でこれを実現することを考える。 Flink のドキュメントを読むと broadcast state pattern でできそうだ。 The Broadcast State Pattern やり方としては次のようになる。 static data のファイルを FileProcessingMode.PROCESS_CONTINUOUSLY で読み込み DataStream 化 1 を broadcast() stream data の DataStream と 2 を connect() static data を PROCESS_CONTINUOUSLY で読むのは変更を得るため。 PROCESS_ONCE で読んでしまうとストリーム処理の開始時に1回読むだけになり、dimension table の変更を得られない。 このあたりの仕様については Data Sources を参照。 ...

8月 6, 2020 · soonraah

あまり大きな Pull Request を作ってほしくない

GitHub Flow ベースの開発においてはあまり大きな pull request を作ってほしくないという話。 なんというか今更わざわざ言わなくてもいいんだけど… 仕事で何度か大きな pull request が投げられているのを見てしまったので、それはあまりよくないよというのを自分でも指摘しやすくするためにまとめておく。 Reference 最初に参考資料を挙げておく。 100 Duck-Sized Pull Requests 「巨大プルリク1件vs細かいプルリク100件」問題を考える(翻訳) Optimal pull request size これらを読めば特に私から言うこともないのだが… これらに書いてあるとおりだが補足しておく。 Pull Request を小分けにしたときのメリット module を適切に切り出すモチベーションが得られる 次のような話がある。 共通化という考え方はアンチパターンを生み出すだけ説 これを理由に module を分けるべきところが分けられず、いろんなことができてしまうベタッと大きな module が生まれるのを見た。 もちろんよく考えられていない共通化は駄目だが、上記ポストでは一方で ただし共通化という名の下におこなわれるのは「同じロジックを持つコードをまとめる」行為であって、抽象化のようにそのコード単位の意味を捉える作業はその範疇にない。抽象化というのはロジックを意味単位ごとにひとくくりにしていく行為で、これがどういうことなのかは次以降で述べていく。 – 共通化という考え方はアンチパターンを生み出すだけ説 - タオルケット体操 と述べており抽象化、つまりコードの意味を考慮した上で適切な単位でまとめておくことは否定していない。 pull request を小さく分けるという行為のためには module や機能を適度なまとまりで切り分けること、つまり抽象化を考えていくことが必要となる。 したがって何でもできてしまう大きな module が生まれるのを防ぐ方向に働く。 (もちろんここで駄目な共通化がなされてしまうこともあるだろう) リリースまでの期間が短く済む 前述の参考資料においても pull request が大きいと ...

8月 3, 2020 · soonraah

勉強会メモ: Spark Meetup Tokyo #3 Online

2020-07-31 にオンライン開催された Spark Meetup Tokyo #3 Online に参加した。 感想などメモしておく。 全体感 トピックとしては主に Spark 3.0 Spark + AI Summit 2020 Spark 周辺要素 といったところだろうか。 最近のコミュニティの動向や関心を日本語で聞くことができてよかった。 運営 & スピーカーの皆様、ありがとうございます。 発表 発表資料は公開されたら追加していく。 SPARK+AI Summit 2020 イベント概要 スピーカー: @tokyodataguy さん Summit は金融業界の参加者が増えているらしい Spark で最も使われている言語は Python とのことだったが、Databricks の notebook サービスの話だった プロダクションコードではまた違うのだろう Spark 3.0 の update をざっくりと Spark 周辺要素の話をざっくりと Koalas Delta Lake Redash MLflow Introducing Koalas 1.0 Introducing Koalas 1.0 (and 1.1) from Takuya UESHIN スピーカー: @ueshin さん ...

8月 1, 2020 · soonraah

Spark DataFrame クエリの弱い分離レベル

Spark バッチ処理の問題を調べていたら分離レベルという概念にたどりついた。 分離レベルについて調べたので、Spark の問題の内容と絡めて記しておく。 考えてみれば当たり前でたいした話ではない。 分離レベルとは トランザクションの挙動についての暗黙の理解 アドホックな分析クエリやプロダクションコード中のクエリを書くとき、その単一のクエリのトランザクションにおいて「同時に実行されている別のクエリの commit 前の状態や commit 結果に影響され、このクエリの結果がおかしくなるかもしれない」ということは通常考えない。 トランザクションはデータベースのある時点の状態に対して正しく処理される、というほぼ無意識の理解をおそらくほとんどの開発者が持っている。 多くの場合この理解は間違っていない。 それはなぜかというと DB 等のデータ処理フレームワークがある強さの分離レベルを提供しているからである。 いろいろな分離レベル ACID 特性のうちの1つ、分離性 (Isolation) の程度を表すのが分離レベル。 トランザクション中に行われる操作の過程が他の操作から隠蔽されることを指し、日本語では分離性、独立性または隔離性ともいう。より形式的には、独立性とはトランザクション履歴が直列化されていることと言える。この性質と性能はトレードオフの関係にあるため、一般的にはこの性質の一部を緩和して実装される場合が多い。 – Wikipedia ACID (コンピュータ科学) 分離レベルには名前のついたものがいくつかあり、分離性の保証の強さが異なる。 具体的にはトランザクションの並行性の問題への対応力が異なる。 名著「データ指向アプリケーションデザイン」の第7章で分離レベルについて詳しく述べられているので、以下ではそちらからの引用。 分離レベルを弱い順に並べる。 read uncommitted このレベルではダーティライトは生じませんが、ダーティリードは妨げられません。 read committed データベースからの読み取りを行った際に見えるデータは、コミットされたもののみであること(ダーティリードは生じない)。 データベースへの書き込みを行う場合、上書きするのはコミットされたデータのみであること(ダーティライトは生じない)。 snapshot isolation スナップショット分離の考え方は、それぞれのトランザクションがデータベースの一貫性のあるスナップショットから読み取りを行うというものです。すなわち、トランザクションが読み取るデータは、すべてそのトランザクションの開始時点のデータベースにコミット済みのものだけということです。 serializability この分離レベルはトランザクションが並行して実行されていても、最終的な答えはそれぞれが1つずつ順番に、並行ではなく実行された場合と同じになることを保証します。 日本語で「分離レベル」を検索すると snapshot isolation の代わりに repeatable read が出てくる事が多い。 しかし repeatable read の名前は実装によって意味が違っていたりして扱いが難しいらしい。 ...

7月 19, 2020 · soonraah
Apache Spark

Apache Spark 3.0.0 について調べた

はじめに Apache Spark 3.0.0 がリリースされました。 Spark Release 3.0.0 release note を見て個人的に気になったところなど簡単に調べました。 書いてみると Databricks の記事へのリンクばっかになってしまった… 全体感 こちらの記事を読めば全体感は OK. Introducing Apache Spark 3.0 公式の release note には Python is now the most widely used language on Spark. とあってそうなん?ってなったけど、こちらの記事だと Python is now the most widely used language on Spark and, consequently, was a key focus area of Spark 3.0 development. 68% of notebook commands on Databricks are in Python. と書いてありどうやら Databricks の notebook の話らしく、だったらまあそうかなという感じ。 プロダクトコードへの実装というよりは、アドホック分析や検証用途の話なんでしょう。 ...

7月 12, 2020 · soonraah