rsyslogでomkafkaを使用して直接Kafkaにログを転送する
Pocket

rsyslogには、バージョン8.7からはデフォルトでKafkaに連携するモジュール「omkafka」というものが用意されている。
今回は、これを使ってrsyslogから直接Kafkaにログを転送する。

なお、CentOS 7の場合だとrsyslogは7系が使われているので、別途8系をインストールしてやる必要があるようだ(まぁ、当たり前だがやる前にちゃんと事前にテストをするように…)。
rsyslogのバージョン8.7以降をインストールしたら、設定ファイル「/etc/rsyslog.conf」に以下の内容を記述してやる(brokerは複数指定可能)。

Sponsored Links

●/etc/rsyslog.conf

#### MODULES ####
module(load="imuxsock")
module(load="imklog")
module(load="imfile")
module(load="omkafka")

#### GLOBAL DIRECTIVES ####

# Use default timestamp format
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat

# File syncing capability is disabled by default. This feature is usually not required,
# not useful and an extreme performance hit
#$ActionFileEnableSync on

# Include all config files in /etc/rsyslog.d/
$IncludeConfig /etc/rsyslog.d/*.conf


#### RULES ####

# Log all kernel messages to the console.
# Logging much else clutters up the screen.
#kern.*                                                 /dev/console

# Log anything (except mail) of level info or higher.
# Don't log private authentication messages!
*.info;mail.none;authpriv.none;cron.none                /var/log/messages

# The authpriv file has restricted access.
authpriv.*                                              /var/log/secure

# Log all the mail messages in one place.
mail.*                                                  /var/log/maillog


# Log cron stuff
cron.*                                                  /var/log/cron

# Everybody gets emergency messages
*.emerg                                                 :omusrmsg:*

# Save news errors of level crit and higher in a special file.
uucp,news.crit                                          /var/log/spooler

# Save boot messages also to boot.log
local7.*                                                /var/log/boot.log


### omkafka setting ####

input(type="imfile"
  File="/var/log/message"
  Tag="syslog"
)

template(name="json_lines" type="list" option.json="on") {
  constant(value="{")
  constant(value="\"timestamp\":\"")
  property(name="timereported" dateFormat="rfc3339")
  constant(value="\",\"message\":\"")
  property(name="msg")
  constant(value="\",\"host\":\"")
  property(name="hostname")
  constant(value="\",\"severity\":\"")
  property(name="syslogseverity-text")
  constant(value="\",\"facility\":\"")
  property(name="syslogfacility-text")
  constant(value="\",\"syslog-tag\":\"")
  property(name="syslogtag")
  constant(value="\"}")
}

main_queue(
  queue.workerthreads="1"      # threads to work on the queue
  queue.dequeueBatchSize="100" # max number of messages to process at once
  queue.size="10000"           # max queue size
)

action(
  broker=["BS-PUB-CLUSTERTEST01.BLACKNON.LOCAL:9092","BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL:9092","BS-PUB-CLUSTERTEST03.BLACKNON.LOCAL:9092"]
  type="omkafka"
  topic="syslog"
  template="json_lines"
)

 

なお、SELinuxが有効になっているとうまく動かないので、事前に無効化しておくこと。

setenforce 0
sed -i '/^SELINUX=/s/enforcing/disabled/g' /etc/selinux/config

 

設定完了後、サービスを再起動する。

systemctl restart rsyslog

 

これで、無事rsyslogから直接Kafkaにログが飛ぶようになった。

 

Pocket

Written by blacknon

インフラ系のSE。一時期はプログラマ。 仮想化とオープンソースに興味あり。一日中寝てたい今日このごろ。 スペインとかで働きたいなぁ…(シエスタがあるので)

Leave a Comment

メールアドレスが公開されることはありません。