これはドリコム Advent Calendar 2019の16日目です。
15日目は金井航輝さんによる新卒がプロダクトに入って色々試してみた話です。

はじめに

こんにちは、サーバーサイドエンジニアの吉岡(irohiroki)です。

ゲームで使われるデータのうち、キャラクターの強さやアイテムの価値など、運営側が決定するものをマスターデータと呼びます。ゲームのサービスを運営していると、マスターデータをあるDBから読み出し、変換して別のDBに投入する処理が必要になることがあります。本稿ではこのようなマスターデータの変換を高速に実行するための基本的な考え方を実際の例をもとに説明します。

なお例はRubyで書かれており、説明を簡単にするためRuby on Railsの用語も使用します。

例に挙げる処理

例に挙げる処理はMySQLからSQLiteにデータを変換しながら移すもので、MySQL側はRailsアプリで使われているため、ActiveRecordのクラスが定義されています。以下ではこの処理を「M2S」と呼ぶことにします。

処理の流れは大まかに書くと下のようになります。

  1. テーブル毎に内容をActiveRecordのクラスで変換しながらYAMLファイルにダンプ
  2. YAMLファイルを他のマシンに転送
  3. YAMLファイルを読み込み、SQL文を生成してSQLiteに書き込み


概念図を下に示します。
なおMySQLサーバは独立したホストにあり、ネットワーク越しにアクセスします。

処理の要点を抜き出すと以下のようになります(2の転送処理は省略しました。また機密のため識別子を改変し、細かいメソッドの記載を省いています)。特徴的な行に番号を振り、下で説明します。


klasses.each do |klass|  # (1)
  File.write(
    "#{klass}.yml",
    klass.localize.map {|record|  # (2)
      ["data_#{record.id}", JSON.parse(record.to_sqlite_json)]  # (3)
    }.to_h.stringify_keys.to_yaml
  )
end
# 元のコードではここでYAMLファイルを転送
sqlite = SQLite3::Database.new("orig.sqlite")
klasses.each do |klass|
  table_name = table_name_of(klass)
  records = YAML.load_file("#{klass}.yml").values
  column_names = records.first.keys

  records.each_slice(1000) do |_records|
    sqlite.execute("INSERT INTO #{table_name} #{column_join(column_names)} VALUES #{value_join(_records)};")
  end
end


(1)のklassesはActiveRecordのクラスの配列です。
(2)のlocalizeは他のテーブルから翻訳を取得するスコープのようなメソッドになっています。
(3)のto_sqlite_jsonは各カラムの値を変換し、JSON形式で返します。

計測

上の処理をsqlite:generate_test1というRakeタスクにして、以下のような別のRakeタスクsqlite:benchで処理時間を計測しました。後述する他のバージョンも計測できるよう、計測対象を変数で指定できるようにしてあります。

task :bench do
  before = Time.now
  Rake::Task["sqlite:generate_test#{ENV["NUM"]}"].invoke
  p Time.now - before
end


3回実行した結果は以下のようになりました。

$ NUM=1 bundle exec rake sqlite:bench
39.942817
$ NUM=1 bundle exec rake sqlite:bench
37.682338
$ NUM=1 bundle exec rake sqlite:bench
38.397434


平均は約38.67秒です。

高速化の方針

ここではM2Sを以下のような観点で高速化していきます。

読み出し

多くのDBは複数のデータを高速に読み書きするための方法を備えています。まずは利用しているDBのマニュアルを精読し、目的にあった方法を選ぶべきです。

MySQLには高速な読み出し方法としてmysqldumpというコマンドが用意されています。デフォルトの出力形式のSQLの他にCSVやXMLも選択できます。WHERE節を指定して出力する行を絞ることもできますが、カラムを絞ったり値を関数に通して変換したりはできません。

カラムを絞ったり値を変換したりする必要があるなら、mysqlコマンドが使えないか検討します。mysqlコマンドは-eオプションでSQLを与えればインタラクティブモードにはならず、標準出力に結果を出力して終了します。出力のフォーマットも-Bオプションをつければタブ区切りにできるほか、-rオプションでバイナリを出力することもできるので、処理の都合に合わせて選択できます。

ActiveRecordのようなORマッパーを使うとオブジェクトを生成するオーバーヘッドが生じるので、最後の選択肢にします。M2SではRailsアプリ一部として実装されているメソッドで値を変換するため、そのままActiveRecordを使うことにしました(つまり本稿の範囲では読み出しの高速化はしません)。

書き込み

書き込みについても読み出しと同様、利用するDBについて情報を集め、最適な方法を選びます。SQLiteについてはCSVファイルをインポートする機能があるので、これを使う方法を検討します。

並行・並列処理

一般に「並行」はある瞬間に同時に計算を実行している状態、「並列」は交互に処理することで観察の時間を延ばしたとき同時に処理しているように見える状態を指します。並行処理は複数のCPUコアがなければ(量子プロセッサでもない限り)実現できませんが、並列処理は1CPUコアでもできることになります。

M2Sに含まれる処理には互いに独立なものがあるので、複数のコアを使えるなら並行処理によって高速化できます。また、ドライブやネットワークへのアクセスで待ち時間が生じるはずなので、その時間に別の処理の実行(=並列化)を検討します。

オブジェクトの数

Rubyはガーベッジコレクションを備えた言語なので、廃棄するオブジェクトが多いほどCPUリソースを消費します。つまり捨てるオブジェクトが少ない方が高速になります。特にループの中の使い捨てオブジェクトに注意します。

エスケープ

特定の文字をエスケープする処理は、データ中の全文字を対象かどうか判定し、対象だったときにエスケープ文字を挿入しなければならないので、その分CPUを使います。例えばJSONの生成処理は、どんなに最適化しても全文字を走査しなければなりません。一方でMessagePackのような文字列長をメタデータとして添えるプロトコルはその分のリソースが不要です。高速化に当たって、可能な限りエスケープ処理を避けることにします。

以上の方針に基づき、以下で具体的な改修方法を説明します。

SQLiteの書き込みを最適化

SQLite DBへの書き込みのために、M2SではSQL文を生成しています。しかしSQlite DBへの書き込み方法はこれだけではありません。より高速な方法としてCSVファイルのインポートが用意されています。この方法を使うには、例えばActiveRecordのインスタンスにCSVの行を生成させ、テーブル毎にCSVファイルを生成すれば済みます。具体的には下のようになります。番号を振った行は下で説明します。

child_pid, child_in = pty_spawn("sqlite3 csv_import.sqlite")  # (1)
child_in.puts ".mode csv"

klasses.each do |klass|
  File.open("#{klass}.csv", "w") do |f|
    klass.localize.each.with_index(1) do |record, i|
      f.write(i.to_s + "," + record.to_sqlite_csv)  # (2)
    end
  end

  child_in.puts ".import #{klass}.csv #{table_name_of(klass)}"
end

child_in.puts ".quit"  # (3)
Process.wait child_pid


(1) SQLiteのCSVインポートはSQLではなくCLIのコマンドでサポートされているため、本スクリプトとCLIを疑似端末(Pseudo TTY)で接続し、コマンドを流し込んでいます。pty_spawnの実装は下の通りです。

def pty_spawn(command)
  require "pty"
  master, slave = PTY.open
  read, write = IO.pipe
  pid = spawn(command, in: read, out: slave)
  read.close
  slave.close
  [pid, write, master]
end


(2)では1から始まる通し番号をデータに加えています。この番号が入るカラムはDBスキーマではAUTOINCREMENTになっているので、INSERT文でデータを入れる場合は不要だったのですが、CSVインポートでは必要になります。

(3)でCLIに終了コマンドを送り、その次の行で終了を待っています。このように終了を待たないと、例えばCLIがクラッシュしたとき検知できませんし、書き込みが終わる前にスクリプトが終了したように見えるため計測結果も不正確になってしまいます。

計測

上のバージョンで処理時間を計測したところ、下のようになりました。

$ NUM=2 bundle exec rake sqlite:bench
28.847359
$ NUM=2 bundle exec rake sqlite:bench
28.581118
$ NUM=2 bundle exec rake sqlite:bench
28.146963


平均28.52秒です。短縮されていますが、このバージョンは後述するようにマルチプロセスになってしまっており、CSVインポートだけの成果ではありません。また、元のコードがJSON、YAMLを経てINSERT文を生成しているのに対し、ActiveRecordのクラスが直接CSVを出力することも短縮に貢献しているでしょう。

SQLiteの書き込みを高速化する他の方法

CSVインポートを使えず、INSERT文を繰り返す場合は以下のような高速化の方法がありますが、今回は利用していないため紹介にとどめます。

1つのtransactionにする

INSERTが完了するにはドライブへの書き込みが完了しなければなりませんが、複数のINSERTを1つのtransactionの中で実行すればドライブへのアクセス回数が減るため高速になります。

PRAGMA synchronous=OFF

このコマンドはメモリ上のデータとドライブの同期を緩めます。高速になる代わりに、プロセスやマシンがクラッシュしたときデータが壊れる可能性が上がります。

PRAGMA journal_mode=MEMORY

ジャーナル(データの整合性を保つための情報)をドライブに保存せずメモリに保持します。プロセスがクラッシュしたときデータが壊れる可能性が上がります。

並行・並列化

マルチプロセス

CRubyで複数のCPUコアを同時に使うには、forkなどによってプロセスを増やすことになります(JRubyではスレッドでも別のコアを利用できます)。実はSQLiteのCLIは別のプロセスで非同期に実行しているので、CSVインポートのコードは既にマルチプロセスになっています。2コアのマシンを使っている場合はCSVインポートにした時点ですべてのコアを使っていることになりますが、もっとコアがあるマシンを使っているならCSVの作成をテーブル別に並行化できます。

処理を別のプロセスで実行した場合、結果を受け取る方法を考慮しなければなりませんが、ParallelというGemを使えば結果の受け取りや処理待ちといった雑事を任せられます。Parallelを使ってM2Sを2コアで実行するには下のように書きます。

ActiveRecord::Base.clear_all_connections! # (1)

Parallel.each(klasses, in_processes: 2) do |klass| # (2)
  klass.connection.reconnect! # (1')
  child_pid, child_in = pty_spawn("sqlite3 parallelx2.sqlite") # (3)
  child_in.puts ".mode csv"

  File.open("#{klass}.csv", "w") do |f|
    klass.localize.each.with_index(1) do |record, i|
      f.write(i.to_s + "," + record.to_sqlite_csv)
    end
  end

  child_in.puts <<~CLI
    .import #{klass}.csv #{table_name_of(klass)}
    .quit
  CLI
  Process.wait child_pid
end


(1)でDBへの接続を切っているのは、プロセス毎に接続しなおす(1′)ためです。DBへの接続はプロセス間で共有できないので、このように切断してから再接続します。

(2)がParallelを使っている行で、klassesを2プロセスで分担することを表します。

(3)でklass毎にSQLite CLIを起動しているのは、最後のklassを処理するプロセスが不定なことに起因します。最後のklassを処理した後にはCLIに「.quit」を送信し、CLIプロセスの終了を待たなければなりません。しかしそれが2つのプロセスのうちどちらになるか分からないし、Parallelにはすべての処理が終わった後に呼ばれるようなフックもありません。仕方がないのでklass毎にCLIを起動、停止しています。

計測

M2Sを動かせる環境が2コアだったため、このバージョンの計測は省略します。

マルチスレッド

複数のプロセスを使ったとしても、それぞれのプロセスがドライブやネットワークにアクセスすると応答待ちになり、コアがアイドル状態になってしまいます。このようなI/O待ち時間を有効に利用するにはマルチスレッド化することになります。

スレッドもプロセスと同じように複数の処理のコンテキストを保持するための仕組みですが、プロセスがそれぞれ別のコアを使うのに対し、CRubyのスレッドは1つのコアを共有します。複数のスレッドを作ると、そのうち1つがコアを使って処理を進め、他のスレッドは待ち状態になります。そしてある瞬間に動くスレッドが切り替わり、それまで動いていたスレッドは待ち状態になります。

スレッドが切り替わるタイミングは処理系に依存しますが、プログラマにも分かるタイミングの一つがI/O待ちになる時です。つまりCRubyの処理系は、ドライブやネットワークにアクセスしてI/O待ちになると、CPUを必要とする他のスレッドに処理を切り替えます。そうしてI/O待ちの時間を有効活用するわけです。

ではM2Sに最適なスレッドの数はいくつでしょうか。それを知るには、I/O待ちになる箇所を数えて1を加えます。M2SでI/O待ちになるのは下の2箇所です。

  • klassを通じたDBアクセス
  • CSVファイルに書き込むf.write


CSVファイルを作る「File.open」やSQLite CLIにコマンドを送る「child_in.puts」はI/Oが発生しますが、ブロックしないので除外します(child_in.putsが送る文字数が擬似端末のバッファ容量を超えればブロックしますが、今回の例ではそのような状況になりません)。

よって最適なスレッドの数は3です。M2Sを3つのスレッドを使うように書き直すと下のようになります。

child_pid, child_in = pty_spawn("sqlite3 threadx3.sqlite")
child_in.puts ".mode csv"

mutex = Mutex.new
klasses  # (1)

Parallel.each(klasses, in_threads: 3) do |klass|
  File.open("#{klass}.csv", "w") do |f|
    klass.connection_pool.with_connection do  # (2)
      klass.localize.each.with_index(1) do |record, i|
        f.write(i.to_s + "," + record.to_sqlite_csv)
      end
    end
  end

  mutex.synchronize do  # (3)
    child_in.puts ".import #{klass}.csv #{table_name_of(klass)}"
  end
end

child_in.puts ".quit"
Process.wait child_pid


(1)はスレッドを使う前にActiveRecordのクラスをロードする行です。「ActiveRecordのクラスのロード」とは、そのクラスの実装が書かれたrbファイルを読み、クラス名となる定数をメモリに登録し、ActiveRecordとして動作するためのメソッドを定義するなどの初期化を指します。この初期化処理はマルチスレッドに対応していないので、この処理の最中にスレッドが切り替わってはいけません。よってスレッドを作る前にロードしています。

(2)で明示的にconnection_poolから接続を取得しているのは、スレッド毎に別のDB接続を使うためです。複数のDB接続を使う仕組みはRailsに組み込まれており、接続を共有するより効率がいいからです。

(3)ではSQLite CLIへのコマンド送信を排他制御しています。起動するSQLite CLIのプロセスは1つで、コマンド送信中にスレッドが切り替わるのを抑制しています。

計測

実行結果は下のようになりました。

$ NUM=3 bundle exec rake sqlite:bench
28.281086
$ NUM=3 bundle exec rake sqlite:bench
27.599667
$ NUM=3 bundle exec rake sqlite:bench
28.299619


平均28.06秒で、CSVインポート版からさらに短縮できました。残った28秒余りの処理には改修していないMySQLからの読み出しと値の変換が含まれているため、詳細を調査すればさらに短縮できる可能性があります。

オブジェクト数の削減

RubyにはGC.statというメソッドがあり、ガーベッジコレクションに関する情報をハッシュで返してくれます。破棄されたオブジェクトの数は:total_freed_objectsというキーで得られるので、この値を処理の前後で比較すれば、処理の中で破棄されたオブジェクトの数がわかります。

例えば今回の計測タスクに加えると以下のようになります。

task :bench do
  GC.start
  freed_obj_before = GC.stat(:total_freed_objects)

  before = Time.now
  Rake::Task["sqlite:generate_test#{ENV["NUM"]}"].invoke
  p Time.now - before

  GC.start
  p GC.stat(:total_freed_objects) - freed_obj_before
end


この方法で最初のYAMLを介したM2Sを計測したところ、破棄されたオブジェクトの数は27,936,167個でした。一方マルチスレッド版は18,396,552個で、約34%の削減になっていました。

処理全体での削減なのでどこが最も効果的だったのかはわかりませんが、おそらくINSERT文の生成などは大きく影響したことが想像されます。詳しく調べるには計測する範囲を狭くするとよいでしょう。また、GC::Profiler.reportというメソッドではガーベッジコレクションにかかった時間を表示することもできますので、参考にしてください。

エスケープ

最初のM2Sのコードをよく見ると、YAMLにする前にJSONにしてることがわかります(record.to_sqlite_jsonの部分)。前述の通りJSONはエスケープが必要なフォーマットです。

YAMLには文字列の表現方法が複数あり、その一部はエスケープが必要です。表現方法の選択は実装依存なので、どの程度エスケープ処理されるかは自明ではありません。

マルチスレッド版のM2SはJSONもYAMLも使いませんが、CSVで出力します。残念ながらCSVはエスケープが必要なフォーマットです。

以上の通り、今回の改修ではエスケープ処理を排除できませんでしたが、YAML生成中のエスケープは削減できた可能性があります。

まとめ

マスターデータの変換処理を高速化するときの観点として、DBアクセス、並行・並列化、オブジェクト数、エスケープ処理を挙げ、実際の改修例を示しました。例の中ではファイルやDB接続を共有する場合/共有しない場合のコードの書き方についても説明しました。参考になれば幸いです。

明日は ikaikairuka さんの記事です。
ドリコムでは一緒に働くメンバーを募集しています!募集一覧はコチラを御覧ください!