このポストについて

前回の記事 dlt 入門 - ELT の Extract と Load を担う data load tool では dlt の概要を説明した。
この記事ではそれを踏まえ、dlt を使って CSV ファイルを BigQuery に load するという一連の開発作業をやってみる。

現実の CSV はそのまま使えなかったりするので、BigQuery に入れるまでに泥臭い前処理のような作業があることが多い。
そのへんもまとめて dlt でやってみるとこんな感じ、というのが示せるとよい。

やりたいこと

個人で管理しているお金の情報を個人用の BigQuery に置きたい、という要件を想定。

データ概要

具体的には MoneyForward のデータを load していく。
個人では API を利用できないので、web UI から export できる CSV のデータで収入・支出詳細と資産推移月次を対象とする。
CSV の export 方法は以下を参照。

データの内容は次のようになっている。

  • 収入・支出詳細_2023-11-01_2023-11-30.csv
"計算対象","日付","内容","金額(円)","保有金融機関","大項目","中項目","メモ","振替","ID"
"1","2023/11/30","AMAZON.CO.JP","-2830","楽天カード","食費","食料品","","0","EPv92ZjQcOxgWQx_cLbhD1"
"1","2023/11/24","東京ガス","-4321","楽天カード","水道・光熱費","ガス・灯油代","","0","r6wuQPfrIRS6aFpNYZE5Eh"
"1","2023/11/24","給与 カ) フロッグログ","700000","みずほ銀行","収入","給与","","0","doettKpYyNp0Tml9KQQXm1"

ヘッダーがあり、各列に名前が付いている。
encoding が CP932 であることに注意。
ID の列があるので、行の識別に使えそう。

  • 資産推移月次.csv
"日付","合計(円)","預金・現金・暗号資産(円)","株式(現物)(円)","投資信託(円)","債券(円)","年金(円)","ポイント(円)"
"2023/12/17","11505000","5000000","300000","5000000","200000","1000000","5000"
"2023/12/16","11505000","5000000","300000","5000000","200000","1000000","5000"
"2023/12/15","11505000","5000000","300000","5000000","200000","1000000","5000"

encoding などについては同じ。
当月についてはすべての 日付 がある一方、それ以前については 2023/11/30, 2023/10/31, … のように月末だけが含まれている。
日付 で行の識別ができる。

これらはまず最初に GCS bucket に手動で置くものとする。

開発手順

CSV そのままだと schema まわりで問題が起こりそうなので、そのあたり try & error で解決したい。
というのをいきなり BigQuery 上でやると手間もかかるし汚くなるので、最初は試験的にローカル環境の DuckDB に load するようにする。
DuckDB でうまくいったら BigQuery へと移行する。

全体の流れとしては次のようになる。

  1. dlt のインストール
  2. DuckDB 用の pipeline project を作成
  3. pipeline の実装
  4. configuration
  5. pipeline の実行
  6. Streamlit app による結果の確認
  7. schema の調整
  8. 不要レコードのフィルタリング
  9. incremental loading への対応
  10. BigQuery への移行
  11. BigQuery への load の確認

開発作業

1. dlt のインストール

destination としては DuckDB, BigQuery を使うので、以下のようにして一緒にインストールする。
(Poetry の例)

poetry add 'dlt[duckdb,bigquery]'

インストールされたバージョンを確認。

$ dlt --version                  
dlt 0.3.25

また、gsfs, pandas, streamlit, google-cloud-bigquery-storage も必要になるのでインストールしておく。

2. pipeline project を作成

次のコマンドで pipeline project を用意する。

$ dlt init filesystem duckdb

これは verified source として Filesystem、destination として DuckDB を指定して pipeline project を作るという意味。
Filesystem はローカルのファイルシステムや S3, GCS のようなクラウドストレージからファイルを読むことが可能。

このコマンドが成功すると次のようなディレクトリ構造が作られる。

.
├── .dlt
│   ├── .sources
│   ├── config.toml
│   └── secrets.toml
├── .gitignore
├── filesystem
│   ├── README.md
│   ├── __init__.py
│   ├── helpers.py
│   ├── readers.py
│   └── settings.py
└── filesystem_pipeline.py

filesystem/ 以下には Filesystem を使うための関数定義や README などが生成されている。
filesystem_pipeline.py には pipeline の実装例がある。
これを元に修正していってもいいが、今回は新しく money_forward_pipeline.py を用意して実装していく。

3. pipeline の実装

money_forward_pipeline.py は次のように実装した。

import dlt
from filesystem import filesystem, read_csv, readers


class MFFileSpec:
    def __init__(self, file_name: str, table_name: str):
        self.file_name = file_name
        self.table_name = table_name


def run_pipeline() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="money_forward",
        destination="duckdb",
        dataset_name="raw_money_forward",
        full_refresh=True,
    )
    load_info = pipeline.run(money_forward())
    print(load_info)
    

@dlt.source
def money_forward() -> dlt.extract.source.DltSource:
    file_specs = [
        MFFileSpec("収入・支出詳細_*.csv", "income_expence_details"),
        MFFileSpec("資産推移月次.csv", "monthly_assets"),
    ]
    
    for file_spec in file_specs:
        records = filesystem(
            file_glob=f"money_forward/{file_spec.file_name}",
        ) | read_csv(encoding="cp932")

        yield records.with_name(file_spec.table_name)


if __name__ == "__main__":
    run_pipeline()

pipeline を作り、run() するときに source として money_forward() を与えている。
試験的に何度か table を作り直すので full_refresh=True を指定している。
ちなみに source は resource をグルーピングしたものであり、resource ごとに table が作られると思っていい。

money_forward() の中では収入・支出詳細と資産推移月次のそれぞれに対して resource が作られる。
filesystem() でファイルをリストアップし、それを read_csv() に渡してデータを読んでいる。

ここで GCS の bucket や認証情報が指定されていないことに気づくかもしれない。
関数 filesystem() の定義は次のようになっている。

@dlt.resource(
    primary_key="file_url", spec=FilesystemConfigurationResource, standalone=True
)
def filesystem(
    bucket_url: str = dlt.secrets.value,
    credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value,
    file_glob: Optional[str] = "*",
    files_per_page: int = DEFAULT_CHUNK_SIZE,
    extract_content: bool = False,
) -> Iterator[List[FileItem]]:
    ...

bucket_url, credentials のデフォルト値は dlt.sercrets.value になっている。
これは設定ファイルや環境変数などから取得された設定値を使うことを意味する。

4. configuration

認証情報などの設定は dlt の設定ファイルや環境変数で与えることができる。
ここでは設定ファイルを使うことにする。
bucket や認証情報を次のように記載した。

  • .dlt/config.toml
[sources.money_forward_pipeline]
bucket_url = "gs://<BUCKET_NAME>"
  • .dlt/secrets.toml
[sources.credentials]
client_email = "<CLIENT_EMAIL>"
private_key = "<PRIVATE_KEY>"
project_id = "<PROJECT_ID>"

.dlt/secrets.toml の情報から GcpServiceAccountCredentials の object が自動で生成され、それが filesystem() に渡されるようになる。
認証情報の各値を得るには GCP で service account を作り、key を用意してやる必要がある。
service account には必要な権限を付与しておく。
当然だが .dlt/secrets.toml は GitHub 等に push してはいけない。

ちなみに設定周りの便利な機能として、足りない設定があったときに設定できる場所の一覧を教えてくれるというものがある。
例えば bucket_url を書き忘れて pipeline を実行した場合、次のようなエラーメッセージが出力される。

Following fields are missing: ['bucket_url'] in configuration with spec FilesystemConfigurationResource
        for field "bucket_url" config providers and keys were tried in following order:
                In Environment Variables key MONEY_FORWARD__SOURCES__MONEY_FORWARD_PIPELINE__FILESYSTEM__BUCKET_URL was not found.
                In Environment Variables key MONEY_FORWARD__SOURCES__MONEY_FORWARD_PIPELINE__BUCKET_URL was not found.
                In Environment Variables key MONEY_FORWARD__SOURCES__BUCKET_URL was not found.
                In Environment Variables key MONEY_FORWARD__BUCKET_URL was not found.
                In secrets.toml key money_forward.sources.money_forward_pipeline.filesystem.bucket_url was not found.
                In secrets.toml key money_forward.sources.money_forward_pipeline.bucket_url was not found.
                In secrets.toml key money_forward.sources.bucket_url was not found.
                In secrets.toml key money_forward.bucket_url was not found.
                In config.toml key money_forward.sources.money_forward_pipeline.filesystem.bucket_url was not found.
                In config.toml key money_forward.sources.money_forward_pipeline.bucket_url was not found.
                In config.toml key money_forward.sources.bucket_url was not found.
                In config.toml key money_forward.bucket_url was not found.
                In Environment Variables key SOURCES__MONEY_FORWARD_PIPELINE__FILESYSTEM__BUCKET_URL was not found.
                In Environment Variables key SOURCES__MONEY_FORWARD_PIPELINE__BUCKET_URL was not found.
                In Environment Variables key SOURCES__BUCKET_URL was not found.
                In Environment Variables key BUCKET_URL was not found.
                In secrets.toml key sources.money_forward_pipeline.filesystem.bucket_url was not found.
                In secrets.toml key sources.money_forward_pipeline.bucket_url was not found.
                In secrets.toml key sources.bucket_url was not found.
                In secrets.toml key bucket_url was not found.
                In config.toml key sources.money_forward_pipeline.filesystem.bucket_url was not found.
                In config.toml key sources.money_forward_pipeline.bucket_url was not found.
                In config.toml key sources.bucket_url was not found.
                In config.toml key bucket_url was not found.

bucket_url が設定される可能性のある環境変数や設定ファイル中のパスが優先度順で一覧となって表示されている。
これにより何か設定が間違っているときにどこに書き足せばいいかがわかるようになっている。
ちなみにパスの違いはスコープの違いを表していて、

                In secrets.toml key sources.money_forward_pipeline.filesystem.bucket_url was not found.
                In secrets.toml key sources.money_forward_pipeline.bucket_url was not found.

の2つを比べると前者は pipeline money_forward_pipelinefilesystem という resource のための bucket_url だが、後者はそれより広く filesystem 以外でも参照できるものとなっている。

設定まわりの挙動について、詳しくは Configuration | dlt Docs を参照のこと。

5. pipeline の実行

ここまでで pipeline を実行する準備が整ったので実行してみる。

$ python money_forward_pipeline.py

成功していれば次のようなログが出力される。

Pipeline money_forward completed in 2.96 seconds
1 load package(s) were loaded to destination duckdb and into dataset raw_money_forward_20231218112802
The duckdb destination used duckdb:////Users/sonoyou/Dev/python/dlt_sample/money_forward.duckdb location to store data
Load package 1702898884.890725 is LOADED and contains no failed jobs

6. Streamlit app による結果の確認

意図したとおりにデータが load されたのか結果を確認する方法はいくつかあるが、ここでは Streamlit を使ってみる。

$ dlt pipeline money_forward show

このコマンドを実行すると web ブラウザが立ち上がり、Streamlit app が表示される。
左のサイドバーで “Explore data” を選択すると DuckDB と接続し、pipeline により作られた table の schema やデータを見ることができる。
さらにクエリを実行することも可能。便利。

Streamlit app の画面: Exprole data

Streamlit app の画面: Exprole data

“Load info” を選択すると load 時の統計情報などを見ることができる。

Streamlit app の画面: Load info

Streamlit app の画面: Load info

さて、前者の画面で table income_expence_detals (収入・支出詳細) の schema を見ると明らかにおかしいことがわかる。
こちらを修正していく必要がある。

日本語 column 名の修正

_dlt の prefix がついた column は dlt の管理用なのでいったん無視するとして、それ以外だと x, id のみしかない。
元の CSV のヘッダーに記載されていた column 名は日本語表記になっており、1つだけ ID という英字表記の column があった。
どうやら日本語表記の column 名に問題がありそうだ。

column 名を指定する方法はいくつかあるが、read_csv() の引数 names を使うことにした。
関数 money_forward() を次のように修正した。

@dlt.source
def money_forward() -> dlt.extract.source.DltSource:
    file_specs = [
        MFFileSpec(
            file_name="収入・支出詳細_*.csv",
            table_name="income_expence_details",
            columns=["is_calc_target", "date", "details", "amount_yen", "financial_institution", "main_group", "mid_group", "memo", "transfer", "id"],
        ),
        MFFileSpec(
            file_name="資産推移月次.csv",
            table_name="monthly_assets",
            columns=["date", "total_yen", "deposit_cash_crypto_yen","stock_yen","investment_trust_yen","bond_yen","pension_yen","point_yen"],
        ),
    ]
    
    for file_spec in file_specs:
        records = filesystem(
            file_glob=f"money_forward/{file_spec.file_name}",
        ) | read_csv(encoding="cp932", names=file_spec.columns, header=0)

        yield records.with_name(file_spec.table_name)

class MFFileSpec に field columns を追加し、read_csv()names として渡すようにした。
columns には CSV のヘッダー情報を英訳したものをハードコードしている。
こちらの read_csv() では内部的に pandas が使われており、pandas.read_csv() の引数を取ることができる。
names で明示的に column 名を指定した形になる。

再度 pipeline を実行し、できあがった table income_expence_details の schema を確認してみる。

namedata_typenullable
0is_calc_targetbiginttrue
1datetexttrue
2detailstexttrue
3amount_yenbiginttrue
4financial_institutiontexttrue
5main_grouptexttrue
6mid_grouptexttrue
7transferbiginttrue
8idtexttrue
9_dlt_load_idtextfalse
10_dlt_idtextfalse
11memotexttrue

(なぜか memo が末尾になっているのが気になるが)
大丈夫そうに見えるけど、もう少し手直しする。

7. schema の調整

上記の column date に注目したい。
これは日付を表す column だが、型が text になっている。
date 型として扱えるようにしたい。

ここでは add_map() により resource に対して変換をかませる対応を行った。
money_forward_pipeline.py に次の変換用関数を追加する。

def convert_date(d: dict) -> dict:
    if "date" in d:
        d["date"] = datetime.datetime.strptime(d["date"], "%Y/%m/%d").date()
    return d

元の値としては "2023/11/30" のような文字列が入っているため、これを Python の datetime.date に変換する。
関数 money_forward() の中で resource を yield しているところに ad_map() でこれを適用。

    for file_spec in file_specs:
        records = filesystem(
            file_glob=f"money_forward/{file_spec.file_name}",
        ) | read_csv(encoding="cp932", names=file_spec.column_names, header=0)

        yield records.add_map(convert_date).with_name(file_spec.table_name)

再度 pipeline を実行すると column date の型が date になることが確認できた。

ただし、本来はこのような schema 調整は import_schema_path, export_schema_path による YAML の編集で対応する方が好ましい。
今回はそれがうまくいかなかったので add_map() による変換を使うやり方にした。
詳しくは Adjust a schema | dlt Docs を参照。

ちなみに同じ要領で add_map() により個人情報をマスキングしたりもできる。

8. 不要レコードのフィルタリング

ここで資産推移月次の CSV ファイルについて思い出してみる。

当月についてはすべての 日付 がある一方、それ以前については 2023/11/30, 2023/10/31, … のように月末だけが含まれている。

月次である以上、月ごとのデータのみが含まれていることが好ましい。
つまり当月の 2023/12/17, 2023/12/17 のような途中のレコードは不要であり、2023/11/30, 2023/10/31 のような月末のレコードのみを残したい。
それには add_filter() を使う。

次のようなフィルタリング用の関数を用意する。
date の値の日付が月末だった場合のみ True を返す。

def is_last_day_of_month(d: dict) -> bool:
    if "date" in d:
        date = d["date"]
        return date.day == calendar.monthrange(date.year, date.month)[1]
    return True

やはりこれを add_filter() により resource に加えるわけだが、今回は table monthly_assets (資産推移月次) にのみ適用したい。
次のようにした。

    for file_spec in file_specs:
        records = filesystem(
            file_glob=f"money_forward/{file_spec.file_name}",
        ) | read_csv(encoding="cp932", names=file_spec.column_names, header=0)

        records = records.add_map(convert_date)
        if file_spec.table_name == "monthly_assets":
            records = records.add_filter(is_last_day_of_month)

        yield records.with_name(file_spec.table_name)

これで pipeline を再実行したところ、table monthly_assets に月末以外のレコードが含まれないようになった。

9. incremental loading への対応

DWH への load を運用するにあたり、load を incremental に行えるかについても考えないといけない。
例えば 収入・支出詳細_2023-11-01_2023-11-30.csv にレコードの追加や変更があった場合、同じファイルを再度 load したい。
資産推移月次.csv については export 時に毎回全期間書き出しなのでこちらも同じファイルを load することになる。
このとき、単に新しくレコードが追加されるだけだと、例えば同じ買い物が2回計上されてしまうなどの問題が生じる。

dlt では incremental loading をサポートしており、primary_key (または merge_key) により load 後のレコードの重複を避けることができる。
前述のとおり収入・支出詳細は ID、資産推移月次は 日付 でレコードを一意にできる。

@dlt.source
def money_forward() -> dlt.extract.source.DltSource:
    file_specs = [
        MFFileSpec(
            file_name="収入・支出詳細_*.csv",
            table_name="income_expence_details",
            columns=["is_calc_target", "date", "details", "amount_yen", "financial_institution", "main_group", "mid_group", "memo", "transfer", "id"],
            primary_keys=["id"],
        ),
        MFFileSpec(
            file_name="資産推移月次.csv",
            table_name="monthly_assets",
            columns=["date", "total_yen", "deposit_cash_crypto_yen","stock_yen","investment_trust_yen","bond_yen","pension_yen","point_yen"],
            primary_keys=["date"],
        ),
    ]
    
    for file_spec in file_specs:
        records = filesystem(
            file_glob=f"money_forward/{file_spec.file_name}",
        ) | read_csv(encoding="cp932", names=file_spec.columns, header=0)

        records = records.add_map(convert_date)
        if file_spec.table_name == "monthly_assets":
            records = records.add_filter(is_last_day_of_month)

        @dlt.transformer(name=file_spec.table_name, primary_key=file_spec.primary_keys, write_disposition="merge")
        def dummy(items):
            return items

        yield records | dummy

class MFFileSpecprimary_keys を追加した。
生成される resource に primary_keys を指定するために @dlt.transformer として関数 dummy() を定義した。
dummy() は transform の処理自体は何もしないが、primary_key および write_disposition を指定している。

write_disposition は incremental loading の挙動を決めるパラメータであり、"replace", "append", "merge" の3つが指定できる。
ここでは primary_keys を使って追加・変更されたレコードを一意にしたかったので "merge" を指定した。
詳しくは Pipeline Tutorial | dlt Docs を参照。
DuckDB, BigQuery ともにすべての write_disposition をサポートしているが、destination によってはサポートされないものもあるので注意。

(dummy() なんか使わずにもっときれいにできる方法があるかもしれないが…)

10. BigQuery への移行

ローカルの DuckDB でやりたいことができるようになってきたのでいよいよ BigQuery へと移行する。
2点変更すればよい。
まずは pipeline の destination 指定を "duckdb" から "bigquery" に変更する。

    pipeline = dlt.pipeline(
        pipeline_name="money_forward",
        destination="bigquery",
        dataset_name="raw_money_forward",
        full_refresh=True,
    )

加えて .dlt/secrets.toml に destination 用の設定を追加する。

[destination.bigquery]
location = "asia-northeast1"

[destination.bigquery.credentials]
client_email = "<CLIENT_EMAIL>"
private_key = "<PRIVATE_KEY>"
project_id = "<PROJECT_ID>"

以上で destination 変更が完了。
pipeline を再実行すると成功した。

11. BigQuery への load の確認

BigQuery にデータが load されているかを確認する。
もちろん前述の Streamlit app から見る方法でも確認できるが、Cloud Shell 上から bq コマンドで確認する。
まずは dataset を確認。

$ bq ls
                 datasetId                  
 ------------------------------------------ 
  raw_money_forward_20231219112438          
  raw_money_forward_20231219112438_staging 

dlt.pipeline() で指定した dataset 名に timestamp っぽい文字列がついた名前で dataset が作成されている。
この timestamp は邪魔だと思ったが、除外する方法が分からなかった。
load ごとに dataset を分けるべきという思想なのだろうか…?


[追記]
dlt.pipeline() の引数で full_refresh=True になっていたのが dataset 名に timestamp がついてしまう原因だった。
full_refresh=True は名前のとおり完全に作り直す挙動になっており、try & error で何度も作り直すためにつけていた。
True のときに dataset 名が一意になるよう、timestamp をつけるという挙動は理解できる。
これを False にすると timestamp のない raw_money_forward という名前で dataset が作成されることを確認した。


末尾に _staging がついているものは incremental loading のための一時的なデータ置き場のようなもの。
dadtaset raw_money_forward_20231219112438 の table 一覧を見てみる。

$ bq query --nouse_legacy_sql 'select table_name from raw_money_forward_20231219112438.INFORMATION_SCHEMA.TABLES'
+------------------------+
|       table_name       |
+------------------------+
| _dlt_loads             |
| _dlt_version           |
| monthly_assets         |
| _dlt_pipeline_state    |
| income_expence_details |
+------------------------+

monthly_assets, income_expence_details の2つの table が作成されていることを確認した。
データもちゃんと入っている模様。

_dlt の prefix を持つ table には dlt 関連の管理情報が含まれている。
意図して見ることは少ないと思うが、何か問題が起こったときには参照することになるだろう。

というわけで BiqQuery への load まで成功した。
いろいろ編集したが、最終的な money_forwared_pipeline.py のコードを貼っておく。

import calendar
import datetime
import typing

import dlt
from filesystem import filesystem, read_csv, readers


class MFFileSpec:
    def __init__(self, file_name: str, table_name: str, columns: typing.List[str], primary_keys: typing.List[str]):
        self.file_name = file_name
        self.table_name = table_name
        self.columns = columns
        self.primary_keys = primary_keys


def run_pipeline() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="money_forward",
        destination="bigquery",
        dataset_name="raw_money_forward",
        full_refresh=False,
    )
    load_info = pipeline.run(money_forward())
    print(load_info)
    

@dlt.source
def money_forward() -> dlt.extract.source.DltSource:
    file_specs = [
        MFFileSpec(
            file_name="収入・支出詳細_*.csv",
            table_name="income_expence_details",
            columns=["is_calc_target", "date", "details", "amount_yen", "financial_institution", "main_group", "mid_group", "memo", "transfer", "id"],
            primary_keys=["id"],
        ),
        MFFileSpec(
            file_name="資産推移月次.csv",
            table_name="monthly_assets",
            columns=["date", "total_yen", "deposit_cash_crypto_yen","stock_yen","investment_trust_yen","bond_yen","pension_yen","point_yen"],
            primary_keys=["date"],
        ),
    ]
    
    for file_spec in file_specs:
        records = filesystem(
            file_glob=f"money_forward/{file_spec.file_name}",
        ) | read_csv(encoding="cp932", names=file_spec.columns, header=0)

        records = records.add_map(convert_date)
        if file_spec.table_name == "monthly_assets":
            records = records.add_filter(is_last_day_of_month)

        @dlt.transformer(name=file_spec.table_name, primary_key=file_spec.primary_keys, write_disposition="merge")
        def dummy(items):
            return items

        yield records | dummy


def convert_date(d: dict) -> dict:
    if "date" in d:
        d["date"] = datetime.datetime.strptime(d["date"], "%Y/%m/%d").date()
    return d


def is_last_day_of_month(d: dict) -> bool:
    if "date" in d:
        date = d["date"]
        return date.day == calendar.monthrange(date.year, date.month)[1]
    return True


if __name__ == "__main__":
    run_pipeline()

まとめ

ドキュメントを見ながら一通りのことができるものを実装することができた。
コードに関しては慣れればもう少しきれいに書くことができそうな気がする。

最後に destination を DuckDB から BiqQuery に変更する作業はとても簡単で体験が良かった。
現在業務で DWH の移行を考えているが、こういう機能があると移行がとても楽だし、DuckDB のようなローカルで気楽に検証できる環境に切り替えられるのもすごくいい。
DDD みを感じる。

もちろん GCS 上の CSV ファイルを BiqQuery に読み込む方法は公式で提供されているので、dlt は必須ではない。
しかし上記のように容易に destination が変えられたり verified source が提供されていたりというところや、前処理のようなごちゃごちゃしたこと pipeline の定義と一緒に Python でどうとでも書けるところにメリットがある。
また今回は触れていないが schema evolution についての配慮もある。

仕事で使ってみてもいいと思った。
ただし dlt が動くマシンにデータを載せることになるので、基本的にはあまり大きなデータの移動には向かない。
ライトなユースケースがマッチするだろう。