if(){}else if(){}else{thrownew new xxException()} 为什么这样行不通?

以下试题来自:
单项选择题给出下列的不完整的方法,则哪个声明可以被加入①行完成此方法的声明(
② {success=connect();
③ if(success==-1){
④ throw new TimedOutException();
⑥ }A.public void method()B.public void method()throws ExceptionC.public void method()throw TimedOutExceptionD.public throw TimedOutException void method()
为您推荐的考试题库
你可能感兴趣的试题
1A.intB.voidC.booleanD.static2A.便于随机存取B.花费的存储空间较顺序存储少C.便于插入和删除操作D.数据元素的物理顺序与逻辑顺序相同
热门相关试卷
最新相关试卷输入关键字进行搜索
使用php start.php start时在终端无法看到抛错信息
代码如下:
if&(!empty($auth))&{
&&&&&&&&try{
&&&&&&&&&&&&$re_obj-&auth($auth);
&&&&&&&&catch(\Exception&$e){
&&&&&&&&&&&&throw&new&\Exception('redis密码错误或连接错误!');
是不是不能这样用?还是有其他的使用方法?
debug方式能看到输出。
另外最好把异常自己捕获下,不然进程就退出了,影响性能。
要回复问题请先或
关注: 2 人
Powered by有什么好的解决方案if (null == attend) {
throw new IncompleteStaffException(1);
if (null == password) {
throw new IncompleteStaffException(2);
if (null == name) {
throw new IncompleteStaffException(3);
if (null == corpMail) {
throw new IncompleteStaffException(4);
if (null == idCard) {
throw new IncompleteStaffException(5);
if (null == education) {
throw new IncompleteStaffException(6);
if (null == jobs) {
throw new IncompleteStaffException(7);
if (null == exp) {
throw new IncompleteStaffException(8);
if (null == department) {
throw new IncompleteStaffException(9);
if (null == state) {
throw new IncompleteStaffException(10);
if (null == dateEn) {
throw new IncompleteStaffException(11);
用hibernate作为持久层实现吗?可以试试hibernate validation:// In your User object@NotnullObject attend;@NotnullS@NotnullS...
private boolean _staffValidation(Staff staff) throws RepeatAttendException, RepeatCompanyEmailException, RepeatLicenseException, RepeatIdentityCardException, IncompleteStaffException, InvalidStaffProperty {
String method = "_staffValidation";
if (null == staff) {
// Required properties validation
String attend = staff.getAttend();
String password = staff.getPassword();
String name = staff.getName();
String corpMail = staff.getCorpMail();
String idCard = staff.getIdCard();
Integer education = staff.getEducation();
Integer jobs = staff.getJobs();
Integer exp = staff.getExp();
Integer department = staff.getDepartment();
Integer state = staff.getState();
Date dateEn = staff.getDateEn();
_logger.debug(method, "attend=" + attend);
_logger.debug(method, "password=" + password);
_logger.debug(method, "name=" + name);
_logger.debug(method, "corpMail=" + corpMail);
_logger.debug(method, "idCard=" + idCard);
_logger.debug(method, "education=" + education);
_logger.debug(method, "jobs=" + jobs);
_logger.debug(method, "exp=" + exp);
_logger.debug(method, "department=" + department);
_logger.debug(method, "state=" + state);
_logger.debug(method, "dateEn=" + dateEn);
// Null properties check
if (null == attend) {
throw new IncompleteStaffException(1);
if (null == password) {
throw new IncompleteStaffException(2);
if (null == name) {
throw new IncompleteStaffException(3);
if (null == corpMail) {
throw new IncompleteStaffException(4);
if (null == idCard) {
throw new IncompleteStaffException(5);
if (null == education) {
throw new IncompleteStaffException(6);
if (null == jobs) {
throw new IncompleteStaffException(7);
if (null == exp) {
throw new IncompleteStaffException(8);
if (null == department) {
throw new IncompleteStaffException(9);
if (null == state) {
throw new IncompleteStaffException(10);
if (null == dateEn) {
throw new IncompleteStaffException(11);
// Properties validation
if (!ValidatorUtil.isAttend(attend)) {
throw new InvalidStaffProperty(1);
if (!ValidatorUtil.isPassword(password)) {
throw new InvalidStaffProperty(2);
if (!ValidatorUtil.isName(name)) {
throw new InvalidStaffProperty(3);
if (!ValidatorUtil.isEmail(corpMail)) {
throw new InvalidStaffProperty(4);
if (!ValidatorUtil.isIdentityCard(idCard)) {
throw new InvalidStaffProperty(5);
//exp must between after has been born 15years later and dateEn
int dateEnYear = Integer.parseInt(DateUtil.parseString(dateEn).substring(0, 4));
int afterBirth15 = Integer.parseInt(idCard.substring(6, 10)) + 15;
if (exp & afterBirth15 || exp & dateEnYear) {
throw new InvalidStaffProperty(6);
if (!corpMail.endsWith("@cn.ufinity.com")) {
throw new InvalidStaffProperty(7);
boolean bool = _notRequiredPropertiesValidation(staff);
if (bool) {
_repeatStaffProperties(staff);
_logger.debug(method,
"Validation failure, Not required properties invalid!");
}[该贴被xuezhongde于 17:15修改过]
我以为,没必要剥离validation到entity外面,validation logic是entity的一部分PS,对你的这个private boolean _staffValidation(Staff staff) throws RepeatAttendException,RepeatCompanyEmailException, RepeatLicenseException,RepeatIdentityCardException, IncompleteStaffException,InvalidStaffProperty 我无语...
该验证是解析Excel后,将一行信息封装成Staff对象,要验证Staff的有效性,你的好方法应该是怎么处理?
企业级软件架构解决之道
如有意见请与我们联系 Powered by JdonFrameworkkafka源码分析之consumer的源码
Consumer的client端
Propertiesprops=newProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,&localhost:9092&);
props.put(ConsumerConfig.GROUP_ID_CONFIG,&DemoConsumer&);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,&false&);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,&30000&);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
&org.apache.kafka.common.serialization.ByteArrayDeserializer&);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
&org.apache.kafka.common.serialization.ByteArrayDeserializer&);
consumer=newKafkaConsumer&&(props);
this.topic=
consumer.subscribe(Collections.singletonList(this.topic));
//下面的传入一个listener这个部分的注释如果需要对partition在当前的consumer中分配或者取消分配时做一定的操作时(比如取消分配时提交offset),可以实现这个接口。
//subscribe(Listtopics,ConsumerRebalanceListenerlistener)
while(true){
ConsumerRecordsrecords=consumer.poll(1000);
for(ConsumerRecordrecord:records){
System.out.println(&Receivedmessage:(&+record.key()+&,&+record.value()
+&)atoffset&+record.offset());
consumer.commitSync()
生成KafkaConsumer实例
@SuppressWarnings(&unchecked&)
privateKafkaConsumer(ConsumerConfigconfig,
DeserializerkeyDeserializer,
DeserializervalueDeserializer){
log.debug(&StartingtheKafkaconsumer&);
根据配置信息,得到如下三个配置的配置值,并检查配置的合法:
1,读取request.timeout.ms配置项的值,默认值为40秒。用于配置请求的超时时间。
2,读取session.timeout.ms配置项的值,默认值为30秒,用于配置当前的consumer的session的超时时间,也就是client端多长时间不给server发送心跳就表示这个client端超时。
3,读取fetch.max.wait.ms配置项的值,默认值为500ms。用于配置从server中读取数据最长的等待时间。
this.requestTimeoutMs=config.getInt(
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
intsessionTimeOutMs=config.getInt(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
intfetchMaxWaitMs=config.getInt(
ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
如果请求超时时间不是一个大于session的超时时间的值或者请求超时时间不是一个大于fetch的最大等待时间的值时,表示requestTimeoutMs的配置不合法,直接throwexception.
if(this.requestTimeoutMs&=sessionTimeOutMs||
this.requestTimeoutMs&=fetchMaxWaitMs)
thrownewConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG
+&shouldbegreaterthan&+ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG
+&and&+ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time=newSystemTime();
MetricConfigmetricConfig=newMetricConfig().samples(
config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(
ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
这里得到对应的consumer的client端id的client.id配置,如果这个值没有配置时,默认随机生成一个。
clientId=config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if(clientId.length()&=0)
clientId=&consumer-&+CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
Listreporters=config.getConfiguredInstances(
ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(newJmxReporter(JMX_PREFIX));
this.metrics=newMetrics(metricConfig,reporters,time);
读取retry.backoff.ms配置的值,默认值为100ms,用于配置重试的间隔周期。
this.retryBackoffMs=config.getLong(
ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
根据重试的间隔周期加上metadata.max.age.ms配置项的值生成Metadata实例,
配置metadata.max.age.ms项默认值为5分钟,用于设置metadata定期重新读取的生命周期。
this.metadata=newMetadata(retryBackoffMs,
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
读取bootstrap.servers配置的要读取的kafkabrokers的配置列表,并根据broker的连接信息,生成Cluster实例,并把Cluster实例更新到metadata的实例。
Listaddresses=ClientUtils.parseAndValidateAddresses(
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses),0);
StringmetricGrpPrefix=&consumer&;
MapmetricsTags=newLinkedHashMap();
metricsTags.put(&client-id&,clientId);
生成NetworkClient的实例,生成实例需要如下几个配置文件:
1,配置项connections.max.idle.ms,默认值9分钟,用于配置连接最大的空闲时间(每个连接的最大连接队列为100)。
2,配置项reconnect.backoff.ms,默认值50ms,用于配置连接断开后重新连接的间隔时间。
3,配置项send.buffer.bytes,默认值128kb,用于配置SOCKET的SO_SNDBUF发送数据的缓冲区大小。
4,配置项receive.buffer.bytes,默认值32kb,用于配置SOCKET的SO_RCVBUF接收数据的缓冲区大小。
5,读取request.timeout.ms配置项的值,默认值为40秒。用于配置请求的超时时间。
ChannelBuilderchannelBuilder=ClientUtils.createChannelBuilder(
config.values());
NetworkClientnetClient=newNetworkClient(
newSelector(config.getLong(
ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics,time,metricGrpPrefix,metricsTags,channelBuilder),
this.metadata,
100,//afixedlargeenoughvaluewillsuffice
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),time);
根据retry.backoff.ms配置的值,生成一个ConsumerNetworkClient的实例。
this.client=newConsumerNetworkClient(netClient,metadata,time,
retryBackoffMs);
读取auto.offset.reset配置项的值,默认值为latest。可配置(&latest&,&earliest&,&none&),这个配置用于在读取partition的offset超出范围时,对offset进行重置的规则。
OffsetResetStrategyoffsetResetStrategy=OffsetResetStrategy.valueOf(
config.getString(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
生成用于管理订阅的topic的partition的状态管理的,用于管理partition的状态与当前的offset的信息。
this.subscriptions=newSubscriptionState(offsetResetStrategy);
生成用于管理相同的一个groupId下的多个client端的partition的分区控制,
通过partition.assignment.strategy配置,默认实例为RangeAssignor。
Listassignors=config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
生成用于对consumer进行协调的实例,这个实例依赖如下配置:
1,配置项group.id,用于配置consumer对应的订阅的组名称,相同的组的多个client端进行协调消费处理。
2,读取session.timeout.ms配置项的值,默认值为30秒,用于配置当前的consumer的session的超时时间,也就是client端多长时间不给server发送心跳就表示这个client端超时。
3,配置项heartbeat.interval.ms,默认值3秒,用于定时向server发送心跳的时间间隔。
4,根据retry.backoff.ms配置的值来设置读取信息失败的重试间隔。
5,配置项enable.auto.commit,默认值true,设置是否自动提交消费过的offset的值的设置。
5,配置项auto.commit.interval.ms,默认值5秒,如果设置有自动提交offset时,自动提交的间隔时间。
this.coordinator=newConsumerCoordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors,
this.metadata,
this.subscriptions,
metricGrpPrefix,
metricsTags,
this.time,
retryBackoffMs,
newConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
根据key.deserializer配置与value.deserializer配置的key,value的反序列化的配置,生成反序列化消息的实例。这个类必须是实现Deserializer接口的类。
if(keyDeserializer==null){
this.keyDeserializer=config.getConfiguredInstance(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.keyDeserializer.configure(config.originals(),true);
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer=keyD
if(valueDeserializer==null){
this.valueDeserializer=config.getConfiguredInstance(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.valueDeserializer.configure(config.originals(),false);
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer=valueD
生成用于具体读取消息的读取实例,这个实例依赖如下几个配置信息:
1,配置项fetch.min.bytes,默认值1,用于设置每次读取的最小byte数。
2,配置项fetch.max.wait.ms,默认值500ms,用于设置每次读取的最大等待时间。
3,配置项max.partition.fetch.bytes,默认值1MB,用于设置每个partition每次读取的最大的数据量。
4,配置项check.crcs,默认值true,用于设置是否校验数据的完整性。
根据retry.backoff.ms配置的值来设置读取信息失败的重试间隔。
this.fetcher=newFetcher&&(this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
this.subscriptions,
metricGrpPrefix,
metricsTags,
this.time,
this.retryBackoffMs);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX,clientId);
log.debug(&Kafkaconsumercreated&);
}catch(Throwablet){
//callclosemethodsifinternalobjectsarealreadyconstructed
//thisistopreventresourceleak.seeKAFKA-2121
close(true);
//nowpropagatetheexception
thrownewKafkaException(&Failedtoconstructkafkaconsumer&,t);
要对一个topic进行订阅时,在consumer生成后,通过调用如下的函数来进行订阅,
第二个参数是对一个partition的分配与取消分析时的监听操作,可用于监听对分配到当前的consumer或者取消时的其它操作。
publicvoidsubscribe(Listtopics,ConsumerRebalanceListenerlistener){
acquire();
if(topics.isEmpty()){
//treatsubscribingtoemptytopiclistasthesameasunsubscribing
this.unsubscribe();
log.debug(&Subscribedtotopic(s):{}&,Utils.join(topics,&,&));
通过对订阅的状态管理的组件对需要订阅的topic进行处理。
把订阅的topic集合添加到subscriptions实例中的subscription集合与groupSubscription集合中,设置needsPartitionAssignment属性的值为true,表示需要重新分配partition在当前的consumer中。
this.subscriptions.subscribe(topics,listener);
把订阅的topic集合添加到metadata的topics的集合中,如果传入的topic集合有不包含在当前的metadata中的topic时,表示metadata需要更新,设置metadata的needUpdate属性为true.
metadata.setTopics(subscriptions.groupSubscription());
release();
读取订阅的topic的消息
外部调用入口
读取订阅的topic的消息时通过KafkaConsumer中的poll函数来进行读取,这个函数需要传入一个timeout的时间值,必须是一个大于0的值。
publicConsumerRecordspoll(longtimeout){
acquire();
if(timeout&0)
thrownewIllegalArgumentException(&Timeoutmustnotbenegative&);
//pollfornewdatauntilthetimeoutexpires
longstart=time.milliseconds();
longremaining=
这里根据设置的超时时间,读取订阅的topic中对应此consumer分配的partition的数据,每次读取根据读取的时长,来计算这个while可用于超时的剩余等待时间,如果读取到数据,就直接返回这个数据,while迭代停止,否则等到指定的超时时间到达。
这里检查group是否被加入到kafkaleader中,检查订阅的topic是否被分配对应的消费的partitions,
从当前的cache中读取records的消息记录,如果读取不到,通过fetcher发起一个请求,并重新读取。
这里如果cache中没有数据时,会先通过fetcher.initFetchers进行一次数据的fetch的消费请求,最长等待的超时时间就是传入的剩余的超时时间。如果没有读取到消费,这里返回的值是一个空的集合。
Map<topicpartition,list<consumerrecord&&records=</topicpartition,list<consumerrecord
pollOnce(remaining);
if(!records.isEmpty()){
如果当前的cache中已经被到数据(或者cache中没有数据也会发起一次请求,并等待响应结果),
这里重新向server进行fetch数据的请求,用于异步向server发起一个请求,并把请求的结果数据存储到cache中,下一次fetchRecords时,直接使用这个cache的数据。
fetcher.initFetches(metadata.fetch());
client.quickPoll();
returnnewConsumerRecords&&(records);
longelapsed=time.milliseconds()-
remaining=timeout-
}while(remaining&0);
如果在指定的超时时间内没有读取到数据,直接返回一个空的集合。
returnConsumerRecords.empty();
release();
读取要消费的记录
在外部调用poll函数时,会通过pollOnce函数来得到一组要消费的数据。
privateMap<topicpartition,list<consumerrecord&&pollOnce(longtimeout){</topicpartition,list<consumerrecord
这里首先检查与kafkaleader中的consumerCoordinator的实例进行交互的client是否初始化,如果没有或者连接被断开,这里需要重新对连接进行初始化操作。
可参见consumer的groupcoordinator中的检查并初始化coordinator的连接部分。
//TODO:Sub-requestsshouldtakeintoaccountthepolltimeout(KAFKA-1894)
coordinator.ensureCoordinatorKnown();
如果consumer是直接针对一个topic进行的订阅时,partitionsAutoAssigned函数返回的值是true.
如果这个函数返回的值是true时,通过coordinator执行partition的分配。
这里通过ConsumerCoordinator中的consumer的group加入部分来进行处理,会执行如下操作:
1,向coordinator对应的broker发起joinGroup的请求,
2,根据joinGroup得到的响应如果是leader时,对当前的group所有的consumers进行订阅的topic的partition的分配,
3,向coordinator进行syncGroup的操作,把分配的每一个consumer对应的partition同步到broker中。
//ensurewehavepartitionsassignedifweexpectto
if(subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
这里检查是否有这个consumer需要消费的partition集合中,是否有没有初始化需要fetch的offset的partition,如果有,执行updateFetchPositions来更新这些个partition.
得到当前消费的partition对应的offset的位置的函数处理流程:
1,检查subscriptions(订阅的状态)中needsFetchCommittedOffsets属性是否为true,如果为true表示有partition需要更新fetch的offset的值。
2,根据当前的groupId与这个consumer对应的partitions发起一个OffsetFetchRequest请求向coordinator对应的broker节点并得到每个partition响应的offset的metadata信息,如果这个partition对应的消费记录的offset已经过期或者不存在是,返回的是一个NoOffset的实例(offset为-1)。
3,根据得到的partition的集合对应的offset的值,
更新到partition的状态管理TopicPartitionState实例的committed属性中。这里需要注意的是如果对应的partition返回的offset是NoOffset的实例时,offset的值为-1,这个时候不会被设置到committed的属性中,这个时候默认情况下committed的值还是一个null值。
4,,设置subscriptions对应的订阅状态管理中的needsFetchCommittedOffsets属性的值为false.表示fetchCommittedoffset的过程已经被执行在当前分配的partitions中。
5,根据得到的committed的offset的值,迭代分配的partition的集合,设置每一个partition开始进行数据读取的position的值,
5,1,如果partition对应的管理状态的TopicPartitionState实例中awaitingReset属性值为true,这种情况表示这个partition没有得到对应的partition的offset(过期或第一次消费,或者说这个offset超过了当前partition中log的offset的范围),这个时候向这个partition对应的leader节点发起一个ListOffsetRequest(其实就是OffsetRequest)请求,这个请求直接得到当前partition的log中最大或者最小的offset的值。得到具体最大或是最小的offset的值通过auto.offset.reset配置项的值,默认值为latest。
执行seek函数,更新这个partition对应的TopicPartitionState实例中position的值为请求返回的offset值,设置awaitingReset属性值为false,设置hasValidPosition属性值为true,表示这个partition已经seek到指定要读取的位置。seek作用于把partition的offset设置到一个开始读取的位置上。
5,2,如果partition对应的TopicPartitionState状态实例中committed属性值为null,表示这个partition是第一次消费或者说这个partition的上次消费时间已经达到过期的时间,被删除掉了,设置awaitingReset属性值为true,表示这个partition需要重置offset.执行offset的重置操作,通过resetOffset函数。向这个partition对应的leader节点发起一个ListOffsetRequest(其实就是OffsetRequest)请求,这个请求直接得到当前partition的log中最大或者最小的offset的值。得到具体最大或是最小的offset的值通过auto.offset.reset配置项的值,默认值为latest。
执行seek函数,更新这个partition对应的TopicPartitionState实例中position的值为请求返回的offset值,设置awaitingReset属性值为false,设置hasValidPosition属性值为true,表示这个partition已经seek到指定要读取的位置。seek作用于把partition的offset设置到一个开始读取的位置上。
5,3,这个情况表示committed属性的值是一个正常的offset的值,得到这个committed的offset的值,执行seek函数,把position属性的值设置为这个committed的offset的值。设置awaitingReset属性值为false,设置hasValidPosition属性值为true,表示这个partition已经seek到指定要读取的位置。
//fetchpositionsifwehavepartitionswe&#39;resubscribedtothatwe
//don&#39;tknowtheoffsetfor
if(!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
//initanynewfetches(won&#39;tresendpendingfetches)
Clustercluster=this.metadata.fetch();
Map<topicpartition,list<consumerrecord&&records=</topicpartition,list<consumerrecord
fetcher.fetchedRecords();
if(!records.isEmpty()){
fetcher.initFetches(cluster);
client.poll(timeout);
returnfetcher.fetchedRecords();
从Fetcher的cache中读取已经加载到的数据
这个部分在consumer在执行poll函数-&pollOnce函数时,会先从通过Fetcher实例中的fetchedRecords函数来从cache中读取已经加载过来的消息。下面来看看这个函数的处理流程:
publicMap<topicpartition,list<consumerrecord&&fetchedRecords(){
if(this.subscriptions.partitionAssignmentNeeded()){</topicpartition,list<consumerrecord
如果订阅的状态中needsPartitionAssignment属性的值为true表示partition需要重新进行分配,直接返回一个空的集合。
returnCollections.emptyMap();
Map<topicpartition,list<consumerrecord&&drained=newHashMap&&();</topicpartition,list<consumerrecord
1,这里检查是否有position的值超出了对应的partition的offset的范围,
迭代offsetOutOfRangePartitions集合,这个集合中存储了上一次超出范围的partition的offset的集合,取出当前这个partition对应的position的值,如果这个值与这个集合中对应的partition的offset的值相同时,throw一个OffsetOutOfRangeExceptionexception.同时清空这个offsetOutOfRangePartitions集合。
throwIfOffsetOutOfRange();
2,检查是否有未认证通过的topics的集合(unauthorizedTopics不为空),
如果有throw一个TopicAuthorizationException的exception,
同时清空TopicAuthorizationException这个集合。
throwIfUnauthorizedTopics();
3,检查recordTooLargePartitions集合(存储有消息记录超过指定大小的partition的集合)是否有值,如果有值,表示有消息大小超出了限制,throw一个RecordTooLargeException的exception.同时清空这个集合。
throwIfRecordTooLarge();
流程执行到这里,表示上面的3个检查都已经完成,消息记录正常,读取cache中已经加载过来的消息,并迭代每一条消息进行包装并返回。
for(PartitionRecordspart:this.records){
这里先检查对应这条消息的partition是否在当前的consumer是一个分配的partition,如果不是,直接continue,这个情况有可能发生在cache完成后,partition进行了重新分配,原来在这个consumer进行消费的partition被rebalance到其它的consumer中。
if(!subscriptions.isAssigned(part.partition)){
log.debug(&Notreturningfetchedrecordsforpartition{}sinceitis
nolongerassigned&,part.partition);
得到当前这个partition消息到的position的值,也就是开始的offset的值。
//notethattheconsumedpositionshouldalwaysbeavailable
//aslongasthepartitionisstillassigned
longposition=subscriptions.position(part.partition);
如果当前的partition的不是处理fetchable的状态(被暂停或者position没有准备好),打印一个提示日志。
if(!subscriptions.isFetchable(part.partition)){
log.debug(&Notreturningfetchedrecordsforassignedpartition{}
sinceitisnolongerfetchable&,part.partition);
}elseif(part.fetchOffset==position){
这种情况下,表示当前这个partition中开始进行fetch的offset的值与当前consumer中存储的position的位置是一个相同的值,得到这个partition中加载到的所有的消息中的最后一条消息的offfset,并把这个offset加一,得到的nextOffset的值就是下一次要读取的开始位置。
longnextOffset=part.records.get(
part.records.size()-1).offset()+1;
log.trace(&Returningfetchedrecordsatoffset{}forassigned
partition{}andupdate&+
&positionto{}&,position,part.partition,nextOffset);
把这个partition已经加载过来的消息的集合添加到要返回的map中这个partition对应的位置。
List<consumerrecord&records=drained.get(part.partition);
if(records==null){
records=part.records;
drained.put(part.partition,records);
records.addAll(part.records);
}</consumerrecord
更新这个对应的partition的下一次读取数据的开始位置为nextOffset的值,这个nextOffset其实就是当前partition已经加载到的数据的最后一条记录的offset的值加一。
subscriptions.position(part.partition,nextOffset);
log.debug(&Ignoringfetchedrecordsfor{}atoffset{}sincethe
currentpositionis{}&,
part.partition,part.fetchOffset,position);
最后,清空cache,并返回对cache进行处理后的集合,这个集合中存储了每个partition对应的读取到的消息集合。
this.records.clear();
异步加载数据到cunsumer的缓存
在每次执行poll操作来加载数据时,从cache中读取完成数据后,或者说当前的cache中没有数据时,会通过Fetcher中的initFetchers函数来进行数据的异步加载操作。
下面先看看这个函数的发起fetch的请求部分:
publicvoidinitFetches(Clustercluster){
首先根据cluster中所有的节点生成FetchRequest的请求,这里迭代当前的consumer中所有要消息的partition并得到对应的node节点,根据node节点中包含的partition(leader的副本对应的节点)为key,value是这个节点中所有的partition的节点的请求信息,每一个节点生成一个FetchRequest的信息(包含要消费的partition对应的node),
如果partition对应的leader的node在consumer的metadata中不存在,表示这个元数据需要被更新,设置metadata实例中的needUpdate属性为true,并跳过这个节点的分配,
这里的createFetchRequests函数中返回的只包含目前metadata中包含有元数据的节点的集合。
for(Map.EntryfetchEntry:
createFetchRequests(cluster).entrySet()){
finalFetchRequestfetch=fetchEntry.getValue();
向所有的已经连接的node节点发起FETCH的请求。如果请求成功,执行handleFetchResponse的函数处理来对响应的结果进行解析。
client.send(fetchEntry.getKey(),ApiKeys.FETCH,fetch)
.addListener(newRequestFutureListener(){
publicvoidonSuccess(ClientResponseresponse){
handleFetchResponse(response,fetch);
publicvoidonFailure(RuntimeExceptione){
log.debug(&Fetchfailed&,e);
接下来看看fetch请求响应成功的响应结果的处理函数:
privatevoidhandleFetchResponse(ClientResponseresp,FetchRequestrequest){
inttotalBytes=0;
inttotalCount=0;
FetchResponseresponse=newFetchResponse(resp.responseBody());
这里迭代这个broker的响应信息,每一个partition的响应表示一次迭代。
for(Map.Entryentry:
response.responseData().entrySet()){
TopicPartitiontp=entry.getKey();
FetchResponse.PartitionDatapartition=entry.getValue();
if(!subscriptions.isFetchable(tp)){
log.debug(&Ignoringfetchedrecordsforpartition{}sinceitisnolonger
fetchable&,tp);
}elseif(partition.errorCode==Errors.NONE.code()){
这个情况,表示partition正常fetch到了数据,对fetch到的数据进行解析,并进行cache操作。
得到这个请求开始的offset,也就是上一次fetch时记录下来的position的值。
longfetchOffset=request.fetchData().get(tp).offset;
得到当前订阅中对应这个partition的position的值,这个值必须与这一次请求的offset的值相同,因为fetch的请求是通过这个position当成的开始的offset的值,如果这个值不相同,表示这个线程中间可能产生了其它的并发问题。
Longposition=subscriptions.position(tp);
if(position==null||position!=fetchOffset){
log.debug(&Discardingfetchresponseforpartition{}sinceitsoffset
{}doesnotmatch&+
&theexpectedoffset{}&,tp,fetchOffset,position);
对这个partition响应的消息集合进行解析,并迭代这个消息集合,把每一条消息添加到一个临时的parsed的集合中,当对这个响应的消息集合迭代完成后,这个parsed的临时集合不为空时,把这个集合添加到consumer的cache的records集合中,记录这个partition开始fetch的offset,对应的partition与消息集合。
如果迭代完成后,响应消息的buffer中还有未进行处理的数据,表示这次请求响应的数据大于了设置的最大的响应消息的大小,把这个partition添加到recordTooLargePartitions集合中,表示这个partition这次的消息响应结果太大。
intbytes=0;
ByteBufferbuffer=partition.recordSet;
MemoryRecordsrecords=MemoryRecords.readableRecords(buffer);
List<consumerrecord&parsed=newArrayList&&();
for(LogEntrylogEntry:records){
//Skipthemessagesearlierthancurrentposition.
if(logEntry.offset()&=position){
parsed.add(parseRecord(tp,logEntry));
bytes+=logEntry.size();
if(!parsed.isEmpty()){
log.trace(&Addingfetchedrecordforpartition{}withoffset{}to</consumerrecord
bufferedrecordlist&,tp,position);
ConsumerRecordrecord=parsed.get(parsed.size()-1);
this.records.add(newPartitionRecords&&(fetchOffset,tp,parsed));
this.sensors.recordsFetchLag.record(
partition.highWatermark-record.offset());
}elseif(buffer.limit()&0){
//wedidnotreadasinglemessagefromanon-emptybuffer
//becausethatmessage&#39;ssizeislargerthanfetchsize,inthiscase
//recordthisexception
this.recordTooLargePartitions.put(tp,fetchOffset);
this.sensors.recordTopicFetchMetrics(tp.topic(),bytes,parsed.size());
totalBytes+=
totalCount+=parsed.size();
如果执行fetch操作,对应这个fetch的partition响应的代码是如下两个代码时,表示这个partition的leader发生了变化,设置metadata的needUpdate属性为true,表示需要重新更新这个topic对应的metadata的信息。
elseif(partition.errorCode==Errors.NOT_LEADER_FOR_PARTITION.code()
||partition.errorCode==Errors.UNKNOWN_TOPIC_OR_PARTITION.code()){
this.metadata.requestUpdate();
如果执行fetch操作时,请求过去的position的offset超出了这个partition的offset的范围时,
如果auto.offset.reset配置的值不是NONE时,设置这个partition的状态中的awaitingReset属性为true,设置position的值为Null,设置hasValidPosition属性为false,设置重置offset时,读取offset的值LATEST是或者是EARLIEST,通过得到auto.offset.reset配置的值。表示这个partition需要重置offset.否则把这个partition添加到offsetOutOfRangePartitions集合中,表示这个partition的offset请求超出了log中offset的范围。
elseif(partition.errorCode==Errors.OFFSET_OUT_OF_RANGE.code()){
longfetchOffset=request.fetchData().get(tp).offset;
if(subscriptions.hasDefaultOffsetResetPolicy())
subscriptions.needOffsetReset(tp);
this.offsetOutOfRangePartitions.put(tp,fetchOffset);
log.info(&Fetchoffset{}isoutofrange,resettingoffset&,fetchOffset);
如果响应的代码是TOPIC_AUTHORIZATION_FAILED代码时,表示当前的consumer没有操作这个partition对应的topic,直接把这个partition对应的topic添加到unauthorizedTopics集合中。
elseif(partition.errorCode==Errors.TOPIC_AUTHORIZATION_FAILED.code()){
log.warn(&Notauthorizedtoreadfromtopic{}.&,tp.topic());
unauthorizedTopics.add(tp.topic());
}elseif(partition.errorCode==Errors.UNKNOWN.code()){
log.warn(&Unknownerrorfetchingdatafortopic-partition{}&,tp);
thrownewIllegalStateException(&Unexpectederrorcode&+
partition.errorCode+&whilefetchingdata&);
this.sensors.bytesFetched.record(totalBytes);
this.sensors.recordsFetched.record(totalCount);
this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
this.sensors.fetchLatency.record(resp.requestLatencyMs());
ConsumerNetworkClient中的poll函数
这个函数用于具体的请求的发送操作,传入参数中的第三个参数表示是否需要计算并执行consumer中定义的一些定时器任务,比如心跳与设置有自动提交offset。
privatevoidpoll(longtimeout,longnow,booleanexecuteDelayedTasks){
通过trySend来执行请求队列中需要发送的所有的请求。在这个过程中,如果请求对应的broker的节点还没有建立连接或者连接失效,会重新创建这个broker的连接。
//sendalltherequestswecansendnow
trySend(now);
//ensurewedon&#39;tpollanylongerthanthedeadlinefor
//thenextscheduledtask
timeout=Math.min(timeout,delayedTasks.nextTimeout(now));
这里的clientPoll函数处理几下几个操作:
1,如果metadata被标记为需要更新时,执行metadata的更新操作重新读取这个consumer中订阅的所有的topic的metadata的信息。
2,对发送成功,接收到broker响应,连接被关闭,连接被打开,请求超时的selector进行处理,如果处理的是连接被关闭或者请求超时时,会关闭对应的node的连接,同时设置metadata的needUpdate属性为true,这个时候需要重新更新metadata.
clientPoll(timeout,now);
now=time.milliseconds();
这里处理node连接失败的节点的请求,把这个节点的请求移出。
checkDisconnects(now);
检查是否有定时间隔达到需要进行执行的定时器任务,如果有执行这个任务。
//executescheduledtasks
if(executeDelayedTasks)
delayedTasks.poll(now);
//tryagaintosendrequestssincebufferspacemayhavebeen
//clearedoraconnectfinishedinthepoll
trySend(now);
对失败的请求进行处理,主要是调用对应请求的回调函数来进行处理。
//failallrequeststhatcouldn&#39;tbesent
failUnsentRequests();
consumer的groupcoordinator
ConsumerCoordinator实例生成
在KafkaConsumer实例生成时,会生成一个ConsumerCoordinator的实例,这个实例用于管理group的加入,partition的分配,当前的client的心跳管理。
在KafkaConsumer实例生成时:
这个实例依赖如下配置:
1,配置项group.id,用于配置consumer对应的订阅的组名称,相同的组的多个client端进行协调消费处理。
2,读取session.timeout.ms配置项的值,默认值为30秒,用于配置当前的consumer的session的超时时间,也就是client端多长时间不给server发送心跳就表示这个client端超时。
3,配置项heartbeat.interval.ms,默认值3秒,用于定时向server发送心跳的时间间隔。
4,根据retry.backoff.ms配置的值来设置读取信息失败的重试间隔。
5,配置项enable.auto.commit,默认值true,设置是否自动提交消费过的offset的值的设置。
5,配置项auto.commit.interval.ms,默认值5秒,如果设置有自动提交offset时,自动提交的间隔时间。
this.coordinator=newConsumerCoordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors,
this.metadata,
this.subscriptions,
metricGrpPrefix,
metricsTags,
this.time,
retryBackoffMs,
newConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
实例初始化的处理流程:
super(client,
sessionTimeoutMs,
heartbeatIntervalMs,
metricGrpPrefix,
metricTags,
retryBackoffMs);
this.metadata=
this.metadata.requestUpdate();
this.metadataSnapshot=newMetadataSnapshot();
this.subscriptions=
this.defaultOffsetCommitCallback=defaultOffsetCommitC
this.autoCommitEnabled=autoCommitE
this.assignors=
这里生成一个用于当metadata的内容发生变化时,也就是当订阅发生变化时,设置订阅状态管理组件中的needsPartitionAssignment属性为true,表示partition需要重新进行分配。
addMetadataListener();
根据是否设置有自动提交offset来生成自动提交offset的线程。
this.autoCommitTask=autoCommitEnabled?
newAutoCommitTask(autoCommitIntervalMs):null;
this.sensors=newConsumerCoordinatorMetrics(metrics,metricGrpPrefix,
metricTags);
这个实例的上层实例的初始化:
publicAbstractCoordinator(ConsumerNetworkClientclient,
StringgroupId,
intsessionTimeoutMs,
intheartbeatIntervalMs,
Metricsmetrics,
StringmetricGrpPrefix,
MapmetricTags,
longretryBackoffMs){
this.client=
this.time=
this.generation=OffsetCommitRequest.DEFAULT_GENERATION_ID;
this.memberId=JoinGroupRequest.UNKNOWN_MEMBER_ID;
this.groupId=groupId;
this.coordinator=null;
this.sessionTimeoutMs=sessionTimeoutMs;
生成用于发送心跳的管理组件。
this.heartbeat=newHeartbeat(this.sessionTimeoutMs,heartbeatIntervalMs,
time.milliseconds());
用于发送心跳的task.
this.heartbeatTask=newHeartbeatTask();
this.sensors=newGroupCoordinatorMetrics(metrics,metricGrpPrefix,
metricTags);
this.retryBackoffMs=retryBackoffMs;
这个实例中,默认初始化完成时,rejoinNeeded的值为true,表示需要对group进行加入操作,这个操作用于分配对应的partitions到当前的consumer中。
这里的rejoinNeeded的属性,如果join成功后,属性会被设置成false,如果失败(joinorsync)这个属性会被重新设置为true.
privatebooleanneedsJoinPrepare=true;
privatebooleanrejoinNeeded=true;
检查并初始化coordinator的连接
这个检查的过程在KafkaConsumer中fetch消息时,通过pollOnce函数进行调用。
由ConsumerCoordinator中的ensureCoordinatorKnown函数来实现:
publicvoidensureCoordinatorKnown(){
这里的coordinatorUnknown函数用于判断ConsumerCoordinator实例中的coordinator(对应这个groupId的partition的leader的node信息)属性是否为空或者是一个失败的连接,如果是,这里就一直迭代得到这个group的metadata的内容。
while(coordinatorUnknown()){
这里,如果请求不成功,一直迭代。
RequestFuturefuture=sendGroupMetadataRequest();
client.poll(future);
if(future.failed()){
if(future.isRetriable())
client.awaitMetadataUpdate();
throwfuture.exception();
接下来看看sendGroupMetadataRequest函数的流程:
在这个函数中,通过生成GroupCoordinatorRequest的请求。这个请求的请求key为GroupCoordinator,传入的参数就是当前的consumer对应的消费者的groupId的值。
privateRequestFuturesendGroupMetadataRequest(){
//initiatethegroupmetadatarequest
//findanodetoaskaboutthecoordinator
Nodenode=this.client.leastLoadedNode();
if(node==null){
//fromconfiguration?
returnRequestFuture.noBrokersAvailable();
//createagroupmetadatarequest
log.debug(&Issuinggroupmetadatarequesttobroker{}&,node.id());
GroupCoordinatorRequestmetadataRequest=
newGroupCoordinatorRequest(this.groupId);
returnclient.send(node,ApiKeys.GROUP_COORDINATOR,metadataRequest)
.compose(newRequestFutureAdapter(){
publicvoidonSuccess(ClientResponseresponse,
RequestFuturefuture){
handleGroupMetadataResponse(response,future);
接收对Group的metadata的响应:
这个操作通过handleGroupMetadataResponse函数来进行处理:
privatevoidhandleGroupMetadataResponse(ClientResponseresp,
RequestFuturefuture){
log.debug(&Groupmetadataresponse{}&,resp);
如果coordinatorUnknown的返回值是false,表示这个group对应的消费者offset记录的partition的leader已经存在,不处理。
if(!coordinatorUnknown()){
//Wealreadyfoundthecoordinator,soignoretherequest
future.complete(null);
这里读取kafkaserver响应回来的数据,这主要包含两部分数据,响应代码与此groupId对应存储消费元数据的topic的partition对应的leader的kafkaserver节点。
GroupCoordinatorResponsegroupCoordinatorResponse=
newGroupCoordinatorResponse(resp.responseBody());
//useMAX_VALUE-node.idasthecoordinatoridtomimicseparateconnections
//forthecoordinatorintheunderlyingnetworkclientlayer
//TODO:thisneedstobebetterhandledinKAFKA-1935
shorterrorCode=groupCoordinatorResponse.errorCode();
如果server返回来的代码是NONE,表示这是一个没有错误的请求处理,得到这个groupId对应要写入数据的partition的leader的连接信息,并生成这个broker的socket连接。
把HeartbeatTask(向节点发送心跳的管理组件)加入到调度器中,用于定时向server发送心跳。
if(errorCode==Errors.NONE.code()){
this.coordinator=newNode(Integer.MAX_VALUE-
groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());
client.tryConnect(coordinator);
//startsendingheartbeatsonlyifwehaveavalidgeneration
if(generation&0)
heartbeatTask.reset();
future.complete(null);
}elseif(errorCode==Errors.GROUP_AUTHORIZATION_FAILED.code()){
future.raise(newGroupAuthorizationException(groupId));
future.raise(Errors.forCode(errorCode));
管理与group对应的partition的leader节点的心跳
当group加入并成功完成对partitions的分配后,会启动用于心跳管理的HeartbeatTask定时器。
当执行ConsumerNetworkClient的poll函数时,如果executeDelayedTasks参数传入是true时,会检查当前的执行时间是否达到了task的超时时间的设置,如果达到这个设置时,会执行这个task的run函数来处理对应的操作。
publicvoidrun(finallongnow){
如果joinGroup没有成功,或者需要进行重新join或者coordinator对应的broker节点的连接失效时,直接返回,不处理。
if(generation&0||needRejoin()||coordinatorUnknown()){
//noneedtosendtheheartbeatwe&#39;renotusingauto-assignmentorifweare
//awaitingarebalance
如果heartbeat实例中,最后一次发送消息的时间已经超过了超时的设置时间,直接停止掉与coordinator的连接,这种情况下表示coordinator对应的broker节点失效了。
if(heartbeat.sessionTimeoutExpired(now)){
//wehaven&#39;treceivedasuccessfulheartbeatinonesessioninterval
//somarkthecoordinatordead
coordinatorDead();
这里根据上一次的心跳时间与当前时间的间隔检查是否达到了要发送心跳的时间,如果没有达到,根据下一次的执行时间为调度的时间,设置到调度器中等待下次的调度处理。
if(!heartbeat.shouldHeartbeat(now)){
//wedon&#39;tneedtoheartbeatnow,sorescheduleforwhenwedo
client.schedule(this,now+heartbeat.timeToNextHeartbeat(now));
流程执行到这里,表示需要向coordinator对应的broker节点发起心跳,设置当前的心跳发送时间为当前时间。
heartbeat.sentHeartbeat(now);
requestInFlight=true;
向coordinator对应的broker节点发送HeartbeatRequest请求。如果请求处理成功响应,设置最后一次接收心跳响应的时间为当前时间,表示这是最后一次成功向coordinator节点请求数据。
如果处理失败(我们主要考虑是请求心跳过去后,发现group又新join了一个consumer进去),这个时候设置rejoinNeeded属性的值为true,表示需要重新进行joinGroup的操作,重新来进行consumer对应的partitions的分配操作。
如果处理失败的错误是coordinator节点没有准备好,表示这个leader节点可能是刚切换完成,或者说group对应的partition的leader已经发生了变化,关闭掉coordinator的连接,这里表示需要重新得到group的metadata的信息,并重新创建与新的coordinator的连接。
RequestFuturefuture=sendHeartbeatRequest();
future.addListener(newRequestFutureListener(){
publicvoidonSuccess(Voidvalue){
requestInFlight=false;
longnow=time.milliseconds();
heartbeat.receiveHeartbeat(now);
longnextHeartbeatTime=now+heartbeat.timeToNextHeartbeat(now);
client.schedule(HeartbeatTask.this,nextHeartbeatTime);
publicvoidonFailure(RuntimeExceptione){
requestInFlight=false;
client.schedule(HeartbeatTask.this,
time.milliseconds()+retryBackoffMs);
Consumer的group加入
当执行KafkaConsumer中的poll函数时,会检查partition是否被执行分配,如果没有时,会通过ConsumerCoordinator实例中的ensureActiveGroup函数来把group加入到对应groupId的partition中,并根据group的id与consumer的个数,对当前的consumer进行partition的分配。
publicvoidensureActiveGroup(){
检查rejoinNeeded属性的值是否是false,如果是false表示这个订阅已经被执行过分配,或者说不需要重新执行group的Join操作。
if(!needRejoin())
if(needsJoinPrepare){
属性needsJoinPrepare在这个实例默认初始化时的值为true,也就是在初始化时,这个流程会被执行。
memberId的值默认为UNKNOWN_MEMBER_ID。generation的默认值为DEFAULT_GENERATION_ID。
执行group加入前的准备工作:
1,如果配置有自动提交offset时,先disable掉自动提交的线程,并对当前consumer中cache中的partition的offset进行提交操作。
2,如果订阅topic时,设置有ConsumerRebalanceListener接口的实现,调用接口中的onPartitionsRevoked函数把当前consumer中所有已经分配的partition传入并执行下线前的操作(这个接口在partition下线时,让用户自定义下线前对partition的处理,比如手动指定offset时用于提交要下线的partition的offset)。
3,设置groupSubscription集合的值为subscription集合的值,
并设置needsPartitionAssignment属性的值为true,表示需要分配partition.
最后设置needsJoinPrepare属性为false.
onJoinPrepare(generation,memberId);
needsJoinPrepare=false;
这里根据属性的值,如果这个值一直没有被更新为false时,这里一直开始迭代,向broker发起加入group的请求。
while(needRejoin()){
ensureCoordinatorKnown();
如果coordinator这个节点现在已经有请求正在被执行,等待请求执行完成。
if(client.pendingRequestCount(this.coordinator)&0){
client.awaitPendingRequests(this.coordinator);
这里生成一个JoinGroupRequest请求(JOIN_GROUP),并向coordinator的节点发起这个请求。
接收这个请求通过JoinGroupResponseHandler来处理。这里的处理主要会执行如下几个步骤:
1,向coordinator发起一个joinGroup的操作,
2,根据joinGroup的响应结果,在leader的consumer中执行所有消费这个group的consumer的partition的分配操作,并向coorninator中发起一个syncGroup的操作(leaderconsumer与follower的consumer都会发起,并只有leader进行处理,follower的请求等待leader的请求接收到后统一处理完成后每个consumer才会接收到响应)
3,每个consumer得到syncGroup后给自己分配的partition的分配信息,并执行onJoinComplete函数。
RequestFuturefuture=performGroupJoin();
client.poll(future);
if(future.succeeded()){
在执行group的加入操作完成后的处理,包含join-&assignorpartitions-&sync操作完成后,
这个onJoinComplete函数的处理流程:
1,解析出响应过来对应此memberId对应的partitions的分配信息,并设置订阅的needsFetchCommittedOffsets属性为true,表示需要加载消费者已经提过过的offset的值。
2,把接收到的partitions的分配信息设置到订阅的assignment集合中,每一个partition都对应一个TopicPartitionState实例的值,用来存储这个partition的消费状态,包含消费到的offset,commit的offset等,更新订阅的needsPartitionAssignment属性值为false,表示分配完成。
3,如果启用有自动提交offset的配置时,启动自动提交offset的定时器,用于定时自动提交offset.
4,如果设置有consumer的rebalance的listener时,执行listener的onPartitionsAssigned函数。
onJoinComplete(generation,memberId,protocol,future.value());
启动用于member与groupleaderbroker的节点心跳管理的定时执行器。
needsJoinPrepare=true;
heartbeatTask.reset();
流程执行到这里,说明这次请求得到了一个失败的返回,根据返回的错误代码,检查是否需要重新向broker发起一个joinGroup的操作。
RuntimeExceptionexception=future.exception();
if(exceptioninstanceofUnknownMemberIdException||
exceptioninstanceofRebalanceInProgressException||
exceptioninstanceofIllegalGenerationException)
elseif(!future.isRetriable())
time.sleep(retryBackoffMs);
接下来看看接收joinGroup请求的响应监听处理实例JoinGroupResponseHandler:
publicvoidhandle(JoinGroupResponsejoinResponse,
RequestFuturefuture){
//processtheresponse
shorterrorCode=joinResponse.errorCode();
根据coordinator的server端返回的响应代码,进行相应的处理流程。
if(errorCode==Errors.NONE.code()){
如果流程执行到这里,表示server端处理joinGroup的请求正常结束,向当前的client端返回了消息,
得到joinGroup后对当前的client端成员生成的memberId的值与generation的值,并设置当当前的ConsumerCoordinator的实例中。
log.debug(&Joinedgroup:{}&,joinResponse.toStruct());
AbstractCoordinator.this.memberId=joinResponse.memberId();
AbstractCoordinator.this.generation=joinResponse.generationId();
AbstractCoordinator.this.rejoinNeeded=false;
AbstractCoordinator.this.protocol=joinResponse.groupProtocol();
sensors.joinLatency.record(response.requestLatencyMs());
这里根据当前的consumer的member是否是group的leader来进行相应的处理。
if(joinResponse.isLeader()){
onJoinLeader(joinResponse).chain(future);
如果当前的consumer的member不是leader的member时,执行onJoinFollower来完成对join后的处理。
onJoinFollower().chain(future);
这里处理如果group在server端正在执行rebalance的操作时,这种情况下server端对group中的所有的member会执行清空处理,等待consumer重新发送joinGroup的操作过去。
elseif(errorCode==Errors.GROUP_LOAD_IN_PROGRESS.code()){
log.debug(&Attempttojoingroup{}rejectedsincecoordinatorisloading
thegroup.&,groupId);
//backoffandretry
future.raise(Errors.forCode(errorCode));
这里的处理错误代码如果是UNKNOWN_MEMBER_ID值时,表示这是一个新加入的group,如果当前的consumer传入的memberId的值不是默认的值时,会得到这个响应代码。等待consumer重新发送joinGroup的操作过去。
elseif(errorCode==Errors.UNKNOWN_MEMBER_ID.code()){
//resetthememberidandretryimmediately
AbstractCoordinator.this.memberId=JoinGroupRequest.UNKNOWN_MEMBER_ID;
log.info(&Attempttojoingroup{}failedduetounknownmemberid,
resettingandretrying.&,
future.raise(Errors.UNKNOWN_MEMBER_ID);
如果请求的group对应的状态是DEAD的状态时,表示这个group已经被标记为死亡的状态,这种情况表示说当前的group对应的partition的brokerleader发生了变化,设置当前的coordinator节点信息属性为null,表示需要在下一次读取时,重新得到这个group对应的leader的broker节点。
elseif(errorCode==Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
||errorCode==Errors.NOT_COORDINATOR_FOR_GROUP.code()){
//re-discoverthecoordinatorandretrywithbackoff
coordinatorDead();
log.info(&Attempttojoingroup{}failedduetoobsoletecoordinator
information,retrying.&,
future.raise(Errors.forCode(errorCode));
}elseif(errorCode==Errors.INCONSISTENT_GROUP_PROTOCOL.code()
||errorCode==Errors.INVALID_SESSION_TIMEOUT.code()
||errorCode==Errors.INVALID_GROUP_ID.code()){
//logtheerrorandre-throwtheexception
Errorserror=Errors.forCode(errorCode);
log.error(&Attempttojoingroup{}faileddueto:{}&,
groupId,error.exception().getMessage());
future.raise(error);
}elseif(errorCode==Errors.GROUP_AUTHORIZATION_FAILED.code()){
future.raise(newGroupAuthorizationException(groupId));
//unexpectederror,throwtheexception
future.raise(newKafkaException(
&Unexpectederrorinjoingroupresponse:&+Errors.forCode(
joinResponse.errorCode()).exception().getMessage()));
对joinGroup后,当前的consumer是leadermember时的处理:
在得到broker对joinGroup的成功响应后,如果返回的member标记着当前的consumer是leader节点,
执行onJoinLeader函数来进行操作,
privateRequestFutureonJoinLeader(JoinGroupResponsejoinResponse){
如果当前的consumer是这个group中的leader的member的节点时,通过leader来进行partition的分配工作,这里得到每一个member根据其消费的topic进行partition分配后,每个member的partition的分配信息。
MapgroupAssignment=performAssignment(
joinResponse.leaderId(),
joinResponse.groupProtocol(),
joinResponse.members());
根据当前的leader的memberId,group的id与当前的member对应的generation的值,生成一个SyncGroupRequest的请求,并通过函数向coordinator对应的broker节点发送请求syncGroup的请求。
SyncGroupRequestrequest=newSyncGroupRequest(groupId,generation,
memberId,groupAssignment);
log.debug(&IssuingleaderSyncGroup({}:{})tocoordinator{}&,
ApiKeys.SYNC_GROUP,request,this.coordinator.id());
returnsendSyncGroupRequest(request);
}catch(RuntimeExceptione){
returnRequestFuture.failure(e);
preformAssignment函数,用于执行对group的所有的member的partition进行分配:
protectedMapperformAssignment(StringleaderId,
StringassignmentStrategy,
MapallSubscriptions){
得到用于进行partition的分配的实例,默认是RangeAssignor实例。
PartitionAssignorassignor=lookupAssignor(assignmentStrategy);
if(assignor==null)
thrownewIllegalStateException(&Coordinatorselectedinvalidassignment
protocol:&+assignmentStrategy);
得到当前group中所有的consumer所有消费的topic的集合。
SetallSubscribedTopics=newHashSet&&();
这里根据memberId存储每一个member对应的消费的topcis与assignor的实例名称信息。
Mapsubscriptions=newHashMap&&();
这里开始迭代此group中每一个member的实例,并取出每一个member订阅的topci的集合存储起来。
for(Map.EntrysubscriptionEntry:
allSubscriptions.entrySet()){
Subscriptionsubscription=ConsumerProtocol.deserializeSubscription(
subscriptionEntry.getValue());
subscriptions.put(subscriptionEntry.getKey(),subscription);
allSubscribedTopics.addAll(subscription.topics());
更新groupSubscription集合的topic信息为当前的group中所有的member消费的topci的集合。
this.subscriptions.groupSubscribe(allSubscribedTopics);
把所有的topic集合添加到metadata的实例中,如果metadata中当前的topics的集合中不包含当前添加进行的部分topic时,设置needUpdate属性的值为true,表示metadata需要被更新。
metadata.setTopics(this.subscriptions.groupSubscription());
如果说metadata中有topic被更新,这里等待metadata更新完成,在ConsumerNetworkClient中通过clientPoll进行调用时,会执行NetworkClient中的poll函数(maybeUpdate函数),这个函数根据是否需要更新metadata来生成一个MetadataRequest请求,得到group中所有的consumer需要的topic的metadata的信息。这个过程会一直等待,直到metadata更新成功。
client.ensureFreshMetadata();
log.debug(&Performing{}assignmentforsubscriptions{}&,assignor.name(),
subscriptions);
通过assignor的实现来完成分配工作,根据所有的consumer订阅的topic的集合与当前的cluster中的节点信息。assignor的实现通过partition.assignment.strategy配置,默认是RangeAssignor.
分配执行流程:
1,得到所有member订阅过的所有的topic的集合,并得到每一个topic对应的partition的个数。
2,根据每个topic对应的partition的个数与member对应的topic的集合进行分配:
2,1,先根据每一个member对应的topic的集合,得到每个topic对应的consumer的集合,
2,2,根据生成出来的topic对consumer的集合进行迭代,得到每一个迭代的topic的partition的个数,并根据topic对应的consumer的集合进行排序,计算出每一个consumer平均要分配的partition的个数与剩余的partition的个数,
2,2,1迭代每一个topic中的consumer,开始针对这一个consumer进行partition的分配,这里使用一个示例来说明分配情况:
假如,一个topic有11个partition,三个consumer时,
A,这时候,每个conumser平均分配3个partition还于下2个partition,
B,对第一个consumer进行分配时,原则上应该是从0开始分配,长度是3,但是由于还于下2个partition,那么第一个consumer分配就是从0开始,分配长度为4个partition,这里迭代的consumer的下标为0,
开始位置:平均个数&0+(0=min(下标,于下个数)),长度:平均个数+(0+1&于下个数?0:1)=1
C,对第二个consumer进行分配时,这里根据平均值计算出来应该是从3开始,但是由于于下了2个partition,上一次分配时长度加了1,因此,这里开始的位置从4开始,根据于下的partition时,分配长度也是4个partition,这时,还剩下3个partition,迭代的下标为1,
开始位置:平均个数&1+(1=min(下标,于下个数)),长度:平均个数+(1+1&于下个数?0:1)=1
D,最后,对第三个consumer进行分配,这里迭代的下标是2,
开始位置:平均个数&2+(2=min(下标,于下个数)),长度:平均个数+(2+1&于下个数?0:1)=0
3,最后,这个函数返回的值是一个map,key就是每个consumer的member的id,value是对应的TopicPartition的集合。
Mapassignment=assignor.assign(metadata.fetch(),
subscriptions);
log.debug(&Finishedassignment:{}&,assignment);
把针对每一个member分配好的partition集合的Assignment实例进行序列化,并返回这个member的分配集合。
MapgroupAssignment=newHashMap&&();
for(Map.EntryassignmentEntry:assignment.entrySet()){
ByteBufferbuffer=ConsumerProtocol.serializeAssignment(
assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(),buffer);
returngroupA
对joinGroup后,当前的consumer是非leadermember时的处理:
通过onJoinFollower函数来进行处理,这种情况下,当前的consumer的member是一个从节点,只对leader对其分配的partition的数据进行消费。
在非leader的consumer中,向coonrdinator的broker发起syncGroup的操作时,在follower的member中发起syncGroup的请求时,传入的consumer的分配信息为一个空的集合。
privateRequestFutureonJoinFollower(){
//sendfollower&#39;ssyncgroupwithanemptyassignment
SyncGroupRequestrequest=newSyncGroupRequest(groupId,generation,
memberId,Collections.emptyMap());
log.debug(&IssuingfollowerSyncGroup({}:{})tocoordinator{}&,
ApiKeys.SYNC_GROUP,request,this.coordinator.id());
returnsendSyncGroupRequest(request);
提交完成消费的offset
通过执行KafkaConsumer中的commitSync函数来提交已经消费到的每个partition中最后一个offset的值。针对这个函数的处理时,如果是手动进行调用,需要自行进行trycatch的操作,否则这个过程中有可能会导致程序无法正常被执行。
publicvoidcommitSync(){
acquire();
commitSync(subscriptions.allConsumed());
release();
首先看看SubscriptionState实例中的allConsumed函数:
这里迭代这个consumer订阅的所有的topic分配的partition的集合,并取出这个partition中消费到的最后一个offset记录的position的值(只有在consumer执行poll后,这个值才会被更新,就是消费掉的最后一条记录对应的offset的值),把每一个partition对应的offset生成到一个map集合中,返回给上层的调用函数。
publicMapallConsumed(){
MapallConsumed=newHashMap&&();
for(Map.Entryentry:
assignment.entrySet()){
TopicPartitionStatestate=entry.getValue();
if(state.hasValidPosition)
allConsumed.put(entry.getKey(),newOffsetAndMetadata(state.position));
returnallC
接下来看看commitSync提交对应的partition记录到的最后一个消费的offset的流程:
publicvoidcommitSync(finalMapoffsets){
acquire();
这个函数中直接通过coordinator对应的broker节点的连接信息提交offsets,这个由ConsumerCoordinator实现。
coordinator.commitOffsetsSync(offsets);
release();
ConsumerCoordinator中的commitOffsetsSync函数,生成提交当前consumer中已经消费的offset的请求:
publicvoidcommitOffsetsSync(Mapoffsets){
if(offsets.isEmpty())
while(true){
首先检查coordinator的broker是否是连接的状态,如果不是先对coordinator进行连接。
ensureCoordinatorKnown();
生成一个OffsetCommitRequest请求,这个请求中针对每一个partition对应的offset生成请求的内容,并向coordinator对应的broker节点发起这个请求,
处理这个请求的响应通过OffsetCommitResponseHandler实例来完成。
RequestFuturefuture=sendOffsetCommitRequest(offsets);
client.poll(future);
if(future.succeeded())
if(!future.isRetriable())
throwfuture.exception();
time.sleep(retryBackoffMs);
接收Offset提交的响应处理流程:
publicvoidhandle(OffsetCommitResponsecommitResponse,RequestFuturefuture){
sensors.commitLatency.record(response.requestLatencyMs());
SetunauthorizedTopics=newHashSet&&();
迭代对每一个partition的offset的提交的响应代码进行处理。
for(Map.Entryentry:
commitResponse.responseData().entrySet()){
TopicPartitiontp=entry.getKey();
OffsetAndMetadataoffsetAndMetadata=this.offsets.get(tp);
longoffset=offsetAndMetadata.offset();
Errorserror=Errors.forCode(entry.getValue());
if(error==Errors.NONE){
如果请求提交offsets正常被server端进行处理,更新这个partition对应的TopicPrtitionState中的committed属性的值为提交时对应的position属性的值。。
log.debug(&Committedoffset{}forpartition{}&,offset,tp);
if(subscriptions.isAssigned(tp))
//updatethelocalcacheonlyifthepartitionisstillassigned
subscriptions.committed(tp,offsetAndMetadata);
}elseif(error==Errors.GROUP_AUTHORIZATION_FAILED){
log.error(&Unauthorizedtocommitforgroup{}&,groupId);
future.raise(newGroupAuthorizationException(groupId));
}elseif(error==Errors.TOPIC_AUTHORIZATION_FAILED){
unauthorizedTopics.add(tp.topic());
}elseif(error==Errors.OFFSET_METADATA_TOO_LARGE
||error==Errors.INVALID_COMMIT_OFFSET_SIZE){
//raisetheerrortotheuser
log.info(&Offsetcommitforgroup{}failedonpartition{}dueto{},
willretry&,groupId,tp,error);
future.raise(error);
}elseif(error==Errors.GROUP_LOAD_IN_PROGRESS){
这种情况是group发生了leader的变化,group的cache信息正在被加载,需要进行重试。
//justretry
log.info(&Offsetcommitforgroup{}faileddueto{},willretry&,
groupId,error);
future.raise(error);
}elseif(error==Errors.GROUP_COORDINATOR_NOT_AVAILABLE
||error==Errors.NOT_COORDINATOR_FOR_GROUP
||error==Errors.REQUEST_TIMED_OUT){
这种情况表示group已经不在原来的节点中,需要断开与原来的coordinator的broker的连接,并进行重试。
log.info(&Offsetcommitforgroup{}faileddueto{},willfindnew
coordinatorandretry&,groupId,error);
coordinatorDead();
future.raise(error);
}elseif(error==Errors.UNKNOWN_MEMBER_ID
||error==Errors.ILLEGAL_GENERATION
||error==Errors.REBALANCE_IN_PROGRESS){
这种情况表示当前的group中有consumer上线或者下线,需要对partition进行重新分配。
//needtore-joingroup
log.error(&Error{}occurredwhilecommittingoffsetsforgroup{}&,error,
subscriptions.needReassignment();
future.raise(newCommitFailedException(&Commitcannotbecompleteddue
togrouprebalance&));
log.error(&Errorcommittingpartition{}atoffset{}:{}&,tp,offset,
error.exception().getMessage());
future.raise(newKafkaException(&Unexpectederrorincommit:&+
error.exception().getMessage()));
if(!unauthorizedTopics.isEmpty()){
log.error(&Unauthorizedtocommittotopics{}&,unauthorizedTopics);
future.raise(newTopicAuthorizationException(unauthorizedTopics));
future.complete(null);}

我要回帖

更多关于 if函数怎么用 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信