SparkStreaming从kafka读取文件流时(Java),默认是utf-8的,如果源文件的编码不是utf-8,那就会出现乱码现象,但是kafka的官网参数里没有关于编码的参数,不过kafka的源码里面是有的。源码如下:
class String Encoder(props:VerifiableProperties=null) extends Encoder[String]{
val encoding = if (props==null) "UTF8" else props.getString("serializer.encoding","UTF8")
根据上面的源码,设置serializer.encoding就可以了,经过测试,设置serializer.encoding确实是有效的。下面是SparkStream从kafka读取数据代码:
...
HashSet_TopicsSet = newHashSet(Arrays.asList("a1_topic","a2_topic"));
Map_KafkaParams = new HashMap();
_KafkaParams.put("metadata.broker.list","dn1:9092,nn0:9092,dn0:9092");
_KafkaParams.put("group.id", "groupid");
_KafkaParams.put("fetch.message.max.bytes","5120000");
_KafkaParams.put("serializer.encoding", "gb18030");
JavaDStream_MessagesLines = KafkaUtils.createDirectStream(_JavaStreamingContext,String.class, String.class,StringDecoder.class,StringDecoder.class,_KafkaParams, _TopicsSet)
.map(newFunction, String>() {
publicString call(Tuple2 tuple2) {
returntuple2._2();
}
});
...