Lake

データレイク関連の OSS - Delta Lake, Apache Hudi, Apache Kudu

はじめに 前回のポストではデータレイクとはどういうものかというのを調べた。 今回はデータレイクの文脈でどのような OSS が注目されているのかを見ていきたい。 以下は NTT データさんによる講演資料であり、その中で「近年登場してきた、リアルタイム分析に利用可能なOSSストレージレイヤソフト」というのが3つ挙げられている。 大規模データ活用向けストレージレイヤソフトのこれまでとこれから(NTTデータ テクノロジーカンファレンス 2019 講演資料、2019/09/05) from NTT DATA Technology & Innovation Delta Lake Apache Hudi Apache Kudu これらはすべて論理的なストレージレイヤーを担う。 こちらの講演資料に付け足すようなこともないかもしれないが、このポストではデータレイクという文脈から自分で調べて理解した内容をまとめるということを目的にする。 当然 Hadoop, Hive, Spark 等もデータレイクの文脈において超重要だが、「データレイク」という言葉がよく聞かれるようになる前から普及していたのでこのポストでは触れないことにする。 Delta Lake https://delta.io/ Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads. Delta Lake Delta Lake は Apache Spark の読み書きに ACID な transaction を提供するストレージレイヤーの OSS である。 Databricks が作り、2019年4月に v0.1.0 がリリースされたのが最初だ。 使い方はめちゃ簡単で、dependency を設定した上で Spark で ...

1月 26, 2021 · soonraah
Lake

いまさらながらのデータレイク

最近よく聞かれるようになった「データレイク」という概念にあまりついていけていなかったため、いまさらながらざっと調べてみた。 データレイクとは Wikipedia によると最初にこの言葉を使ったのは Pentaho 社の CTO である James Dixon らしい。 その時の彼のブログ (10年前…) を読むと、既にあったデータマートに対して Only a subset of the attributes are examined, so only pre-determined questions can be answered. The data is aggregated so visibility into the lowest levels is lost –Pentaho, Hadoop, and Data Lakes - James Dixon’s Blog というような問題意識からデータレイクというコンセプトを提案したようだ。 最近?のデータレイクについてはベンダー等の記事が参考になる。 データレイクとは - AWS データレイクとは? - talend データレイクとは?データレイクの落とし穴と効果 - Informatica 書籍だと『AWSではじめるデータレイク: クラウドによる統合型データリポジトリ構築入門』がいいだろうか。 データレイクの概要と AWS が考えている構築・運用がざっとわかる。 Amazon で検索した限りだと現時点でタイトルに「データレイク」を含む和書はこれのみだった。 ...

12月 31, 2020 · soonraah
CSV

Apache Flink の DataStream API 利用時の CSV ファイル読み込み

ストリーム処理における CSV ファイルの読み込み Apache Flink は unbounded なストリームデータを処理するためのフレームワークだ。 しかし現実的な application を開発する場合、ストリームデータに加えて static なファイルや DB 等を読み込みたいこともある。 star schema における dimension table 的な情報をストリームに結合したい場合 等が考えられる。 このポストでは Flink で DataStream API ベースでの実装において CSV ファイルを読むことを考える。 Flink は現時点の stable である v1.11 を想定。 CSV ファイルを読む方法 DataStream API ベースの実装で CSV ファイルを読むには StreamExecutionEnvironment のメソッドである readFile() を使う。 overload された同名のメソッドがいくつか存在するが、次の2つの引数が特に重要だろう。 まず1つめは FileInputFormat<OUT> inputFormat であり、こちらは data stream の生成に用いる入力フォーマットを指定する。 おそらく最も一般的なのが TextInputFormat だと思われる。 もちろん単なる text として CSV ファイルを読み込み、後続の処理で各レコードを parse することも可能だが CSV 用の入力フォーマットがいくつか用意されているようだ。 PojoCsvInputFormat RowCsvInputFormat TupleCsvInputFormat なんとなく名前でわかると思うが、それぞれ readFile() の結果として返される DataStreamSource が内包する型が異なる。 これについては後述の実験にて確認する。 ...

12月 1, 2020 · soonraah
Profit

機械学習の精度と利益と倫理とイシューと

ちょっと昔話 かつて参画したプロジェクトの話。 そのプロジェクトでは他社から受注した受託開発として機械学習系のシステムを開発していた。 当時としては新しいフレームワークを使い、かなり頑張ってなんとか納期内で完成させた。 その中の1つの機能として A/B テストができるようにしていた。 パラメータチューニングによりパフォーマンスを改善することを想定していた。 しかし結局その機能は使われることがなかった。 なぜか。 A/B テストを実施するためのクライアントの追加の予算がつかなかったためである。 受託なのでなおさらなのだが、売上にならなければ工数をかけるこはできない。 工数を使ってパフォーマンス改善することはできなかった。 手はあるのに。 機械学習の精度は必ずしも利益に結びつかない この昔話で何が言いたいかというと、機械学習の精度改善は必ずしも利益に結びつかないということである。 そのことを示しているとても素晴らしい資料がこちら。 機械学習の精度と売上の関係 from Tokoroten Nakayama 前述の昔話の例はこの資料で言うところの③ロジスティック型 (=外注) となる。 いったん売上が立った後、追加予算がつかなかったので精度改善では売上は増えなかったのだ。 倫理感による精度改善 受託開発を主としている組織であれば工数にはシビアなので、売上の立たない工数をかけることはあまりないだろう。 (よっぽどの炎上鎮火とかでなければ) しかし自社で製品やサービスを作って提供しているような組織の場合、利益にならない精度改善をしているのを時折見かける。 なぜそのようなことが起こるかと言うと多くの場合はデータサイエンティスト/機械学習エンジニアとしての倫理感からなのではないだろうか。 「◯◯予測という機能なのでできるだけ良い予測精度を示すべきだ」 「ユーザには気づかれない部分だが精度が悪いので改善したい」 倫理感や興味が先行してしまっているのだ。 しかしその精度を上げた先に利益があるとは限らない。 機械学習で職を得ている人間は自分の仕事を機械学習の精度を上げるゲームだとみなす傾向があるように思う。 例えばインターネット広告の CTR 予測。 これは予測精度が高いほど利益は改善するし、広告主に価値も提供できる。 精度改善に倫理と利益が伴っている、とても機械学習がハマる例だと思う。 本来はこれらを兼ね備えているのが良い適用先であるはずだ。 イシューは行き渡っているのか 利益に結びつかない、または間接的にしか結びつかないような精度改善をやることが許されるというのは組織に余裕があるということで悪いことではないのかもしれない。 しかし単によいイシューの設定ができてないだけという可能性もある。 自社で製品やサービスを作って提供しているような組織において、単純なロジスティック回帰でコアなところのビジネスを大きく加速させることができた時期を過ぎると機械学習で解くのに適したよい問題を恒常的に見つけ出すのは実は難しいのではないだろうかと最近考えるようになった。 ビジネスの領域拡大よりも既存領域への機械学習の適用の方が速いということは十分ありうる。 もちろんチームの規模にもよる。 機械学習チームの人的リソースの規模に対して機械学習で解くべきよいイシューを見つけ出せているのか、ということだ。 少し前にちょっと話題になったこちらの件もイシューが大事だと言っている。 全ての機械学習の論文は新しいアルゴリズムを提案しているのですか? - Quora キャリアの行く末 事業会社においてビジネスの領域拡大よりも既存領域への機械学習の適用の方が速く、よいイシューを提供しにくいということがよく起こるのであれば、機械学習チームのリソースは余剰気味になりやすいということになる。 これが続くと今後機械学習しかやらない人材の市場価値は下がっていくのかもしれない。 もしくは自社で製品やサービスを持っている組織ではなく、受託開発やコンサルが主戦場になっていくのかもしれない。 何にせよ特定のプロダクトに commit したいのであれば機械学習エンジニアは機械学習以外のスキルも磨いていく必要があるように思う。 おわりに 見える範囲にいる人が利益にならない精度改善をしているのを横目で見てこのようなことを考えていた。 難しいけどできるだけ金を生んでいきたい。

11月 12, 2020 · soonraah
Stream

ストリーム処理システムに求められる機能性、および Apache Flink におけるその対応

はじめに このポストではストリーム処理の survay 論文の話題に対して Apache Flink における例を挙げて紹介する。 論文概要 Fragkoulis, M., Carbone, P., Kalavri, V., & Katsifodimos, A. (2020). A Survey on the Evolution of Stream Processing Systems. 2020年の論文。 過去30年ぐらいのストリーム処理のフレームワークを調査し、その発展を論じている。 ストリーム処理に特徴的に求められるいくつかの機能性 (functionality) についてその実現方法をいくつか挙げ、比較的古いフレームワークと最近のフレームワークでの対比を行っている。 このポストのスコープ このポストでは前述のストリーム処理システムに求められる機能性とそれがなぜ必要となるかについて簡単にまとめる。 論文ではそこからさらにその実現方法がいくつか挙げられるが、ここでは個人的に興味がある Apache Flink ではどのように対処しているかを見ていく。 ちなみに論文中では Apache Flink はモダンなフレームワークの1つとしてちょいちょい引き合いに出されている。 ここでは Flink v1.11 をターゲットとする。 以下では論文で挙げられている機能性に沿って記載していく。 Out-of-order Data Management Out-of-order ストリーム処理システムにやってくるデータの順序は外的・内的要因により期待される順序になっていないことがある。 外的要因としてよくあるのはネットワークの問題。 データソース (producer) からストリーム処理システムに届くまでのルーティング、負荷など諸々の条件により各レコードごとに転送時間は一定にはならない。 各 operator の処理などストリーム処理システムの内的な要因で順序が乱されることもある。 out-of-order は処理の遅延や正しくない結果の原因となることがある。 out-of-order を管理するためにストリーム処理システムは処理の進捗を検出する必要がある。 “進捗” とはある時間経過でレコードの処理がどれだけ進んだかというもので、レコードの順序を表す属性 A (ex. event time) により定量化される。 ある期間で処理された最古の A を進捗の尺度とみなすことができる。 ...

11月 7, 2020 · soonraah
Stream

バッチ処理おじさんがストリーム処理のシステムを開発するにあたって調べたこと

ほとんどバッチ処理しか書いたことのない者だがストリーム処理のシステムを開発することになった。 それにあたって独学で調べたことなどまとめておく。 ストリーム処理とは そもそも “ストリーム処理” とは何を指しているのか。 以下の引用が簡潔に示している。 a type of data processing engine that is designed with infinite data sets in mind. Nothing more. – Streaming 101: The world beyond batch こちらは “streaming system” について述べたものだが、つまり終わりのないデータを扱うのがストリーム処理ということである。 例えば web サービスから生まれ続けるユーザ行動ログを逐次的に処理するというのがストリーム処理。 web サービスが終了しないかぎりはユーザ行動ログの生成には終わりがない。 これに対して “1日分のユーザ行動ログ” 等のように有限の量のデータを切り出して処理する場合、これはバッチ処理となる。 ストリーム処理とバッチ処理の違いは扱うデータが無限なのか有限なのかということだ。 この後触れていくが、この終わりのないデータを継続的に処理し続けるというところにバッチ処理にはない難しさがある。 なぜストリーム処理なのか なぜストリーム処理なのか。 ひとえに逐次的な入力データに対する迅速なフィードバックが求められているからと言えるだろう。 迅速なフィードバックがビジネス上のメリットとなることは自明だ。 SNS の配信 カーシェアリングにおける配車や料金設定 クレジットカードや広告クリックなどの不正検知 もしこれらの application が例えば hourly のバッチ処理で実装されていたらどうだろうか。 まあ待っていられない。 ...

9月 6, 2020 · soonraah
A/B Testing

A/B テストの運用が重くてつらいという話

前提 ここでは web システムで使われている機械学習のモデルやアルゴリズムを改善するための online の A/B テストを考える。 具体的に述べると web 広告における CTR 予測や EC サイトのレコメンデーション等が対象である。 よくあるやつ。 web システムにおいて online の A/B テストは KPI 改善の根幹でありとても重要だ。 それが重くなるとつらい、という話。 ここで「重い」と言っているのは計算資源のことではなく、A/B テストを実施する担当者の運用コストについて。 A/B テストの運用が重い場合のデメリット デメリット 1. KPI 改善が遅くなる デメリットと言えばこれが一番大きい。 単純に A/B テストを1回まわすのに時間がかかってしまうし、それがゆえに online の A/B テストに入るまでの offline のテストが厚くなりここでも時間がかかってしまう。 KPI 改善に時間がかかるというのはつまり売上や利益を大きくするのに時間がかかってしまうということである。 デメリット 2. KPI 改善における offline テストの比重が大きくなる 前述のとおりだが online の A/B テストが重いとそこで失敗できなくなり、結果としてその前段の offline のテストを厚くするということになる。 offline のテストが厚いことの何が問題だろうか。 ここで前提としている CTR 予測やレコメンデーションのようなタスクの場合、offline のデータは既存のモデルやアルゴリズムの影響を受けることになる。 例えばレコメンデーションの場合を考えると、新しいモデルを offline で評価するための実験データの正例 (コンテンツの閲覧等) は既存モデルによって生み出される。 既存モデルが「このコンテンツがいいよ」といってユーザに出したリスト、その中からコンテンツの閲覧が行われ正例となるからだ。 このような状況下での offline テストにおいては既存モデルと近い好みを持ったモデルのスコアが高くなる傾向がある。 ...

8月 23, 2020 · soonraah
Apache Flink

Apache Flink の Temporary Table Function を用いた stream data と static data の join

前回の記事 では Apache Flink における stream data と static data の join において、DataStream API における broadcast state pattern を使う方法を示した。 今回の記事では Table API の temporal table function を用いた実験を行う。 Table API Table API は名前のとおりで class Table を中心として SQL-like な DSL により処理を記述するという、DataStream API より high-level な API となっている。 これらの関係は Apaceh Spark の RDD と DataFrame (DataSet) の関係に似ている。 SQL-like な API で記述された処理が実行時に最適化されて low-level API の処理に翻訳されるところも同じだ。 RDB の table の概念を元にしているものと考えられるが、本質的に table の概念とストリーム処理はあまりマッチしないと思う。 table はある時点のデータセット全体を表すのに対し、ストリーム処理ではやってくるレコードを逐次的に処理したい。 ここを合わせているため、ストリーム処理における Table API による処理の挙動の理解には注意が必要だ。 Streaming Concepts 以下のドキュメントを確認しておきたい。 ...

8月 16, 2020 · soonraah
Apache Flink

Apache Flink の Broadcast State Pattern を用いた stream data と static data の join

star schema における fact table と dimension table の join のようなことを Apache Flink におけるストリーム処理で行いたい。 stream data と static data の join ということになる。 ただし dimension table 側も更新されるため、完全な static というわけではない。 このポストでは Flink v1.11 を前提とした。 join の方法 今回は DataStream API でこれを実現することを考える。 Flink のドキュメントを読むと broadcast state pattern でできそうだ。 The Broadcast State Pattern やり方としては次のようになる。 static data のファイルを FileProcessingMode.PROCESS_CONTINUOUSLY で読み込み DataStream 化 1 を broadcast() stream data の DataStream と 2 を connect() static data を PROCESS_CONTINUOUSLY で読むのは変更を得るため。 PROCESS_ONCE で読んでしまうとストリーム処理の開始時に1回読むだけになり、dimension table の変更を得られない。 このあたりの仕様については Data Sources を参照。 ...

8月 6, 2020 · soonraah

あまり大きな Pull Request を作ってほしくない

GitHub Flow ベースの開発においてはあまり大きな pull request を作ってほしくないという話。 なんというか今更わざわざ言わなくてもいいんだけど… 仕事で何度か大きな pull request が投げられているのを見てしまったので、それはあまりよくないよというのを自分でも指摘しやすくするためにまとめておく。 Reference 最初に参考資料を挙げておく。 100 Duck-Sized Pull Requests 「巨大プルリク1件vs細かいプルリク100件」問題を考える(翻訳) Optimal pull request size これらを読めば特に私から言うこともないのだが… これらに書いてあるとおりだが補足しておく。 Pull Request を小分けにしたときのメリット module を適切に切り出すモチベーションが得られる 次のような話がある。 共通化という考え方はアンチパターンを生み出すだけ説 これを理由に module を分けるべきところが分けられず、いろんなことができてしまうベタッと大きな module が生まれるのを見た。 もちろんよく考えられていない共通化は駄目だが、上記ポストでは一方で ただし共通化という名の下におこなわれるのは「同じロジックを持つコードをまとめる」行為であって、抽象化のようにそのコード単位の意味を捉える作業はその範疇にない。抽象化というのはロジックを意味単位ごとにひとくくりにしていく行為で、これがどういうことなのかは次以降で述べていく。 – 共通化という考え方はアンチパターンを生み出すだけ説 - タオルケット体操 と述べており抽象化、つまりコードの意味を考慮した上で適切な単位でまとめておくことは否定していない。 pull request を小さく分けるという行為のためには module や機能を適度なまとまりで切り分けること、つまり抽象化を考えていくことが必要となる。 したがって何でもできてしまう大きな module が生まれるのを防ぐ方向に働く。 (もちろんここで駄目な共通化がなされてしまうこともあるだろう) リリースまでの期間が短く済む 前述の参考資料においても pull request が大きいと Developers would put off reviews until they had time/energy and the development process would come to a halt. – 100 Duck-Sized Pull Requests ...

8月 3, 2020 · soonraah