Embulk 0.3 & 0.4 の新機能 - リジュームとJavaプラグイン
つい先日*1、Embulk の新しいメジャーバージョンを2つリリースしました。
これらのバージョンでは、データ転送ミドルウェア勉強会で得られたフィードバックを元に、リジューム機能、Javaプラグイン機能、そして プラグインテンプレートジェネレータ を追加しています。
リジューム機能
大きなデータをロードする場合、大部分のデータのロードには成功するが、一部だけ失敗してしまうことは良くあることです。ネットワーク障害、サーバの過負荷などの他に、エラー処理が不完全であるなど原因は様々考えられますが、そのためだけに全データをすべてロードし直すのは大変な手間です。
そこでEmbulkでは、分割された複数のタスクのうちの一部だけが失敗した場合に、それらのタスクを後からリトライできる仕組みを導入しました。
使い方は、embulk run に --resume--state PATH オプションを指定するだけです:
embulk run config.yml --resume-state resume.yml
もしロードが一部失敗した場合は、指定したファイルに実行状態が保存されます。これをリトライしたい場合は、 まったく同じコマンドを再実行 すればOKです:
embulk run config.yml --resume-state resume.yml
それでも失敗してしまった場合には、また resume.yml ファイルが更新されるので、また同じコマンドを実行すればOKです。
一部ではなくすべてのタスクが失敗してしまった場合には、resume.yml ファイルは作成されません。しかしいずれにしても、まったく同じコマンドでリトライできます。
もしもリトライを諦めて中間データを掃除したい場合は、run の代わりに cleanup を実行してください:
embulk cleanup config.yml --resume-state resume.yml
リジュームの内部実装
例えば3つのタスクのうちの最後のタスクが失敗した場合は、このようなresume.ymlファイルが作成されます:
in_schema: # 入力スキーマ - {index: 0, name: file, type: string} - {index: 1, name: hostname, type: string} - {index: 2, name: col0, type: long} - {index: 3, name: col1, type: double} out_schema: # 出力スキーマ - {index: 0, name: file, type: string} - {index: 1, name: hostname, type: string} - {index: 2, name: col0, type: long} - {index: 3, name: col1, type: double} in_task: # inputプラグインのtask files: [file1, file2, file3] hostname: null out_task: # outputプラグインのtask TimeZone: UTC in_reports: # 成功したinputタスクのcommit reports - {records: 10} # 成功したタスクのレポート - {records: 10} # 成功したタスクのレポート - null # 失敗した分 out_reports: # 成功したoutputタスクのcommit reports - {} # 成功したタスクのレポート - {} # 成功したタスクのレポート - null # 失敗した分 exec_task: transaction_time: '2015-02-24 20:49:04.729 UTC' transaction_time_zone: UTC
in_reports と out_reports には、実行したタスクのレポート(CommitReport)が書かれています。ここがnullであるタスクは、前回の実行時に失敗したタスクです。resume時には、これらのタスクのみを再実行します。タスクを再実行して成功すれば、そのnullを実際のレポートに置き換えることができるわけです。
そうして全てのタスクのレポートが揃ったら、それらのレポートを集めてプラグインに返し、トランザクションをコミットさせます。
並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その3:run概要&データソース概要 でも触れられていますが、タスク(TaskSource) や CommitReport などのデータをファイルに保存し、後で復元してやり直したり、ネットワーク越しに転送して分散実行できることが、Embulk の実装上の大きな特徴です。
HadoopのInputSplitやConfiguration、PrestoのHandleなどにも見られるように、タスク・オブジェクトのシリアライズはEmbulkのような並列・分散処理フレームワークにおいて必須の実装課題です。しかしシリアライズコードの実装は煩雑な上にバグると発見しにくく、できるだけ素直に隠蔽するべきです。Embulkでは jackson-databind、Proxy、独特な継承構造によって、それなりにうまく解決していると思います。Embulk が提供する Task インタフェースと @Config を使っている限り、プラグインでは意識する必要が無いようになっています。
Javaプラグイン機能
Embulk 0.3以前はRubyでしかプラグインを書くことができませんでしたが、0.4からJavaでも書けるようになりました。
Embulk のプラグインローダーは、新しいClassLoaderを各プラグインごとに作ります。これが何を意味するかというと、プラグインごとに異なるバージョンのライブラリを同時に使える と言うことです。
例えば、Hadoop 1.x 系の HDFS からデータを読み出す file input plugin を書いたとします。そのデータを Hadoop 2.x 系の HDFS に書き出す file output plugin も書いたとします。これらのプラグインを同時にロードするとどうなるでしょうか? ふつうのJavaアプリケーションでは、依存ライブラリのバージョンが衝突し、おそらくうまく動きません。しかしEmbulkでは、各プラグインごとに別々の実行環境を作るため、異なるバージョンのライブラリを同時に使用することが可能です。
プラグインテンプレートジェネレータ
Embulk には、プラグインの開発を簡単にするために、プラグインのひな形を作成する機能が実装されています。 例えば、Rubyでデータ加工を行うプラグインを書きたいと思ったら、次のように始めることができます:
embulk new ruby-filter myfilter
これによって ./embulk-filter-myfilter ディレクトリが自動的に作成され、ほぼ動く状態のコードも同時に生成されます。 あとは実際に必要なコードを足していくだけで、新しいプラグインを開発できます。
*1:1ヶ月前