1、kafka.properties
bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
retires=0
group.id=g_graffic1
enable.auto.commit=true
auto.commit.interval.ms=30000
kafka.topics=traffc
zookeeper.sync.time.ms=250
num.io.threads=12
batch.size=65536
buffer.memory=524288
log.retention.hours=5
2、Producer
import java.text.DecimalFormat
import java.util
import java.util.Calendar
import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import utils.PropertyUtil
import scala.util.Random
object Producer {
def main(args: Array[String]): Unit = {
//读取kafka配置信息
val properties = PropertyUtil.properties
//创建kafka生产者
val produce = new KafkaProducer[String, String](properties)
val jsonMap = new util.HashMap[String, String]()
jsonMap.put("monitor_id", randomMonitorId)
jsonMap.put("speed", randomSpeed)
val event = JSON.toJSON(jsonMap)
produce.send(new ProducerRecord[String, String](PropertyUtil.getProperties("kafka.topics"), event.toString))
Thread.sleep(300)
}
}
3、PropertyUtil
import java.util.Properties
object PropertyUtil {
val properties = new Properties
try {
val stream = ClassLoader.getSystemResourceAsStream("kafka.properties")
properties.load(stream)
} catch {
case e:Exception => e.getStackTrace
} finally {}
def getProperties(key:String) = properties.getProperty(key)
}
4、pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.41</version>
</dependency>