Spark バッチ処理の問題を調べていたら分離レベルという概念にたどりついた。
分離レベルについて調べたので、Spark の問題の内容と絡めて記しておく。
考えてみれば当たり前でたいした話ではない。

分離レベルとは

トランザクションの挙動についての暗黙の理解

アドホックな分析クエリやプロダクションコード中のクエリを書くとき、その単一のクエリのトランザクションにおいて「同時に実行されている別のクエリの commit 前の状態や commit 結果に影響され、このクエリの結果がおかしくなるかもしれない」ということは通常考えない。
トランザクションはデータベースのある時点の状態に対して正しく処理される、というほぼ無意識の理解をおそらくほとんどの開発者が持っている。

多くの場合この理解は間違っていない。
それはなぜかというと DB 等のデータ処理フレームワークがある強さの分離レベルを提供しているからである。

いろいろな分離レベル

ACID 特性のうちの1つ、分離性 (Isolation) の程度を表すのが分離レベル。

トランザクション中に行われる操作の過程が他の操作から隠蔽されることを指し、日本語では分離性、独立性または隔離性ともいう。より形式的には、独立性とはトランザクション履歴が直列化されていることと言える。この性質と性能はトレードオフの関係にあるため、一般的にはこの性質の一部を緩和して実装される場合が多い。
Wikipedia ACID (コンピュータ科学)

分離レベルには名前のついたものがいくつかあり、分離性の保証の強さが異なる。
具体的にはトランザクションの並行性の問題への対応力が異なる。
名著「データ指向アプリケーションデザイン」の第7章で分離レベルについて詳しく述べられているので、以下ではそちらからの引用。


分離レベルを弱い順に並べる。

  • read uncommitted

    このレベルではダーティライトは生じませんが、ダーティリードは妨げられません。

  • read committed

    1. データベースからの読み取りを行った際に見えるデータは、コミットされたもののみであること(ダーティリードは生じない)。
    2. データベースへの書き込みを行う場合、上書きするのはコミットされたデータのみであること(ダーティライトは生じない)。
  • snapshot isolation

    スナップショット分離の考え方は、それぞれのトランザクションがデータベースの一貫性のあるスナップショットから読み取りを行うというものです。すなわち、トランザクションが読み取るデータは、すべてそのトランザクションの開始時点のデータベースにコミット済みのものだけということです。

  • serializability

    この分離レベルはトランザクションが並行して実行されていても、最終的な答えはそれぞれが1つずつ順番に、並行ではなく実行された場合と同じになることを保証します。

日本語で「分離レベル」を検索すると snapshot isolation の代わりに repeatable read が出てくる事が多い。
しかし repeatable read の名前は実装によって意味が違っていたりして扱いが難しいらしい。

分離レベルと race condition の関係

以下に各分離レベルとトランザクションの並行性の問題 (race condition) の関係を示す。
各 race condition の説明については割愛するが、複数のトランザクションが並行して実行されることにより起こりうる期待されていない挙動だと思えばよい。
○はその分離レベルにおいてその race condition が発生しないことを示す。
△は条件によっては発生する。

dirty readdirty writeread skew (nonrepeatable read)lost updatewrite skewphantom read
read uncommitted-----
read committed----
snapshot isolation-
serializability

下に行くほど強い分離レベルとなっている。
分離レベルが強くなるほど race condition が発生しにくくなるが、一方で lock 等によりパフォーマンスが下がっていくというトレードオフがある。

各種データベースの分離レベル

ここでは MySQL と Hive においてどの分離レベルが提供されているかを見てみる。

MySQL の場合

MySQL の分離レベルについては以下のドキュメントで述べられている。

MySQL (というか InnoDB?) では次の4つの分離レベルを設定することができる。

  • READ UNCOMMITTED
  • READ COMMITTED
  • REPEATABLE READ (default)
  • SERIALIZABLE

デフォルトの分離レベルは REPEATABLE READ だが、これは前述の snapshot isolation に相当するらしい。
分離レベルは、例えば set transaction 構文により次のようにして指定できる。

set transaction isolation level SERIALIZABLE;

この場合は現在のセッション内で実行される次のトランザクションについて適用される。
すべてのセッションやすべてのトランザクション等の指定もできる。
詳しくは以下。

Hive の場合

Hive についてはドキュメントに次のような記載がある。

At this time only snapshot level isolation is supported. When a given query starts it will be provided with a consistent snapshot of the data.
Hive Transactions

Hive は snapshot isolation のみ提供しているとのこと。

The default DummyTxnManager emulates behavior of old Hive versions: has no transactions and uses hive.lock.manager property to create lock manager for tables, partitions and databases.
Hive Transactions

lock は小さくとも partition の単位になるのだろうか。
であるとすると予想通りだが MySQL よりもいかつい挙動になっている。

このように多くの DB では snapshot isolation の分離レベルが基本となっている。

Spark クエリの分離レベル

では Spark のクエリはどうだろうか。
ここからようやく本題となる。

read committed 相当

Spark において DataFrame を用いたデータ処理を記述するとき、それは1つの SQL クエリを書くのと近い感覚になる。
そもそも DataFrame は SQL-like な使い心地を目的として作られた API だから当然だ。

DataFrame で記述された処理は実行時に RDD として翻訳されるが、分離レベルを考えるにあたって RDD の特性がキーとなってくる。

By default, each transformed RDD may be recomputed each time you run an action on it.
RDD Operations

あまり良い説明を見つけられなかったが、1回の action を伴う処理においても同じ RDD が複数回参照されるとき、その RDD までの計算は通常やり直されることになる。

したがって例えば self join のようなことをするとき、同じデータソースを2回読みに行くということが起こってしまう。
HDFS や S3 上のファイル、JDBC 経由の外部 DB など Spark は様々なデータソースを扱うことができるが、通常 Spark がその2回の読み込みに対して lock をかけたりすることはできない。

つまり non-repeatable read や phantom read を防ぐことができない。
read committed という弱い分離レベルに相当するということになってしまう。

分離レベルという言葉はトランザクションという概念に対して使われるものであり、DataFrame のクエリをトランザクションと呼んでいいのかはわからない。
なので分離レベルという言葉をここで使うのが適切でないかもしれないということは述べておく。

検証

MySQL からデータを読み取り Spark で処理することを考える。
まず local の MySQL で次のような table を用意する。

mysql> describe employees;
+---------------+---------+------+-----+---------+-------+
| Field         | Type    | Null | Key | Default | Extra |
+---------------+---------+------+-----+---------+-------+
| id            | int(11) | NO   | PRI | NULL    |       |
| salary        | int(11) | YES  |     | NULL    |       |
| department_id | int(11) | YES  |     | NULL    |       |
+---------------+---------+------+-----+---------+-------+
3 rows in set (0.03 sec)

部署 (department) ごとの給料 (salary) の平均から各従業員の給料がどれくらい離れているかの差分を見たいものとする。
Spark のコードは次のようになる。
Spark のバージョンはこれを書いている時点での最新 3.0.0 とした。

package com.example

import org.apache.spark.sql.functions.avg
import org.apache.spark.sql.SparkSession

object IsolationLevelExperiment {
  def main(args: Array[String]): Unit = {
    // Prepare SparkSession
    val spark = SparkSession
      .builder()
      .appName("Isolation Level Experiment")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Read from MySQL
    val dfEmployee = spark
      .read
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost")
      .option("dbtable", "db_name.employees")
      .option("user", "user_name")
      .option("password", "********")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .load
      .cache

    // Get average salary
    val dfAvg = dfEmployee
      .groupBy($"department_id")
      .agg(avg($"salary").as("avg_salary"))

    // Calculate diff
    val dfResult = dfEmployee
      .as("e")
      .join(
        dfAvg.as("a"),
        $"e.department_id" === $"a.department_id",
        "left_outer"
      )
      .select(
        $"e.id",
        $"e.department_id",
        ($"e.salary" - $"a.avg_salary").as("salary_diff")
      )

    // Output results
    dfResult.show

    spark.stop()
  }
}

このコードを実行する前に MySQL の general query log を ON にする。

mysql> set global general_log = 'ON';
Query OK, 0 rows affected (0.02 sec)

mysql> show global variables like 'general_log%';
+------------------+--------------------------------------+
| Variable_name    | Value                                |
+------------------+--------------------------------------+
| general_log      | ON                                   |
| general_log_file | /usr/local/var/mysql/MacBook-Pro.log |
+------------------+--------------------------------------+
2 rows in set (0.01 sec)

これによって MySQL に対して発行されたクエリがログとして記録されるようになる。

直感的に snapshot isolation になっているのであれば MySQL に対する select 文は1回だけ発行されるはずである。
しかし前述のとおり RDDDataFrame の処理は途中の状態を通常保存せず、同じ RDDDataFrame を参照していたとしても再計算される。
上記コードの例だと dfEmployee が2回参照されている。

コードを実行すると general query log には次のように、データ取得のための select 文が2つ記録されていた。
それぞれ join() の左右の table のデータソースを示している。

8 Query     SELECT `id`,`salary`,`department_id` FROM test_fout_dsp.employees
7 Query     SELECT `salary`,`department_id` FROM test_fout_dsp.employees WHERE (`department_id` IS NOT NULL)

2つの select 文はそれぞれ別のクエリ、トランザクションとして発行されている。
したがって前者の select 文が実行された後、後者の select 文が実行される前に別のトランザクションにより employees が更新されたり挿入・削除されたりすると non-repeatable read や phantom read が発生してしまうのである。

今回はデータソースへのアクセスを確認するためにデータソースとして MySQL を使ったが、同じことはファイルや他の DB など別のデータソースで起こりうる。

回避策

プロダクト運用上、non-repeatable read や phantom read が発生しうる状況というのは多くの場合で厄介である。
一時的なデータソースの状態に依存して問題が発生するため、バグの原因追求がとても困難だからだ。
見た目上の分離レベルを強くし、これらを避けるには2つの方法が考えられる。

immutable なデータにのみアクセスする

単純な話で non-repeatable read や phantom read が発生するようなデータソースを参照しなければよい。
例えばユーザの行動ログのような蓄積されていくデータが hour 単位で partitioning されているような場合、基本的に過去の partition に対しては変更や挿入・削除は行われない。
このような partition にアクセスする分には前述のような厄介な問題は起こらない。

cache する

データソースから読み取った結果の DataFrame に対して cache() または persist() をするとよい。

Spark SQL can cache tables using an in-memory columnar format by calling spark.catalog.cacheTable("tableName") or dataFrame.cache().
Caching Data In Memory

前述のコードで dfEmployee に対して .cache() をした場合、MySQL へのデータ取得のための select 文の発行は1回になった。
大きなデータソースを cache() するときだけメモリや HDD 容量に気をつけておきたい。

まとめ

DataFrame はいわゆる RDBMS を操作しているように見えてしまうところが難しいところだろうか。
「データ指向アプリケーションデザイン」に至極真っ当なことが書いてあったので、その言葉を借りて締めておく。

何も考えずにツールを信じて依存するのではなく、並行性の問題にはどういったものがあるのか、そしてそれらを回避するにはどうしたら良いのかを、しっかり理解しなければなりません。そうすれば、信頼性があり、正しく動作するアプリケーションを手の届くツールを使って構築できるようになります。
– Martin Kleppmann データ指向アプリケーションデザイン