読者です 読者をやめる 読者になる 読者になる

Embulk 0.3 & 0.4 の新機能 - リジュームとJavaプラグイン

つい先日*1、Embulk の新しいメジャーバージョンを2つリリースしました。

これらのバージョンでは、データ転送ミドルウェア勉強会で得られたフィードバックを元に、リジューム機能Javaプラグイン機能、そして プラグインテンプレートジェネレータ を追加しています。

リジューム機能

大きなデータをロードする場合、大部分のデータのロードには成功するが、一部だけ失敗してしまうことは良くあることです。ネットワーク障害、サーバの過負荷などの他に、エラー処理が不完全であるなど原因は様々考えられますが、そのためだけに全データをすべてロードし直すのは大変な手間です。

そこでEmbulkでは、分割された複数のタスクのうちの一部だけが失敗した場合に、それらのタスクを後からリトライできる仕組みを導入しました。

使い方は、embulk run に --resume--state PATH オプションを指定するだけです:

embulk run config.yml --resume-state resume.yml

もしロードが一部失敗した場合は、指定したファイルに実行状態が保存されます。これをリトライしたい場合は、 まったく同じコマンドを再実行 すればOKです:

embulk run config.yml --resume-state resume.yml

それでも失敗してしまった場合には、また resume.yml ファイルが更新されるので、また同じコマンドを実行すればOKです。

一部ではなくすべてのタスクが失敗してしまった場合には、resume.yml ファイルは作成されません。しかしいずれにしても、まったく同じコマンドでリトライできます。

もしもリトライを諦めて中間データを掃除したい場合は、run の代わりに cleanup を実行してください:

embulk cleanup config.yml --resume-state resume.yml

リジュームの内部実装

例えば3つのタスクのうちの最後のタスクが失敗した場合は、このようなresume.ymlファイルが作成されます:

in_schema:  # 入力スキーマ
  - {index: 0, name: file, type: string}
  - {index: 1, name: hostname, type: string}
  - {index: 2, name: col0, type: long}
  - {index: 3, name: col1, type: double}
out_schema: # 出力スキーマ
  - {index: 0, name: file, type: string}
  - {index: 1, name: hostname, type: string}
  - {index: 2, name: col0, type: long}
  - {index: 3, name: col1, type: double}
in_task:  # inputプラグインのtask
  files: [file1, file2, file3]
  hostname: null
out_task:  # outputプラグインのtask
  TimeZone: UTC
in_reports:  # 成功したinputタスクのcommit reports
  - {records: 10}  # 成功したタスクのレポート
  - {records: 10}  # 成功したタスクのレポート
  - null  # 失敗した分
out_reports:  # 成功したoutputタスクのcommit reports
  - {}  # 成功したタスクのレポート
  - {}  # 成功したタスクのレポート
  - null  # 失敗した分
exec_task:
  transaction_time: '2015-02-24 20:49:04.729 UTC'
  transaction_time_zone: UTC

in_reportsout_reports には、実行したタスクのレポート(CommitReport)が書かれています。ここがnullであるタスクは、前回の実行時に失敗したタスクです。resume時には、これらのタスクのみを再実行します。タスクを再実行して成功すれば、そのnullを実際のレポートに置き換えることができるわけです。

そうして全てのタスクのレポートが揃ったら、それらのレポートを集めてプラグインに返し、トランザクションをコミットさせます。

並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その3:run概要&データソース概要 でも触れられていますが、タスク(TaskSource) や CommitReport などのデータをファイルに保存し、後で復元してやり直したり、ネットワーク越しに転送して分散実行できることが、Embulk の実装上の大きな特徴です。

HadoopのInputSplitやConfiguration、PrestoのHandleなどにも見られるように、タスク・オブジェクトのシリアライズはEmbulkのような並列・分散処理フレームワークにおいて必須の実装課題です。しかしシリアライズコードの実装は煩雑な上にバグると発見しにくく、できるだけ素直に隠蔽するべきです。Embulkでは jackson-databindProxy、独特な継承構造によって、それなりにうまく解決していると思います。Embulk が提供する Task インタフェースと @Config を使っている限り、プラグインでは意識する必要が無いようになっています。

Javaプラグイン機能

Embulk 0.3以前はRubyでしかプラグインを書くことができませんでしたが、0.4からJavaでも書けるようになりました。

Embulk のプラグインローダーは、新しいClassLoaderを各プラグインごとに作ります。これが何を意味するかというと、プラグインごとに異なるバージョンのライブラリを同時に使える と言うことです。

例えば、Hadoop 1.x 系の HDFS からデータを読み出す file input plugin を書いたとします。そのデータを Hadoop 2.x 系の HDFS に書き出す file output plugin も書いたとします。これらのプラグインを同時にロードするとどうなるでしょうか? ふつうのJavaアプリケーションでは、依存ライブラリのバージョンが衝突し、おそらくうまく動きません。しかしEmbulkでは、各プラグインごとに別々の実行環境を作るため、異なるバージョンのライブラリを同時に使用することが可能です。

プラグインテンプレートジェネレータ

Embulk には、プラグインの開発を簡単にするために、プラグインのひな形を作成する機能が実装されています。 例えば、Rubyでデータ加工を行うプラグインを書きたいと思ったら、次のように始めることができます:

embulk new ruby-filter myfilter

これによって ./embulk-filter-myfilter ディレクトリが自動的に作成され、ほぼ動く状態のコードも同時に生成されます。 あとは実際に必要なコードを足していくだけで、新しいプラグインを開発できます。

*1:1ヶ月前