Kafka面试题----Kafka是如何保证顺序消费的

news/2025/2/25 6:59:36

在 Kafka 中,默认情况下消息是按分区进行顺序存储和读取的,但全局顺序消费(即所有分区的消息按顺序消费)较难实现。下面分别介绍 Kafka 按分区顺序消费以及实现全局顺序消费的相关内容

按分区顺序消费

Kafka 本身可以保证单个分区内的消息是顺序写入和顺序读取的,以下是其原理和实现要点:

原理

  • 消息写入:Kafka 生产者在发送消息时,如果指定了分区,消息会被顺序追加到该分区的日志文件末尾。Kafka 的分区日志是一个只允许追加写入的文件,这种设计保证了消息在分区内的顺序性。
  • 消息读取:Kafka 消费者从分区中按偏移量(offset)顺序读取消息,偏移量是消息在分区内的唯一标识,消费者按照偏移量从小到大的顺序读取消息,从而保证了消息消费的顺序性。

实现要点

  • 生产者配置:生产者在发送消息时,需要明确指定消息要发送到的分区。可以通过自定义分区器或者直接指定分区号来实现。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class OrderedProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        String topic = "test-topic";
        int partition = 0; // 指定分区号

        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, "key-" + i, "value-" + i);
            producer.send(record);
        }

        producer.close();
    }
}
  • 消费者配置:消费者需要确保按顺序处理消息,并且在处理完一条消息后再处理下一条消息。同时,要避免手动调整偏移量,以免破坏消息的顺序。
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class OrderedConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "test-topic";
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    // 处理完一条消息后再处理下一条
                }
                consumer.commitSync(); // 同步提交偏移量
            }
        } finally {
            consumer.close();
        }
    }
}

全局顺序消费

要实现全局顺序消费,需要将所有消息发送到同一个分区,因为 Kafka 只能保证单个分区内的消息顺序性。但这种方式会带来性能瓶颈,因为单个分区的处理能力是有限的。

实现要点

  • 生产者配置:生产者需要将所有消息都发送到同一个分区,可以通过自定义分区器来实现。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class GlobalOrderedProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("partitioner.class", "com.example.SinglePartitionPartitioner");

        Producer<String, String> producer = new KafkaProducer<>(props);
        String topic = "test-topic";

        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key-" + i, "value-" + i);
            producer.send(record);
        }

        producer.close();
    }
}

// 自定义分区器,将所有消息发送到同一个分区
class SinglePartitionPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0; // 所有消息都发送到分区0
    }

    @Override
    public void close() {}

    @Override
    public void configure(java.util.Map<String, ?> configs) {}
}
  • 消费者配置:只需要一个消费者实例来消费该分区的消息,避免多个消费者同时消费同一个分区导致的顺序问题。
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class GlobalOrderedConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "test-topic";
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    // 处理完一条消息后再处理下一条
                }
                consumer.commitSync(); // 同步提交偏移量
            }
        } finally {
            consumer.close();
        }
    }
}

http://www.niftyadmin.cn/n/5865127.html

相关文章

机器人“战场”:创新、落地与未来

从1999年的机器管家&#xff0c;2001年的机器人小孩大卫&#xff0c;到2015年拥有自我意识的“查派”&#xff0c;在科幻电影里&#xff0c;人们赋予了对机器人的各种形象和想象。2018年&#xff0c;尽管只是实验室的试验品&#xff0c;但波士顿动力机器狗Spot的视频还是在国内…

python 判断 字符串在字典列表中

在Python中&#xff0c;如果你想判断一个字符串是否存在于一个字典列表中&#xff0c;你可以通过遍历这个列表并检查每个字典是否包含你想要找的字符串键来实现。这里有几种方法可以做到这一点&#xff1a; 方法1&#xff1a;使用any()函数 你可以使用any()函数和字典的get方法…

【MySQL】第九弹---掌握SQL关键操作:更新、删除、插入与聚合分析的秘诀

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】【Linux系统编程】【MySQL】 目录 1 Update 2 Delete 2.1 删除数据 2.2 截断表 3 插入查询结果 4 聚合函数 5 group by子句的使用 1 Update 语法…

【AI+智造】DeepSeek价值重构:当采购与物控遇上数字化转型的化学反应

作者&#xff1a;Odoo技术开发/资深信息化负责人 日期&#xff1a;2025年2月24日 引言&#xff1a;从事企业信息化工作16年&#xff0c;我见证过无数企业从手工台账到ERP系统的跨越。但真正让采购和物控部门脱胎换骨的&#xff0c;是融合了Deepseek AI的Odoo数字化解决方案——…

如何在 CMake 上设置新项目以获得成功

选择正确的构建系统可以决定项目的成功与否。从处理依赖项到确保正确编译和链接代码&#xff0c;正确的构建系统可以节省 你时间并避免潜在的麻烦。 在众多可用选项中&#xff0c;CMake 构建系统脱颖而出&#xff0c;可以处理复杂的跨平台项目。 本教程介绍了 CMake 的受欢迎…

【C】堆的应用1 -- 堆排序

之前学习了堆&#xff0c;堆的一棵以顺序结构存储的完全二叉树&#xff0c;堆本身又氛围大根堆和小根堆&#xff0c;假设以大根堆为例&#xff0c;由于堆顶部元素是一棵二叉树里面最大的元素&#xff0c;所以如果每次都取堆顶的元素&#xff0c;那么取出的元素就是一个降序排列…

量子计算的数学基础:复数、矩阵和线性代数

量子计算是基于量子力学原理的一种新型计算模式,它与经典计算机在信息处理的方式上有着根本性的区别。在量子计算中,信息的最小单位是量子比特(qubit),而不是传统计算中的比特。量子比特的状态是通过量子力学中的数学工具来描述的,因此,理解量子计算的数学基础对于深入学…

【Java项目】基于Spring Boot的家具销售电商系统

【Java项目】基于Spring Boot的家具销售电商系统 技术简介&#xff1a;采用Spring Boot框架、Java技术、MySQL数据库等实现。 系统简介&#xff1a;家具销售电商系统主要实现了管理员模块、用户模块二大部分。1、管理员&#xff1a;首页、个人中心、家具分类管理、热销家具管理…