データ転送ミドルウェア勉強会
Treasure Data, Inc. 古橋貞之です。
来たる1月27日、新しいOSSツール Embulk をリリースします。
EmbulkはFluentdのバッチ処理版のようなツールで、CSVデータやアクセスログなどの構造化データを高い信頼性で転送することができるコンパクトなツールです。
入力元、出力先、ファイルフォーマット、圧縮方式などをプラグインで拡張することができ、S3上のCSVファイル、PostgreSQL、Elasticsearch、Salesforce.com、Treasure Dataなど、異種のストレージやサービスの間でデータを転送・同期することが可能になります。
Fluentdとは異なって、1発実行、あるいは1時間や1日毎で実行するバルク処理に特化しており、
などの拡張を備えています。
- 1回で使い捨てられる割には面倒すぎるデータ変換スクリプト
- 中途半端なエラー処理実装でかろうじて運用され続けるcronスクリプト
- 本気で作ってみたけど特定用途向きすぎて再利用できないデータ同期アプリケーション
- …
これらの経験知をプラグインとしてパッケージ化し、人類共通の資産として評価・再利用・継続的な改善を可能にするツールです。
1月27日の勉強会では、Fluentdのえらい人である @repeatedly からFluentdのv1に向けたロードマップについて、データ転送の大御所 小野和俊さんからHULFTについての解説もあります。
会場は、新宿・渋谷・品川・丸の内のどこからも行きやすいSAPジャパンビル:
データ転送ミドルウェア勉強会 - dots.[ドッツ]
講演者枠も1つ空いています。データ転送について一言ある方を募集中です。 @repeatedly か @frsyuki までご連絡ください。
ちなみに、その前の1/20にはPresto meetupもあります:
Presto Meetup - dots.[ドッツ]
続・リトライと冪等性のデザインパターン - リトライはいつ成功するか
三度の飯よりエラー処理。古橋です。
大変好評をいただいた序章リトライと冪等性のデザインパターンの続編です。
前回はほぼ前置きでしたが、今回は冪等でない操作を冪等にする具体的なテクニックもまとめていきます。
パターン2:エラーを区別してDELETEを冪等にする
リソースに常に一意なIDが振られていれば、Deleteを冪等にするのは難しくない。そもそも同じリソースを2度削除することはできない。
一つ注意するべきなのは、削除されたリソースのIDが再利用されるケースでは、Deleteの冪等性は保証されない。例えば、kill -KILL <pid> コマンドはDelete系のAPIと考えられるが、pidは再利用されるので、何度も繰り返すと意図しないプロセスを殺してしまう可能性がある。
一般にIDの生成は非常に難しい問題だが、Deleteに関してのみ言えば再利用されなければいいので、単調増加する整数(AUTO INCREMENT)で問題ない。
ただし、DeleteもCreateと同様に、クライアント側のエラーハンドリングは少し難しい。「指定されたIDは既に削除済みだ」というエラーを受け取ったとき、それが以前から存在していなかったのが原因なのか、リトライしたことが原因なのかは区別するできない。これもケースバイケースだが、やはり決定性なエラーなのでリトライしてはいけない。
つまりDelete系のAPIでは、HTTPの404 Not Foundに相当するエラーコードや例外クラスを定義しておき、クライアントが区別できるようにしてしておいた方がいい*1。
パターン3:操作をまとめて冪等にする
状態を持たないCreateやDeleteを冪等にする方法は簡単すぎるので、書くまでも無かったかもしれない。しかし世の中はそれほどシンプルでは無いので、次のようなケースが良く発生する:
1. 新しい空のプロジェクトを作る(Create)
2. 作ったプロジェクトにアイテムAを5個加える(Append)
3. 作ったプロジェクトにアイテムBを10個加える(Append)
一般にAppendやIncrementなどの操作は冪等にするのが難しい。言い換えれば、以前の状態に依存して操作後の状態が変わる操作は、冪等にするのが難しい*2。
上記の例では、ステップ2.や3.が失敗した場合にその操作をリトライすると、予定よりも多くのアイテムが入ったプロジェクトが出来上がってしまう可能性がある。これではマズい。
こういう場合は、一連の操作をまとめてリトライできないか考えてみる。例えば上記の例では、Append中に失敗した場合は、一連の操作を一度最初からやり直すことで冪等にすることができる:
1. 新しい空のプロジェクトPを作る(Create)→成功
2. プロジェクトPにアイテムAを5個加える(Append)→成功
3. プロジェクトPにアイテムBを10個加える(Append)→失敗!
4. プロジェクトPを削除し(Delete)、最初からやり直す
あるいは、すべての操作をまとめた1つの処理を作ってしまう方法でも良い:
1. アイテムAが5個とアイテムBが10個入ったプロジェクトPを作る(Create)
パターン4:操作を細かくして信頼性を高める
前節では操作を大きな粒度にまとめてリトライする方法を紹介したが、リトライの粒度を大きくするほど信頼性は落ちることに注意する必要がある。
例えば、以下のような近ごろ大変良くあるケースを考えてみる:
1. 新しいテーブルを作る(Create)
2. 作ったテーブルにレコードを10,000件加える(Append)
3. 作ったテーブルにレコードを10,000件加える(Append)
4. 作ったテーブルにレコードを10,000件加える(Append)
....
50,000. 合計5億件加える
構造的には前節と同じなので、全体をまとめることで安全にリトライできる。
ここで、10,000件のAppend操作が、0.01%の確率で失敗してしまうと仮定する。一連の5万回の操作が一度も失敗せずに完遂する確率は何%だろうか?
各操作が独立だとすれば、99.99%の5万乗なので、たったの0.67% となる。一連の操作全体を毎回リトライしていたら、いつもどこかで失敗してしまう。こういうプログラムはいつまで経っても終わらないプログラムと言われる。残念、使い物にならない。
つまり、リトライの粒度を大きくするほどリトライのオーバーヘッドは大きく膨らみ、全体としてスループットは低下する。このような場合では、個々の操作を冪等にし、細かい粒度でリトライできるようにする必要がある。1つの解決策として、一つ一つの操作に名前を付けることで、AppendにCreateに変更する方法がある:
1. レコードを10,000件含んだ部分データSplit-1を作る(Create)
2. レコードを10,000件含んだ部分データSplit-2を作る(Create)
3. レコードを10,000件含んだ部分データSplit-3を作る(Create)
...
50000. 作成したSplit-1〜Split-50000含んだテーブルTを作成する(Create)
しかしこのように、AppendやIncrementを冪等にする仕組みは大がかりになりがちなので、本当に細かい粒度でリトライする必要があるかどうかは熟考が必要。場合によっては最終奥義『トランザクション』に頼った方が良いケースもある。いかにシンプルで妥当な方法を見つけ出すかは、プログラマの腕の見せ所かもしれない。
パターン5:操作ログとリクエストIDでUPDATEを冪等にする
*1:あわせて読んでおきたいコメント:https://twitter.com/kazuho/status/476942055373934595 https://twitter.com/frsyuki/status/476944444676001793
*2:逆に言えば、以前の状態を無視して(依存せずに)新しいセットを宣言する操作は、簡単に冪等にできる。そういうAPIには、操作後の状態を単純に宣言する名前を付けることができる。例えば setXyz、enableXyz、ensureXyzIsCreated、ensureXyzIsDeleted などのようになる。
リトライと冪等性のデザインパターン
リトライを肴に一晩酒が飲める古橋です。
大規模なデータに触れることが日常茶飯事になっている今日この頃。この分野のおもしろいところは、いつまで経っても終わらないプログラムを簡単に作れてしまうことかもしれません。エラー処理、リトライそして冪等性*1の3つを抑えていないプログラムは、小規模なデータなら問題ないが、データ量が多くなると使い物にならなくなる可能性が大です。
大規模データをバッチ処理するケース以外でも、リトライは一般にプログラムの信頼性に関わる重要な問題です。
そんなわけで、リトライに関わるいくつかのデザインパターンを、連載でまとめておこうと思います*2。
では、第1回は背景から:
なぜリトライが必要なのか
プログラムは色々な理由で失敗する。例えば、
- A) 通信先のプログラムが高負荷すぎて応答できなかった
- B) メモリを消費しすぎてメモリ確保に失敗した。またはOOM KIllerに殺された
- C) 通信先のサーバがハードウェア的に落ちていた
- D) データの転送中にネットワークエラーが起きた
- E) 読み込んだデータが壊れていた
- F) 設定が間違っていた
- G) プログラムがバグっていた
など。一部の問題はプログラミングや運用の努力で未然に回避できる可能性がある。A)はキャパシティプランニングでミスっているか監視が足りていないし、B)はメモリ管理の実装が甘い。
とは言え、正直なところそこまで完璧に実装していられないし、ハードウェア故障や想定できなかった突発的な高負荷など、どうしようも無いことも多い*3。
堅牢なプログラムを手早く作ろうとすると、リトライでカバーするのが妥当なケースは数多い。
リトライと冪等性
しかし、不用意にリトライすると問題が起きる。具体的には、こんなことが起こりうる:
- クライアントが要求を発行した。
- サーバはリクエストを正常に受け付けたが、高負荷過ぎてすぐには処理できなかった。
- クライアントがリトライした。
- 同じリクエストが2度実行されてしまった。
例えば、新しいアイテムを作り、そのIDを返すという要求を不用意にリトライすると、サーバの負荷やネットワークエラーなどの状況によって、同じアイテムが1度に2つ作成されてしまったりする可能性がある。これではマズい。
そこで、リトライを実装する場合には処理を冪等にすることが重要になる。すなわち、同じ要求を複数回行っても結果が同じになるようにする。
例えば、新しいアイテムを作る処理は、ID=xyzで識別される新しいアイテムを作成する という処理に変更できないだろうか? これなら、同じ要求を何回繰り返しても同じアイテムが2つ作成されてしまうことは無い。
冪等にするやり方はアプリケーションによって変わってくる。トランザクションの粒度を考えるのと同じように、それなりに大きな処理をまとめないと冪等にできないことも多い。そこでここでは、リトライと冪等性に関するいくつかのパターンをまとめてみる。
パターン1:IDを付けてCREATEを冪等にする
CRUDのC(Create)を冪等にするには、リソースに一意な名前を付ければいい。前節で挙げた新しいアイテムを作成する処理は、IDが既に存在したらエラーを返すようにサーバを実装しておけば、クライアントは安全にリトライできる。
クライアント側のエラー処理は少し難しい。なぜなら、クライアントが「同じIDが既に存在する(ので新しいリソースを作成できない)」というエラーを受け取ったとき、以前から同じIDが存在していたのが原因なのか、リトライした結果として重複してしまったのかを区別することができない。これはケースバイケースで対処するしか無い。無視しても良いし、警告をレポートしても良いが、決定性なエラーなのでリトライしてはいけない。
言い換えれば、Create系のAPIは、HTTPの409 Conflictに相当するようなエラーコードを定義しておき、クライアントはそれを他のエラーとは区別して扱う必要がある。クライアント側の実装は後回しにしてもいいが、もし何か新しいAPIセットを設計することがあれば、エラーコードや例外クラスにConflictを含めておいた方がいい。
パターン2:エラーを区別してDELETEを冪等にする
デシリアライズ速度の比較 ByteBuffer vs DirectBuffer vs Unsafe vs C
OpenJDK や Hotspot VM には sun.misc.Unsafe という内部APIがあり*1、これを使うと ByteBuffer.getInt や ByteBuffer.getLong よりも高速にバイト列から整数値をデコードできるという。これを駆使することで、Cで実装された拡張ライブラリに匹敵する速度を出せるらしい。
それが本当なら、データ圧縮やハッシュ関数、シリアライザ/デシリアライザなどの実装を高速化できる。例えば、lz4 や xxhash のJava実装が Unsafe API を使用している*2:jpountz/lz4-java
Prestoも、中間データのシリアライズ/デシリアライズにはすべて Unsafe API を使っている*3。
そこで、実際にベンチマークしてみた。
ベンチマーク内容
結果
- ByteBuffer heap: new byte[…]で確保したメモリを、ByteBuffer.wrapして整数値をデコード
- ByteBuffer direct: ByteBuffer.allocateDirectで確保したメモリから、整数値をデコード
- Unsafe heap: new byte[…]で確保したメモリから、Unsafe APIを使って整数値をデコード
- Unsafe direct: ByteBuffer.allocateDirectで確保したメモリから、Unsafe APIを使って整数値をデコード
- C shift: ビットシフトを使って、バイト列から整数値をデコード
- C load: memcpyとエンディアン変換関数(bswap_64、__DARWIN_OSSwapInt64、_byteswap_uint64など)を使って、バイト列から整数値をデコード
確かにUnsafeを使うとCに匹敵する性能が出る。これは予想以上に速い。
一方でヒープメモリからのByteBuffer.getIntは、かなり遅いことが分かる。Unsafe APIではヒープメモリでもdirect bufferでも差はないので、ヒープメモリからのByteBufferをUnsafeに変更すれば、2倍近く性能が向上することになる。
Cにはstrict-aliasingルールがあるので、バイト列から整数値をデコードするコードは複雑になる。コンパイラが賢いので冗長に見える割には速度が出るのだが、ポータブルで安全なコードを書くのは結構難しい。strict-aliasingルールについて詳しくは:
実行環境
- OS: Mac OS X 10.8.5
- CPU: Intel Core i7 2.7 GHz
- Memory: 16GB DDR3 1600MHz
- Oracle JDK 1.7.0 u45 b18 Hotspot 64-Bit Server VM
- gcc-llvm 4.2.1 (i686-apple-darwin11-llvm-gcc-4.2 (GCC) 4.2.1 (Based on Apple Inc. build 5658) (LLVM build 2336.11.00))
===
後日追記:あわせて読みたい:何故JVM(HotSpot)のUnsafe APIは速いのか
*1:Dalvikにあるかどうかは調査していない。誰か教えてください。
*2:ちなみにlz4とxxhashの開発者は同一人物。
*3:Prestoが依存している airlift/slice ライブラリがUnsafe APIをラップしたクラスを実装している。airliftを開発しているのはPrestoと同じメンバー。
Presto 0.54を1台のサーバで起動する
Presto | Distributed SQL Query Engine for Big Data v0.54 を1台のサーバ上で動かす手順です。
Mac OS X でも動きます(と言うよりMac OS Xを想定しています)
JDK 7 が必要になるので、事前にインストールしておいてください。Oracle JDK(Mac OS X版)を使っていますが、OpenJDKでも動くはずです。
もし読むのが面倒なら、これを実行すればOKです:https://gist.github.com/frsyuki/8001572
githubからソースコードをcloneする
git clone https://github.com/facebook/presto.git cd presto git checkout v0.54
JDKのバージョンを7にする
export JAVA_HOME="$(/usr/libexec/java_home -v 1.7.0)"
presto-serverをビルドする
version="$(xpath pom.xml "/project/version/text()")" pushd presto-server mvn package assembly:assembly -DdescriptorId=bin -Dtest=skip -DfailIfNoTests=false popd
他のコンポーネントを改変したい場合は、presto-server/ディレクトリではなくルートディレクトリでビルド(assembly:assembly)する必要があります。
これで presto-server/target/presto-server-0.54-SNAPSHOT.tar.gz ファイルができあがるので、presto-deploy/ ディレクトリに展開しておきます:
rm -rf presto-deploy mkdir -p presto-deploy pushd presto-deploy tar zxvf "../presto-server/target/presto-server-$version.tar.gz"
設定ファイルを書いたりする
最終的には、presto-discovery, presto-coordinator, presto-worker の3つのプロセスを動かします。今回は、それぞれにディレクトリを作ることにします。
それぞれのディレクトリには etc/ ディレクトリを作り、設定ファイルを置いておく必要があります。ここにサンプルの設定を固めた物を置いておきました:
config.tar.gz
これを展開、presto-coordinator, presto-worker ディレクトリを作成します:
wget https://gist.github.com/frsyuki/8001572/raw/c4524795d31a23f37365b0ad850c08899614ab98/config.tar.gz tar zxvf config.tar.gz cp -a presto-server-$version/* presto-coordinator/ cp -a presto-server-$version/* presto-worker/ rm -rf presto-server-$version
discovery-server を配備する
Prestoを起動するために必要なコンポーネントである discovery-server は、Prestoとは完全に疎結合化されており、別プロジェクトからリリースされています。これをダウンロード & 展開します:
curl "http://search.maven.org/remotecontent?filepath=io/airlift/discovery/discovery-server/1.16/discovery-server-1.16.tar.gz" | tar zxvf - mv discovery-server-*/* discovery-server/ rm -rf discovery-server-*
起動する
3つのプロセスを起動します:
cd discovery-server
./bin/launcher run
cd presto-coordinator
./bin/launcher run
cd presto-worker
./bin/launcher run
クライアントを起動する
これで走ります:
java -jar presto-cli-executable.jar --server http://127.0.0.1:8880/ --catalog native --schema default
SELECT 1 などが動きます。
Next step
サンプルの設定では jmx コネクタと native コネクタを有効化してあります。
HDFSからデータを読み出すには hive-hadoop1 または hive-cdh4 コネクタを有効化する必要があります。
Running Presto v0.54 on a single server
This is a procedure to run Presto | Distributed SQL Query Engine for Big Data v0.54 on a single server.
Presto runs on Mac OS X (rather I assume Mac OS X).
You need to install JDK 7 in advance. I used Oracle JDK (for Mac OS X) but OpenJDK should also work well.
If you don't have time to read this short article, you can simplly run this script: https://gist.github.com/frsyuki/8001572
Clone source code from Github
git clone https://github.com/facebook/presto.git cd presto git checkout v0.54
Build presto-server
version="$(xpath pom.xml "/project/version/text()")" pushd presto-server mvn package assembly:assembly -DdescriptorId=bin -Dtest=skip -DfailIfNoTests=false popd
If you modify other components, you need to build (run assembly:assembly) on the root directory of presto instead of presto-server directory.
OK, I got presto-server/target/presto-server-0.54-SNAPSHOT.tar.gz file. Extract it to presto-deploy/ directory:
rm -rf presto-deploy mkdir -p presto-deploy pushd presto-deploy tar zxvf "../presto-server/target/presto-server-$version.tar.gz"
Writing configuration files
Presto needs at least 3 processes: presto-discovery, presto-coordinator, and presto-worker. Here creates one directory for each processes.
Each directory needs ./etc/ directory that contains some config files. I uploaded tar.gz of the files here: config.tar.gz
Extract it and create presto-coordinator and presto-worker directories:
wget https://gist.github.com/frsyuki/8001572/raw/c4524795d31a23f37365b0ad850c08899614ab98/config.tar.gz tar zxvf config.tar.gz cp -a presto-server-$version/* presto-coordinator/ cp -a presto-server-$version/* presto-worker/ rm -rf presto-server-$version
Deploy discovery-server
Presto depends on a process called discovery-server but it's completely separated from Presto itself. Another OSS project releases it. Download it & extract:
curl "http://search.maven.org/remotecontent?filepath=io/airlift/discovery/discovery-server/1.16/discovery-server-1.16.tar.gz" | tar zxvf - mv discovery-server-*/* discovery-server/ rm -rf discovery-server-*
Launch
I ran 3 processes:
./discovery-server/bin/launcher run
./presto-coordinator/bin/launcher run
./presto-worker/bin/launcher run
Run client
java -jar presto-cli-executable.jar --server http://127.0.0.1:8880/ --catalog native --schema default
That's it. You can run SQL like SELECT 1.
Next step
The sample configuration I uploaded enables jmx connector and native connector.
You need to enable hive-hadoop1 or hive-cdh4 connector to read data from HDFS.
Fluentd Casual Talks #3
Fluentd Casual Talks #3 でしゃべってきました。会場提供していただいた DeNA さん、主催の@tagomorisさんありがとうございましたm(_ _)m
Ustreamで録画されているので、見逃した人はここで見られます:
v11 preview1リリース
fluentd v11 perview1 をリリースしました。
会場でもデモしましたが、インストール方法は:
gem install fluentd -v v0.11.0.preview1
です。まだαリリースな状態ですが、とりあえず起動すると何が起こるのか分かる状況にはなっています。
コードはこちら:https://github.com/fluent/fluentd/tree/v11
データの更新履歴をRDBMSからfluentdに流すfluent-plugin-sql
Fluentd Advent Calendar 9日目。担当の古橋です。
Fluentd v11の情報は Fluentd Casual Talks #3 at :D でお話しすることにして、今回はFluentdの大幅な性能向上を可能にするMultiprocessプラグインを紹介…しようと思っていたら@niku4i さんに先を越されてしまったので!今回はSQL inputプラグインを紹介します。
SQL inputプラグインとは?
SQL inputプラグインは、SELECT文を定期的に実行することで、RDBMSから最近更新されたレコードや最近追加されたレコードを定期的に取り出してFluentdに流すことができるプラグインです。内部では"前回読み出したレコード"を記憶しており、前回読み出したタイミングより後になって更新/追加されたレコードを定期的に読み出します。
何に使える?
例えば、ユーザー情報を保存するテーブルがあったとします。アプリケーションはレコードを更新するたびにupdated_atカラムを更新することにします。ここでSQL inputプラグインを使えば、ユーザー情報の更新ログをFluentdに流し込むことができます。もちろんその後、HDFSに書き込んだり、Kibana+ElasticSearchに保存して検索可能にしたりできますね:
設定
先述の例の場合、次のような設定ファイルになります:
<source> type sql # RDBMSの設定 adapter mysql2 host localhost database app_test username root password xyz # SELECTを実行する間隔 select_interval 6s # 1回のSELECTで読み出すレコード数 select_limit 500 # 最後に読み込んだレコードの保存先 state_file /var/run/fluentd/sql_state # 対象のテーブル <table> table users tag users.updated # 更新判定に使うカラム update_column updated_at # ログの時刻に使うカラム time_column updated_at </table> </source>
update_columnパラメータに更新判定に使うカラムを指定してください。ここにAUTO INCREMENTなプライマリキーなどを指定すると、最近更新されたレコードの代わりに、最近追加されたレコードを読み出すことができます。
対応しているRDBMS
実装にはActiveRecordを使っているので、ActiveRecordが対応しているRDBMSなら使えます。
fluent-plugin-sql gemはデフォルトでMySQL(mysql2)とPostgreSQLのドライバに依存しているので、これらのRDBMSであれば追加でgemをインストールせずに使えます。
インデックス必須!
更新判定に使うカラム(update_column)には、インデックスが設定されていないと、毎回フルテーブルスキャンが走ることになるので注意しましょう。
次は、@k1LoW さんです!:しまった!!いきなり「Apacheのアクセスログを提出して欲しい」と言われたら? (Fluentd Advent Calendar 2013 Day10)