docker kafka 入门实践

1 前置条件

Linux-centos7 && docker && docker compose

2. 镜像

3. docker-compose.yml示例

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.18.55.63 # 此ip是我自己的内网ID
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

备注:亦可参照https://hub.docker.com/r/wurstmeister/kafka/站点的信息

3.启动

在docker-compose.yml所在的目录执行以下命令:

docker-compose up -d

运行:

docker ps -a

查看docker的容器信息:

CONTAINER ID        IMAGE                                     COMMAND                  CREATED             STATUS              PORTS                                                                        NAMES
ffa65c7bce19        wurstmeister/kafka                        "start-kafka.sh"         6 days ago          Up 6 days           0.0.0.0:32768->9092/tcp                                                      kafka_kafka_1
6d9cd607fc78        wurstmeister/zookeeper                    "/bin/sh -c '/usr/sb…"   6 days ago          Up 6 days           22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32769->2181/tcp                          kafka_zookeeper_1

4. 测试

  • 进入容器
docker exec -it kafka_kafka_1 /bin/bash
  • 创建topic
$KAFKA_HOME/bin/kafka-topics.sh --create --topic xiaojun --zookeeper kafka_zookeeper_1:2181 --replication-factor 1 --partitions 1
  • 查看topic信息
$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper kafka_zookeeper_1 --topic xiaojun

输出:

Topic:xiaojun   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: xiaojun  Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
  • 发布消息[发布完按ctrl+C退出]
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=xiaojun --broker-list kafka_kafka_1:9092

我发布了三条信息:

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
Today I am very upset
Tomorrow is another day
Can you help me
  • 消费信息
    1 从头消费
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun

输出:

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic xiaojun
Today I am very upset
Tomorrow is another day
Can you help me

2 按offset消费

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --partition 0 --offset 2 --topic xiaojun

输出:

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --partition 0 --offset 2 --topic xiaojun
Can you help me
  • 删除topic
$KAFKA_HOME/bin/kafka-topics.sh --delete --topic test  --zookeeper kafka_zookeeper_1:2181

5. PHP实践

1. Producer:
<?php

$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$topic = $rk->newTopic("test");

for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $rk->poll(0);
}

while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}

?>
2. Consumer:
<?php

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
             echo "Revoke: ";
             var_dump($partitions);
             $kafka->assign(NULL);
             break;

         default:
            throw new \Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.1');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'test'
$consumer->subscribe(['test']);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>

6 最后简要说一下kafka快的原理

6.1 先说几个概念

  • producer:生产者
  • consumer:消费者
  • topic:主题,可以理解为队列的名称
  • broker:集群的一个实例,可以是指定的服务器IP+端口
  • group:消费者可以是一个群组

6.2 原理

参考:
kafka效率
为什么Kafka那么快
总结:

写入优化

读写数据的效率瓶颈主要在于磁盘IO,所谓磁盘IO,需要经历机械磁盘的磁臂移动(寻道时间)和旋转延迟(寻址时间),如果是随机IO,那么这个时间耗费就更大了,但是呢,如果是顺序IO,则可以大大加快读写操作的效率。

以上说的简单的优化产生了数量级的加速。 批处理导致更大的网络数据包,更大的顺序磁盘操作,连续的内存块等,这些都允许Kafka将随机消息写入的突发流转换成流向消费者的线性写入。

当然,即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来提高I/O效率。

Kafka还使用了Memory Mapped Files进行写入优化。

Memory Mapped Files(后面简称mmap)也被翻译成内存映射文件,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候进行操作)。

通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。
使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销(调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)。但这种做法也有一个很明显的缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫同步(sync);写入mmap之后立即返回Producer不调用flush叫异步(async)。

同时,Kafka采用由生产者,经纪人和消费者共享的标准化二进制消息格式(样数据块就可以在它们之间自由传输,无需转换)来避免无效率的字节复制。

读取优化

另外一个很重要的优化是:
通常来说,数据从文件传输到socket的公共数据路径是:

    1. 操作系统将数据从磁盘读入到内核空间的页缓存
    1. 应用程序将数据从内核空间读入到用户空间缓存中
    1. 应用程序将数据写回到内核空间到socket缓存中
    1. 操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出

这样做明显是低效的,这里有四次拷贝,两次系统调用。如果使用sendfile,再次拷贝可以被避免:允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。

以上就是所谓的sendfile和zero copy。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容