Spring Boot与Kafka实战手把手指南

间隙填充
正睿科技  发布时间:2024-06-07 10:48:52  浏览数:336

关于正睿.png

安装kafka

启动Kafka本地环境需Java 8+以上

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用。

Kafka下载

image1_521.png

解压tar -xzf kafka_2.13-3.7.0.tgz

image2_528.png

一、Zookeeper启动Kafka(kafka内置zookeeper)

Kafka依赖Zookeeper

1、启动Zookeeper 2、启动Kafka

使用kafka自带Zookeeper启动

./zookeeper-server-start.sh ../config/zookeeper.properties &

./zookeeper-server-stop.sh ../config/zookeeper.properties

image3_515.png

./kafka-server-start.sh ../config/server.properties &

image4_473.png

./kafka-server-stop.sh ../config/server.properties

二、Zookeeper服务器启动Kafka

Zookeeper服务器安装

https://zookeeper.apache.org/

image5_446.png

https://dlcdn.apache.org/zookeeper/zookeeper-3.9.2/apache-zookeeper-3.9.2-bin.tar.gz

tar zxvf apache-zookeeper-3.9.2-bin.tar.gz

image6_403.png

image7_376.png

配置Zookeeper服务器

cp zoo_sample.cfg zoo.cfg

image8_347.png

启动Zookeeper服务器

image9_286.png


./zkServer.sh start
修改Zookeeper端口


image10_263.png

Zoo.cfg添加内容
admin.serverPort=8099
apache-zookeeper-3.9.2-bin/bin目录下重启Zookeeper


image11_233.png

Zookeeper服务器启动kafka
/opt/kafka_2.13-3.7.0/bin目录下
./kafka-server-start.sh ../config/server.properties &

image12_221.png

Kafka配置文件server.properties

image13_196.png

三、使用KRaft启动Kafka

image14_189.png

UUID通用唯一识别码(Universally Unique Identifier)
1、生成Cluster UUID(集群UUID):./kafka-storage.sh random-uuid

image15_176.png

2.格式化kafka日志目录:
./kafka-storage.sh format -t 3pMJGNJcT0uLIBsZhbucjQ -c ../config/kraft/server.properties

image16_164.png

3.启动kafka:
./kafka-server-start.sh ../config/kraft/server.properties &

image17_148.png

springboot集成kafka

image1_581.png

创建topic时,若不指定topic的分区(partition)数量使,则默认为1个分区(partition)

image2_582.png

修改server.properties文件

vim server.properties

listeners=PLAINTEXT://0.0.0.0:9092

image3_578.png

advertised.listeners=PLAINTEXT://192.168.68.133:9092

image4_531.png

image5_497.png

image6_444.png

springboot加入依赖kafka

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>    

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

image7_412.png

application.yml配置连接kafka

spring: kafka: bootstrap-servers: 192.168.68.133:9092    

生产者

发送消息

image8_380.png

<Resource> private KafkaTemplate<String,String> kafkaTemplate; <Test> void kafkaSendTest(){ kafkaTemplate.send("kafkamsg01","hello kafka"); }    

image9_315.png

消费者

接收消息

image10_290.png

<Component> public class KafkaConsumer { <KafkaListener topics = "kafkamsg01","test", groupId = "123"> public void consume(String message){ System.out.println("接收到消息:"+message); } }

若没有配置groupid
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

image11_258.png

image12_245.png

<Component> public class KafkaConsumer { <KafkaListener topics = "kafkamsg01","test", groupId = "123"> public void consume(String message){ System.out.println("接收到消息:"+message); }

}

image13_216.png

问题没解决? 我们帮您!

如果您在本文中未能找到解决当前问题的办法,不用担心——正睿专业技术支持团队随时待命

服务项目.png

获取更多帮助

文章来源:华为云社区 作者:QGS