Embulkでやりたいことリスト(2015年7月版)

バルクロード機能

1つの設定ファイルで複数ジョブを実行する

例えば users.csv と histories.csv の2つのファイルを、それぞれPostgreSQLにある users と histories の2つのテーブル にロードしたいというようなユースケースに対応する機能。

設定ファイルの構文はissueに書いてあるように、default: に書き並べた設定に対して、jobs: に書いた設定をマージしたものを実際の設定ファイルとして実行していく方法で良さそう。しかし、fliters: は配列なので、default: に書かれた filters: に jobs: に書かれた filters: をどうマージするか、あまり良い案がない。常に後ろに追加すれば良いわけでは無いと思うので。

filtersが配列になっているのが良くないので、これを transform: {name: config, name: config, ...} というmapにすれば、うまくマージできる。例えば、

jobs:
    accounts:
        in:
            path_prefix: accounts
            parser:
                type: csv
                columns:
                - {name: id, type: long}
                - {name: surename, type: string}
                - {name: givenname, type: string}
        transforms:
            remove_my_sensitive_columns:
                columns: [id]
        out:
            table: accounts
    payments:
        in:
            path_prefix: payments.csv
            parser:
                type: csv
                columns:
                - {name: id, type: long}
                - {name: timestamp, type: timestamp, format: format: '%Y-%m-%d %H:%M:%S.%N'}
        out:
            table: payments
    requests:
        in:
            path_prefix: requests.csv
            parser:
                type: csv
                columns:
                - {name: request_id, type: string}
                - {name: account_id, type: long}
                - {name: timestamp, type: timestamp, format: format: '%Y-%m-%d %H:%M:%S.%N'}
        transforms:
            remove_my_sensitive_columns:
                columns: [account_id]
        out:
            table: requests
in:
    type: file
transform:
  remove_my_sensitive_columns:
    type: filter_column
    columns: []
out:
    type: postgresql
    table: ${table_name}

複数のジョブを実行するときに、全部を1つのトランザクションとして扱うか、1つ1つのジョブを別々のトランザクションと考えて逐次実行するかは、決まっていない。全部を1つのトランザクションにする場合、全ジョブが完了するまでcommitできないので、実装は比較的複雑になる。逐次実行する場合は、resume fileのパスをどういうコマンドラインで指定してもらうか、どのタイミングで消すかが良く分かっていない。

エラーレコードの扱いをカスタマイズ可能にする

設定ファイルの構文は、issueに書いてある error: フィールドにoutputプラグインを書く方法で良いと思う。デフォルトでは embulk-output-warning みたいなbuilt-inプラグインが指定されていることにして、警告を出して無視する。カスタマイズ次第で、embulk-output-abort を指定してエラーにさせるとか、embulk-output-file を指定してファイルに保存できるようになる。

どうやって実装するかが悩ましい。とりあえず、embulk-formatter-csv と embulk-parser-csv に実装してみて、良さそうなAPIを考えて、それを全プラグインで使えるようにmix-inかutility classとして切り出す方針が良さそうだと思っている。

JSON型サポート

arrayやmap型などの型付きの複合型を入れるのも良いのだけども、どうせスキーマレスな型は必要になるはずだから、とりあえずJSON型を追加し、arrayやmapなどもそこに突っ込む方針で良いと思う。

プラグインAPIの互換性を壊さずに型を追加できるようにするAPIAdded SwitchPageReader and SwitchPageBuilder utilities. by frsyuki · Pull Request #191 · embulk/embulk · GitHub)も考えたけども、それは要らない気がしている。なぜなら、Javaは静的型付け言語だが処理系は動的で、実行時にメソッドが実装されているかどうかチェックできるから、古いAPIに準拠したプラグインに対する処理をembulk本体側に入れておけば(e.g. メソッドが実装されていなかったらstringに変換してしまう)、後方互換性を維持したままJSON型は追加できる。

json型はPostgreSQLと同様に、arrayやmap以外に文字列などのスカラ型も保存できて良いと思う。fluentdと同様に、内部エンコーディングはmsgpackにすればいい。

BigDecimal型サポート

いるかな?

--no-commit, --no-run オプションの追加

誰かコードを書くんだ…!

プラグインAPI

テストフレームワーク

Help wanted。

プラグインタイプをRubyで書けるようにする

DecoderとEncoderは、Rubyで書けなくても特に問題ない気がしているけども、FileInputとFileOutputは書けた方が良さそう。

行ベースのパーサプラグインを簡単に書くmixinかユーティリティ

fluentdで言うところのmixinのような仕組みが、今のところないので、こういう便利系APIをどうやって足したら良いのかの指針がない。まずそこを少し考えた方がいい。

TimestampParser/Formatter APIをstaticに作れるようにする

欲しいプラグイン

Presto filter

filterプラグインは結構書くのが大変で、もっと便利なAPIを追加しないといけない。しかし他方で、大体のユースケースSQLが書ければ解決できる。

そこで、embulkのfilterプラグインとして、SQLの処理系であるPrestoを使うプラグインがあれば、filterプラグインを書かなくても色々な処理ができるようになる。

具体的な設定ファイルは、例えば:

in:
    type: file
    ...
filters:
    - type: presto
      query: |-
           select ip, address, time + elapsed as end_time from in where address != '127.0.0.1'
out:
    type: stdout

のようになる(なって欲しい)。

line-filter decoder

CSVパーサなど、parserプラグインにデータを渡す前に、行を加工したりスキップしたいことが、たまにある。

例えば、行のフォーマットが

2015-06-01 23:15:81    {"col1":"val1","col2":2}

というように日付 + JSONになっている場合に、日付部分を削除してからJSONパーサプラグインに渡したい、というケース。

parserプラグインに渡す前のデータは、decoderプラグインで加工できる。そういう加工をするプラグインを一つ作ればいい。

各種改善

outputプラグインの並列化

タスクの個数(taskCount)はinputプラグインが決めるのだけども、ファイルが1つしか無いケースや、embulk-input-jdbc など並列化に対応していないプラグインを使っていると、outputプラグインの動作までシングルスレッドで動くことになってしまう。

よくあるoutputの処理が遅いケースでは、inputは1スレッドでもoutputをマルチスレッドで動かしたいことが良くある。

実はtaskCountは、executorプラグインのレイヤーで変更することができる。executorプラグインは、inputプラグインから受け取ったデータをoutputプラグインに渡す部分をカスタマイズできるのだけども、そこでデータを複数スレッドに分散させればいい。

LocalExecutorPluginを改造すればできる。少し難しいのは、resumeはタスク単位で行われるので、resumeした結果が冪等になるように、データの分散はinput + 設定ファイルに対して決定性のあるアルゴリズムで行わなければならないところだけ。

統計情報の収集

処理したレコード数、バイト数、CPU使用率、メモリ使用量などの統計情報の収集。

Dropwizardのmetricsを組み込めば良さそう。依存関係も少ないし。

組み込み方に関する具体的な設計は、まだ何も無い。

LineDecoderの最適化

issueに貼ってある参考実装みたいにしたい。

現在のLineDecoderは、厳密にはデータの末端にある1行の扱いがおかしい。

その他

Github issuesに登録してあるissue全部: