Kafka
- 开启zookeeper:
bin/zookeeper-server-start.sh
config/zookeeper.properties - 开启server:
bin/kafka-server-start.sh
config/server.properties - 查看Topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181 - 创建一个Topic: b
in/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic AnalyticsData - 向Kafka输入数据:
- 开启Consumer:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic AnalyticsData --from-beginning
原理
Kafka是一种高吞吐量的分布式发布订阅消息系统:
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
kafka本身有内置zookeeper。
一个典型的Kafka集群中包含若干Producer(可以是web前端产生的PageView,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。
例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据。
Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。
def createProducer[K, V](bootstrapServers: String, keySerializer: String = KAFKA_SERIALIZATION_STRINGSERIALIZER, valueSerializer: String = KAFKA_SERIALIZATION_STRINGSERIALIZER) =
{
val properties = new Properties()
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer)
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") new KafkaProducer[K, V](properties)
}
Flume
Flume是Cloudera提供的一个分布式、可靠、高可用的海量日志采集、聚合、传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据。Flume提供对数据进行简单处理,并写到各种数据接收方。Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前使用exec方式进行日志采集。Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。在我们系统中由kafka来接收。