#基本操作
参考:http://kafka.apache.org/quickstart#quickstart_multibroker
kakfa安装配置启动
http://kafka.apache.org/quickstart
- 启动zookerper
root:~/kafka$ ./bin/zookeeper-server-start.sh config/zookeeper.properties - 启动kafka
root:~/kafka_2.12-0.10.2.1$ ./bin/kafka-server-start.sh config/server.properties
server.properties中的cluster_id要求唯一
- 创建主题:
root:~/kafka_2.12-0.10.2.1$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test". - 启动producer
root:~/kafka_2.12-0.10.2.1$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test - 启动consumer
root:~/kafka_2.12-0.10.2.1$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
在Producer终端输入字符串,在Consumer终端将会接收到Producer端收到的信息
查看主题相关信息
root:~/kafka_2.12-0.10.2.1$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看:
root:~/kafka_2.12-0.10.2.1$ ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
每一行代表一个分区
leader:负责指定分区的所有读写操作的节点(cluster_id=2的节点)
replicas:对指定分区做日志复制的节点列表(cluster_id =2,0,1的节点)
isr:当前可用的或可成为leader的复制列表的子集(cluster_id =2,0,1的节点)
kafka设置多个集群
- 设置新集群的配置文件
cp config/server.properties config/server-1.properties
修改以下配置:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1 - 启动
bin/kafka-server-start.sh config/server-1.properties
数据的导入
- 设置连接
root:~/kafka_2.12-0.10.2.1$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties - 启动consumer监听指定主题
root:~/kafka_2.12-0.10.2.1$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning - 文件输入
root:~/kafka_2.12-0.10.2.1$ echo "line">>test.txt
connect-file-source.properties文件中设置了数据导入来源,connect-file-sink.properties 设置了数据保存地址
数据处理
- 新建主题
root:~/kafka_2.12-0.10.2.1$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-file-input - 设置从文件中获取producer输入
root:~/kafka_2.12-0.10.2.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt - 新建应用
root:~/kafka_2.12-0.10.2.1$ vi org.apache.kafka.streams.examples.wordcount.WordCountDemo - 运行应用
root:~/kafka_2.12-0.10.2.1$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo - 消费
root:~/kafka_2.12-0.10.2.1$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --property print.key=true
all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1
流处理是对发送的消息进行再次处理
java开发
参考:http://kafka.apache.org/documentation
kafka源代码中包含了java示例程序
- 添加依赖
<!--kafka-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
<!--kafka-->
stream
- 添加Producer和Consumer,并新建Stream要处理的消息主题
- 新建KafkaStreams对象
- 在Producer端发送消息,启动KafkaStreams运行
KStream的相关用法
- 针对每条消息记录进行处理:
//KStream示例
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount1");//一个stream应用指定一个topic
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("topic1");//input topic
processMessage(source);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
private static void processMessage(KStream<String, String> source) throws FileNotFoundException {
final PrintWriter printWriter = new PrintWriter(WordCountDemo.class.getResource("").getPath() + "outputfile");
source.foreach(new ForeachAction<String, String>() {
@Override
public void apply(String key, String value) {
printWriter.append("|").append(key).append("|").append(" ").append("|").append(value).append("|").append("\n");
printWriter.flush();
}
});
source.print();//打印到控制台
source.writeAsText(WordCountDemo.class.getResource("").getPath() + "outputfile");//写入文件
}
kafka用途
参考:http://kafka.apache.org/uses#uses_commitlog
- 消息处理
- 网站实时监测
- 监控
- 日志收集
- 数据流处理
- 事件跟踪(存在事件顺序)
7.分布式系统的提交记录
源代码编译
- 下载源码
git clone http://git-wip-us.apache.org/repos/asf/kafka.git<kafka.project.dir> - gradle编译
- cd <kafka.project.dir>
- gradle idea