このポストについて

データ基盤移行について書いていくシリーズです。
シリーズ一覧はこちらから。

前回 Part 3. アーキテクチャ編ではどういったシステム構成にしたかを書きました。
今回はその技術スタックへと移行するための苦労と効率化について書きます。

(次は CI/CD の話をすると書きましたが…スマンありゃウソだった)

スコープ

今回はやや小さいスコープの話です。
データ基盤における ETL (ELT) 処理の移行作業を対象としています。
移行作業における工数的な課題を AI ワークフローを作って効率化して軽減したという話になります。
ETL 以外の移行作業は今回はスコープ外となります。

課題

旧データ基盤から新データ基盤へと table およびそれを更新するための処理を移行するにあたり工数面での課題が2つあります。

  • 技術スタックの移行
  • column 命名などの標準化

これらについて述べます。

技術スタックの移行

データ基盤の移行において、新旧の環境で技術スタックは次のようになっています。

  • 旧データ基盤
    • ETL: Glue Job
  • 新データ基盤
    • ELT: dbt-databricks

つまり Glue Job の Python コードを dbt model、つまり SQL に翻訳する必要があり、それなりに手間がかかります。
さらにこの Python コードは次のような問題もあり、移行のハードルを上げます。

  • UDF を実装して特殊な処理を行っているケースがある
  • Spark の API だけでなく Glue の API をふんだんに使っている (なるべく Spark に寄せればいいものを…)
  • (ここ数年の業務で見た中で一番というぐらいに) コード品質が低い

column 命名などの標準化

旧データ基盤は利用者への配慮があまりない状態で table の schema が作られており、利用者にとって使いにくいものとなっていました。
それを改善するため、新データ基盤では次のようなルールを導入しました。

  • 時刻を表す column は timestamp 型にし、column 名を <過去分詞形>_at とする
    • ex. created_at
  • 複数の要素を表す column は array 型にし、column 名を <複数形> とする
    • ex. features
  • etc.

これらルールの適用により利用者にとっての利便性が向上しますが、一方でモデリング時に手間と注意力を要します。

移行対象の table が数件程度ならこのような対応も人手で行えますが、数十〜の table を対応するには工数がかかりすぎます。(それでも少ない方ですが)
とはいえ従来的なプログラミングで簡単に変換できるようなものでもありません。
そこで AI ワークフローを作ることにしました。

AI ワークフローって?

Anthropic のブログ記事 Building Effective AI Agents \ Anthropic において、エージェントと対比する形でワークフローについて述べられています。

  • Workflows are systems where LLMs and tools are orchestrated through predefined code paths.
  • Agents, on the other hand, are systems where LLMs dynamically direct their own processes and tool usage, maintaining control over how they accomplish tasks.

(エージェントの定義はいろいろあるようですが) 今回作ったのはここで言うエージェントほど自由ではなく、ワークフローの範疇となります。
同記事で挙げられている Workflow: Evaluator-optimizer を含んだワークフローを構築します。

ソリューション

ワークフロー

LangGraph を使って次のようなワークフローを実装しました。

flowchart TD
    start((Start))
    retrieve_target_table["①移行対象の旧データ基盤 table の schema を取得"]
    retrieve_migrated_table["②移行済みの旧データ基盤 table の schema を取得"]
    retrieve_migrated_dbt_model["③移行済みの dbt model を取得"]
    generate_dbt_model["④LLM による dbt model 生成"]
    sql_evaluator["⑤LLM による生成された dbt model の評価"]
    end_((End))
    start --> retrieve_target_table
    retrieve_target_table --> generate_dbt_model
    start --> retrieve_migrated_table
    retrieve_migrated_table --> retrieve_migrated_dbt_model
    retrieve_migrated_dbt_model --> generate_dbt_model
    generate_dbt_model --> sql_evaluator
    sql_evaluator -- dbt model が適切 --> end_
    sql_evaluator -- dbt model が不適切 --> generate_dbt_model

AI ワークフローと言いつつ、LLM を使っているステップは④と⑤だけだったりします。
各ステップを解説します。

①移行対象の旧データ基盤 table の schema を取得

最初に分岐した左のラインでは移行したい table の情報を取得します。
table 情報はどこから取ってきてもいいのですが、OpenMetadata を導入していたためそちらから取ってきました。
この table の schema を入力として dbt model を生成します。

本来は Glue Job の Python コードを入力として与えるのが正当なやり方だと思いますが、ここではそうしません。
複数ファイルに散らばる Python コードを読ませるのが多少手間なのと、(ここ数年の業務で見た中で一番というぐらいに) コード品質が低いコードを LLM に読ませるのが忍びないからです。

じゃあ何の情報から dbt model を作るの?というのが右のラインになります。

②移行済みの旧データ基盤 table の schema を取得

右のラインではすでに移行が完了した table についての情報を取得します。
まずは移行対象と同様に schema 情報を OpenMetadata から取得します。

③移行済みの dbt model を取得

次に移行済みの dbt model を取得します。
これはローカルに clone された git repository からファイルとして取得する想定です。

さて、この②と③により dbt model 生成の入出力の例が得られました。
これにより dbt model 生成のプロンプトで few-shot prompting ができるようになります。
few-shot prompting は簡単に言うと、LLM への指示に対する入出力の例を提示することにより、得たい出力を得やすくするという手法です。

④LLM による dbt model 生成

ここでようやく LLM の出番です。
プロンプトには

  • dbt model を作れという指示
  • 指示の背景
  • column 命名などの標準化ルール
  • few-shot prompting の入出力例
  • 入力となる移行対象 table の schema

を与えます。
そうすると何らかの dbt model (SQL) っぽいものが出力されました。

⑤LLM による生成された dbt model の評価

④で終わってもよかったのですが、生成物を LLM で評価するというフェーズも追加しました。
④のプロンプトとその出力の dbt model を LLM に与え、「タスクの実行結果は適切だったと思うか?」と問いかけます。
適切なら終了します。めでたしめでたし。

不適切と判断された場合はその理由つけて④に差し戻し、評価結果に配慮させながら④をやり直します。
無限ループにならないよう N 回繰り返すと失敗するような制御も必要です。

この LLM に評価させるというのが前述の Building Effective AI Agents \ Anthropic における Workflow: Evaluator-optimizer となります。
agent desing pattern においては self-reflection とも言われます。

技術スタック

ワークフローの実装には次のような技術スタックを使いました。

  • LangGraph: ワークフローの構成
  • LangChain: ワークフローの node 内での prompt 生成、LLM 呼び出し、結果成形などの一連の処理
  • Amazon Bedrock: LLM は Bedrock 経由で Claude 3.7 Sonnet を利用

これらを使ったワークフロー実装はそこまで難しいものではなく、だいたい1週間ちょっとぐらいでコーディングしました。

ワークフローの導入効果

一部の table 移行で試したところ、ちゃんと測ったわけではありませんが体感で50~80%ぐらいは工数削減できたかなと思います。
比較的ローコストで大きく工数を削減できたのでこの結果には満足しています。
勉強したての AI ワークフローのユースケースとして実際に役に立つものを作れたのも良かったです。

few-shot prompting だのみなので、他の dbt model と異なる特殊な処理をしているケースには対応が難しく、そういった場合は人間様の出番になります。
まあ作業を100%置き換えることを目指していたわけではないため、それは良しとしています。

何回か試した中では評価で不適切とみなされることはありませんでした。
これは dbt model 生成とその評価に同じ LLM を使っているからかもしれません。
人間も自分の仕事を自分で評価すると甘くなります。
self-reflection ではなく、評価に別の LLM を使うという crossーreflection という agent design pattern にするといいかもしれません。

Q&A

Q. ワークフロー作らんでも Cursor や Cline みたいなエージェントに命令したらええんちゃうん?

A.
おそらくそのやり方でもできるでしょう。
もっと言うと Glue Job の Python コードを含むプロジェクトのファイルをまるっとコンテキストとして持てるのであれば、精度も上がるかもしれません。
ただ残念ながら弊社は出遅れており、そういったソフトウェア開発エージェントがまだ利用できない状態です…

ワークフローを自前で作ることのメリットを強いて上げるのであれば、決まった処理の流れを曖昧な自然言語ではなくコードで定義できるというところになるでしょうか。

Q. Glue Job は Spark やろ?せやったらそのままのコードで Databricks に移行できるんちゃうん?

A.
Apache Spark はプログラミングモデルは DataFrame 形式で非常にとっつきやすいのですが、アーキテクチャを理解していないとパフォーマンスの問題に対応できません。
アーキテクチャを理解せずに Spark を使っている開発者も割と多いです。
弊社もそういう状況で Spark を理解するエンジニアを安定的に雇用するのも困難なため、このデータ基盤移行によりコードとしての Spark は捨てて ELT については完全に SQL 化する運用にシフトしようとしています。

また、前述のとおり Spark のコードと言っても Glue Job の API が結構入り込んでいてポータビリティが低くなってしまっていました。
さらに (ここ数年の業務で見た中で一番というぐらいに) コード品質が低く、これを残したくない気持ちがありました。

Q. AI ワークフローはどないして勉強したらええのん?

A.
今回の技術選定、ひいてはそもそものワークフロー作成のアイディアは以下の書籍に大きく影響を受けています。

個人的にはとても面白い内容でした。
AI エージェントやワークフローについて学ぶのにいい書籍だと思います。(が、この分野は古くなるのも早いので注意)

この書籍について書いたポストもご参照いただければと。

次回予告

次こそ CI/CD の話をします!