部署kafka集群

主机名(角色)IP地址
zk-0110.10.1.10
zk-0210.10.1.11
zk-0310.10.1.12

必要条件

#关闭swap
swapoff  #临时关闭
sed -i 's#^\/dev\/mapper\/centos-swap#\#\/dev\/mapper\/centos-swap#' /etc/fstab #永久关闭
#安装jdk
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
tar -xf jdk-17_linux-x64_bin.tar.gz
mv jdk-17_linux-x64_bin.tar.gz /usr/local/jdk-17
echo "PATH=/usr/local/jdk-17/bin:$PATH" >> /etc/profile
source /etc/profile
java -version

第一步:下载并解压安装包

#下载zookeeper安装包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3.tar.gz
#下载kafka安装包
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/3.0.0/kafka_2.12-3.0.0.tgz
#解压zk安装包并放在指定目录
tar -xf apache-zookeeper-3.6.3.tar.gz
mv apache-zookeeper-3.6.3  /usr/local/zookeeper
#解压kafka安装包并放在指定目录
tar -xf kafka_2.12-3.0.0.tgz
mv kafka_2.12-3.0.0. /usr/local/kafka

第二步:修改配置文件

#修改zk配置文件zoo.cfg
tickTime=2000
dataDir=/data/zookeeper
dataLogDir=/data/zookeeper/logs
maxClientCnxns=0
clientPort=2181
initLimit=5
syncLimit=2
autopurge.purgeInterval=1
autopurge.snapRetainCount=10
server.1=10.10.1.10:2888:3888
server.2=10.10.1.11:2888:3888
server.3=10.10.1.12:2888:3888
#配置myid
echo '1' > myid  #不同主机配置不同的myid 其他两个节点配置2,3

#修改kafka配置文件server.properties
broker.id=1      #id范围1-255 3节点通常选择1,2,3
broker.rack=dc1


listeners=PLAINTEXT://:9092  #暴露端口
advertised.listeners=PLAINTEXT://10.10.1.10:9092

compression.type=producer   #数据压缩类型为生产模式

auto.create.topics.enable=true   #开启自动创建topic
unclean.leader.election.enable=false  #增加可靠性
controlled.shutdown.enable=true    
delete.topic.enable=true     #开启可以删除topic

group.initial.rebalance.delay.ms=3000
group.max.session.timeout.ms=900000

default.replication.factor=2
offsets.topic.replication.factor=3

num.partitions=2  #配置分区
num.io.threads=12 #实际处理网络请求的线程数
num.network.threads=64   #broker后台转发网络请求的线程数
num.recovery.threads.per.data.dir=2

replica.fetch.max.bytes=104857600
message.max.bytes=104857600  #broker最大能接收多少请求

socket.receive.buffer.bytes=102400  #socket接收缓冲
socket.request.max.bytes=104857600  #socket最大请求字节
socket.send.buffer.bytes=102400  #socket发送缓冲

log.segment.bytes=67108864
log.segment.delete.delay.ms=1000
log.retention.hours=72  #日志保留时长
log.retention.check.interval.ms=60000
log.retention.bytes= 500000  #按保存的日志文件大小,当日志大小超过设置数字,超过的日志数据将删除
log.dirs=/data/kafka/data   #日志目录

zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000

第三步:配置启动脚本

#zookeeper启动脚本
[Unit]
Description=Zookeeper Daemon
Documentation=http://zookeeper.apache.org
Requires=network.target
After=network.target

[Service]    
Type=forking
WorkingDirectory=/usr/local/zookeeper
User=root
Group=root

Environment=ZOO_LOG_DIR=/data/zookeeper/logs
Environment=ZK_SERVER_HEAP=1000
Environment=ZK_CLIENT_HEAP=256
Environment="JVMFLAGS=-Djava.net.preferIPv4Stack=true"

ExecStart=/usr/local/zookeeper/bin/zkServer.sh start /data/zookeeper/config/zoo.cfg
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop /data/zookeeper/config/zoo.cfg
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart /data/zookeeper/config/zoo.cfg

TimeoutSec=180
Restart=on-failure
SuccessExitStatus=143

LimitCORE=infinity
LimitNOFILE=655360
LimitNPROC=655360

[Install]
WantedBy=default.target

# kafka启动脚本
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]    
Type=simple
WorkingDirectory=/usr/local/kafka
User=root
Group=root

Environment="KAFKA_HEAP_OPTS=-Xmx6G -Xms6G"
Environment=LOG_DIR=/data/kafka/logs

ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

TimeoutSec=300
Restart=on-failure
SuccessExitStatus=143

LimitCORE=infinity
LimitNOFILE=655360
LimitNPROC=655360

[Install]
WantedBy=default.target

第四步:启动kafka集群

systemctl daemon-reload
systemctl start zookeeper
systemctl start kafka
systemctl enable zookeeper
systemctl enable kafka
#查看启动状态
systemctl status kafka
systemctl status zookeeper

第五步:升级kafka集群

#server.properties配置文件新增
inter.broker.protocol.version=<当前版本号>
log.message.format.version=<当前版本号>
#下载升级的二进制包
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/3.0.0/kafka_<升级版本号>.tgz
#解压并覆盖到kafka目录
tar -xf https://www.apache.org/dyn/closer.cgi?path=/kafka/3.0.0/kafka_<升级版本号>.tgz
#替换前,先备份server.properties,这里不在赘述
mv -f kafka_<升级版本号> /usr/local/kafka
#kafka集群都替换后,将备份的配置文件替换到kafka目录,依次重启kafka
systemctl restart kafka
#修改配置文件
inter.broker.protocol.version=<升级版本号>
log.message.format.version=<升级版本号>
#依次重启集群kafka
systemctl restart kafka
此条目发表在kafka分类目录。将固定链接加入收藏夹。

发表评论