Kafkaからデータを取得する方法はいろいろとあるようだが、今回はfluentdでKafkaからデータをPullさせてみる。 基本的にはPushのときと変わらず、fluent-plugin-kafkaを用いる。 一応、td-agentおよび必要となるプラグインのインストールコマンドは以下(CentOS 7)。

curl -L http://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
yum install -y gcc
/opt/td-agent/embedded/bin/gem install fluent-plugin-kafka
/opt/td-agent/embedded/bin/gem install zookeeper

Kafkaをsourceとした設定ファイルの記述方法は、以下のように記述する。 (ここでは、とりあえずmatch後の処理はファイルへ書き出しさせている)

/etc/td-agent/td-agent.conf
<source> @type kafka brokers broker1:port,broker2:port,.. topics syslog format json </source> <match *.**> @type file path /var/log/fluent/myapp time_slice_format %Y%m%d time_slice_wait 10m time_format %Y/%m/%d %H:%M:%S.%z compress gzip utc </match>

あとは、td-agentの再起動を行う。

systemctl restart td-agent

指定したPATHにログがちゃんと出力されていればOK。

[root@BS-PUB-CENT7-02 ~]# tail -F /var/log/fluent/myapp.20170131.b5476b8059d69dc91
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"zookeeper-server-start.sh","message":"[2017-02-01 07:49:00,000] INFO Expiring session 0x159d12390070117, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"zookeeper-server-start.sh","message":"[2017-02-01 07:49:00,001] INFO Processed session termination for sessionid: 0x159d12390070118 (org.apache.zookeeper.server.PrepRequestProcessor)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"zookeeper-server-start.sh","message":"[2017-02-01 07:49:00,001] INFO Processed session termination for sessionid: 0x159d12390070117 (org.apache.zookeeper.server.PrepRequestProcessor)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"kafka-server-start.sh","message":"[2017-02-01 07:49:00,181] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"kafka-server-start.sh","message":"[2017-02-01 07:49:00,182] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -&gt; EndPoint(BS-PUB-CLUSTERTEST03.BLACKNON.LOCAL,9092,PLAINTEXT) (kafka.utils.ZkUtils)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"kafka-server-start.sh","message":"[2017-02-01 07:49:00,182] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"kafka-server-start.sh","message":"[2017-02-01 07:49:00,183] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)"}
2017/01/31 22:49:39.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST03","ident":"kafka-server-start.sh","message":"[2017-02-01 07:49:00,265] INFO New leader is 3 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)"}
2017/01/31 22:49:45.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST02","ident":"zookeeper-server-start.sh","message":"[2017-02-01 07:48:54,000] INFO Expiring session 0x159d12179d60013, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)"}
2017/01/31 22:49:45.+0000       syslog  {"host":"BS-PUB-CLUSTERTEST02","ident":"zookeeper-server-start.sh","message":"[2017-02-01 07:48:54,001] INFO Processed session termination for sessionid: 0x159d12179d60013 (org.apache.zookeeper.server.PrepRequestProcessor)"}