fluentdでKafkaからログのデータをConumerとしてPullしてくる

Kafkaからデータを取得する方法はいろいろとあるようだが、今回はfluentdでKafkaからデータをPullさせてみる。
基本的にはPushのときと変わらず、fluent-plugin-kafkaを用いる。一応、td-agentおよび必要となるプラグインのインストールコマンドは以下(CentOS 7)。

curl -L http://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
yum install -y gcc
/opt/td-agent/embedded/bin/gem install fluent-plugin-kafka
/opt/td-agent/embedded/bin/gem install zookeeper
Sponsored Links

Kafkaをsourceとした設定ファイルの記述方法は、以下のように記述する。
(ここでは、とりあえずmatch後の処理はファイルへ書き出しさせている)

●/etc/td-agent/td-agent.conf

<source>
@type kafka
brokers broker1:port,broker2:port,..
topics syslog
format json
</source>
<match *.**>
@type file
path /var/log/fluent/myapp
time_slice_format %Y%m%d
time_slice_wait 10m
time_format %Y/%m/%d %H:%M:%S.%z
compress gzip
utc
</match>

 

あとは、td-agentの再起動を行う。

systemctl restart td-agent

 

指定したPATHにログがちゃんと出力されていればOK。

[root@BS-PUB-CENT7-02 ~]# tail -F /var/log/fluent/myapp.20170131.b5476b8059d69dc91
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"zookeeper-server-start.sh","message":"[2017-02-01 07:49:00,000] INFO Expiring session 0x159d12390070117, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"zookeeper-server-start.sh","message":"[2017-02-01 07:49:00,001] INFO Processed session termination for sessionid: 0x159d12390070118 (org.apache.zookeeper.server.PrepRequestProcessor)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"zookeeper-server-start.sh","message":"[2017-02-01 07:49:00,001] INFO Processed session termination for sessionid: 0x159d12390070117 (org.apache.zookeeper.server.PrepRequestProcessor)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"kafka-server-start.sh","message":"[2017-02-01 07:49:00,181] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"kafka-server-start.sh","message":"[2017-02-01 07:49:00,182] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -> EndPoint(BS-PUB-CLUSTERTEST03.BLACKNON.LOCAL,9092,PLAINTEXT) (kafka.utils.ZkUtils)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"kafka-server-start.sh","message":"[2017-02-01 07:49:00,182] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"kafka-server-start.sh","message":"[2017-02-01 07:49:00,183] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"kafka-server-start.sh","message":"[2017-02-01 07:49:00,265] INFO New leader is 3 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)"}
2017/01/31 22:49:45.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST02","ident":"zookeeper-server-start.sh","message":"[2017-02-01 07:48:54,000] INFO Expiring session 0x159d12179d60013, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)"}
2017/01/31 22:49:45.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST02","ident":"zookeeper-server-start.sh","message":"[2017-02-01 07:48:54,001] INFO Processed session termination for sessionid: 0x159d12179d60013 (org.apache.zookeeper.server.PrepRequestProcessor)"}

 


Written by blacknon

インフラ系のSE。一時期はプログラマ。 仮想化とオープンソースに興味あり。一日中寝てたい今日このごろ。 スペインとかで働きたいなぁ…(シエスタがあるので)

Leave a Comment

メールアドレスが公開されることはありません。

*