Kafka 消费者启动后与服务器的交互流程涉及多个关键步骤,主要包括初始化、查找组协调器、加入消费者组、分区分配、心跳维持、拉取数据和提交偏移量等。以下是详细的流程说明:
KafkaConsumer
的构造函数,传入配置参数创建消费者实例。bootstrap.servers
(Kafka集群地址)、group.id
(消费者组ID)、key.deserializer
、value.deserializer
等。subscribe
方法:消费者通过调用subscribe
方法订阅一个或多个主题,也可以使用正则表达式来匹配多个主题。FindCoordinator
请求:消费者向Kafka集群中的任意Broker发送FindCoordinator
请求,请求中包含消费者组ID。/** * 表示 内部主题 __consumer_offsets 的分区数量,默认初始化值是50(顺带一提__consumer_offsets 副本因子默认值是3) * 初始值为 -1,表示尚未设置。 * 使用 volatile 关键字确保在多线程环境中对该变量的修改是可见的。 */ private volatile int numPartitions = -1; /** * 内部主题 __consumer_offsets 的各个分区分布在各个Broker服务器上,算出当前消费者组的协调器在哪个服务器 * 消费者组协调器所在brokerId = 消费者组id的哈希值 % 50 */ coordinator_broker_id = Utils.abs(groupId.hashCode()) % numPartitions public static int abs(int n) { return (n == Integer.MIN_VALUE) ? 0 : Math.abs(n); }
FindCoordinator
响应中的地址信息,连接到组协调器。(1)发送JoinGroup请求:当消费者启动并调用poll
方法时,如果它尚未加入消费者组,或者需要重新加入(例如,因为再平衡),它会向组协调器发送JoinGroup
请求。这个请求包含消费者的group.id
、订阅的主题列表以及消费者使用的分区分配策略。
(2)等待响应:组协调器收到JoinGroup
请求后,会等待一段时间,以允许其他消费者也发送他们的JoinGroup
请求。这个等待时间是为了收集同一消费者组内所有消费者的信息。
(3)选择Leader:对于同一个消费者组的第一次JoinGroup
请求,协调器会选择第一个消费者作为Leader。Leader负责为组内的所有消费者分配分区。Leader的选择基于消费者的JoinGroup
请求顺序。
(4)分区分配策略:Leader消费者收到协调器的响应后,会根据提供的分区分配策略(如Range
、RoundRobin
等)和所有消费者的订阅信息来决定分区的分配方案。
(1)发送SyncGroup请求:Leader消费者将分区分配方案通过SyncGroup请求发送给组协调器。随后,组内的其他消费者也发送SyncGroup请求,但不包含分区分配方案。
(2)协调器广播分区分配方案:组协调器接收到SyncGroup请求后,将leader消费者的分区分配方案广播给消费者组内的所有消费者。
Fetch
请求:消费者向分配给它的分区的Leader Broker发送Fetch
请求,请求包含拉取数据的偏移量。enable.auto.commit=true
),消费者会定期自动提交消费的偏移量。commitSync
或commitAsync
方法提交偏移量。Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test-topic")); // 订阅主题 // 消费者加入消费者组并开始消费的过程是在第一次调用poll方法时触发的 ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } // 提交偏移量 consumer.commitSync();
在上述代码中,消费者通过调用subscribe
方法订阅了主题test-topic
,然后通过调用poll
方法触发了加入消费者组的完整流程,包括查找组协调器、加入消费者组、分区分配、拉取数据和提交偏移量等步骤。
Kafka消费者启动后与服务器的交互流程是一个复杂的过程,涉及与组协调器的多次交互。这个流程确保了消费者能够正确地加入消费者组,分区能够被合理地分配给消费者组内的消费者,并且在消费者组成员变化时能够进行适当的再平衡,同时保证了消费者能够从正确的位置继续消费数据。
上一篇:linux权限
下一篇:Linux系统运维常用命令