Java|Java Kafka 消费积压监控的示例代码
后端代码:
Monitor.java代码:
package com.suncreate.kafkaConsumerMonitor.service; import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; /** * kafka消费监控 * * @author suxiang */public class Monitor {private static final Logger log = LoggerFactory.getLogger(Monitor.class); private String servers; private String topic; private String groupId; private long lastTime; private long lastTotalLag = 0L; private long lastLogSize = 0L; private long lastOffset = 0L; private double lastRatio = 0; private long speedLogSize = 0L; private long speedOffset = 0L; private String time; private Listlist; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public String getTime() {return time; }public void setTime(String time) {this.time = time; }public long getLastTotalLag() {return lastTotalLag; }public double getLastRatio() {return lastRatio; }public String getTopic() {return topic; }public String getGroupId() {return groupId; }public long getSpeedLogSize() {return speedLogSize; }public long getSpeedOffset() {return speedOffset; }public List getList() {return list; }public void setList(List list) {this.list = list; }private KafkaConsumer consumer; private List topicPartitionList; private final DecimalFormat decimalFormat = new DecimalFormat("0.00"); public Monitor(String servers, String topic, String groupId) {this.servers = servers; this.topic = topic; this.groupId = groupId; this.list = new ArrayList<>(); //消费者Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumer = new KafkaConsumer(properties); //查询 topic partitionstopicPartitionList = new ArrayList<>(); List partitionInfoList = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfoList) {TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitionList.add(topicPartition); }}public void monitor(boolean addToList) {try {long startTime = System.currentTimeMillis(); //查询 log sizeMap endOffsetMap = new HashMap<>(); Map endOffsets = consumer.endOffsets(topicPartitionList); for (TopicPartition partitionInfo : endOffsets.keySet()) {endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo)); }//查询消费 offsetMap commitOffsetMap = new HashMap<>(); for (TopicPartition topicAndPartition : topicPartitionList) {OffsetAndMetadata committed = consumer.committed(topicAndPartition); commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); }long endTime = System.currentTimeMillis(); log.info("查询logSize和offset耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒"); startTime = System.currentTimeMillis(); //累加laglong totalLag = 0L; long logSize = 0L; long offset = 0L; if (endOffsetMap.size() == commitOffsetMap.size()) {for (Integer partition : endOffsetMap.keySet()) {long endOffset = endOffsetMap.get(partition); long commitOffset = commitOffsetMap.get(partition); long diffOffset = endOffset - commitOffset; totalLag += diffOffset; logSize += endOffset; offset += commitOffset; }} else {log.error("Topic:" + topic + "consumer:" + consumer + "topic partitions lost"); }log.info("Topic:" + topic + "logSize:" + logSize + "offset:" + offset + "totalLag:" + totalLag); if (lastTime > 0) {if (System.currentTimeMillis() - lastTime > 0) {speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0)); speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0)); }if (speedLogSize > 0) {String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0)); lastRatio = Double.parseDouble(strRatio); log.info("Topic:" + topic + "speedLogSize:" + speedLogSize + "speedOffset:" + speedOffset + "百分比:" + strRatio + "%"); }}lastTime = System.currentTimeMillis(); lastTotalLag = totalLag; lastLogSize = logSize; lastOffset = offset; endTime = System.currentTimeMillis(); log.info("计算耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒"); if (addToList) {this.setTime(simpleDateFormat.format(new Date())); this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime())); if (this.list.size() > 500) {this.list.remove(0); }}} catch (Exception e) {log.error("Monitor error", e); }}}
MonitorService.java代码:
package com.suncreate.kafkaConsumerMonitor.service; import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.*; @Servicepublic class MonitorService {private static final Logger log = LoggerFactory.getLogger(MonitorService.class); @Value("${kafka.consumer.servers}")private String servers; private Monitor monitor; private ListmonitorList; @PostConstructprivate void Init() {monitorList = new ArrayList<>(); monitorList.add(new Monitor(servers, "wifiData", "wifi-kafka-hbase")); monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "EXTRACT-SAMPLE")); monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "dblrecog-upload2vcn")); monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "yisa20210521000001")); monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "kafka-filter-check-19")); monitorList.add(new Monitor(servers, "motorVehicle", "unifiedstorage-downloader")); monitorList.add(new Monitor(servers, "motorVehicle", "full-vehicle-data-storage-kafka2ch")); monitorList.add(new Monitor(servers, "motorVehicle", "vehicle_store")); monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-luyang")); monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-yaohai")); monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-baohe")); monitorList.add(new Monitor(servers, "peopleFace", "kafka-filter-check-19")); }public void monitorOnce(boolean addToList) {for (Monitor monitor : monitorList) {monitor.monitor(addToList); }}public List getConsumerList() {List list = new ArrayList<>(); for (Monitor monitor : monitorList) {list.add(new ConsumerInfo(monitor.getTopic(), monitor.getGroupId(), monitor.getLastTotalLag(), monitor.getLastRatio(), monitor.getSpeedLogSize(), monitor.getSpeedOffset(), monitor.getTime())); }return list; }public List getDetails(String topic, String groupId) {for (Monitor monitor : monitorList) {if (monitor.getTopic().equals(topic) && monitor.getGroupId().equals(groupId)) {return monitor.getList(); }}return new ArrayList<>(); }}
MonitorConfig.java代码:
package com.suncreate.kafkaConsumerMonitor.task; import com.suncreate.kafkaConsumerMonitor.service.MonitorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; import java.text.SimpleDateFormat; @Configuration@EnableSchedulingpublic class MonitorConfig implements SchedulingConfigurer {private static final Logger logger = LoggerFactory.getLogger(MonitorConfig.class); private String cronExpression = "0 */3 * * * ?"; //private String cronExpression = "*/20 * * * * ?"; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Autowiredprivate MonitorService monitorService; @Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {taskRegistrar.addTriggerTask(() -> {monitorService.monitorOnce(true); }, triggerContext -> new CronTrigger(cronExpression).nextExecutionTime(triggerContext)); }}
MonitorController.java代码:
package com.suncreate.kafkaConsumerMonitor.controller; import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import com.suncreate.kafkaConsumerMonitor.model.LayuiData; import com.suncreate.kafkaConsumerMonitor.service.MonitorService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; @RestController@RequestMapping("/monitor")public class MonitorController {@Autowiredprivate MonitorService monitorService; @GetMapping("/getConsumers")public LayuiData getConsumers() {Listlist = monitorService.getConsumerList(); LayuiData data = https://www.it610.com/article/new LayuiData(list); return data; }@GetMapping("/monitorOnce")public void monitorOnce() {monitorService.monitorOnce(false); }@GetMapping("/getDetails")public LayuiData getDetails(String topic, String groupId) {List list = monitorService.getDetails(topic, groupId); LayuiData data = https://www.it610.com/article/new LayuiData(list); return data; }}
pom.xml文件(有些东西没用到或者备用,没有删):
4.0.0 org.springframework.boot spring-boot-starter-parent2.1.6.RELEASE com.suncreate kafka-consumer-monitor1.0 kafka-consumer-monitor Kafka消费积压监控预警 1.8 6.1.4 org.projectlombok lombok1.18.12 com.alibaba fastjson1.2.54 org.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-testtestorg.springframework.kafka spring-kafka-testtestcom.google.code.gson gson2.8.0 org.postgresql postgresqlruntimeorg.springframework.boot spring-boot-starter-jdbcorg.elasticsearch.client elasticsearch-rest-high-level-client6.1.4 com.oracle ojdbc611.1.0.7.0 org.apache.kafka kafka_2.110.11.0.1 org.apache.kafka kafka-clients0.11.0.1 org.springframework.boot spring-boot-maven-pluginorg.apache.maven.plugins maven-compiler-plugin8 8
前端使用了 Layui 和 ECharts 展示表格和图表
index.css代码:
.div-title {font-size: 18px; margin-top: 10px; margin-left: 10px; }.div-right {text-align: right; }.span-red {color: #ff0000; }
index.html代码(展示topic、消费者组Consumer GroupId、Total Lag、Kafka数据生产速度、Kafka数据消费速度等):
Title - 锐客网 Kafka 监控
detail.html代码(展示单个消费者组的Total Lag、生产速度、消费速度以及Total Lag趋势图):
Title - 锐客网 明细
效果图:
消费者组列表:

文章图片
消费者组明细:

文章图片
【Java|Java Kafka 消费积压监控的示例代码】到此这篇关于Java Kafka 消费积压监控的文章就介绍到这了,更多相关Java Kafka 消费监控内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
推荐阅读
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 事件代理
- Java|Java OpenCV图像处理之SIFT角点检测详解
- java中如何实现重建二叉树
- 数组常用方法一
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- Java|Java基础——数组
- RxJava|RxJava 在Android项目中的使用(一)
- java之static、static|java之static、static final、final的区别与应用
- Java基础-高级特性-枚举实现状态机