1818
1919
2020
21+
22+
23+
24+
2125### 2. 异步发送 API
2226
2327- ** KafkaProducer** 需要创建一个生产者对象,用来发送数据
3236</dependency >
3337```
3438
39+ #### (1)不带回调函数的异步(AsyncProducer)
40+
3541``` java
3642package com.tian.kafka.producer ;
3743
@@ -70,6 +76,8 @@ public class AsyncProducer {
7076}
7177```
7278
79+ #### (2)带回调函数的异步(CallbackProducer)
80+
7381``` java
7482package com.tian.kafka.producer ;
7583
@@ -79,13 +87,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
7987import java.util.Properties ;
8088
8189/**
82- * 带回调函数的异步 Producer API
90+ * 带回调函数的异步Producer API
8391 */
8492public class CallbackProducer {
8593 public static void main (String [] args ) {
8694 Properties props = new Properties ();
8795 props. put(ProducerConfig . BOOTSTRAP_SERVERS_CONFIG ,
88- " hadoop101:9092,hadoop102:9092,hadoop103 :9092" );
96+ " 192.168.72.133 :9092" );
8997 props. put(ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG ,
9098 StringSerializer . class. getName());
9199 props. put(ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG ,
@@ -105,8 +113,9 @@ public class CallbackProducer {
105113 else e. printStackTrace();
106114 }
107115 });
108- producer . close();
116+
109117 }
118+ producer. close();
110119 }
111120}
112121```
@@ -115,6 +124,8 @@ public class CallbackProducer {
115124
116125### 3. 同步发送 API
117126
127+ #### (1)同步发送(SyncProducer)
128+
118129``` java
119130package com.tian.kafka.producer ;
120131
@@ -339,7 +350,7 @@ public class AsyncManualCommitOffset {
339350### 3. 自定义存储 offset
340351
341352 Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本之后,默认将 offset 存储在 Kafka 的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。
342- Offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance。
353+ offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance。
343354 当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
344355 消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
345356 要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。
@@ -368,6 +379,7 @@ public class CustomConsumer {
368379 props. put(" value.deserializer" , " org.apache.kafka.common.serialization.StringDeserializer" );
369380
370381 KafkaConsumer<String , String > consumer = new KafkaConsumer<> (props);
382+ // 消费者订阅主题
371383 consumer. subscribe(Arrays . asList(" first" ), new ConsumerRebalanceListener () {
372384
373385 // 该方法会在 Rebalance 之前调用
0 commit comments