rsyslogでomkafkaを使用して直接Kafkaにログを転送する

rsyslogには、バージョン8.7からはデフォルトでKafkaに連携するモジュール「omkafka」というものが用意されている。 今回は、これを使ってrsyslogから直接Kafkaにログを転送する。

なお、CentOS 7の場合だとrsyslogは7系が使われているので、別途8系をインストールしてやる必要があるようだ(まぁ、当たり前だがやる前にちゃんと事前にテストをするように…)。 rsyslogのバージョン8.7以降をインストールしたら、設定ファイル「/etc/rsyslog.conf」に以下の内容を記述してやる(brokerは複数指定可能)。

/etc/rsyslog.conf
#### MODULES #### module(load="imuxsock") module(load="imklog") module(load="imfile") module(load="omkafka") #### GLOBAL DIRECTIVES #### # Use default timestamp format $ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat # File syncing capability is disabled by default. This feature is usually not required, # not useful and an extreme performance hit #$ActionFileEnableSync on # Include all config files in /etc/rsyslog.d/ $IncludeConfig /etc/rsyslog.d/*.conf #### RULES #### # Log all kernel messages to the console. # Logging much else clutters up the screen. #kern.* /dev/console # Log anything (except mail) of level info or higher. # Don't log private authentication messages! *.info;mail.none;authpriv.none;cron.none /var/log/messages # The authpriv file has restricted access. authpriv.* /var/log/secure # Log all the mail messages in one place. mail.* /var/log/maillog # Log cron stuff cron.* /var/log/cron # Everybody gets emergency messages *.emerg :omusrmsg:* # Save news errors of level crit and higher in a special file. uucp,news.crit /var/log/spooler # Save boot messages also to boot.log local7.* /var/log/boot.log ### omkafka setting #### input(type="imfile" File="/var/log/message" Tag="syslog" ) template(name="json_lines" type="list" option.json="on") { constant(value="{") constant(value="\"timestamp\":\"") property(name="timereported" dateFormat="rfc3339") constant(value="\",\"message\":\"") property(name="msg") constant(value="\",\"host\":\"") property(name="hostname") constant(value="\",\"severity\":\"") property(name="syslogseverity-text") constant(value="\",\"facility\":\"") property(name="syslogfacility-text") constant(value="\",\"syslog-tag\":\"") property(name="syslogtag") constant(value="\"}") } main_queue( queue.workerthreads="1" # threads to work on the queue queue.dequeueBatchSize="100" # max number of messages to process at once queue.size="10000" # max queue size ) action( broker=["BS-PUB-CLUSTERTEST01.BLACKNON.LOCAL:9092","BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL:9092","BS-PUB-CLUSTERTEST03.BLACKNON.LOCAL:9092"] type="omkafka" topic="syslog" template="json_lines" )

なお、SELinuxが有効になっているとうまく動かない場合があるので、検証であれば事前に無効化しておくこと。

setenforce 0
sed -i '/^SELINUX=/s/enforcing/disabled/g' /etc/selinux/config

設定完了後、サービスを再起動する。

systemctl restart rsyslog

これで、無事rsyslogから直接Kafkaにログが飛ぶようになった。