読者です 読者をやめる 読者になる 読者になる

日本OSS貢献者賞を受賞しました

このたび、第10回 日本OSS貢献者賞 を受賞いたしました。

非同期メッセージングライブラリ「MessagePack」、分散Key-Valueストア「kumofs」、ログコレクタ「Fluentd」、バルクデータローダ「Embulk」、Presto向けゲートウェイサーバ「Prestogres」など、大学生時代から現在に至るまで、多数のOSSプロジェクトを立ち上げ、開発を主導している。2010年度 日本OSS奨励賞を受賞。

推薦していただいた皆様、審査員の皆様、ありがとうございます。もちろん、OSSは1人で作っている物ではありません。かつては名も無いOSSに果敢にも参加していただいた皆様、本当にありがとうございます。

何年も前からコミュニティを支えていただいているMessagePackの開発者の皆様や、Fluentdのコミッタやコントリビュータの皆様EmbulkプラグインFluentdプラグインの開発に携わる皆様、ドキュメントやブログ記事を書いている皆様、OSSを継続できる環境を作っているTreasure Dataの同僚、色々な人々の努力で成り立っています。

いつもいつも、お世話になっております。まだまだ新しいOSSを設計していく所存です。これからもよろしくお願いします!

Embulkでやりたいことリスト(2015年7月版)

バルクロード機能

1つの設定ファイルで複数ジョブを実行する

例えば users.csv と histories.csv の2つのファイルを、それぞれPostgreSQLにある users と histories の2つのテーブル にロードしたいというようなユースケースに対応する機能。

設定ファイルの構文はissueに書いてあるように、default: に書き並べた設定に対して、jobs: に書いた設定をマージしたものを実際の設定ファイルとして実行していく方法で良さそう。しかし、fliters: は配列なので、default: に書かれた filters: に jobs: に書かれた filters: をどうマージするか、あまり良い案がない。常に後ろに追加すれば良いわけでは無いと思うので。

filtersが配列になっているのが良くないので、これを transform: {name: config, name: config, ...} というmapにすれば、うまくマージできる。例えば、

jobs:
    accounts:
        in:
            path_prefix: accounts
            parser:
                type: csv
                columns:
                - {name: id, type: long}
                - {name: surename, type: string}
                - {name: givenname, type: string}
        transforms:
            remove_my_sensitive_columns:
                columns: [id]
        out:
            table: accounts
    payments:
        in:
            path_prefix: payments.csv
            parser:
                type: csv
                columns:
                - {name: id, type: long}
                - {name: timestamp, type: timestamp, format: format: '%Y-%m-%d %H:%M:%S.%N'}
        out:
            table: payments
    requests:
        in:
            path_prefix: requests.csv
            parser:
                type: csv
                columns:
                - {name: request_id, type: string}
                - {name: account_id, type: long}
                - {name: timestamp, type: timestamp, format: format: '%Y-%m-%d %H:%M:%S.%N'}
        transforms:
            remove_my_sensitive_columns:
                columns: [account_id]
        out:
            table: requests
in:
    type: file
transform:
  remove_my_sensitive_columns:
    type: filter_column
    columns: []
out:
    type: postgresql
    table: ${table_name}

複数のジョブを実行するときに、全部を1つのトランザクションとして扱うか、1つ1つのジョブを別々のトランザクションと考えて逐次実行するかは、決まっていない。全部を1つのトランザクションにする場合、全ジョブが完了するまでcommitできないので、実装は比較的複雑になる。逐次実行する場合は、resume fileのパスをどういうコマンドラインで指定してもらうか、どのタイミングで消すかが良く分かっていない。

エラーレコードの扱いをカスタマイズ可能にする

設定ファイルの構文は、issueに書いてある error: フィールドにoutputプラグインを書く方法で良いと思う。デフォルトでは embulk-output-warning みたいなbuilt-inプラグインが指定されていることにして、警告を出して無視する。カスタマイズ次第で、embulk-output-abort を指定してエラーにさせるとか、embulk-output-file を指定してファイルに保存できるようになる。

どうやって実装するかが悩ましい。とりあえず、embulk-formatter-csv と embulk-parser-csv に実装してみて、良さそうなAPIを考えて、それを全プラグインで使えるようにmix-inかutility classとして切り出す方針が良さそうだと思っている。

JSON型サポート

arrayやmap型などの型付きの複合型を入れるのも良いのだけども、どうせスキーマレスな型は必要になるはずだから、とりあえずJSON型を追加し、arrayやmapなどもそこに突っ込む方針で良いと思う。

プラグインAPIの互換性を壊さずに型を追加できるようにするAPIAdded SwitchPageReader and SwitchPageBuilder utilities. by frsyuki · Pull Request #191 · embulk/embulk · GitHub)も考えたけども、それは要らない気がしている。なぜなら、Javaは静的型付け言語だが処理系は動的で、実行時にメソッドが実装されているかどうかチェックできるから、古いAPIに準拠したプラグインに対する処理をembulk本体側に入れておけば(e.g. メソッドが実装されていなかったらstringに変換してしまう)、後方互換性を維持したままJSON型は追加できる。

json型はPostgreSQLと同様に、arrayやmap以外に文字列などのスカラ型も保存できて良いと思う。fluentdと同様に、内部エンコーディングはmsgpackにすればいい。

BigDecimal型サポート

いるかな?

--no-commit, --no-run オプションの追加

誰かコードを書くんだ…!

プラグインAPI

テストフレームワーク

Help wanted。

プラグインタイプをRubyで書けるようにする

DecoderとEncoderは、Rubyで書けなくても特に問題ない気がしているけども、FileInputとFileOutputは書けた方が良さそう。

行ベースのパーサプラグインを簡単に書くmixinかユーティリティ

fluentdで言うところのmixinのような仕組みが、今のところないので、こういう便利系APIをどうやって足したら良いのかの指針がない。まずそこを少し考えた方がいい。

TimestampParser/Formatter APIをstaticに作れるようにする

欲しいプラグイン

Presto filter

filterプラグインは結構書くのが大変で、もっと便利なAPIを追加しないといけない。しかし他方で、大体のユースケースSQLが書ければ解決できる。

そこで、embulkのfilterプラグインとして、SQLの処理系であるPrestoを使うプラグインがあれば、filterプラグインを書かなくても色々な処理ができるようになる。

具体的な設定ファイルは、例えば:

in:
    type: file
    ...
filters:
    - type: presto
      query: |-
           select ip, address, time + elapsed as end_time from in where address != '127.0.0.1'
out:
    type: stdout

のようになる(なって欲しい)。

line-filter decoder

CSVパーサなど、parserプラグインにデータを渡す前に、行を加工したりスキップしたいことが、たまにある。

例えば、行のフォーマットが

2015-06-01 23:15:81    {"col1":"val1","col2":2}

というように日付 + JSONになっている場合に、日付部分を削除してからJSONパーサプラグインに渡したい、というケース。

parserプラグインに渡す前のデータは、decoderプラグインで加工できる。そういう加工をするプラグインを一つ作ればいい。

各種改善

outputプラグインの並列化

タスクの個数(taskCount)はinputプラグインが決めるのだけども、ファイルが1つしか無いケースや、embulk-input-jdbc など並列化に対応していないプラグインを使っていると、outputプラグインの動作までシングルスレッドで動くことになってしまう。

よくあるoutputの処理が遅いケースでは、inputは1スレッドでもoutputをマルチスレッドで動かしたいことが良くある。

実はtaskCountは、executorプラグインのレイヤーで変更することができる。executorプラグインは、inputプラグインから受け取ったデータをoutputプラグインに渡す部分をカスタマイズできるのだけども、そこでデータを複数スレッドに分散させればいい。

LocalExecutorPluginを改造すればできる。少し難しいのは、resumeはタスク単位で行われるので、resumeした結果が冪等になるように、データの分散はinput + 設定ファイルに対して決定性のあるアルゴリズムで行わなければならないところだけ。

統計情報の収集

処理したレコード数、バイト数、CPU使用率、メモリ使用量などの統計情報の収集。

Dropwizardのmetricsを組み込めば良さそう。依存関係も少ないし。

組み込み方に関する具体的な設計は、まだ何も無い。

LineDecoderの最適化

issueに貼ってある参考実装みたいにしたい。

現在のLineDecoderは、厳密にはデータの末端にある1行の扱いがおかしい。

その他

Github issuesに登録してあるissue全部:

Re: 論理削除はなぜ「筋が悪い」か

Kazuhoさんの論理削除はなぜ「筋が悪い」かを読んで。

UPDATEが発生しないテーブルならば、削除フラグを使った実装手法でも現在の状態と更新ログを別々に表現でき、結果として効率と過去の情報を参照できるメリットを簡潔に両立できるのではないか、という話。

大前提として全く同意なのだけども、今あるテーブルにdeleted_atを足すだけで、過去のレコードを復旧可能なようにしたい>< みたいに思っちゃった僕のような人間が実際に取るべき実装手法は何か、あるいは、それを想定して今やっておくべきテーブル設計はどういうものか!?というのが最後の疑問。

まずUPDATEがなければ、immutableなマスタ、更新ログ、「現時点のビュー」の3テーブルは、例えば次のようになる(PostgreSQLの場合):

-- immutableなマスタ。
create table records (
  id serial primary key,
  another_col uuid not null
);

-- 更新ログ。deleted_atカラムがある。
create table record_delete_events (
  record_id int primary key references (records.id),
  deleted_at timestamp with time zone not null default now()
);
create index delete_events_deleted_at on record_delete_events (deleted_at);

-- 現時点のビュー。マスタから更新ログに含まれるidを取り除いたVIEW。
create view live_records as
select * from records
where id not exists (select * from record_delete_events where record_id = id);

-- 削除する操作。更新ログにINSERT
INSERT INTO record_delete_events (record_id) values (1);

削除フラグを使った実装手法でも、まったく同じスキーマを定義できる:

-- immutableなマスタ。deleted_atカラムは無視する。
create table records (
  id serial primary key,
  another_col uuid not null,
  deleted_at timestamp with time zone defualt null
);
create index records_deleted_at on records (deleted_at) where deleted_at is not null;

-- 更新ログ。マスタからdeleted_atがセットされたレコードを抽出したVIEW。
create view record_delete_events as
select id as record_id, deleted_at from records where deleted_at is not null;

-- 現時点のビュー。マスタからdeleted_atがセットされていないレコードを抽出したVIEW。
create view live_records as
select id, another_col from records where records where deleted_at is null;

-- 削除する操作。UPDATEでdeleted_atをセット
UPDATE records SET deleted_at = now() WHERE id=1;

上記のクエリは、live_recordsからprimary key以外の条件でSELECTする際に、部分インデックスrecords_deleted_atが必要になるので最適化が効きにくくなる弊害が残るが、これは次の実装手法で解決できるはず:

-- 現時点のビュー。新しいレコードはここに入れる。
create table live_records (
  id serial primary key,
  another_col uuid not null
);

-- 更新ログ。削除したレコードはこっちに移す。
create table deleted_records (
  record_id id primary key,
  another_col uuid not null,
  deleted_at timestamp with time zone not null default now()
);
create index records_deleted_at on deleted_records (deleted_at);

create view record_delete_events as
select record_id, deleted_at from deleted_records;

-- immutableなマスタ。liveとdeletedをUNION ALLしたビュー。
create view records as
select id, another_col from live_records
union all
select record_id as id, another_col from deleted_records;

-- 削除する操作:
WITH deleted (
  DELETE FROM live_records
  WHERE id=1
  RETURNING *
)
INSERT INTO deleted_records (record_id, another_col)
SELECT id, another_col

ただし、PostgreSQLでは、UPDATEの負荷とDELETE+INSERTの負荷があまり変わらないと仮定する。削除する操作が複雑になっているが、これはFUNCTIONを作っておくことで回避できる:

CREATE FUNCTION delete_record (delete_record_id int not null) as $$
WITH deleted (
  DELETE FROM live_records
  WHERE id=delete_record_id
  RETURNING *
)
INSERT INTO deleted_records (record_id, another_col)
SELECT id, another_col
$$ language sql;

select delete_record(1);

VIEWではなくINHERITでも実装できるはず。おそらくこうなる:

-- immutableなマスタ。親テーブルからのSELECTは、子テーブルのレコードを含む
create table records (
  id serial primary key,
  another_col uuid not null
);

-- 現時点のビュー。子テーブル。
create table live_records (
)
inherits (records);

-- 更新ログ。子テーブル。
create table deleted_records (
  deleted_at timestamp with time zone not null default now()
)
inherits (records);

create view record_delete_events as
select id as record_id, deleted_at from deleted_records;

ここで疑問は、

  • 効率的にSELECTや更新ができるスキーマを作ろうとすると、VIEWやFUNCTIONなど、側に実装するコードが増えてくる。それらのコードは、上記のようにDB側に実装しても良い(するべき)だろうか?それともアプリケーションに実装するべきだろうか?
  • WebサービスとかTreasure Dataのようなサービス事業者だと、テーブルを最初に設計するときにあまり時間をかけられないので、後から見直して最適化していくことが多い。テーブルのスキーマは停止時間なしで変更する手法をいくつか思いつくが(PostgreSQLなら)、上記のレコードを削除する操作などはアプリケーションの変更を伴うので難しい(アプリケーションとDBのスキーマをアトミックに変更できない)。
    • レコードの削除や追加は、どのようなSQLの構文で表現しておくと良いだろうか?全部FUNCTIONにしておくのは煩雑である*1
    • とりあえず論理削除でもいいから、後で何とかしようと思ったときに、後から変更しやすいスキーマ設計(または後から変更できなくて困るタイプのスキーマ設計)は、どんなスキーマだろうか?

*1:でもPlazmaDBの実装は、ほぼ全部FUNCTIONになっている… Plazma - Treasure Data’s distributed analytical database -

Embulk 0.3 & 0.4 の新機能 - リジュームとJavaプラグイン

つい先日*1、Embulk の新しいメジャーバージョンを2つリリースしました。

これらのバージョンでは、データ転送ミドルウェア勉強会で得られたフィードバックを元に、リジューム機能Javaプラグイン機能、そして プラグインテンプレートジェネレータ を追加しています。

リジューム機能

大きなデータをロードする場合、大部分のデータのロードには成功するが、一部だけ失敗してしまうことは良くあることです。ネットワーク障害、サーバの過負荷などの他に、エラー処理が不完全であるなど原因は様々考えられますが、そのためだけに全データをすべてロードし直すのは大変な手間です。

そこでEmbulkでは、分割された複数のタスクのうちの一部だけが失敗した場合に、それらのタスクを後からリトライできる仕組みを導入しました。

使い方は、embulk run に --resume--state PATH オプションを指定するだけです:

embulk run config.yml --resume-state resume.yml

もしロードが一部失敗した場合は、指定したファイルに実行状態が保存されます。これをリトライしたい場合は、 まったく同じコマンドを再実行 すればOKです:

embulk run config.yml --resume-state resume.yml

それでも失敗してしまった場合には、また resume.yml ファイルが更新されるので、また同じコマンドを実行すればOKです。

一部ではなくすべてのタスクが失敗してしまった場合には、resume.yml ファイルは作成されません。しかしいずれにしても、まったく同じコマンドでリトライできます。

もしもリトライを諦めて中間データを掃除したい場合は、run の代わりに cleanup を実行してください:

embulk cleanup config.yml --resume-state resume.yml

リジュームの内部実装

例えば3つのタスクのうちの最後のタスクが失敗した場合は、このようなresume.ymlファイルが作成されます:

in_schema:  # 入力スキーマ
  - {index: 0, name: file, type: string}
  - {index: 1, name: hostname, type: string}
  - {index: 2, name: col0, type: long}
  - {index: 3, name: col1, type: double}
out_schema: # 出力スキーマ
  - {index: 0, name: file, type: string}
  - {index: 1, name: hostname, type: string}
  - {index: 2, name: col0, type: long}
  - {index: 3, name: col1, type: double}
in_task:  # inputプラグインのtask
  files: [file1, file2, file3]
  hostname: null
out_task:  # outputプラグインのtask
  TimeZone: UTC
in_reports:  # 成功したinputタスクのcommit reports
  - {records: 10}  # 成功したタスクのレポート
  - {records: 10}  # 成功したタスクのレポート
  - null  # 失敗した分
out_reports:  # 成功したoutputタスクのcommit reports
  - {}  # 成功したタスクのレポート
  - {}  # 成功したタスクのレポート
  - null  # 失敗した分
exec_task:
  transaction_time: '2015-02-24 20:49:04.729 UTC'
  transaction_time_zone: UTC

in_reportsout_reports には、実行したタスクのレポート(CommitReport)が書かれています。ここがnullであるタスクは、前回の実行時に失敗したタスクです。resume時には、これらのタスクのみを再実行します。タスクを再実行して成功すれば、そのnullを実際のレポートに置き換えることができるわけです。

そうして全てのタスクのレポートが揃ったら、それらのレポートを集めてプラグインに返し、トランザクションをコミットさせます。

並列バッチデータ転送OSSのEmbulkをソースコードリーディングしてみる(その3:run概要&データソース概要 でも触れられていますが、タスク(TaskSource) や CommitReport などのデータをファイルに保存し、後で復元してやり直したり、ネットワーク越しに転送して分散実行できることが、Embulk の実装上の大きな特徴です。

HadoopのInputSplitやConfiguration、PrestoのHandleなどにも見られるように、タスク・オブジェクトのシリアライズはEmbulkのような並列・分散処理フレームワークにおいて必須の実装課題です。しかしシリアライズコードの実装は煩雑な上にバグると発見しにくく、できるだけ素直に隠蔽するべきです。Embulkでは jackson-databindProxy、独特な継承構造によって、それなりにうまく解決していると思います。Embulk が提供する Task インタフェースと @Config を使っている限り、プラグインでは意識する必要が無いようになっています。

Javaプラグイン機能

Embulk 0.3以前はRubyでしかプラグインを書くことができませんでしたが、0.4からJavaでも書けるようになりました。

Embulk のプラグインローダーは、新しいClassLoaderを各プラグインごとに作ります。これが何を意味するかというと、プラグインごとに異なるバージョンのライブラリを同時に使える と言うことです。

例えば、Hadoop 1.x 系の HDFS からデータを読み出す file input plugin を書いたとします。そのデータを Hadoop 2.x 系の HDFS に書き出す file output plugin も書いたとします。これらのプラグインを同時にロードするとどうなるでしょうか? ふつうのJavaアプリケーションでは、依存ライブラリのバージョンが衝突し、おそらくうまく動きません。しかしEmbulkでは、各プラグインごとに別々の実行環境を作るため、異なるバージョンのライブラリを同時に使用することが可能です。

プラグインテンプレートジェネレータ

Embulk には、プラグインの開発を簡単にするために、プラグインのひな形を作成する機能が実装されています。 例えば、Rubyでデータ加工を行うプラグインを書きたいと思ったら、次のように始めることができます:

embulk new ruby-filter myfilter

これによって ./embulk-filter-myfilter ディレクトリが自動的に作成され、ほぼ動く状態のコードも同時に生成されます。 あとは実際に必要なコードを足していくだけで、新しいプラグインを開発できます。

*1:1ヶ月前

並列データ転送ツール『Embulk』リリース!

こんにちは。古橋です。

先日の*1 データ転送ミドルウェア勉強会で、新しいオープンソースツール Embulk をリリースしました。

Embulk は、リアルタイムなログ収集では常識となった fluentd のバッチ版のようなツールで、ファイルやデータベースからデータを吸い出し、別のストレージやデータベースにロードするためのコンパクトなツールです。

fluentd と同様にプラグイン型のアーキテクチャを採用 しているため、RubyJavaで簡単なコードを書くことで、様々なファイルフォーマットやストレージに対応することができます。一方で fluentd とは異なり、高速性トランザクション制御スキーマを使ったデータのバリデーション などにこだわっており、1発実行、あるいは日次や1時間毎に実行するバルク処理に特化しています。

なぜEmbulkを作ったのか?

fluentdをリリースした2011年と比較すると、データ収集を取り巻く問題は飛躍的に改善されました。fluentdにログを流し込んでさえいれば、あとはfluentdの設定次第でどんなストレージやサービスにもログを転送できるようになりました。

つまり、用途もなく“とりあえず” 集め始めたログであっても、それがfluentdに入ってさえいれば、集計やグラフ化監視とアラート、さらには複雑な 異常値検出SQLによるストリーミングデータ処理 など、必要になればすぐに価値ある情報を取り出すことが可能です。カンだけに頼らないデータ活用時代においては、何はなくともまずデータの収集基盤を作ることは、非常に重要なステップです。

しかしその一方で、

  • ダウンロードしてきた『CSVファイル』を1回ロードして解析してみたい
    • 実は10GBもあるので大変。
    • 加えて値の変換にカスタムな処理が必要。
  • fluentdを導入したけど大量に残った過去データもロードして解析したい
    • 実は数年分のデータが溜まっている。
    • 全部バイナリデータで扱いに困っている。
  • 日次のバッチ処理の一部にデータの転送処理がある
    • 実は誰かが書いたか分からないスクリプトがたまに失敗していて困る。
    • リトライするにもオプションが多すぎて良く分からない。
  • 異なるストレージにデータを同期したい
    • MySQLにあるデータをElasticsearchにバルクロードしたい
    • ローカルのOracleにあるデータをSalesforce.comに転送したい
    • ローカルのHadoopからTreasure Dataに移行したい
    • MongoDBからPostgreSQLに移行したい

など、fluentdでカバーすることが難しい問題も多くあることが分かってきました。

そこで、データ収集にまつわる残りすべての問題を解決することを目指したツールがEmbulkです。

Embulkの特徴

プラグインアーキテクチャ

Embulkの第一の特徴はプラグインアーキテクチャです。入力、出力、フィルタ(データ加工)などのプラグインを書くことで足りない機能を補完し、現場で使えるツールに拡張していくことができます。

それらのプラグインは、オープンソースソフトウェアとして自由にリリースすることができます。Embulkのリリースからわずか3週間しか経っていないにもかかわらず、なんと既に17個ものプラグインがリリースされています。

データ処理には、壊れた値(例外データ)の扱いや、エラー処理やリカバリ、日付フォーマットの変換、NULL値の扱い、リトライ、二重ロードを防ぐ冪等性の制御…などなど、実は難しい処理が数多くあります。これらを頭の中に秘伝のタレとして蓄積するだけでなく、多くの人と共有することで、ノウハウが詰まったスクリプトを継続的にメンテナンスできるようにしたいという願いがあります。

Embulk plugin architecture

高速な並列・分散処理

Embulkは、1回の処理を複数のタスクに分割し、並列に実行する仕組みを備えています。これらのタスクを実行するExecutorプラグインを追加することで、HadoopやYARN、あるいはSun GridやMPIなどの分散処理環境を使い、大規模データを高速にバルク処理することが可能になります(※なる予定です。未実装。たぶんv0.6くらい)

また、プラグインRubyJRuby)で書きますが、Embulkのコア部分はJavaで書かれており、単一スレッドの性能にもこだわっています。もちろん、プラグインまでJavaで書くこともできます(v0.4〜)。特に性能を追求する場合はJavaで書いた方が良いかもしれません。

プラグインJVMで動く言語なら何でも良いので、ScalaClojureでも書ける…はず。

guess機構 — 設定ファイルの推測と提案

CSVは、RFCでフォーマットが定義されているにも関わらず、細部が微妙に異なるフォーマットが氾濫していることで悪名高いフォーマットです。これを扱うためにEmbulkのCSVファイルパーサプラグインには、8つもの挙動オプションがあります。挙動オプションの他にも、全カラムの型やカラム名、日付フォーマットの指定などが必要になり、設定項目はどうしても多くなってしまいます。

こういった設定作業を簡単にするために、Embulkには guess という仕組みがあります(v0.1〜。v0.5〜強化予定)。1度少しだけデータを読み込み、自動的に設定ファイルを生成してくれます。このように推測された設定を、必要なら少し手直ししてから使うことで、ゼロから設定を書くよりもずっと素早くデータ処理を始められます。

また、実行フェーズとguessフェーズを分離することによって、一度設定ファイルを固定してしまえば、その後は非決定的な動作がなく、データによらず正しく動き続けるという利点もあります。

どの程度の推測が可能なのかは、実際にEmbulkのQuick Startを試してみてください。

Embulk architecture

リトライとリジュー

データサイズが大きくなればなるほど、リトライの重要性は増してきます。数百個のタスクがすべて1回で成功するように祈る代わりに、Embulkでは失敗したタスクだけを後からやり直すリジューム機能を備えています(v0.3〜)。

オープンソース

Embulkはオープンソースソフトウェアです。ソースコードはすべてGithubにあります。何か問題が起きたならば、ソースコードを見て対応することができますし、必要なら直して再ビルドすることまでできます*2

またEmbulkはオープンソースの大原則に従い、複雑な製品で遠大な問題を解決するよりも、シンプルなツールで目の前の問題を解決するツールを目指しています。それ以上のことは他のツールと連携して解決する方向性です。

Embulkプロジェクトへの参加には、色々な方法があります:

  • プラグインを書く
  • pull-requestを送る
  • 他のソフトウェアと連携するツールを書く
  • twitterでつぶやく
  • ブログを書く
    • こんなプラグインを書いた
    • こんな環境で使った
    • ここが便利そう! ここが足りない!
    • 動いた! 動かない!

リリースして3週間しか経っていない、まだまだ若いソフトウェアです。様々な形での参加を待っています!

あわせて読みたい

既にいくつものブログ記事を書いていただいています。ありがとうございます!

*1:少し時間が経ってしまいましたが^^;

*2:その場合はpull-requestを送るのを忘れずに

データ転送ミドルウェア勉強会

Treasure Data, Inc. 古橋貞之です。
来たる1月27日、新しいOSSツール Embulk をリリースします。

EmbulkはFluentdのバッチ処理版のようなツールで、CSVデータやアクセスログなどの構造化データを高い信頼性で転送することができるコンパクトなツールです。
入力元、出力先、ファイルフォーマット、圧縮方式などをプラグインで拡張することができ、S3上のCSVファイル、PostgreSQL、Elasticsearch、Salesforce.com、Treasure Dataなど、異種のストレージやサービスの間でデータを転送・同期することが可能になります。

Fluentdとは異なって、1発実行、あるいは1時間や1日毎で実行するバルク処理に特化しており、

などの拡張を備えています。

  • 1回で使い捨てられる割には面倒すぎるデータ変換スクリプト
  • 中途半端なエラー処理実装でかろうじて運用され続けるcronスクリプト
  • 本気で作ってみたけど特定用途向きすぎて再利用できないデータ同期アプリケーション

これらの経験知をプラグインとしてパッケージ化し、人類共通の資産として評価・再利用・継続的な改善を可能にするツールです。

1月27日の勉強会では、Fluentdのえらい人である @repeatedly からFluentdのv1に向けたロードマップについて、データ転送の大御所 小野和俊さんからHULFTについての解説もあります。

会場は、新宿・渋谷・品川・丸の内のどこからも行きやすいSAPジャパンビル:

データ転送ミドルウェア勉強会 - dots.[ドッツ]


講演者枠も1つ空いています。データ転送について一言ある方を募集中です。 @repeatedly@frsyuki までご連絡ください。


ちなみに、その前の1/20にはPresto meetupもあります:

Presto Meetup - dots.[ドッツ]

続・リトライと冪等性のデザインパターン - リトライはいつ成功するか

三度の飯よりエラー処理。古橋です。

大変好評をいただいた序章リトライと冪等性のデザインパターンの続編です。
前回はほぼ前置きでしたが、今回は冪等でない操作を冪等にする具体的なテクニックもまとめていきます。

パターン2:エラーを区別してDELETEを冪等にする

リソースに常に一意なIDが振られていれば、Deleteを冪等にするのは難しくない。そもそも同じリソースを2度削除することはできない。

一つ注意するべきなのは、削除されたリソースのIDが再利用されるケースでは、Deleteの冪等性は保証されない。例えば、kill -KILL <pid> コマンドはDelete系のAPIと考えられるが、pidは再利用されるので、何度も繰り返すと意図しないプロセスを殺してしまう可能性がある。

一般にIDの生成は非常に難しい問題だが、Deleteに関してのみ言えば再利用されなければいいので、単調増加する整数(AUTO INCREMENT)で問題ない。

ただし、DeleteもCreateと同様に、クライアント側のエラーハンドリングは少し難しい。「指定されたIDは既に削除済みだ」というエラーを受け取ったとき、それが以前から存在していなかったのが原因なのか、リトライしたことが原因なのかは区別するできない。これもケースバイケースだが、やはり決定性なエラーなのでリトライしてはいけない。

つまりDelete系のAPIでは、HTTPの404 Not Foundに相当するエラーコードや例外クラスを定義しておき、クライアントが区別できるようにしてしておいた方がいい*1

パターン3:操作をまとめて冪等にする

状態を持たないCreateやDeleteを冪等にする方法は簡単すぎるので、書くまでも無かったかもしれない。しかし世の中はそれほどシンプルでは無いので、次のようなケースが良く発生する:

1. 新しい空のプロジェクトを作る(Create)
2. 作ったプロジェクトにアイテムAを5個加える(Append)
3. 作ったプロジェクトにアイテムBを10個加える(Append)

一般にAppendやIncrementなどの操作は冪等にするのが難しい。言い換えれば、以前の状態に依存して操作後の状態が変わる操作は、冪等にするのが難しい*2

上記の例では、ステップ2.や3.が失敗した場合にその操作をリトライすると、予定よりも多くのアイテムが入ったプロジェクトが出来上がってしまう可能性がある。これではマズい。

こういう場合は、一連の操作をまとめてリトライできないか考えてみる。例えば上記の例では、Append中に失敗した場合は、一連の操作を一度最初からやり直すことで冪等にすることができる:

1. 新しい空のプロジェクトPを作る(Create)→成功
2. プロジェクトPにアイテムAを5個加える(Append)→成功
3. プロジェクトPにアイテムBを10個加える(Append)→失敗!
4. プロジェクトPを削除し(Delete)、最初からやり直す

あるいは、すべての操作をまとめた1つの処理を作ってしまう方法でも良い:

1. アイテムAが5個とアイテムBが10個入ったプロジェクトPを作る(Create)

パターン4:操作を細かくして信頼性を高める

前節では操作を大きな粒度にまとめてリトライする方法を紹介したが、リトライの粒度を大きくするほど信頼性は落ちることに注意する必要がある。

例えば、以下のような近ごろ大変良くあるケースを考えてみる:

1. 新しいテーブルを作る(Create)
2. 作ったテーブルにレコードを10,000件加える(Append)
3. 作ったテーブルにレコードを10,000件加える(Append)
4. 作ったテーブルにレコードを10,000件加える(Append)
....
50,000. 合計5億件加える

構造的には前節と同じなので、全体をまとめることで安全にリトライできる。

ここで、10,000件のAppend操作が、0.01%の確率で失敗してしまうと仮定する。一連の5万回の操作が一度も失敗せずに完遂する確率は何%だろうか?

各操作が独立だとすれば、99.99%の5万乗なので、たったの0.67% となる。一連の操作全体を毎回リトライしていたら、いつもどこかで失敗してしまう。こういうプログラムはいつまで経っても終わらないプログラムと言われる。残念、使い物にならない。

つまり、リトライの粒度を大きくするほどリトライのオーバーヘッドは大きく膨らみ、全体としてスループットは低下する。このような場合では、個々の操作を冪等にし、細かい粒度でリトライできるようにする必要がある。1つの解決策として、一つ一つの操作に名前を付けることで、AppendにCreateに変更する方法がある:

1. レコードを10,000件含んだ部分データSplit-1を作る(Create)
2. レコードを10,000件含んだ部分データSplit-2を作る(Create)
3. レコードを10,000件含んだ部分データSplit-3を作る(Create)
...
50000. 作成したSplit-1〜Split-50000含んだテーブルTを作成する(Create)

しかしこのように、AppendやIncrementを冪等にする仕組みは大がかりになりがちなので、本当に細かい粒度でリトライする必要があるかどうかは熟考が必要。場合によっては最終奥義『トランザクション』に頼った方が良いケースもある。いかにシンプルで妥当な方法を見つけ出すかは、プログラマの腕の見せ所かもしれない。

パターン5:操作ログとリクエストIDでUPDATEを冪等にする

次回に続く…

*1:あわせて読んでおきたいコメント:https://twitter.com/kazuho/status/476942055373934595 https://twitter.com/frsyuki/status/476944444676001793

*2:逆に言えば、以前の状態を無視して(依存せずに)新しいセットを宣言する操作は、簡単に冪等にできる。そういうAPIには、操作後の状態を単純に宣言する名前を付けることができる。例えば setXyz、enableXyz、ensureXyzIsCreated、ensureXyzIsDeleted などのようになる。