rsyslogでomkafkaを使用して直接Kafkaにログを転送する

rsyslogには、バージョン8.7からはデフォルトでKafkaに連携するモジュール「omkafka」というものが用意されている。
今回は、これを使ってrsyslogから直接Kafkaにログを転送する。

なお、CentOS 7の場合だとrsyslogは7系が使われているので、別途8系をインストールしてやる必要があるようだ(まぁ、当たり前だがやる前にちゃんと事前にテストをするように…)。
rsyslogのバージョン8.7以降をインストールしたら、設定ファイル「/etc/rsyslog.conf」に以下の内容を記述してやる(brokerは複数指定可能)。

Sponsored Links

●/etc/rsyslog.conf

#### MODULES ####
module(load="imuxsock")
module(load="imklog")
module(load="imfile")
module(load="omkafka")
#### GLOBAL DIRECTIVES ####
# Use default timestamp format
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat
# File syncing capability is disabled by default. This feature is usually not required,
# not useful and an extreme performance hit
#$ActionFileEnableSync on
# Include all config files in /etc/rsyslog.d/
$IncludeConfig /etc/rsyslog.d/*.conf
#### RULES ####
# Log all kernel messages to the console.
# Logging much else clutters up the screen.
#kern.*                                                 /dev/console
# Log anything (except mail) of level info or higher.
# Don't log private authentication messages!
*.info;mail.none;authpriv.none;cron.none                /var/log/messages
# The authpriv file has restricted access.
authpriv.*                                              /var/log/secure
# Log all the mail messages in one place.
mail.*                                                  /var/log/maillog
# Log cron stuff
cron.*                                                  /var/log/cron
# Everybody gets emergency messages
*.emerg                                                 :omusrmsg:*
# Save news errors of level crit and higher in a special file.
uucp,news.crit                                          /var/log/spooler
# Save boot messages also to boot.log
local7.*                                                /var/log/boot.log
### omkafka setting ####
input(type="imfile"
File="/var/log/message"
Tag="syslog"
)
template(name="json_lines" type="list" option.json="on") {
constant(value="{")
constant(value="\"timestamp\":\"")
property(name="timereported" dateFormat="rfc3339")
constant(value="\",\"message\":\"")
property(name="msg")
constant(value="\",\"host\":\"")
property(name="hostname")
constant(value="\",\"severity\":\"")
property(name="syslogseverity-text")
constant(value="\",\"facility\":\"")
property(name="syslogfacility-text")
constant(value="\",\"syslog-tag\":\"")
property(name="syslogtag")
constant(value="\"}")
}
main_queue(
queue.workerthreads="1"      # threads to work on the queue
queue.dequeueBatchSize="100" # max number of messages to process at once
queue.size="10000"           # max queue size
)
action(
broker=["BS-PUB-CLUSTERTEST01.BLACKNON.LOCAL:9092","BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL:9092","BS-PUB-CLUSTERTEST03.BLACKNON.LOCAL:9092"]
type="omkafka"
topic="syslog"
template="json_lines"
)

 

なお、SELinuxが有効になっているとうまく動かないので、事前に無効化しておくこと。

setenforce 0
sed -i '/^SELINUX=/s/enforcing/disabled/g' /etc/selinux/config

 

設定完了後、サービスを再起動する。

systemctl restart rsyslog

 

これで、無事rsyslogから直接Kafkaにログが飛ぶようになった。

 


Written by blacknon

インフラ系のSE。一時期はプログラマ。 仮想化とオープンソースに興味あり。一日中寝てたい今日このごろ。 スペインとかで働きたいなぁ…(シエスタがあるので)

Leave a Comment

メールアドレスが公開されることはありません。

*