博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka之生产者消费者示例
阅读量:6457 次
发布时间:2019-06-23

本文共 3412 字,大约阅读时间需要 11 分钟。

本例以kafka2.10_0.10.0.0为例,不同版本的kafka Java api有些区别!

 

增加maven依赖

org.apache.kafka
kafka_2.10
0.10.0.0

生产者

package com.zns.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;public class KafkaProducerTest {    public static String topicName = "test";    public static void main(String[] args) {        Properties props = new Properties();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");// 9092是kafka默认端口        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        KafkaProducer
producer = new KafkaProducer
(props); for (int i = 1; i <= 10; i++) { ProducerRecord
message = new ProducerRecord
(topicName, "hello world " + i); producer.send(message); producer.flush(); } }}

 

 

消费者

package com.zns.kafka;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.serializer.StringDecoder;import kafka.utils.VerifiableProperties;public class KafkaConsumerTest {    public static String topicName = "test";    public static void main(String[] args) {        Properties props = new Properties();        props.put("zookeeper.connect", "127.0.0.1:2181");// 2181是zookeeper默认端口        props.put("group.id", "test-group");        props.put("zookeeper.session.timeout.ms", "100000");        props.put("zookeeper.sync.time.ms", "200");        props.put("auto.commit.interval.ms", "1000");        props.put("auto.offset.reset", "smallest");        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        ConsumerConfig config = new ConsumerConfig(props);        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);        Map
topicCountMap = new HashMap
(); topicCountMap.put(topicName, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map
>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream
stream = consumerMap.get(topicName).get(0); ConsumerIterator
it = stream.iterator(); while (it.hasNext()) { System.out.println("收到消息:" + it.next().message()); } }}

 

 

确保启动运行了zookeeper和kafka

先后启动运行生产者和消费者,可以看到消费者端接收到了消息...

转载于:https://www.cnblogs.com/zengnansheng/p/10389745.html

你可能感兴趣的文章
leetcode124二叉树最大路径和
查看>>
AngularJS笔记整理 内置指令与自定义指令
查看>>
shell与正则表达式
查看>>
第三篇:白话tornado源码之请求来了
查看>>
表示数值的字符串
查看>>
JQUERY AJAX请求
查看>>
超级账本Fabric区块链用弹珠游戏Marbles 部署
查看>>
控制圈复杂度的9种重构技术总结
查看>>
数据分析--数字找朋友
查看>>
18年selenium3+python3+unittest自动化测试教程(下)
查看>>
memcache数据库和redis数据库的区别(理论)
查看>>
我的友情链接
查看>>
MyBatis+Spring结合
查看>>
Office 365之SkyDrive Pro
查看>>
无缝滚动实现原理分析【公告栏】
查看>>
Java Web 高性能开发
查看>>
CentOS 4.4双网卡绑定,实现负载均衡
查看>>
Scala之柯里化和隐式转换
查看>>
获取androdmanifest里面的meta-data
查看>>
mysql拷贝表的几种方式
查看>>