服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

云服务器|WEB服务器|FTP服务器|邮件服务器|虚拟主机|服务器安全|DNS服务器|服务器知识|Nginx|IIS|Tomcat|

服务器之家 - 服务器技术 - 服务器知识 - 云服务器(Linux)安装部署Kafka的详细过程

云服务器(Linux)安装部署Kafka的详细过程

2023-03-06 16:30shadc 服务器知识

这篇文章主要介绍了云服务器(Linux)安装部署Kafka的详细过程,kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8,本文给大家详细介绍感兴趣的朋友跟随小编一起看看吧

云服务器(Linux)安装部署Kafka

前期准备

kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8。

下载安装包

官网地址:

较新的版本已自带Zookeeper,无需额外下载。这里使用3.2.0做演示。

云服务器(Linux)安装部署Kafka的详细过程

注意要下载Binary downloads标签下的tgz包,Source download标签下的包为源码。无法直接运行,需要编译。

上载安装包到云服务器

使用ssh连接工具将kafka_2.12-3.2.0.tgz这个包上传到云服务器上的一个目录。

云服务器(Linux)安装部署Kafka的详细过程

打开命令行,进入到放有压缩包的目录,执行

  1. tar -zxvf kafka_2.12-3.2.0.tgz

配置kafka

然后使用cd命令进入到/kafka_2.12-3.2.0/config/下,使用

  1. vi server.properties

编辑配置文件。

云服务器(Linux)安装部署Kafka的详细过程

删除listeners和advertised前方的#号,改成如下配置:

  1. listeners=PLAINTEXT://云服务器内网ip:9092(本地访问用本地ip)
  2. # 如果要提供外网访问则必须配置此项
  3. advertised.listeners=PLAINTEXT://云服务器公网ip:9092(若要远程访问需配置此项为云服务器的公网ip)
  4. # zookeeper连接地址,集群配置格式为ip:port,ip:port,ip:port
  5. zookeeper.connect=云服务器公网ip:2181

开放云服务器端口

在云服务器控制台内进入安全组页面,添加两条新的入站规则,tcp/9092和tcp/2181

开放linux防火墙端口

先查看使用的防火墙类型iptables/firewalld

iptables操作命令

  1. 1.打开/关闭/重启防火墙
  2.  
  3. 开启防火墙(重启后永久生效):chkconfig iptables on
  4.  
  5. 关闭防火墙(重启后永久生效):chkconfig iptables off
  6.  
  7. 开启防火墙(即时生效,重启后失效):service iptables start
  8.  
  9. 关闭防火墙(即时生效,重启后失效):service iptables stop
  10.  
  11. 重启防火墙:service iptables restartd
  12.  
  13. 2.查看打开的端口
  14.  
  15. /etc/init.d/iptables status
  16. 3.开启端口
  17.  
  18. iptables -A INPUT -p tcp --dport 8080 -j ACCEPT
  19. 4.保存并重启防火墙
  20. /etc/rc.d/init.d/iptables save
  21. /etc/init.d/iptables restart

Centos7默认安装了firewalld,如果没有安装的话,可以使用 yum install firewalld firewalld-config进行安装。

操作指令如下:

  1. 1.启动防火墙
  2.  
  3. systemctl start firewalld
  4. 2.禁用防火墙
  5.  
  6. systemctl stop firewalld
  7. 3.设置开机启动
  8.  
  9. systemctl enable firewalld
  10. 4.停止并禁用开机启动
  11.  
  12. sytemctl disable firewalld
  13. 5.重启防火墙
  14.  
  15. firewall-cmd --reload
  16.  
  17. 6.查看状态
  18.  
  19. systemctl status firewalld或者 firewall-cmd --state
  20. 7.在指定区域打开端口(记得重启防火墙)
  21.  
  22. firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)

打开tcp/9092和tcp/2181这两个端口后,重启防火墙,并查看开放的端口确实生效。

启动kafka服务

cd命令进入kafka_2.12-3.2.0目录下,执行

  1. bin/zookeeper-server-start.sh config/zookeeper.properties

启动zookeeper,不加-daemon方便排除启动错误,新建一个shell窗口,进入该目录再执行

  1. bin/kafka-server-start.sh config/server.properties

启动kafka,若打印日志未报错,若未出现error日志,说明启动成功。

测试单机连通性

  1. 查询kafka下所有的topic
  2. bin/kafka-topics.sh --list --zookeeper ip:port
  3. 因为kafka使用zookeeper作为配置中心,一些topic信息需要查询该kafka对应的zookeeper
  4. 创建topic
  5. bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
  6. 开启生产者
  7. bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test
  8. 开启消费者
  9. bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test

Springboot连接kafak

在pom.xml文件中引入kafka依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>2.9.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka-clients</artifactId>
  9. <version>3.2.0</version>
  10. </dependency>

在application.yml配置文件中配置kafka

  1. server:
  2. port: 8080
  3.  
  4. spring:
  5. kafka:
  6. bootstrap-servers: 云服务器外网ip地址:9092
  7. producer: # 生产者
  8. retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
  9. batch-size: 16384
  10. buffer-memory: 33554432
  11. acks: 1
  12. # 指定消息key和消息体的编解码方式
  13. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  14. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  15. consumer:
  16. group-id: default-group
  17. enable-auto-commit: false
  18. auto-offset-reset: earliest
  19. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  21. listener:
  22. # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  23. # RECORD
  24. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  25. # BATCH
  26. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  27. # TIME
  28. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  29. # COUNT
  30. # TIME | COUNT 有一个条件满足时提交
  31. # COUNT_TIME
  32. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
  33. # MANUAL
  34. # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
  35. # MANUAL_IMMEDIATE
  36. ack-mode: manual_immediate

生产者

  1. @RestController
  2. public class KafkaController {
  3. private final static String TOPIC_NAME = "test-topic";
  4.  
  5. @Autowired
  6. private KafkaTemplate<String, String> kafkaTemplate;
  7.  
  8. @RequestMapping("/send")
  9. public String send(@RequestParam("msg") String msg) {
  10. kafkaTemplate.send(TOPIC_NAME, "key", msg);
  11. return String.format("消息 %s 发送成功!", msg);
  12. }
  13. }

消费者

  1. @Component
  2. public class DemoConsumer {
  3. /**
  4. * @param record record
  5. * @KafkaListener(groupId = "testGroup", topicPartitions = {
  6. * @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
  7. * @TopicPartition(topic = "topic2", partitions = "0",
  8. * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
  9. * },concurrency = "6")
  10. * //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
  11. */
  12. @KafkaListener(topics = "test-topic", groupId = "testGroup1")
  13. public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
  14. String value = record.value();
  15. System.out.println("testGroup1 message: " + value);
  16. System.out.println("testGroup1 record: " + record);
  17. //手动提交offset,一般是提交一个banch,幂等性防止重复消息
  18. // === 每条消费完确认性能不好!
  19. ack.acknowledge();
  20. }
  21.  
  22. //配置多个消费组
  23. @KafkaListener(topics = "test--topic", groupId = "testGroup2")
  24. public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
  25. String value = record.value();
  26. System.out.println("testGroup2 message: " + value);
  27. System.out.println("testGroup2 record: " + record);
  28. //手动提交offset
  29. ack.acknowledge();
  30. }
  31. }

使用swagger测试发送消息

云服务器(Linux)安装部署Kafka的详细过程

控制台打印消息

云服务器(Linux)安装部署Kafka的详细过程

到此这篇关于云服务器(Linux)安装部署Kafka的详细过程的文章就介绍到这了,更多相关Linux安装Kafka内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/ndchao/archive/2022/11/14/chaos1.html

延伸 · 阅读

精彩推荐