博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka消费组、消费者
阅读量:6817 次
发布时间:2019-06-26

本文共 3982 字,大约阅读时间需要 13 分钟。

consumer group

consumer instance

一个消费组可能有一个或者多个消费者。同一个消费组可以订阅一个或者多个主题。主题的某一个分区只能被消费组的某一个消费者消费。那么分区和消费者之间是如何对应的呢?

假设消费组cg1(group.id=cg1)订阅了topic1,cg1有3个消费者c1、c2、c3,topic1有5个分区p1、p2、p3、p4、p5。那么c1消费topic1的哪个或者哪些分区呢?p1要被cg1的哪个消费者消费呢?

kafka2.2.0源码中有一个PartitionAssignor接口(在kafka-client.jar的org.apache.kafka.clients.consumer.internals包中),该接口有2个实现类:AbstractPartitionAssignor(在kafka-client.jar的org.apache.kafka.clients.consumer.internals包中)和StreamsPartitionAssignor(在kafka-stream.jar的org.apache.kafka.streams.processor.internals包中)。AbstractPartitionAssignor有3个子类:RangeAssignor(在kafka-client.jar的org.apache.kafka.clients.consumer包中)、RoundRobinAssignor(在kafka-client.jar的org.apache.kafka.clients.consumer包中)、StickyAssignor(在kafka-client.jar的org.apache.kafka.clients.consumer包中)。这些非抽象类分别实现了不同的策略:

 

记录在消费者这边用ConsumerRecord表示,成员变量有:

String topic、int partition、long offset、long timestamp、TimestampType timestampType、int serializedKeySize、int serializedValueSize、Headers headers、K key、V value、Optional<Integer> leaderEpoch

其中leaderEpoch最不好理解,其意思是???

KafkaConsumer实例就是一个kafka消费者客户端,从kafka集群消费记录。消费者客户端透明地处理kafka代理的故障,透明地适应主题分区。同一消费组的各消费者会负载均衡消息的消费。消费者维护与kafka代理的TCP连接,以取数据(fetch data)。消费者客户端在使用完之后要close,否则就会有资源泄露。和生产者客户端不同,消费者客户端不是线程安全的。

Offsets and Consumer Position
Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There are actually two notions of position relevant to the user of the consumer:
The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to {@link #poll(Duration)}.
The {@link #commitSync() committed position} is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. {@link #commitSync() commitSync} and {@link #commitAsync(OffsetCommitCallback) commitAsync}).
This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below.

简单用例:

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.*;public class KafkaConsumerTest {    public static void main(String[] args) {        Properties props = new Properties();        props.setProperty("bootstrap.servers", "192.168.56.100:9092");        props.setProperty("group.id", "my-test-consumer-group");        props.setProperty("enable.auto.commit", "true");        props.setProperty("auto.commit.interval.ms", "100");        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        KafkaConsumer
consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords
records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord
record : records) { System.out.println(record); } } }}

消费者客户端从kafka集群拉数据的方式是poll(Duration timeout),返回ConsumerRecords类型。ConsumerRecords类实现了Iterable<ConsumerRecord<K, V>>接口。在消费者这里,记录用ConsumerRecord实例表示。ConsumerRecords表示记录的集合。

 

转载于:https://www.cnblogs.com/koushr/p/5873375.html

你可能感兴趣的文章
闭包的常见用处
查看>>
中联通4月份3G用户净增181.7万总数突破2000万
查看>>
cJSON填坑记
查看>>
css3(border-radius)边框圆角详解(转)
查看>>
hdu1406
查看>>
Android 开发工具下载中文网站
查看>>
Redis 列表处理
查看>>
android 股票数据通过日K获取周K的数据 算法 源码
查看>>
关于Linux运维的一些题目总结
查看>>
原生js实现查询天气的小应用
查看>>
分享两个必应壁纸接口,可用来获取高质量壁纸和故事
查看>>
tomcat启动脚本
查看>>
ASP.NET-FineUI开发实践-10
查看>>
小猪决定做一件尝试
查看>>
linux下jdk的安装:
查看>>
Ajax_ajax模板引擎 ---tmplate.js处理数据和标签拼接
查看>>
微信小程序-下拉松开弹不回去顶部留一段空白
查看>>
[摘录]感受弗兰克尔的故事
查看>>
jmeter响应时间与postman响应时间为什么不一样?
查看>>
HTTPonly属性
查看>>