当前位置: 首页 > news >正文

西安市建设协会网站美食网站案例

西安市建设协会网站,美食网站案例,wordpress登录可见内容,wordpress+4.4.1+中文目录 前言#xff1a; 位移提交#xff1a; 小结#xff1a; 参考资料 前言#xff1a; Consumer 需要向 Kafka 汇报自己的位移数据#xff0c;这个汇报过程被称为提交位移#xff08;Committing Offsets#xff09;。因为 Consumer 能够同时消费多个分区的数据 位移提交  小结 参考资料  前言 Consumer 需要向 Kafka 汇报自己的位移数据这个汇报过程被称为提交位移Committing Offsets。因为 Consumer 能够同时消费多个分区的数据所以位移的提交实际上是在分区粒度上进行的即 Consumer 需要为分配给它的每个分区提交各自的位移数据。 位移提交  提交位移主要是为了表征 Consumer 的消费进度这样当 Consumer 发生故障重启之后就能够从 Kafka 中读取之前提交的位移值然后从相应的位移处继续消费从而避免整个消费过程重来一遍。 因为位移提交非常灵活你完全可以提交任何位移值但由此产生的后果你也要一并承担。假设你的 Consumer 消费了 10 条消息你提交的位移值却是 20那么从理论上讲位移介于 1119 之间的消息是有可能丢失的相反地如果你提交的位移值是 5那么位移介于 59 之间的消息就有可能被重复消费。所以Kafka 只会“无脑”地接受你提交的位移。 从用户的角度来说位移提交分为自动提交和手动提交从 Consumer 端的角度来说位移提交分为同步提交和异步提交。  所谓自动提交就是指 Kafka Consumer 在后台默默地为你提交位移作为用户的你完全不必操心这些事而手动提交则是指你要自己提交位移Kafka Consumer 压根不管。 自动提交的代码示意如下 Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(group.id, test);props.put(enable.auto.commit, true);props.put(auto.commit.interval.ms, 2000);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(foo, bar));while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records)System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value());} 和自动提交相反的就是手动提交了。开启手动提交位移的方法就是设置 enable.auto.commit 为 false。但是仅仅设置它为 false 还不够因为你只是告诉 Kafka Consumer 不要自动提交位移而已你还需要调用相应的 API 手动提交位移。 最简单的 API 就是 KafkaConsumer#commitSync()。该方法会提交 KafkaConsumer#poll() 返回的最新位移。从名字上来看它是一个同步操作即该方法会一直等待直到位移被成功提交才会返回。如果提交过程中出现异常该方法会将异常信息抛出。下面这段代码展示了 commitSync() 的使用方法 while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常} } 可见调用 consumer.commitSync() 方法的时机是在你处理完了 poll() 方法返回的所有消息之后。如果你莽撞地过早提交了位移就可能会出现消费数据丢失的情况。那么你可能会问自动提交位移就不会出现消费数据丢失的情况了吗它能恰到好处地把握时机进行位移提交吗为了搞清楚这个问题我们必须要深入地了解一下自动提交位移的顺序。 一旦设置了 enable.auto.commit 为 trueKafka 会保证在开始调用 poll 方法时提交上次 poll 返回的所有消息。从顺序上来说poll 方法的逻辑是先提交上一批消息的位移再处理下一批消息因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于它可能会出现重复消费。 在默认情况下Consumer 每 5 秒自动提交一次位移。现在我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后所有 Consumer 从上一次提交的位移处继续消费但该位移已经是 3 秒前的位移数据了故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率但这么做只能缩小重复消费的时间窗口不可能完全消除它。这是自动提交机制的一个缺陷。 手动提交位移它的好处就在于更加灵活你完全能够把控位移提交的时机和频率。但是它也有一个缺陷就是在调用 commitSync() 时Consumer 程序会处于阻塞状态直到远端的 Broker 返回提交结果这个状态才会结束。在任何系统中因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈会影响整个应用程序的 TPS。当然你可以选择拉长提交间隔但这样做的后果是 Consumer 的提交频率下降在下次 Consumer 重启回来后会有更多的消息被重新消费。  为了解决上述同步提交位移的问题我们可以选择使用异步提交位移的方式KafkaConsumer#commitAsync() 调用 commitAsync() 之后它会立即返回不会阻塞因此不会影响 Consumer 应用的 TPS。由于它是异步的Kafka 提供了回调函数callback供你实现提交之后的逻辑比如记录日志或处理异常等。下面这段代码展示了调用 commitAsync() 的方法 while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息consumer.commitAsync((offsets, exception) - {if (exception ! null)handle(exception);}); } commitAsync 是否能够替代 commitSync 呢答案是不能。commitAsync 的问题在于出现问题时它不会自动重试。因为它是异步操作倘若提交失败后自动重试那么它重试时提交的位移值可能早已经“过期”或不是最新值了。 显然如果是手动提交我们需要将 commitSync 和 commitAsync 组合使用才能达到最理想的效果原因有两个 我们可以利用 commitSync 的自动重试来规避那些瞬时错误比如网络的瞬时抖动Broker 端 GC 等。因为这些问题都是短暂的自动重试通常都会成功因此我们不想自己重试而是希望 Kafka Consumer 帮我们做这件事。我们不希望程序总处于阻塞状态影响 TPS。  try {while(true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞} } catch(Exception e) {handle(e); // 处理异常 } finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close(); } } 这段代码同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交我们调用 commitAsync() 避免程序阻塞而在 Consumer 要关闭前我们调用 commitSync() 方法执行同步阻塞式的位移提交以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后我们既实现了异步无阻塞式的位移管理也确保了 Consumer 位移的正确性 设想这样一个场景你的 poll 方法返回的不是 500 条消息而是 5000 条。那么你肯定不想把这 5000 条消息都处理完之后再提交位移因为一旦中间出现差错之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候我们希望将一个大事务分割成若干个小事务分别提交这能够有效减少错误恢复的时间。 就拿刚刚提过的那个例子来说如何每处理 100 条消息就提交一次位移呢在这里我以 commitAsync 为例展示一段代码实际上commitSync 的调用方法和它是一模一样的。 private MapTopicPartition, OffsetAndMetadata offsets new HashMap(); int count 0; …… while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String record: records) {process(record); // 处理消息offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1)ifcount % 100 0consumer.commitAsync(offsets, null); // 回调处理逻辑是nullcount;} } 小结 Kafka Consumer 的位移提交是实现 Consumer 端语义保障的重要手段。位移提交分为自动提交和手动提交而手动提交又分为同步提交和异步提交。在实际使用过程中推荐你使用手动提交机制因为它更加可控也更加灵活。另外建议你同时采用同步提交和异步提交两种方式这样既不影响 TPS又支持自动重试改善 Consumer 应用的高可用性。 参考资料  18 | Kafka中位移提交那些事儿-极客时间
http://www.fuzeviewer.com/news/22415/

相关文章:

  • 卧龙区2015网站建设价格韩国庆祝出线
  • 济南seo网站排名优化工具英国网站域名
  • 做网站千篇一律gta5办公室网站建设中
  • 上海网站建设在哪里网站设计是什么意思
  • 做搜狗pc网站淘宝网站推广怎么做
  • 泰安可信的网站建设教学ppt模板免费下载完整版
  • 大神做的动漫网站深圳高品质网站建设服务
  • 婚纱网站目录优化专门做西装网站
  • 新一代 网站备案深圳市龙岗区住房和建设局网站
  • seo品牌优化整站优化vc域名建站的网站
  • 网站域名后缀有哪些中富国建设有限公司网站
  • 网站后台样式受大众喜欢的域名备案加急
  • 越秀区建设水务局网站寿县城乡建设局网站
  • 好网站建设公司业务自媒体135官网手机版下载
  • 构建可用于生产环境的AI智能体
  • 红色文化网站建设郑州建设局官网
  • 酒店团购的网站建设网络搭建就业前景
  • 做网站的公司高创泰安市住宅与房产信息网
  • 应对 “读放大” 问题的新方法 —— OceanBase 中的 Merge-On-Write 表
  • 延庆网站建设wordpress 付费下载付费阅读
  • 关于茶文化网站建设的背景短网址生成平台
  • 中文电子商务网站模板我要找人做网站的主页
  • 淘宝客cms网站怎么做高端网站建设公司价格
  • 百度统计网站概况如何建设个人免费网站教程视频
  • 卖渔具的亲戚做网站建筑工程网教育网
  • 亿唐网不做网站做品牌考试题做网站猫要做端口映射吗
  • 优秀的网站设计分析如何做建议的网站
  • 门户网站程序企业所得税怎么计算
  • 郑州哪家做网站最好欧派整装大家居装修公司加盟
  • 网站开发设计模板网络建站一般多少钱