Spark バッチ処理の問題を調べていたら分離レベルという概念にたどりついた。
分離レベルについて調べたので、Spark の問題の内容と絡めて記しておく。
考えてみれば当たり前でたいした話ではない。
分離レベルとは
トランザクションの挙動についての暗黙の理解
アドホックな分析クエリやプロダクションコード中のクエリを書くとき、その単一のクエリのトランザクションにおいて「同時に実行されている別のクエリの commit 前の状態や commit 結果に影響され、このクエリの結果がおかしくなるかもしれない」ということは通常考えない。
トランザクションはデータベースのある時点の状態に対して正しく処理される、というほぼ無意識の理解をおそらくほとんどの開発者が持っている。
多くの場合この理解は間違っていない。
それはなぜかというと DB 等のデータ処理フレームワークがある強さの分離レベルを提供しているからである。
いろいろな分離レベル
ACID 特性のうちの1つ、分離性 (Isolation) の程度を表すのが分離レベル。
トランザクション中に行われる操作の過程が他の操作から隠蔽されることを指し、日本語では分離性、独立性または隔離性ともいう。より形式的には、独立性とはトランザクション履歴が直列化されていることと言える。この性質と性能はトレードオフの関係にあるため、一般的にはこの性質の一部を緩和して実装される場合が多い。
– Wikipedia ACID (コンピュータ科学)
分離レベルには名前のついたものがいくつかあり、分離性の保証の強さが異なる。
具体的にはトランザクションの並行性の問題への対応力が異なる。
名著「データ指向アプリケーションデザイン」の第7章で分離レベルについて詳しく述べられているので、以下ではそちらからの引用。
分離レベルを弱い順に並べる。
read uncommitted
このレベルではダーティライトは生じませんが、ダーティリードは妨げられません。
read committed
- データベースからの読み取りを行った際に見えるデータは、コミットされたもののみであること(ダーティリードは生じない)。
- データベースへの書き込みを行う場合、上書きするのはコミットされたデータのみであること(ダーティライトは生じない)。
snapshot isolation
スナップショット分離の考え方は、それぞれのトランザクションがデータベースの一貫性のあるスナップショットから読み取りを行うというものです。すなわち、トランザクションが読み取るデータは、すべてそのトランザクションの開始時点のデータベースにコミット済みのものだけということです。
serializability
この分離レベルはトランザクションが並行して実行されていても、最終的な答えはそれぞれが1つずつ順番に、並行ではなく実行された場合と同じになることを保証します。
日本語で「分離レベル」を検索すると snapshot isolation の代わりに repeatable read が出てくる事が多い。
しかし repeatable read の名前は実装によって意味が違っていたりして扱いが難しいらしい。
分離レベルと race condition の関係
以下に各分離レベルとトランザクションの並行性の問題 (race condition) の関係を示す。
各 race condition の説明については割愛するが、複数のトランザクションが並行して実行されることにより起こりうる期待されていない挙動だと思えばよい。
○はその分離レベルにおいてその race condition が発生しないことを示す。
△は条件によっては発生する。
dirty read | dirty write | read skew (nonrepeatable read) | lost update | write skew | phantom read | |
---|---|---|---|---|---|---|
read uncommitted | ○ | - | - | - | - | - |
read committed | ○ | ○ | - | - | - | - |
snapshot isolation | ○ | ○ | ○ | △ | - | △ |
serializability | ○ | ○ | ○ | ○ | ○ | ○ |
下に行くほど強い分離レベルとなっている。
分離レベルが強くなるほど race condition が発生しにくくなるが、一方で lock 等によりパフォーマンスが下がっていくというトレードオフがある。
各種データベースの分離レベル
ここでは MySQL と Hive においてどの分離レベルが提供されているかを見てみる。
MySQL の場合
MySQL の分離レベルについては以下のドキュメントで述べられている。
MySQL (というか InnoDB?) では次の4つの分離レベルを設定することができる。
READ UNCOMMITTED
READ COMMITTED
REPEATABLE READ
(default)SERIALIZABLE
デフォルトの分離レベルは REPEATABLE READ
だが、これは前述の snapshot isolation に相当するらしい。
分離レベルは、例えば set transaction
構文により次のようにして指定できる。
set transaction isolation level SERIALIZABLE;
この場合は現在のセッション内で実行される次のトランザクションについて適用される。
すべてのセッションやすべてのトランザクション等の指定もできる。
詳しくは以下。
Hive の場合
Hive についてはドキュメントに次のような記載がある。
At this time only snapshot level isolation is supported. When a given query starts it will be provided with a consistent snapshot of the data.
– Hive Transactions
Hive は snapshot isolation のみ提供しているとのこと。
The default DummyTxnManager emulates behavior of old Hive versions: has no transactions and uses hive.lock.manager property to create lock manager for tables, partitions and databases.
– Hive Transactions
lock は小さくとも partition の単位になるのだろうか。
であるとすると予想通りだが MySQL よりもいかつい挙動になっている。
このように多くの DB では snapshot isolation の分離レベルが基本となっている。
Spark クエリの分離レベル
では Spark のクエリはどうだろうか。
ここからようやく本題となる。
read committed 相当
Spark において DataFrame
を用いたデータ処理を記述するとき、それは1つの SQL クエリを書くのと近い感覚になる。
そもそも DataFrame
は SQL-like な使い心地を目的として作られた API だから当然だ。
DataFrame
で記述された処理は実行時に RDD
として翻訳されるが、分離レベルを考えるにあたって RDD
の特性がキーとなってくる。
By default, each transformed RDD may be recomputed each time you run an action on it.
– RDD Operations
あまり良い説明を見つけられなかったが、1回の action を伴う処理においても同じ RDD
が複数回参照されるとき、その RDD
までの計算は通常やり直されることになる。
したがって例えば self join のようなことをするとき、同じデータソースを2回読みに行くということが起こってしまう。
HDFS や S3 上のファイル、JDBC 経由の外部 DB など Spark は様々なデータソースを扱うことができるが、通常 Spark がその2回の読み込みに対して lock をかけたりすることはできない。
つまり non-repeatable read や phantom read を防ぐことができない。
read committed という弱い分離レベルに相当するということになってしまう。
分離レベルという言葉はトランザクションという概念に対して使われるものであり、DataFrame
のクエリをトランザクションと呼んでいいのかはわからない。
なので分離レベルという言葉をここで使うのが適切でないかもしれないということは述べておく。
検証
MySQL からデータを読み取り Spark で処理することを考える。
まず local の MySQL で次のような table を用意する。
mysql> describe employees;
+---------------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+---------------+---------+------+-----+---------+-------+
| id | int(11) | NO | PRI | NULL | |
| salary | int(11) | YES | | NULL | |
| department_id | int(11) | YES | | NULL | |
+---------------+---------+------+-----+---------+-------+
3 rows in set (0.03 sec)
部署 (department) ごとの給料 (salary) の平均から各従業員の給料がどれくらい離れているかの差分を見たいものとする。
Spark のコードは次のようになる。
Spark のバージョンはこれを書いている時点での最新 3.0.0 とした。
package com.example
import org.apache.spark.sql.functions.avg
import org.apache.spark.sql.SparkSession
object IsolationLevelExperiment {
def main(args: Array[String]): Unit = {
// Prepare SparkSession
val spark = SparkSession
.builder()
.appName("Isolation Level Experiment")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Read from MySQL
val dfEmployee = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost")
.option("dbtable", "db_name.employees")
.option("user", "user_name")
.option("password", "********")
.option("driver", "com.mysql.cj.jdbc.Driver")
.load
.cache
// Get average salary
val dfAvg = dfEmployee
.groupBy($"department_id")
.agg(avg($"salary").as("avg_salary"))
// Calculate diff
val dfResult = dfEmployee
.as("e")
.join(
dfAvg.as("a"),
$"e.department_id" === $"a.department_id",
"left_outer"
)
.select(
$"e.id",
$"e.department_id",
($"e.salary" - $"a.avg_salary").as("salary_diff")
)
// Output results
dfResult.show
spark.stop()
}
}
このコードを実行する前に MySQL の general query log を ON にする。
mysql> set global general_log = 'ON';
Query OK, 0 rows affected (0.02 sec)
mysql> show global variables like 'general_log%';
+------------------+--------------------------------------+
| Variable_name | Value |
+------------------+--------------------------------------+
| general_log | ON |
| general_log_file | /usr/local/var/mysql/MacBook-Pro.log |
+------------------+--------------------------------------+
2 rows in set (0.01 sec)
これによって MySQL に対して発行されたクエリがログとして記録されるようになる。
直感的に snapshot isolation になっているのであれば MySQL に対する select 文は1回だけ発行されるはずである。
しかし前述のとおり RDD
や DataFrame
の処理は途中の状態を通常保存せず、同じ RDD
や DataFrame
を参照していたとしても再計算される。
上記コードの例だと dfEmployee
が2回参照されている。
コードを実行すると general query log には次のように、データ取得のための select 文が2つ記録されていた。
それぞれ join()
の左右の table のデータソースを示している。
8 Query SELECT `id`,`salary`,`department_id` FROM test_fout_dsp.employees
7 Query SELECT `salary`,`department_id` FROM test_fout_dsp.employees WHERE (`department_id` IS NOT NULL)
2つの select 文はそれぞれ別のクエリ、トランザクションとして発行されている。
したがって前者の select 文が実行された後、後者の select 文が実行される前に別のトランザクションにより employees
が更新されたり挿入・削除されたりすると non-repeatable read や phantom read が発生してしまうのである。
今回はデータソースへのアクセスを確認するためにデータソースとして MySQL を使ったが、同じことはファイルや他の DB など別のデータソースで起こりうる。
回避策
プロダクト運用上、non-repeatable read や phantom read が発生しうる状況というのは多くの場合で厄介である。
一時的なデータソースの状態に依存して問題が発生するため、バグの原因追求がとても困難だからだ。
見た目上の分離レベルを強くし、これらを避けるには2つの方法が考えられる。
immutable なデータにのみアクセスする
単純な話で non-repeatable read や phantom read が発生するようなデータソースを参照しなければよい。
例えばユーザの行動ログのような蓄積されていくデータが hour 単位で partitioning されているような場合、基本的に過去の partition に対しては変更や挿入・削除は行われない。
このような partition にアクセスする分には前述のような厄介な問題は起こらない。
cache する
データソースから読み取った結果の DataFrame
に対して cache()
または persist()
をするとよい。
Spark SQL can cache tables using an in-memory columnar format by calling spark.catalog.cacheTable("tableName") or dataFrame.cache().
– Caching Data In Memory
前述のコードで dfEmployee
に対して .cache()
をした場合、MySQL へのデータ取得のための select 文の発行は1回になった。
大きなデータソースを cache()
するときだけメモリや HDD 容量に気をつけておきたい。
まとめ
DataFrame
はいわゆる RDBMS を操作しているように見えてしまうところが難しいところだろうか。
「データ指向アプリケーションデザイン」に至極真っ当なことが書いてあったので、その言葉を借りて締めておく。
何も考えずにツールを信じて依存するのではなく、並行性の問題にはどういったものがあるのか、そしてそれらを回避するにはどうしたら良いのかを、しっかり理解しなければなりません。そうすれば、信頼性があり、正しく動作するアプリケーションを手の届くツールを使って構築できるようになります。
– Martin Kleppmann データ指向アプリケーションデザイン