主机名(角色) | IP地址 |
---|
zk-01 | 10.10.1.10 |
zk-02 | 10.10.1.11 |
zk-03 | 10.10.1.12 |
必要条件
#关闭swap
swapoff #临时关闭
sed -i 's#^\/dev\/mapper\/centos-swap#\#\/dev\/mapper\/centos-swap#' /etc/fstab #永久关闭
#安装jdk
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
tar -xf jdk-17_linux-x64_bin.tar.gz
mv jdk-17_linux-x64_bin.tar.gz /usr/local/jdk-17
echo "PATH=/usr/local/jdk-17/bin:$PATH" >> /etc/profile
source /etc/profile
java -version
第一步:下载并解压安装包
#下载zookeeper安装包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3.tar.gz
#下载kafka安装包
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/3.0.0/kafka_2.12-3.0.0.tgz
#解压zk安装包并放在指定目录
tar -xf apache-zookeeper-3.6.3.tar.gz
mv apache-zookeeper-3.6.3 /usr/local/zookeeper
#解压kafka安装包并放在指定目录
tar -xf kafka_2.12-3.0.0.tgz
mv kafka_2.12-3.0.0. /usr/local/kafka
第二步:修改配置文件
#修改zk配置文件zoo.cfg
tickTime=2000
dataDir=/data/zookeeper
dataLogDir=/data/zookeeper/logs
maxClientCnxns=0
clientPort=2181
initLimit=5
syncLimit=2
autopurge.purgeInterval=1
autopurge.snapRetainCount=10
server.1=10.10.1.10:2888:3888
server.2=10.10.1.11:2888:3888
server.3=10.10.1.12:2888:3888
#配置myid
echo '1' > myid #不同主机配置不同的myid 其他两个节点配置2,3
#修改kafka配置文件server.properties
broker.id=1 #id范围1-255 3节点通常选择1,2,3
broker.rack=dc1
listeners=PLAINTEXT://:9092 #暴露端口
advertised.listeners=PLAINTEXT://10.10.1.10:9092
compression.type=producer #数据压缩类型为生产模式
auto.create.topics.enable=true #开启自动创建topic
unclean.leader.election.enable=false #增加可靠性
controlled.shutdown.enable=true
delete.topic.enable=true #开启可以删除topic
group.initial.rebalance.delay.ms=3000
group.max.session.timeout.ms=900000
default.replication.factor=2
offsets.topic.replication.factor=3
num.partitions=2 #配置分区
num.io.threads=12 #实际处理网络请求的线程数
num.network.threads=64 #broker后台转发网络请求的线程数
num.recovery.threads.per.data.dir=2
replica.fetch.max.bytes=104857600
message.max.bytes=104857600 #broker最大能接收多少请求
socket.receive.buffer.bytes=102400 #socket接收缓冲
socket.request.max.bytes=104857600 #socket最大请求字节
socket.send.buffer.bytes=102400 #socket发送缓冲
log.segment.bytes=67108864
log.segment.delete.delay.ms=1000
log.retention.hours=72 #日志保留时长
log.retention.check.interval.ms=60000
log.retention.bytes= 500000 #按保存的日志文件大小,当日志大小超过设置数字,超过的日志数据将删除
log.dirs=/data/kafka/data #日志目录
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
第三步:配置启动脚本
#zookeeper启动脚本
[Unit]
Description=Zookeeper Daemon
Documentation=http://zookeeper.apache.org
Requires=network.target
After=network.target
[Service]
Type=forking
WorkingDirectory=/usr/local/zookeeper
User=root
Group=root
Environment=ZOO_LOG_DIR=/data/zookeeper/logs
Environment=ZK_SERVER_HEAP=1000
Environment=ZK_CLIENT_HEAP=256
Environment="JVMFLAGS=-Djava.net.preferIPv4Stack=true"
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start /data/zookeeper/config/zoo.cfg
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop /data/zookeeper/config/zoo.cfg
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart /data/zookeeper/config/zoo.cfg
TimeoutSec=180
Restart=on-failure
SuccessExitStatus=143
LimitCORE=infinity
LimitNOFILE=655360
LimitNPROC=655360
[Install]
WantedBy=default.target
# kafka启动脚本
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
WorkingDirectory=/usr/local/kafka
User=root
Group=root
Environment="KAFKA_HEAP_OPTS=-Xmx6G -Xms6G"
Environment=LOG_DIR=/data/kafka/logs
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
TimeoutSec=300
Restart=on-failure
SuccessExitStatus=143
LimitCORE=infinity
LimitNOFILE=655360
LimitNPROC=655360
[Install]
WantedBy=default.target
第四步:启动kafka集群
systemctl daemon-reload
systemctl start zookeeper
systemctl start kafka
systemctl enable zookeeper
systemctl enable kafka
#查看启动状态
systemctl status kafka
systemctl status zookeeper
第五步:升级kafka集群
#server.properties配置文件新增
inter.broker.protocol.version=<当前版本号>
log.message.format.version=<当前版本号>
#下载升级的二进制包
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/3.0.0/kafka_<升级版本号>.tgz
#解压并覆盖到kafka目录
tar -xf https://www.apache.org/dyn/closer.cgi?path=/kafka/3.0.0/kafka_<升级版本号>.tgz
#替换前,先备份server.properties,这里不在赘述
mv -f kafka_<升级版本号> /usr/local/kafka
#kafka集群都替换后,将备份的配置文件替换到kafka目录,依次重启kafka
systemctl restart kafka
#修改配置文件
inter.broker.protocol.version=<升级版本号>
log.message.format.version=<升级版本号>
#依次重启集群kafka
systemctl restart kafka