読者です 読者をやめる 読者になる 読者になる

データの更新履歴をRDBMSからfluentdに流すfluent-plugin-sql

Fluentd Advent Calendar 9日目。担当の古橋です。
Fluentd v11の情報は Fluentd Casual Talks #3 at :D でお話しすることにして、今回はFluentdの大幅な性能向上を可能にするMultiprocessプラグインを紹介…しようと思っていたら@niku4i さんに先を越されてしまったので!今回はSQL inputプラグインを紹介します。

SQL inputプラグインとは?

SQL inputプラグインは、SELECT文を定期的に実行することで、RDBMSから最近更新されたレコード最近追加されたレコードを定期的に取り出してFluentdに流すことができるプラグインです。内部では"前回読み出したレコード"を記憶しており、前回読み出したタイミングより後になって更新/追加されたレコードを定期的に読み出します。

SQL input plugin for Fluentd event collector - github

何に使える?

例えば、ユーザー情報を保存するテーブルがあったとします。アプリケーションはレコードを更新するたびにupdated_atカラムを更新することにします。ここでSQL inputプラグインを使えば、ユーザー情報の更新ログをFluentdに流し込むことができます。もちろんその後、HDFSに書き込んだりKibana+ElasticSearchに保存して検索可能にしたりできますね:

f:id:viver:20131209110556p:plain

設定

先述の例の場合、次のような設定ファイルになります:

<source>
  type sql

  # RDBMSの設定
  adapter mysql2
  host localhost
  database app_test
  username root
  password xyz

  # SELECTを実行する間隔
  select_interval 6s

  # 1回のSELECTで読み出すレコード数
  select_limit 500

  # 最後に読み込んだレコードの保存先
  state_file /var/run/fluentd/sql_state

  # 対象のテーブル
  <table>
    table users
    tag users.updated

    # 更新判定に使うカラム
    update_column updated_at

    # ログの時刻に使うカラム
    time_column updated_at
  </table>
</source>

update_columnパラメータに更新判定に使うカラムを指定してください。ここにAUTO INCREMENTなプライマリキーなどを指定すると、最近更新されたレコードの代わりに、最近追加されたレコードを読み出すことができます。

対応しているRDBMS

実装にはActiveRecordを使っているので、ActiveRecordが対応しているRDBMSなら使えます。
fluent-plugin-sql gemはデフォルトでMySQL(mysql2)とPostgreSQLのドライバに依存しているので、これらのRDBMSであれば追加でgemをインストールせずに使えます。

インデックス必須!

更新判定に使うカラム(update_column)には、インデックスが設定されていないと、毎回フルテーブルスキャンが走ることになるので注意しましょう。

次は、@k1LoW さんです!:しまった!!いきなり「Apacheのアクセスログを提出して欲しい」と言われたら? (Fluentd Advent Calendar 2013 Day10)