为什么使用消息队列?以用户下单购买商品的行为举例,在使用微服务架构时,我们需要调用多个服务 。传统的调用方式是同步调用,这会存在一定的性能问题

文章插图
使用消息队列可以实现异步的通信方式,相比于同步的通信?式,异步的?式可以让上游快速成功,极大提高系统的吞吐量 。在分布式系统中,通过下游多个服务的分布式事务的保障,也能保障业务执行之后的最终?致性

文章插图
Kafka 概述1. 介绍Kafka 是?个分布式的、?持分区的(partition)、多副本的 (replica),基于 zookeeper 协调的分布式消息系统,它最大的特性就是可以实时处理大量数据以满足各类需求场景:
- 日志收集:使用 Kafka 收集各种服务的日志,并通过 kafka 以统一接口服务的方式开放给各种 consumer,例如 hadoop、Hbase、Solr 等
- 消息系统:解耦和生产者和消费者、缓存消息等
- 用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘
- 运营指标:Kafka 也经常用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
需要修改配置文件,进?到 config 目录内,修改 server.properties
# broker.id 属性在 kafka 集群中必须唯一broker.id= 0# kafka 部署的机器 ip 和提供服务的端口号listeners=PLAINTEXT://192.168.65.60:9092# kafka 的消息存储文件log.dir=/usr/local/data/kafka-logs# kafka 连接 zookeeper 的地址zookeeper.connect= 192.168.65.60:2181server.properties 核心配置详解:PropertyDefaultDescriptionbroker.id0每个 broker 都可以用?个唯?的非负整数 id 进行标识,作为 broker 的 名字log.dirs/tmp/kafka-logskafka 存放数据的路径,这个路径并不是唯?的,可以是多个,路径之间只需要使?逗号分隔即可;每当创建新 partition 时,都会选择在包含最少 partitions 的路径下进行listenersPLAINTEXT://192.168.65.60:9092server 接受客户端连接的端?,ip 配置 kafka 本机 ip 即可zookeeper.connectlocalhost:2181zooKeeper 连接字符串的格式为:hostname:port,此处 hostname 和 port 分别是 ZooKeeper 集群中某个节点的 host 和 port;zookeeper 如果是集群,连接?式为 hostname1:port1,hostname2:port2,hostname3:port3log.retention.hours168每个日志文件删除之前保存的时间,默认数据保存时间对所有 topic 都?样num.partitions1创建 topic 的默认分区数default.replication.factor1?动创建 topic 的默认副本数量,建议设置为?于等于 2min.insync.replicas1当 producer 设置 acks 为 -1 时,min.insync.replicas 指定 replicas 的最小数目(必须确认每?个 repica 的写数据都是成功的),如果这个数目没有达到,producer 发送消息会产生异常delete.topic.enablefalse是否允许删除主题进入到 bin 目录下,使用命令来启动
./kafka-server-start.sh -daemon../config/server.properties验证是否启动成功:进入到 zk 中的节点看 id 是 0 的 broker 有没有存在(上线)ls /brokers/ids/实现消息的生产和消费1. 主题 Topictopic 可以实现消息的分类,不同消费者订阅不同的 topic

文章插图
执行以下命令创建名为
test 的 topic,这个 topic 只有一个 partition,并且备份因子也设置为 1./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 1 --topic test查看当前 kafka 内有哪些 topic./kafka-topics.sh --list --zookeeper 172.16.253.35:21812. 发送消息把消息发送给 broker 中的某个 topic,打开?个 kafka 发送消息的客户端,然后开始?客户端向 kafka 服务器发送消息kafka 自带了一个 producer 命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到 kafka 集群中 。在默认情况下,每一个行会被当做成一个独立的消息
./kafka-console-producer.sh --broker-list 172.16.253.38:9092 --topic test3. 消费消息对于 consumer,kafka 同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息 。使用 kafka 的消费者客户端,从指定 kafka 服务器的指定 topic 中消费消息方式一:从最后一条消息的 偏移量+1 开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test方式二:从头开始消费./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test消息的发送方会把消息发送到 broker 中,broker 会存储消息,消息是按照发送的顺序进行存储 。因此消费者在消费消息时可以指明主题中消息的偏移量 。默认情况下,是从最后一个消息的下一个偏移量开始消费4. 单播消息一个消费组里只有一个消费者能消费到某一个 topic 中的消息,可以创建多个消费者,这些消费者在同一个消费组中
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup --topic test5. 多播消息在一些业务场景中需要让一条消息被多个消费者消费,那么就可以使用多播模式 。kafka 实现多播,只需要让不同的消费者处于不同的消费组即可./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup1 --topic test./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup2 --topic test6. 查看消费组及信息# 查看当前主题下有哪些消费组./kafka-consumer-groups.sh --bootstrap-server 10.31.167.10:9092 --list# 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量./kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup【Kafka 学习笔记】
文章插图
- Currennt-offset:当前消费组的已消费偏移量
- Log-end-offset:主题对应分区消息的结束偏移量(HW)
- Lag:当前消费组未消费的消息数

文章插图
- 生产者将消息发送给 broker,broker 会将消息保存在本地的日志文件中
/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log - 消息的保存是有序的,通过 offset 偏移量来描述消息的有序性
- 消费者消费消息时也是通过 offset 来描述当前要消费的那条消息的位置
主题与分区主题 Topic 在 kafka 中是?个逻辑概念,kafka 通过 topic 将消息进行分类 。不同的 topic 会被订阅该 topic 的消费者消费 。但是有?个问题,如果说这个 topic 的消息非常多,消息是会被保存到 log 日志文件中的,这会出现文件过大的问题,因此,kafka 提出了 Partition 分区的概念

文章插图
通过 partition 将?个 topic 中的消息分区来存储,这样的好处有多个:
- 分区存储,可以解决存储文件过大的问题
- 提供了读写的吞吐量:读和写可以同时在多个分区进?
./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --topic test1通以下命令查看 topic 的分区信息./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1分区的作用:- 可以分布式存储
- 可以并行写
- 提交到哪个分区:通过 hash 函数:
hash(consumerGroupId) % __consumer_offsets 主题的分区数 - 提交到该主题中的内容是:key 是 consumerGroupId + topic + 分区号,value 就是当前 offset 的值
- 文件中保存的消息,默认保存七天,七天到后消息会被删除
Kafka 集群1. 搭建创建三个 server.properties 文件
# 0 1 2broker.id=2# 9092 9093 9094listeners=PLAINTEXT://192.168.65.60:9094# kafka-logs kafka-logs-1 kafka-logs-2log.dir=/usr/local/data/kafka-logs-2通过命令启动三台 broker./kafka-server-start.sh -daemon../config/server0.properties./kafka-server-start.sh -daemon../config/server1.properties./kafka-server-start.sh -daemon../config/server2.properties搭建完后通过查看 zk 中的 /brokers/ids 看是否启动成功2. 副本下面的命令,在创建主题时,除了指明了主题的分区数以外,还指明了副本数,分别是:一个主题,两个分区、三个副本
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic通过查看主题信息,其中的关键数据:
文章插图
- replicas:当前副本所存在的 broker 节点
- leader:副本里的概念
- 每个 partition 都有一个 broker 作为 leader
- 消息发送方要把消息发给哪个 broker,就看副本的 leader 是在哪个 broker 上面,副本里的 leader 专门用来接收消息
- 接收到消息,其他 follower 通过 poll 的方式来同步数据
- follower:leader 处理所有针对这个 partition 的读写请求,而 follower 被动复制 leader,不提供读写(主要是为了保证多副本数据与消费的一致性),如果 leader 所在的 broker 挂掉,那么就会进行新 leader 的选举
- isr:可以同步的 broker 节点和已同步的 broker 节点,存放在 isr 集合中

文章插图
4. 集群消息的发送
./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic my-replicated-topic5. 集群消息的消费./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --topic my-replicated-topic6. 分区消费组消费者的细节
文章插图
- ?个 partition 只能被?个消费组中的?个消费者消费,目的是为了保证消费的顺序性,但是多个 partion 的多个消费者的消费顺序性是得不到保证的
- 一个消费者可以消费多个 partition,如果消费者挂了,那么会触发rebalance机制,由其他消费者来消费该分区
- 消费组中消费者的数量不能比一个 topic 中的 partition 数量多,否则多出来的消费者消费不到消息
Java 中使用 Kafka1. 生产者1.1 引入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>1.2 生产者发送消息/** * 消息的发送方 */public class MyProducer {private final static String TOPIC_NAME = "my-replicated-topic";public static void main(String[] args) throws ExecutionException, InterruptedException {// 1.设置参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");// 把发送的 key 从字符串序列化为字节数组props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 把发送消息 value 从字符串序列化为字节数组props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2.创建?产消息的客户端,传?参数Producer<String, String> producer = new KafkaProducer<String, String>(props);// 3.创建消息// key: 作?是决定了往哪个分区上发// value: 具体要发送的消息内容ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "hellokafka");// 4.发送消息,得到消息发送的元数据并输出RecordMetadata metadata = https://tazarkount.com/read/producer.send(producerRecord).get();System.out.println("同步?式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());}}1.3 发送消息到指定分区ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(), JSON.toJSONString(order));如果未指定分区,则会通过业务 Key 的 hash 运算,得出要发送的分区,公式为:hash(key)%partitionNum1.4 同步发送消息?产者同步发消息,在收到 kafka 的 ack 告知发送成功之前将?直处于阻塞状态
// 等待消息发送成功的同步阻塞方法RecordMetadata metadata = https://tazarkount.com/read/producer.send(producerRecord).get();System.out.println("同步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());1.5 异步发送消息异步发送,?产者发送完消息后就可以执?之后的业务,broker 在收到消息后异步调用生产者提供的 callback 回调方法// 指定发送分区ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(),JSON.toJSONString(order));// 异步回调方式发送消息producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("发送消息失败:" +exception.getStackTrace());}if (metadata != null) {System.out.println("异步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}}});1.6 生产者中的 ack 的配置在同步发送的前提下,生产者在获得集群返回的 ack 之前会?直阻塞,那么集群什么时候返回 ack 呢?此时 ack 有三个配置:- acks = 0:表示 producer 不需要等待任何 broker 确认收到消息的回复,就可以继续发送下一条消息,性能最高,但最容易丢消息
- acks = 1:至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入 。就可以继续发送下一条消息 。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
- acks = -1 或 all:需要等待
min.insync.replicas(默认为 1 ,推荐配置大于等于2)这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据 。这是最强的数据保证,一般是金融级别,或跟钱打交道的场景才会使用这种配置
props.put(ProducerConfig.ACKS_CONFIG, "1");// 发送失败,默认会重试三次,每次间隔 100msprops.put(ProducerConfig.RETRIES_CONFIG, 3);props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100)1.7 消息发送的缓冲区
文章插图
- kafka 默认会创建?个消息缓冲区,用来存放要发送的消息,缓冲区是 32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); - kafka 本地线程会在缓冲区中?次拉 16k 的数据,发送到 broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); - 如果线程拉不到 16k 的数据,间隔 10ms 也会将已拉到的数据发到 broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
public class MySimpleConsumer {private final static String TOPIC_NAME = "my-replicated-topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");// 消费分组名props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 1.创建?个消费者的客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);// 2.消费者订阅主题列表consumer.subscribe(Arrays.asList(TOPIC_NAME));while (true) {/** 3. poll() API 是拉取消息的?轮询*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 4.打印消息System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = https://tazarkount.com/read/%s%n", record.partition(), record.offset(), record.key(), record.value());}}}}2.2 自动提交和手动提交 offset无论是自动提交还是手动提交,都需要把所属的 消费组 + 消费的某个主题 + 消费的某个分区 + 消费的偏移量 提交到集群的 _consumer_offsets 主题里面- 自动提交:消费者 poll 消息下来以后自动提交 offset
// 是否自动提交 offset,默认就是 trueprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 自动提交 offset 的间隔时间props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");注意:如果消费者还没消费完 poll 下来的消息就自动提交了偏移量,此时消费者挂了,于是下?个消费者会从已提交的 offset 的下?个位置开始消费消息,之前未被消费的消息就丢失掉了
- 手动提交:需要把自动提交的配置改成 false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");手动提交又分成了两种:
- 手动同步提交
在消费完消息后调用同步提交的方法,当集群返回 ack 前?直阻塞,返回 ack 后表示提交成功,执行之后的逻辑
while (true) {/** poll() API 是拉取消息的?轮询*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = https://tazarkount.com/read/%s%n", record.partition(),record.offset(), record.key(), record.value());}// 所有的消息已消费完if (records.count() > 0) { // 有消息// ?动同步提交 offset, 当前线程会阻塞直到 offset 提交成功// ?般使?同步提交, 因为提交之后?般也没有什么逻辑代码了consumer.commitSync(); // ====阻塞=== 提交成功}} - 手动异步提交
在消息消费完后提交,不需要等到集群 ack,直接执行之后的逻辑,可以设置?个回调方法,供集群调用
while (true) {/** poll() API 是拉取消息的?轮询*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = https://tazarkount.com/read/%s%n", record.partition(), record.offset(), record.key(), record.value());}// 所有的消息已消费完if (records.count() > 0) {// 手动异步提交 offset,当前线程提交 offset 不会阻塞,可以继续处理后?的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.println("Commit failed exception: " + exception.getStackTrace());}}});}}
- 手动同步提交
// ?次 poll 最?拉取消息的条数,可以根据消费速度的快慢来设置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500)可以根据消费速度的快慢来设置,如果两次 poll 的时间超出了 30s 的时间间隔,kafka 会认为其消费能力过弱,将其踢出消费组,将分区分配给其他消费者代码中设置了长轮询的时间是 1000 毫秒
while (true) {/** poll() API 是拉取消息的?轮询*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = https://tazarkount.com/read/%s%n", record.partition(), record.offset(), record.key(), record.value());}}- 如果?次 poll 到 500 条,就直接执行 for 循环
- 如果这?次没有 poll 到 500 条,且时间在1秒内,那么长轮询继续 poll,要么到 500 条,要么到 1s
- 如果多次 poll 都没达到 500 条,且 1 秒时间到了,那么直接执行 for 循环
// consumer 给 broker 发送?跳的间隔时间props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);// kafka 如果超过 10 秒没有收到消费者的?跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000)2.5 指定分区消费consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));2.6 消息回溯消费也即从头开始消费消息consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));2.7 指定偏移量消费consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);2.8 从指定时间点消费根据时间,去所有的 partition 中确定该时间对应的 offset,然后去所有的 partition 中找到该 offset 之后的消息开始消费List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);// 从一小时前开始消费long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;Map<TopicPartition, Long> map = new HashMap<>();for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);}Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = https://tazarkount.com/read/entry.getValue();if (key == null || value == null) {continue;}// 根据消费?的 timestamp 确定 offsetLong offset = value.offset();System.out.println("partition-" + key.partition() + "|offset-" + offset);if (value != null) {consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}}2.9 新消费组的消费 offset 规则新消费组中的消费者在启动以后,默认会从当前分区的最后?条消息的 offset+1 开始消费(消费新消息),可以通过以下的设置,让新的消费者第?次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量 +1)- Latest:默认的,消费新消息
- earliest:第?次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量 +1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");SpringBoot 中使用 Kafka1. 引入依赖
<dependency> <groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>2. 编写配置文件server: port: 8080spring: kafka:bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094producer: # ?产者retries: 3 # 设置?于0的值,则客户端会将发送失败的记录重新发送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息体的编解码?式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500listener:# 当每?条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每?批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每?批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间?于TIME时提交# TIME# 当每?批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量?于等于COUNT时提交# COUNT# TIME | COUNT 有?个条件满?时提交# COUNT_TIME# 当每?批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ?动调?Acknowledgment.acknowledge()后提交# MANUAL# 手动调?Acknowledgment.acknowledge()后?即提交,?般使?这种# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATE redis:host: 172.16.253.213. 编写生产者import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/msg")public class MyKafkaController {private final static String TOPIC_NAME = "my-replicated-topic";@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@RequestMapping("/send")public String sendMessage(){kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");return "send success!";}}4. 编写消费者import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.stereotype.Component;@Componentpublic class MyConsumer {@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = https://tazarkount.com/read/record.value();System.out.println(value);System.out.println(record);// 手动提交offsetack.acknowledge();}}配置消费主题、分区和偏移量@KafkaListener(groupId = "testGroup", topicPartitions = { @TopicPartition(topic = "topic1", partitions = {"0", "1"}), @TopicPartition(topic = "topic2", partitions = "0",partitionOffsets =@PartitionOffset(partition = "1", initialOffset = "100"))},concurrency = "3") // concurrency 就是同组下的消费者个数,就是并发消费数,建议?于等于分区总数public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = https://tazarkount.com/read/record.value();System.out.println(value);System.out.println(record);// 手动提交offsetack.acknowledge();}Kafka 集群 Controller、Rebalance、HW1. ControllerKafka 集群中的 broker 在 zookeeper 中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群的 controller,负责管理整个集群中的所有分区和副本的状态:
- 当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本,选举的规则是从 isr 集合中最左边获取
- 当集群中有 broker 新增或减少,controller 会同步信息给其他 broker
- 当集群中有分区新增或减少,controller 会同步信息给其他 broker
在触发 rebalance 机制之前,消费者消费哪个分区有三种分配策略:
- range:通过公式来计算某个消费者消费哪个分区,公式为:前面的消费者是
(分区总数/消费者数量)+1,之后的消费者是分区总数/消费者数量 - 轮询:大家轮着来
- sticky:粘合策略,如果需要 rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况 。如果这个策略没有开,那么就要全部重新分配,所以建议开启

文章插图
Kafka 线上问题优化1. 防止消息丢失生产者:
- 使用同步发送
- 把 ack 设成 1 或者 all,并且设置同步的分区数 >= 2
- 把自动提交改成手动提交
解决方案:
- 生产者关闭重试,但会造成丢消息,不建议
- 消费者解决非幂等性消费问题,所谓非幂等性,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,可以用唯一主键或分布式锁来实现
消费者:主题只能设置?个分区,消费组中只能有?个消费者
4. 解决消息积压所谓消息积压,就是消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致 kafka 中有大量的数据没有被消费 。随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个 kafka 对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩
解决方案:
- 在这个消费者中,使用多线程,充分利用机器的性能消费消息
- 通过业务的架构设计,提升业务层面消费的性能
- 创建多个消费组,多个消费者,部署到其他机器上,?起消费,提高消费者的消费速度
- 创建?个消费者,该消费者在 kafka 另建?个主题,配上多个分区,多个分区再配上多个消费者 。该消费者将 poll 下来的消息,不进行消费,直接转发到新建的主题上 。此时,新的主题的多个分区的多个消费者就开始?起消费了

文章插图
5. 实现延时队列假设一个应用场景:订单创建后,超过 30 分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现,实现方案如下:

文章插图
- 在 Kafka 创建相应的主题,比如该主题的超时时间定为 30 分钟
- 消费者消费该主题的消息(轮询)
- 消费者消费消息时,判断消息的创建时间和当前时间是否超过 30 分钟(前提是订单没有完成支付)
- 超过:数据库修改订单状态为已取消
- 没有超过:记录当前消息的 offset,并不再继续消费之后的消息 。等待 1 分钟后,再次从 Kafka 拉取该 offset 及之后的消息,继续判断,以此反复
- 春季老年人吃什么养肝?土豆、米饭换着吃
- 三八妇女节节日祝福分享 三八妇女节节日语录
- 老人谨慎!选好你的“第三只脚”
- 校方进行了深刻的反思 青岛一大学生坠亡校方整改校规
- 脸皮厚的人长寿!有这特征的老人最长寿
- 长寿秘诀:记住这10大妙招 100%增寿
- 春季老年人心血管病高发 3条保命要诀
- 眼睛花不花要看四十八 老年人怎样延缓老花眼
- 香槟然能防治老年痴呆症? 一天三杯它人到90不痴呆
- 老人手抖的原因 为什么老人手会抖
