002 Java操作kafka客户端

news/2025/2/27 9:34:52

kafka_0">Java操作kafka客户端

文章目录

  • Java操作kafka客户端
    • 3.Java操作kafka客户端
      • 1.引入依赖
      • 2. Kafka服务配置
      • 3、生产者(Producer)实现
        • 1. 基础配置与发送消息
        • 2. 关键配置说明
      • 4.消费者(Consumer)实现
        • 1. 基础配置与消费消息
        • 2. 关键配置说明
      • 3.auto.offset.reset参数可选值及行为
        • 1.代码示例与行为验证
          • 1. 配置为 `earliest`
          • 2. 配置为 `latest`
          • 3. 配置为 `none`
        • 2.关键注意事项
          • 1. Offset 提交机制的影响
          • 2. 消费者组隔离性
          • 3. 命令行验证 Offset
        • 3、生产环境最佳实践
        • 4、常见问题解答
          • Q:配置了 `latest`,为什么还能消费到旧消息?
          • Q:如何让消费者组永久保留 Offset?
      • 5.主题管理示例(AdminClient)
      • 6.最佳实践与注意事项
      • 7.关于flush和close方法的说明

来源参考的deepseek,如有侵权联系立删

kafka_4">3.Java操作kafka客户端

Java API提供以下核心接口:

  • Producer API:发送消息。
  • Consumer API:订阅消息。
  • Streams API:流式处理。
  • Admin API:管理Topic和集群。

1.引入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version> 
</dependency>

2. Kafka服务配置

确保已启动Zookeeper和Kafka服务,默认端口分别为21819092

3、生产者(Producer)实现

1. 基础配置与发送消息

无需提前创建topic

java">import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerDemo {
    public static void main(String[] args) {
        // 1. 配置生产者参数
        Properties props = new Properties();
        // Broker地址
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 消息确认机制
        props.put("acks", "all");
        // 重试次数
        props.put("retries", 3);

        // 2. 创建生产者实例
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            // 3. 构造消息并发送
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(
                        "test-topic", // 主题名称
                        "key-" + i,   // 消息键
                        "value-" + i  // 消息值
                );
                // 异步发送(可改用get()同步等待)
                producer.send(record, (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("消息发送成功:topic=%s, partition=%d, offset=%d%n",
                                metadata.topic(), metadata.partition(), metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                });
            }
            producer.flush(); // 确保所有消息发送完成
        }
    }
}

在这里插入图片描述

2. 关键配置说明
参数说明
bootstrap.serversBroker地址列表,多个用逗号分隔
key.serializer键的序列化类(如StringSerializer)
value.serializer值的序列化类
acks消息持久化确认机制(0/1/all
retries发送失败后的重试次数
batch.size批量发送的消息大小(字节)

4.消费者(Consumer)实现

1. 基础配置与消费消息
java">import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        // 1. 配置消费者参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "test-group"); // 消费者组ID
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费
        props.put("enable.auto.commit", "false");   // 关闭自动提交偏移量

        // 2. 创建消费者实例
        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("test-topic")); // 订阅主题

            while (true) {
                // 3. 轮询消息(超时时间100ms)
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("收到消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                // 4. 手动提交偏移量(同步提交)
                consumer.commitSync();
            }
        }
    }
}

在这里插入图片描述
在这里插入图片描述

可收到实时消费的消息,但队列中消息并没有移除,

  • 消息保留规则由 Broker 配置控制,与消费者无关。
  • 消费者 Offset 仅标记消费进度,不会删除消息。
  • 通过 kafka-consumer-groups.sh 工具监控消费状态。
  • 生产环境中,合理设置 log.retention.hourslog.retention.bytes
2. 关键配置说明
参数说明
group.id消费者组ID,相同组内共享分区
auto.offset.reset无偏移量时的策略(earliest/latest
enable.auto.commit是否自动提交偏移量(建议false手动控制)
max.poll.records单次poll最大消息数

3.auto.offset.reset参数可选值及行为

作用典型场景
earliest从分区的最早消息开始消费(从头消费)需要处理 Topic 中所有历史消息
latest从分区的最新消息开始消费(仅消费新消息)实时处理最新数据,忽略历史消息
none抛出异常(NoOffsetForPartitionException需要严格确保 Offset 有效性
1.代码示例与行为验证
1. 配置为 earliest
java">props.put("auto.offset.reset", "earliest");

参数生效的触发条件

场景auto.offset.reset 是否生效消费起始位置
消费者组首次启动(无 Offset)根据参数值(earliest/latest
Offset 已提交且有效(未过期)从已提交 Offset 继续消费
Offset 已过期(消息被删除)根据参数值重新定位

行为

  • 如果消费者组首次启动,会从 Topic 每个分区的第一条消息开始消费。
  • 如果 Offset 过期(例如消息被删除),会从现存的最早消息开始消费。

适用场景

  • 数据回放(重放全部历史数据)
  • 测试环境需要消费完整数据集
2. 配置为 latest
java">props.put("auto.offset.reset", "latest");

行为

  • 如果消费者组首次启动,只消费启动后新写入的消息。
  • 如果 Offset 过期,会从当前最新消息开始消费。

适用场景

  • 生产环境实时处理(避免处理历史积压数据)
  • 日志收集系统(只需最新日志)
3. 配置为 none
java">props.put("auto.offset.reset", "none");

行为

  • 如果 Offset 无效,直接抛出 NoOffsetForPartitionException
  • 需手动处理异常或确保 Offset 始终有效。

适用场景

  • 高可靠性系统(需严格监控 Offset 有效性)
2.关键注意事项
1. Offset 提交机制的影响
  • 如果启用了自动提交 (enable.auto.commit=true),消费者会定期提交 Offset。
    重复消费风险:若消息处理失败但 Offset 已提交,会导致消息丢失。
  • 推荐做法
java">  props.put("enable.auto.commit", "false"); // 关闭自动提交
  // 处理完消息后手动提交 Offset
  consumer.commitSync();
2. 消费者组隔离性
  • 不同group.id的 Offset 互相独立。例如:
    • 消费者组 A(group.id=group1)配置为 latest → 只消费新消息。
    • 消费者组 B(group.id=group2)配置为 earliest → 可以消费全部消息。
3. 命令行验证 Offset

通过 Kafka 工具查看消费者组的 Offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group your-group-id

输出示例:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
test-topic      0          5000            10000           5000
  • LAG:未消费的消息数量。若 LAG 持续增长,说明消费速度跟不上生产速度。

3、生产环境最佳实践
  1. 明确业务需求
    • 需要重放数据 → earliest
    • 仅处理实时数据 → latest
  2. 监控 Offset 提交
    • 使用 kafka-consumer-groups.sh 定期检查 LAG。
    • 集成监控系统(如 Prometheus + Grafana)。
  3. 防御性代码
java">   try {
       while (true) {
           ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
           // 处理消息
           consumer.commitSync(); // 同步提交
       }
   } catch (NoOffsetForPartitionException e) {
       // 处理 Offset 无效的极端情况
       logger.error("Offset 无效,需人工介入!", e);
   }

4、常见问题解答
Q:配置了 latest,为什么还能消费到旧消息?
  • 可能原因
    消费者组之前已提交过 Offset,且当前 Offset 指向旧消息位置。
  • 解决
    重置消费者组 Offset:
  bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --group your-group-id --reset-offsets --to-latest --execute --topic test-topic
Q:如何让消费者组永久保留 Offset?
  • Kafka 默认行为
    Offset 存储在内部 Topic __consumer_offsets 中,默认保留时间为 7 天。
  • 修改保留策略
  # 修改 Offset 保留时间(单位:毫秒)
  bin/kafka-configs.sh --bootstrap-server localhost:9092 \
    --entity-type topics --entity-name __consumer_offsets \
    --alter --add-config retention.ms=604800000

5.主题管理示例(AdminClient)

java">import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaAdminDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");

        try (AdminClient admin = AdminClient.create(props)) {
            // 创建主题(3分区,1副本)
            NewTopic newTopic = new NewTopic("test-topic2", 3, (short) 1);
            CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
            result.all().get(); // 阻塞等待创建完成
            System.out.println("主题创建成功");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

6.最佳实践与注意事项

  1. 生产者优化
    • 启用压缩(compression.type=snappy)减少网络开销。
    • 合理设置batch.sizelinger.ms提高吞吐量。
  2. 消费者可靠性
    • 使用手动提交偏移量,避免消息丢失或重复消费。
    • 处理CommitFailedException,防止因处理超时导致提交失败。
  3. 序列化选择
    • 默认支持String、ByteArray等序列化器。
    • 复杂对象推荐使用JSON(Jackson)或Avro。
  4. 消费者组管理
    • 通过kafka-consumer-groups.sh工具监控消费进度。
    • 避免频繁重平衡(Rebalance),调整session.timeout.ms参数。

7.关于flush和close方法的说明

  • flush():强制发送缓冲区中所有未发送的消息(同步等待发送完成)
  • close():释放生产者占用的所有资源(包括线程、网络连接、内存等)

若未调用close()可能导致:

  • 线程泄漏:生产者后台的Sender线程未终止
  • 连接泄漏:与Broker的TCP连接未关闭
  • 内存泄漏:未释放消息缓冲区内存

可通过jstackVisualVM工具检查线程状态验证。

关键区别说明

方法作用是否必须调用是否自动包含对方功能
flush()清空发送缓冲区,确保所有消息被发送可选(按需调用)❌ 不释放资源
close()关闭生产者并释放资源必须调用✅ 内部会自动调用flush()

正确写法(推荐):

java">try (Producer<String, String> producer = new KafkaProducer<>(props)) {
    producer.send(record);
    producer.flush(); // 显式清空缓冲区(可选)
} // 自动调用close(),包含flush()

错误写法(资源泄漏):

java">Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(record);
producer.flush(); 
// 忘记调用close() → 线程/连接未释放!

最佳实践建议

1.优先使用try-with-resources(Java 7+特性):

java">   try (Producer<String, String> producer = new KafkaProducer<>(props)) {
       // 发送消息...
   } // 自动调用close()

这是最安全的写法,无需手动调用flush()close()

2.需要立即发送时

java">   producer.send(record);
   producer.flush(); // 强制立即发送(如实时系统关键消息)
   // ...其他操作...
   producer.close(); // 仍需显式关闭

3.不要依赖finalize()
Kafka客户端的finalize()方法已废弃,不能保证资源释放。

4.KafkaProducer.close()源码:

java">public void close() {
    close(Duration.ofMillis(Long.MAX_VALUE)); // 默认无限等待
}

public void close(Duration timeout) {
    // ...
    flush();    // 内部自动调用flush()
    client.close(); // 释放网络资源
    metrics.close(); // 关闭监控指标
    // ...
}

http://www.niftyadmin.cn/n/5869888.html

相关文章

IO和NIO

IO和NIO在定义、面向对象、模式、选择器等方面存在显著的差异。以下是对IO和NIO的详细对比&#xff1a; 一、定义 IO&#xff08;Input/Output&#xff09;&#xff1a; IO是计算机中的输入/输出&#xff08;Input/Output&#xff09;的简称&#xff0c;指的是计算机系统与外…

python文件的基本操作,文件读写

1.文件 1.1文件就是存储在某种长期存储设备上的一段数据 1.2文件操作 打开文件-->读写文件-->关闭文件 注意&#xff1a;可以只打开和关闭文件不进行任何操作 1.3文件对象的方法 1.open():创建一个file对象&#xff0c;默认以只读模式打开 2.read(n):n表示从文件中…

内存泄漏问题分享

在前端开发中&#xff0c;内存泄漏&#xff08;Memory Leak&#xff09;是指由于代码问题导致浏览器无法回收不再使用的内存&#xff0c;从而影响网页的性能&#xff0c;导致页面变慢&#xff0c;甚至崩溃。前端内存泄漏通常由以下几种原因引起&#xff0c;理解和修复这些问题对…

核弹级技术革命——搭配deepseek-r1满血版的腾讯云ai助手(codex)仅用14天独立开发出适配ARM架构的微内核操作系统!

&#x1f680; 编程革命已至&#xff1a;双核AI代码引擎开启效率新纪元 &#x1f680; 当代码生成速度提升600%、缺陷率下降75%成为现实&#xff0c;全球开发者正在见证AI生产力的核爆时刻&#xff01;DeepSeek ProMax AI 代码引擎与腾讯云CodeX双剑合璧&#xff0c;以实测数据…

Uniapp 小程序:语音播放与暂停功能的实现及优化方案

界面部分 //开启语音 <button class"open" v-if"showPlayfalse" click"playText">这是开启播放的图片</button >//关闭语音 <button class"close" v-if"showPlaytrue" click"stopText">这是…

Rust 并发编程:使用消息传递进行线程间数据共享

一、通道&#xff08;Channel&#xff09;的基本概念 一个通道可以想象成一条单向水道或河流&#xff1a;有一个 发送端&#xff08;transmitter&#xff09; 和一个 接收端&#xff08;receiver&#xff09;。发送端好比河流上游&#xff0c;负责把“橡皮鸭”丢进水里&#x…

PhotoLine绿色版 v25.00:全能型图像处理软件的深度解析

在图像处理领域,PhotoLine以其强大的功能和紧凑的体积,赢得了国内外众多用户的喜爱。本文将为大家全面解析PhotoLine绿色版 v25.00的各项功能,帮助大家更好地了解这款全能型的图像处理软件。 一、迷你体积,强大功能 PhotoLine被誉为迷你版的Photoshop,其体积虽小,但功能却…

WiFi IEEE 802.11协议精读:IEEE 802.11-2007,6,MAC service definition MAC服务定义

继续精读IEEE 802.11-2007 6&#xff0c;MAC service definition MAC服务定义 6.1 MAC服务概述 6.1.1 数据服务 此服务为对等逻辑链路控制&#xff08;LLC&#xff09;实体提供交换MAC服务数据单元&#xff08;MSDU&#xff09;的能力。为支持此服务&#xff0c;本地媒体访…