Kafka java api-生产者代码

前面做过命令行让生产者发送消息,现在使用java api来进行消息的生产,以及解释kafka高性能是如何实现(来源于学习资料)。
使用shell创建topic和发送消息如下:

#参数:zookeeper连接地址和端口号,副本数(包括自身),使用几个partition,topic的名称
[root@mini1 bin]# ./kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic orderMq
Created topic "orderMq".
[root@mini1 bin]# kafka-console-producer.sh --broker-list mini1:9092 --topic orderMq
hello tom
hi jerry
spring 
hhaah
xixi
nini

下面使用java api来发送消息
注:如果topic已经存在那么肯定就不创建了,但是不存在则会创建。

<dependencies>
       <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka_2.8.2</artifactId>
           <version>0.8.1</version>
       </dependency>
   </dependencies>

public class KafkaProducerSimple {
    public static void main(String[] args) {
        /**
         * 1、指定当前kafka producer生产的数据的目的地
         *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
         *  bin/kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic test
         */
        String TOPIC = "orderMq";
        /**
         * 2、读取配置文件
         */
        Properties props = new Properties();
        /*
         * key.serializer.class默认为serializer.class  key的序列化使用哪个类
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /*
         * kafka broker对应的主机,格式为host1:port1,host2:port2
         */
        props.put("metadata.broker.list", "mini1:9092,mini2:9092,mini3:9092");
        /*
         * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
         * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
         * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
         * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
         * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
         * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
         * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
         * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
         */
        props.put("request.required.acks", "1");
        /*
         * 可选配置,如果不配置,则使用默认的partitioner partitioner.class
         * 默认值:kafka.producer.DefaultPartitioner
         * 用来把消息分到各个partition中,默认行为是对key进行hash。
         */
        props.put("partitioner.class", "com.scu.kafka.MyLogPartitioner");
//        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        /**
         * 3、通过配置文件,创建生产者
         */
        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
        /**
         * 4、通过for循环生产数据
         */
        for (int messageNo = 1; messageNo < 100000; messageNo++) {
            /**
             * 5、调用producer的send方法发送数据
             * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
             */
            producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast"));
        }
    }
}

public class MyLogPartitioner implements Partitioner {
    private static Logger logger = Logger.getLogger(MyLogPartitioner.class);

    public MyLogPartitioner(VerifiableProperties props) {
    }

    /**
     *
     * @param obj 传来的key 用它来进行hash分到partition
     * @param numPartitions 几个partition 如果集群中已存在该topic,那么partition数为原本存在数,否则默认是2
     * @return 生产到哪个partition
     */
    public int partition(Object obj, int numPartitions) {
 //使用下面被注释掉的代码,则类似于hadoop的partition分发方式,hash取模去发到对应序号的partition,这里使用1则表示发送到orderMQ-1的topic
//        return Integer.parseInt(obj.toString())%numPartitions;
        return 1;
    }

}

启动kafka集群,执行main方法,去集群中查看。

[root@mini1 orderMQ-1]# ll
总用量 14296
-rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
-rw-r--r--. 1 root root 14610099 11月 22 07:53 00000000000000000000.log
[root@mini1 orderMQ-1]# ll
总用量 14696
-rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
-rw-r--r--. 1 root root 15012813 11月 22 07:53 00000000000000000000.log
[root@mini1 orderMQ-1]# ll
总用量 15184
-rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
-rw-r--r--. 1 root root 15513339 11月 22 07:53 00000000000000000000.log
[root@mini1 orderMQ-1]# ll
总用量 15448
-rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
-rw-r--r--. 1 root root 15783297 11月 22 07:53 00000000000000000000.log
[root@mini1 orderMQ-1]# ll
总用量 16288
-rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
-rw-r--r--. 1 root root 16643559 11月 22 07:53 00000000000000000000.log
[root@mini1 orderMQ-1]# ll
总用量 16600
-rw-r--r--. 1 root root 10485760 11月 22 07:53 00000000000000000000.index
-rw-r--r--. 1 root root 16961019 11月 22 07:53 00000000000000000000.log

看到消息在不断增加。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,719评论 13 425
  • 人荒废起来,真是挺可悲的,不想动,不想写日记!觉得自己的人生没有任何可被记录的意义。我在想昨天面试的那个地方到底可...
    艾莎菈阅读 184评论 0 1
  • 问题:假设你手上有价值3000元的自家产品(实习公司的或自己创业的)可以作为奖品,你会为它包装个什么样的噱头、利益...
    烟柳Artemis阅读 173评论 0 0
  • 恢复功能 当我们在使用vim编辑时,vim会在被编辑的文件目录下再新建一个名为.filename.swp的文件。如...
    zshanjun阅读 323评论 0 0