CentOS 7にfluntedをインストールしてrsyslogでログを転送する
Pocket

KafkaでSyslogを受け取らせようとしてるのだが、CentOS 7からKafkaにログを送る際、rsyslogのバージョンが7系なのでそのまま送るのはちょっと厳しい(rsyslog 8.x系だったらomkafkaモジュールが使える)。
さすがにそんな気軽にほいほいrsyslogのバージョン変更するのもアレなので、定番であるfluentdをあいだに入れてやることにする。

一応、fluentdではsyslogでログを受け付けることができるのだが、udpしか受け付けられないのでログの暗号化ができない。このため、localhostのrsyslogからログを受け付けてやるようにする。
イメージ的には、以下のような感じ。kafkaと同居させて、rsyslogサーバで受け付けたログをkafkaに渡してやるようにする。rsyslogで受け付けたログをファイルに書き出して、それを読み込むことでも実現可能なのだが、ログフォーマットが一定である必要があるため、inputでtype syslogを使ってlocalhost内でログを送信させる。
(ASCII Flow Infinity便利だわやっぱ)

+-----------------------------------------------------------------+
|--------------------+ +-------------------+ +--------------------|
||                   | |                   | |                   ||
+-----------+     || +---------------+ | | +---------------+ | | +---------------+ ||
|  rsyslog  | --> || |    rsyslog    | | | |    rsyslog    | | | |    rsyslog    | ||
+-----------+     || +-------+-------+ | | +-------+-------+ | | +-------+-------+ ||
||         |         | |         |         | |         |         ||
||         v         | |         v         | |         v         ||
|| +-------+-------+ | | +-------+-------+ | | +-------+-------+ ||
|| |    fluentd    | | | |    fluentd    | | | |    fluentd    | ||
|| +-------+-------+ | | +-------+-------+ | | +-------+-------+ ||
||         |         | |         |         | |         |         ||
||         v         | |         v         | |         v         ||
|| +-------+-------+ | | +-------+-------+ | | +-------+-------+ ||
|| |     kafka     | | | |     kafka     | | | |     kafka     | ||
|| +---------------+ | | +---------------+ | | +---------------+ ||
||                   | |                   | |                   ||
|--------------------+ +-------------------+ +--------------------|
+-----------------------------------------------------------------+

 

1.fluentdのインストール・設定

fluentdのインストールは簡単で、インストールスクリプトがあるからそれをキックするだけで入れられる。
この段階で、インストール後に必要となるプラグインについても入れてしまおう。

curl -L http://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
yum install -y gcc
/opt/td-agent/embedded/bin/gem install fluent-plugin-kafka
/opt/td-agent/embedded/bin/gem install zookeeper

 

td-agentという名称でインストールされているので注意。/opt配下にもいろいろファイルが配置されるが、設定ファイルは/etc/td-agent/td-agent.confになる。
こちらを編集して設定してやる。

今回の場合、syslogで受け付けてkafkaで渡してやるので、以下記述を追記してやろう。
rsyslog側でログファイルに出力するので、そのファイルを読み込む設定とする。

●/etc/td-agent/td-agent.conf

<source>
@type syslog
port 5140
bind 127.0.0.1
tag syslog
</source>
<match syslog>
@type kafka
zookeeper localhost:2181
default_topic syslog
</match>

 

設定ファイル編集後、サービスを起動する。

systemctl start td-agent
systemctl enable td-agent
Sponsored Links

2.rsyslogの設定

fluentdのインストールをしたら、rsyslogの設定を変更してrsyslogサーバとして動作、およびlocalhostのfluentdのinputに飛ばすようにする。
(今回は、後で暗号化もできるようにtcpで受信するようにする)。

以下、設定例。

●/etc/rsyslog.conf

# Provides TCP syslog reception
$ModLoad imtcp # コメントアウト解除
$InputTCPServerRun 514 # コメントアウト解除
~ 略 ~
# ### begin forwarding rule ###
# The statement between the begin ... end define a SINGLE forwarding
# rule. They belong together, do NOT split them. If you create multiple
# forwarding rules, duplicate the whole block!
# Remote Logging (we use TCP for reliable delivery)
#
# An on-disk queue is created for this action. If the remote host is
# down, messages are spooled to disk and sent when it is up again.
$ActionQueueFileName fwdRule1 # unique name prefix for spool files        # コメントアウト解除
$ActionQueueMaxDiskSpace 1g   # 1gb space limit (use as much as possible) # コメントアウト解除
$ActionQueueSaveOnShutdown on # save messages to disk on shutdown         # コメントアウト解除
$ActionQueueType LinkedList   # run asynchronously                        # コメントアウト解除
$ActionResumeRetryCount -1    # infinite retries if host is down          # コメントアウト解除
# remote host is: name/ip:port, e.g. 192.168.0.1:514, port optional
*.* @localhost:5140 # コメントアウト解除・向き先変更
# ### end of the forwarding rule ###

 

設定ファイル編集後、rsyslogを再起動する。

systemctl restart rsyslog

 

3.ログの受信を確認する

さて、これでrsyslogサーバにsyslogを送ることで、同居しているKafkaクラスタにログが保管されるようになった。
試しにkafkacatで取得した値がこちら(BS-PUB-CENT7-01ってのが)。

{"host":"BS-PUB-CENT7-01","ident":"root","message":"test12"}
{"host":"BS-PUB-CLUSTERTEST01","ident":"kafka-server-start.sh","message":"[2017-01-28 12:11:21,363] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)"}
{"host":"BS-PUB-CLUSTERTEST01","ident":"kafka-server-start.sh","message":"[2017-01-28 12:21:21,363] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)"}
{"host":"BS-PUB-CENT7-01","ident":"root","message":"test123"}

 

このように、JSON形式で保存されている。
タイムスタンプは保持されてないので、Kafkaの次工程のシステム(Graylogとか)が取得したタイミングになるのかな?
あと、大量(1000件くらい)のログを一気に送ってみたところ、どうもrsyslogまでは受け取れてるのだが、td-agent→Kafkaとそれぞれの過程でいくつか取りこぼしが発生していた。rsyslog→td-agentについては、普通にtailでファイルを読み込んであげたほうがいいかもしれない。

 

Pocket

Written by blacknon

インフラ系のSE。一時期はプログラマ。 仮想化とオープンソースに興味あり。一日中寝てたい今日このごろ。 スペインとかで働きたいなぁ…(シエスタがあるので)

Leave a Comment

メールアドレスが公開されることはありません。

*