Error sending fetch request xxx to node : org.apache.kafka.common.errors.DisconnectException.

  • 396
  • 2023年5月18日12:56:40

这个问题是我在flink任务上报的错, 乍一看是kafka连接出了问题.

推测

我们的业务数据量比较大, 集群网络负载比较高, 推测是网络抖动导致的连接超时.
先看看gpt的推测:

这个错误通常是由于Kafka broker与消费者之间的连接中断或网络故障引起的。下面是一些可能的解决方法:
检查Kafka broker的健康状态:
确定Kafka broker是否运行,并检查其健康状态。你可以使用Kafka自带的工具来检查broker的状态,例如kafka-topics.sh或kafka-console-consumer.sh等。
检查网络连接:
检查Kafka broker和消费者之间的网络连接是否正常,并确保没有任何中断或问题。你可以尝试使用ping命令测试网络连接是否正常。
检查防火墙设置:
检查防火墙设置是否正确,确保Kafka broker和消费者之间的通信端口没有被阻止。如果有必要,请添加正确的防火墙规则以允许通信。
调整Kafka消费者的配置:
你可以尝试调整Kafka消费者的配置,例如增加max.poll.records或max.poll.interval.ms等参数的值,以减少消费者与broker之间的通信频率,从而减少连接中断的可能性。
如果以上方法都不起作用,你可以尝试升级Kafka版本或联系Kafka社区获取更多支持。

解决方法

既然是网络抖动造成的问题, 那么只要延长超时时间就可以缓解, 我们在连接kafka时加个超时配置:

properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");

其中, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG 你也可以替换成 "request.timeout.ms"
request.timeout.ms是Kafka中一个与生产者和消费者相关的配置参数,它定义了Kafka broker应该等待请求的响应时间长度,单位为毫秒。如果在该时间内没有收到响应,Kafka broker将返回一个错误给生产者或消费者。该参数默认值为30000毫秒(30秒),这意味着如果Kafka broker在30秒内没有响应生产者或消费者的请求,它将返回一个错误。

在Kafka中,生产者和消费者通过向Kafka broker发送请求来与其交互。如果Kafka broker在一定时间内无法响应请求,则生产者或消费者将收到一个错误。该错误通常是由于网络故障、Kafka broker负载过高或请求处理时间过长等原因引起的。

你可以通过修改request.timeout.ms参数的值来更改Kafka生产者和消费者请求的超时时间。但是,你需要注意的是,如果你将该值设置得太小,可能会导致请求频繁超时并返回错误。如果将该值设置得太大,则可能会导致请求占用过长时间的资源,并且可能会影响系统的性能和可靠性。

观察

重启服务,观察一个小时发现没有再出现这个问题.

本文来自凡蜕博客(https://blog.ysboke.cn), 转载请带上地址.。
匿名

发表评论

匿名网友