KafkaでSyslogを受け取らせようとしてるのだが、CentOS 7からKafkaにログを送る際rsyslogのバージョンが7系なので、そのまま送るのはちょっと厳しい(rsyslog 8.x系だったらomkafkaモジュールが使える)。 さすがにそんな気軽にほいほいrsyslogのバージョン変更するのもアレなので、定番であるfluentdをあいだに入れてやることにする。

一応、fluentdではsyslogでログを受け付けることができるのだが、udpしか受け付けられないのでログの暗号化ができない。 このため、localhostのrsyslogからログを受け付けてやるようにする。

イメージ的には以下のような感じ。 kafkaと同居させて、rsyslogサーバで受け付けたログをkafkaに渡してやるようにする。 rsyslogで受け付けたログをファイルに書き出して読み込むことでも実現可能なのだが、ログフォーマットが一定である必要があるため、inputでtype syslogを使ってlocalhost内でログを送信させる。 (ASCII Flow Infinity便利だわやっぱ)

+-----------------------------------------------------------------+
                  |--------------------+ +-------------------+ +--------------------|
                  ||                   | |                   | |                   ||
+-----------+     || +---------------+ | | +---------------+ | | +---------------+ ||
|  rsyslog  | --> || |    rsyslog    | | | |    rsyslog    | | | |    rsyslog    | ||
+-----------+     || +-------+-------+ | | +-------+-------+ | | +-------+-------+ ||
                  ||         |         | |         |         | |         |         ||
                  ||         v         | |         v         | |         v         ||
                  || +-------+-------+ | | +-------+-------+ | | +-------+-------+ ||
                  || |    fluentd    | | | |    fluentd    | | | |    fluentd    | ||
                  || +-------+-------+ | | +-------+-------+ | | +-------+-------+ ||
                  ||         |         | |         |         | |         |         ||
                  ||         v         | |         v         | |         v         ||
                  || +-------+-------+ | | +-------+-------+ | | +-------+-------+ ||
                  || |     kafka     | | | |     kafka     | | | |     kafka     | ||
                  || +---------------+ | | +---------------+ | | +---------------+ ||
                  ||                   | |                   | |                   ||
                  |--------------------+ +-------------------+ +--------------------|
                  +-----------------------------------------------------------------+

1. fluentdのインストール・設定

fluentdのインストールは簡単で、インストールスクリプトがあるからそれをキックするだけで入れられる。 この段階で、インストール後に必要となるプラグインについても入れてしまおう。

curl -L http://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
yum install -y gcc
/opt/td-agent/embedded/bin/gem install fluent-plugin-kafka
/opt/td-agent/embedded/bin/gem install zookeeper

td-agentという名称でインストールされているので注意。 /opt配下にもいろいろファイルが配置されるが、設定ファイルは/etc/td-agent/td-agent.confになる。 こちらを編集して設定してやる。

今回の場合はsyslogで受け付けてkafkaで渡してやるので、以下の記述を追記してやろう。 rsyslog側でログファイルに出力するので、そのファイルを読み込む設定とする。

/etc/td-agent/td-agent.conf
<source> @type syslog port 5140 bind 127.0.0.1 tag syslog </source> <match syslog> @type kafka zookeeper localhost:2181 default_topic syslog </match>

設定ファイル編集後、サービスを起動する。

systemctl start td-agent
systemctl enable td-agent

2. rsyslogの設定

fluentdのインストールをしたら、rsyslogの設定を変更してrsyslogサーバとして動作、およびlocalhostのfluentdのinputに飛ばすようにする。 (今回は、後で暗号化もできるようにtcpで受信するようにする)。

以下、設定例。

/etc/rsyslog.conf
# Provides TCP syslog reception $ModLoad imtcp # コメントアウト解除 $InputTCPServerRun 514 # コメントアウト解除 ~ 略 ~ # ### begin forwarding rule ### # The statement between the begin ... end define a SINGLE forwarding # rule. They belong together, do NOT split them. If you create multiple # forwarding rules, duplicate the whole block! # Remote Logging (we use TCP for reliable delivery) # # An on-disk queue is created for this action. If the remote host is # down, messages are spooled to disk and sent when it is up again. $ActionQueueFileName fwdRule1 # unique name prefix for spool files # コメントアウト解除 $ActionQueueMaxDiskSpace 1g # 1gb space limit (use as much as possible) # コメントアウト解除 $ActionQueueSaveOnShutdown on # save messages to disk on shutdown # コメントアウト解除 $ActionQueueType LinkedList # run asynchronously # コメントアウト解除 $ActionResumeRetryCount -1 # infinite retries if host is down # コメントアウト解除 # remote host is: name/ip:port, e.g. 192.168.0.1:514, port optional *.* @localhost:5140 # コメントアウト解除・向き先変更 # ### end of the forwarding rule ###

設定ファイル編集後、rsyslogを再起動する。

systemctl restart rsyslog

3. ログの受信を確認する

さて、これでrsyslogサーバにsyslogを送ることで、同居しているKafkaクラスタにログが保管されるようになった。 試しにkafkacatで取得した値がこちら(BS-PUB-CENT7-01ってのが)。

{"host":"BS-PUB-CENT7-01","ident":"root","message":"test12"}
{"host":"BS-PUB-CLUSTERTEST01","ident":"kafka-server-start.sh","message":"[2017-01-28 12:11:21,363] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)"}
{"host":"BS-PUB-CLUSTERTEST01","ident":"kafka-server-start.sh","message":"[2017-01-28 12:21:21,363] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)"}
{"host":"BS-PUB-CENT7-01","ident":"root","message":"test123"}

このように、JSON形式で保存されている。 タイムスタンプは保持されてないので、Kafkaの次工程のシステム(Graylogとか)が取得したタイミングになるのかな? あと、大量(1000件くらい)のログを一気に送ってみたところ、どうもrsyslogまでは受け取れてるのだが、td-agent→Kafkaとそれぞれの過程でいくつか取りこぼしが発生していた。 rsyslog→td-agentについては、普通にtailでファイルを読み込んであげたほうがいいかもしれない。