仕事でPub-Subのシステム構築が必要になりそうなので、この用途でのデファクトスタンダートとなってきてるKafkaクラスタをCentOS 7で構築してみる。 構成は3台構成、ZooKeeperとの同居(こちらも3台構成)とする。 ホスト名はとりあえずBS-PUB-CLUSTERTEST01~03としている。

1. 前提パッケージのインストール

Kafkaを動作させるにはJavaが必要なようなので、事前にOpenJDKのインストールをする。 以下のコマンドを実行する。

yum install -y java-1.8.0-openjdk.x86_64

JAVA_HOME、JRE_HOMEの環境変数を設定する。

cat << EOF >> /etc/profile
export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk
export JRE_HOME=/usr/lib/jvm/jre
EOF
source /etc/profile

2. Kafkaのインストール・設定

JAVAの準備ができたら、Kafkaのダウンロードを行おう。 以下は現時点での最新版のため、都度こちらで最新版のリンクを確認すること。 また、ログを設置するディレクトリも作成してしまおう。

wget http://ftp.jaist.ac.jp/pub/apache/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
tar xzvf kafka_2.11-0.10.1.1.tgz
mv kafka_2.11-0.10.1.1 /opt/kafka
mkdir /opt/kafka/logs

ディレクトリ移動後、設定ファイル(/opt/kafka/config/server.properties)の編集を行う。 設定を編集する項目と値については、以下のようにする。

  • BS-PUB-CLUSTERTEST01

    broker.id=1
    log.dirs=/opt/kafka/logs
    zookeeper.connect=BS-PUB-CLUSTERTEST01.BLACKNON.LOCAL:2181,BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL:2181,BS-PUB-CLUSTERTEST03.BLACKNON.LOCAL:2181
  • BS-PUB-CLUSTERTEST02

    broker.id=2
    log.dirs=/opt/kafka/logs
    zookeeper.connect=BS-PUB-CLUSTERTEST01.BLACKNON.LOCAL:2181,BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL:2181,BS-PUB-CLUSTERTEST03.BLACKNON.LOCAL:2181
  • BS-PUB-CLUSTERTEST03

    broker.id=3
    log.dirs=/opt/kafka/logs
    zookeeper.connect=BS-PUB-CLUSTERTEST01.BLACKNON.LOCAL:2181,BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL:2181,BS-PUB-CLUSTERTEST03.BLACKNON.LOCAL:2181

設定ファイルを作成したら、以下のコマンドでサービスファイルを作成してしまおう。

  • /etc/systemd/system/kafka.service

    cat << EOF > /etc/systemd/system/kafka.service
    [Unit]
    Description=Apache Kafka server (broker)
    Documentation=http://kafka.apache.org/documentation.html
    Requires=network.target remote-fs.target
    After=network.target remote-fs.target kafka-zookeeper.service
    
    [Service]
    Type=simple
    User=root
    Group=root
    Environment=JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk
    ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
    ExecStop=/opt/kafka/bin/kafka-server-stop.sh
    
    [Install]
    WantedBy=multi-user.target
    EOF
  • /etc/systemd/system/kafka-zookeeper.service

    cat << EOF > /etc/systemd/system/kafka-zookeeper.service
    [Unit]
    Description=Apache Zookeeper server (Kafka)
    Documentation=http://zookeeper.apache.org
    Requires=network.target remote-fs.target 
    After=network.target remote-fs.target
    
    [Service]
    Type=simple
    User=root
    Group=root
    Environment=JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk
    ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
    ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
    
    [Install]
    WantedBy=multi-user.target
    EOF

ファイル作成後、以下のコマンドでサービスを起動する。

systemctl daemon-reload
systemctl start kafka-zookeeper
systemctl status kafka-zookeeper
systemctl start kafka
systemctl status kafka

[root@BS-PUB-CLUSTERTEST02 ~]# chown -R kafka. /opt/kafka/
[root@BS-PUB-CLUSTERTEST02 ~]# systemctl daemon-reload
[root@BS-PUB-CLUSTERTEST02 ~]# systemctl start kafka-zookeeper.service
[root@BS-PUB-CLUSTERTEST02 ~]# systemctl status kafka-zookeeper.service
● kafka-zookeeper.service - Apache Zookeeper server (Kafka)
   Loaded: loaded (/etc/systemd/system/kafka-zookeeper.service; disabled; vendor preset: disabled)
   Active: active (running) since 水 2017-01-25 00:39:08 JST; 4s ago
     Docs: http://zookeeper.apache.org
 Main PID: 17113 (java)
   CGroup: /system.slice/kafka-zookeeper.service
           mq17113 /usr/lib/jvm/jre-1.8.0-openjdk/bin/java -Xmx512M -Xms512M -server -XX:+Us...

 1月 25 00:39:09 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL zookeeper-server-start.sh[17113]: [201...
 1月 25 00:39:09 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL zookeeper-server-start.sh[17113]: [201...
 1月 25 00:39:09 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL zookeeper-server-start.sh[17113]: [201...
 1月 25 00:39:09 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL zookeeper-server-start.sh[17113]: [201...
 1月 25 00:39:09 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL zookeeper-server-start.sh[17113]: [201...
 1月 25 00:39:09 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL zookeeper-server-start.sh[17113]: [201...
 1月 25 00:39:09 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL zookeeper-server-start.sh[17113]: [201...
 1月 25 00:39:09 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL zookeeper-server-start.sh[17113]: [201...
 1月 25 00:39:09 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL zookeeper-server-start.sh[17113]: [201...
 1月 25 00:39:09 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL zookeeper-server-start.sh[17113]: [201...
Hint: Some lines were ellipsized, use -l to show in full.
[root@BS-PUB-CLUSTERTEST02 ~]# systemctl start kafka
[root@BS-PUB-CLUSTERTEST02 ~]# systemctl status kafka
● kafka.service - Apache Kafka server (broker)
   Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
   Active: active (running) since 水 2017-01-25 00:40:20 JST; 2s ago
     Docs: http://kafka.apache.org/documentation.html
 Main PID: 17353 (java)
   CGroup: /system.slice/kafka.service
           mq17353 /usr/lib/jvm/jre-1.8.0-openjdk/bin/java -Xmx1G -Xms1G -server -XX:+UseG1G...

 1月 25 00:40:23 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL kafka-server-start.sh[17353]: at scala...
 1月 25 00:40:23 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL kafka-server-start.sh[17353]: at scala...
 1月 25 00:40:23 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL kafka-server-start.sh[17353]: at kafka...
 1月 25 00:40:23 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL kafka-server-start.sh[17353]: at kafka...
 1月 25 00:40:23 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL kafka-server-start.sh[17353]: at kafka...
 1月 25 00:40:23 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL kafka-server-start.sh[17353]: at kafka...
 1月 25 00:40:23 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL kafka-server-start.sh[17353]: at kafka...
 1月 25 00:40:23 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL kafka-server-start.sh[17353]: at kafka...
 1月 25 00:40:23 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL kafka-server-start.sh[17353]: at kafka...
 1月 25 00:40:23 BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL kafka-server-start.sh[17353]: [2017-01...
Hint: Some lines were ellipsized, use -l to show in full.

ひとまず、これでサービスは起動した。 最後に、サービスを動作するにあたり、使用するポートを開放しておこう。

firewall-cmd --permanent --add-port=2181/tcp
firewall-cmd --permanent --add-port=9092/tcp
firewall-cmd --reload

3. 動作確認

さて、無事にサービスは起動したので、ちゃんと動くかどうか確かめてみよう。 簡単なテストには、kafkacatというツールを使うと楽だ。

とりあえず、(aptで入って楽なので)手元のUbuntuにインストールしてしまう。

sudo apt-get install kafkacat

インストール後、以下のコマンドでsyslogをZooKeeperに転送させてみよう。 ログの転送先をBS-PUB-CLUSTERTEST01、取得先をBS-PUB-CLUSTERTEST02にしてやっても、クラスタ化されているので問題なくログが取れることを確認する。

tail -f /var/log/syslog | kafkacat -P -t syslog -b BS-PUB-CLUSTERTEST01.BLACKNON.LOCAL:9092 &
logger test123
logger test456
kafkacat -C -t syslog -b BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL:9092
blacknon@BS-PUB-UBUNTU-01:~$ tail -f /var/log/syslog | kafkacat -P -t syslog -b BS-PUB-CLUSTERTEST01.BLACKNON.LOCAL:9092 &
[1] 19967
blacknon@BS-PUB-UBUNTU-01:~$ logger test123
blacknon@BS-PUB-UBUNTU-01:~$ logger test456
blacknon@BS-PUB-UBUNTU-01:~$ kafkacat -C -t syslog -b BS-PUB-CLUSTERTEST02.BLACKNON.LOCAL:9092
Jan 25 00:49:12 BS-PUB-UBUNTU-01 systemd[1]: Created slice User Slice of blacknon.
Jan 25 00:49:12 BS-PUB-UBUNTU-01 systemd[1]: Starting User Manager for UID 1000...
Jan 25 00:49:12 BS-PUB-UBUNTU-01 systemd[1]: Started Session 158 of user blacknon.
Jan 25 00:59:11 BS-PUB-UBUNTU-01 blacknon: test123
Jan 25 00:59:13 BS-PUB-UBUNTU-01 blacknon: test456
Jan 25 00:59:38 BS-PUB-UBUNTU-01 blacknon: test123
Jan 25 00:59:43 BS-PUB-UBUNTU-01 blacknon: test456

無事、動作してるようだ。 ちょこちょこと今後もいじってみようと思う。