Flume接入log4j/log4cplus时SocketAppender的那些事?

先说两句:

公司最近需要做一个数据分析平台,主要功能就是收集app中用户的行为日志,然后通过日志分析出一些通用的报表数据、用户行为预测等等,便于运营。

日志怎么接入:

接入的日志服务器端开发用的C++,日志打印用的log4cplus,使用java的同学可能比较熟悉log4j,其实都一样。Just so so。直接上配置文件:

#flume_log

log4cplus.appender.R6=log4cplus::SocketAppender

log4cplus.appender.R6.host=10.132.34.12

log4cplus.appender.R6.port=44444

log4cplus.appender.R6.layout=log4cplus::PatternLayout

log4cplus.appender.R6.layout.ConversionPattern=%m

直接采用SocketAppender发送日志到远程服务器,从上面的配置可以得到,10.132.34.12就是Flume接受日志的服务器,44444就是Flume的端口号。

Flume配置:

由于采用的是SocketAppender,Flume这边可以采用SyslogTcpSource,直接上配置文件:

a1.sources = s1

a1.sinks = k1

a1.channels = c1


a1.sources.s1.type = syslogtcp

a1.sources.s1.host = 10.132.34.12

a1.sources.s1.port = 44444

a1.sources.s1.channels = c1


a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 100


a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

开始配置的时候感觉一切怎么都这么顺利呢,So Easy有没有。后来测试的时候才发现现在才刚刚开始。

问题1:消息怎么没有换行呢?

消息接入的时候log4cplus里面的配置为:log4cplus.appender.R6.layout.ConversionPattern=%m,是不是加上'%n'就ok了,加上'%n'后测试发现问题没有解决,尝试手动在日志后面加上'\n'呢,还是不行。但是消息打印到本地磁盘,一些都那么美好,通过SocketAppender就是没有换行,崩溃中...

问题2:为什么Flume接收的日志总有乱码?

Flume通过logger Sink打印的日志中总有乱码,通过tcpdump发现tcp包中确实包含一些无关信息。难道是发送的时候数据就有问题,反复检查C++代码发现没有问题。

咋办呢?

本猿是一名java程序员,对C++不太了解。查看了下org.apache.log4j.net.SocketAppender的源码。发现了一些端倪。

首先在SocketAppender中声明了一个ObjectOutputStream  oos;的变量,

在connect方法内部对oos进行了实例化,oos=new ObjectOutputStream(newSocket(address,port).getOutputStream());

在append方法内部调用的是:oos.writeObject(event);其中event类型是org.apache.log4j.spi.LoggingEvent。

到此为什么出现乱码,为什么消息里的换行符不起作用看起来就明了了。

原来SocketAppender发送的是一个序列化后的对象,而Flume的SyslogTcpSource接收到tcp包后没有进行反序列化,而是直接将收到的消息作为日志内容进行解析,出现乱码就不奇怪了。

随后网上download了一份log4cplus的源码,发现里面的实现基本和java一致,SocketAppender发送的消息也是序列化后的对象。具体代码如下:

其中convertToBuffer的代码如下:


解决:

1、客服端修改logcplus源码重新编译安装,在append方法内部修改发送内容,直接将消息内容发送到Flume(需要修改代码bool ret = socket.write(msgBuffer);),联系客户端 同学,说难度比较大,扩展性不好,放弃了。

2、重写Flume的SyslogTcpSouce源码,在Flume端解析对象内容。C++端解析的源码如下:



通过上面的代码,可以很容易的写出java的实现版本。

注意:java解析到消息内容后,如果消息不是'\n'结尾,需要手动添加'\n',否则Flume无法正常解析日志内容。由于SyslogTcpSource的消息默认长度为2500Byte,所以当日志达到最大值的时候会切断消息内容。

由此所有的问题看似都完美解决了。附修改后的完整代码一份,如下:

packagecom.mirror.game.flume.source;

importorg.apache.flume.ChannelException;

importorg.apache.flume.*;

importorg.apache.flume.conf.Configurable;

importorg.apache.flume.conf.Configurables;

importorg.apache.flume.source.AbstractSource;

importorg.apache.flume.source.SyslogSourceConfigurationConstants;

importorg.apache.flume.source.SyslogUtils;

importorg.jboss.netty.bootstrap.ServerBootstrap;

importorg.jboss.netty.buffer.ChannelBuffer;

importorg.jboss.netty.buffer.ChannelBufferFactory;

importorg.jboss.netty.buffer.ChannelBuffers;

importorg.jboss.netty.channel.Channel;

importorg.jboss.netty.channel.ChannelFactory;

importorg.jboss.netty.channel.*;

importorg.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

importorg.slf4j.Logger;

importorg.slf4j.LoggerFactory;

importjava.net.InetSocketAddress;

importjava.nio.ByteOrder;

importjava.util.Map;

importjava.util.Set;

importjava.util.concurrent.Executors;

importjava.util.concurrent.TimeUnit;

/**

*@Author: 小胖墩

*@Description:

*      接受log4cplus序列化后的对象,并对tcp包反序列化得到日志内容。log4cplus序列化和反序列化的过程参考socketappender.cxx的源码。

*

*      序列化过程如下:

*      void convertToBuffer(SocketBuffer & buffer, const spi::InternalLoggingEvent& event, const tstring& serverName)

*      {

*

*          buffer.appendByte(LOG4CPLUS_MESSAGE_VERSION);

*          #ifndef UNICODE

*              buffer.appendByte(1);

*          #else

*              buffer.appendByte(2);

*          #endif

*

*          buffer.appendString(serverName);

*          buffer.appendString(event.getLoggerName());

*          buffer.appendInt(event.getLogLevel());

*          buffer.appendString(event.getNDC());

*          buffer.appendString(event.getMessage());

*          buffer.appendString(event.getThread());

*          buffer.appendInt( static_cast(event.getTimestamp().sec()) );

*          buffer.appendInt( static_cast(event.getTimestamp().usec()) );

*          buffer.appendString(event.getFile());

*          buffer.appendInt(event.getLine());

*          buffer.appendString(event.getFunction());

*      }

*

*      反序列化过程如下:

*      spi::InternalLoggingEvent readFromBuffer(SocketBuffer& buffer)

*      {

*          unsigned char msgVersion = buffer.readByte();

*          if(msgVersion != LOG4CPLUS_MESSAGE_VERSION) {

*              LogLog * loglog = LogLog::getLogLog();

*              loglog->warn(LOG4CPLUS_TEXT("readFromBuffer() received socket message with an invalid version"));

*          }

*

*          unsigned char sizeOfChar = buffer.readByte();

*

*          tstring serverName = buffer.readString(sizeOfChar);

*          tstring loggerName = buffer.readString(sizeOfChar);

*          LogLevel ll = buffer.readInt();

*          tstring ndc = buffer.readString(sizeOfChar);

*          if(! serverName.empty ()) {

*              if(ndc.empty ()) {

*                  ndc = serverName;

*              }

*              else {

*                  ndc = serverName + LOG4CPLUS_TEXT(" - ") + ndc;

*              }

*          }

*          tstring message = buffer.readString(sizeOfChar);

*          tstring thread = buffer.readString(sizeOfChar);

*          long sec = buffer.readInt();

*          long usec = buffer.readInt();

*          tstring file = buffer.readString(sizeOfChar);

*          int line = buffer.readInt();

*          tstring function = buffer.readString(sizeOfChar);

*

*          spi::InternalLoggingEvent ev (loggerName, ll, ndc,

*              MappedDiagnosticContextMap (), message, thread, internal::empty_str,

*              Time(sec, usec), file, line, function);

*          return ev;

*      }

*

*@Date: 2017/10/21 16:53

*@ModifiedBy :

*/

public classMirrorSyslogTcpSourceextendsAbstractSourceimplementsEventDrivenSource,Configurable {

private static finalLoggerlogger= LoggerFactory.getLogger(MirrorSyslogTcpSource.class);

private intport;

privateStringhost=null;

privateChannelnettyChannel;

privateIntegereventSize;

privateMapformaterProp;

privateCounterGroupcounterGroup=newCounterGroup();

privateSetkeepFields;

public classsyslogTcpHandlerextendsSimpleChannelHandler {

privateSyslogUtilssyslogUtils=newSyslogUtils();

public voidsetEventSize(inteventSize){

syslogUtils.setEventSize(eventSize);

}

public voidsetKeepFields(Set keepFields) {

syslogUtils.setKeepFields(keepFields);

}

public voidsetFormater(Map prop) {

syslogUtils.addFormats(prop);

}

@Override

public voidmessageReceived(ChannelHandlerContext ctx,MessageEvent mEvent) {

ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();

while(buff.readable()) {

try{

intlength = buff.readInt();//消息总长度

ChannelBuffer eventBuffer = buff.readBytes(length);//消息你内容,log4cplus将InternalLoggingEvent封装序列化后的结果

intmessageVersion = eventBuffer.readByte();//消息版本号,log4cplus默认消息版本号为3

intsizeOfChar = eventBuffer.readByte();//char的字节长度,unicode=1,否则=2

intserverNameLength = eventBuffer.readInt();//获取server name

ChannelBuffer serverNameBuffer =null;

if(serverNameLength >0){

serverNameBuffer = eventBuffer.readBytes(serverNameLength * sizeOfChar);//serverName

}

intloggerNameLength  = eventBuffer.readInt();

ChannelBuffer loggerNameBuffer =null;

if(loggerNameLength >0){

loggerNameBuffer = eventBuffer.readBytes(loggerNameLength * sizeOfChar);//loggerName

}

intlogLevel = eventBuffer.readInt();//日志级别

intndcLength = eventBuffer.readInt();// ndc

ChannelBuffer ndcBuffer =null;

if(ndcLength >0){

ndcBuffer = eventBuffer.readBytes(ndcLength * sizeOfChar);

}

intmessageLength = eventBuffer.readInt();//消息内容

ChannelBuffer messageBuffer =null;

if(messageLength >0){

intlen = messageLength * sizeOfChar;

messageBuffer = eventBuffer.readBytes(len);

/**

* 必须在消息末尾添加‘\n’,否则消息解析失败.

*/

byte[] messageArray = messageBuffer.array();

if(messageArray[len-1] !='\n'){

byte[] newArray =new byte[len+1];

System.arraycopy(messageArray,0,newArray,0,len);

newArray[len] ='\n';

messageBuffer = ChannelBuffers.buffer(ByteOrder.LITTLE_ENDIAN,newArray.length);

messageBuffer.writeBytes(newArray);

}

}

if(logger.isDebugEnabled()){

intthreadLength = eventBuffer.readInt();//线程名字

ChannelBuffer threadBuffer =null;

if(threadLength >0){

threadBuffer = eventBuffer.readBytes(threadLength * sizeOfChar);

}

inttimeStampSec = eventBuffer.readInt();//时间戳

inttimeStampUsec = eventBuffer.readInt();

intfileLength =  eventBuffer.readInt();//打印日志的文件

ChannelBuffer fileBuffer =null;

if(fileLength >0){

fileBuffer = eventBuffer.readBytes(fileLength * sizeOfChar);

}

intline = eventBuffer.readInt();//代码中的行数

intfuncLength  = eventBuffer.readInt();//打印日志的方法名

ChannelBuffer functionBuffer =null;

if(funcLength >0){

functionBuffer = eventBuffer.readBytes(funcLength * sizeOfChar);

}

StringBuilder sb =newStringBuilder("{");

sb.append("length=").append(length).append(",")

.append("messageVersion=").append(messageVersion).append(",")

.append("serverNameLength=").append(serverNameLength).append(",")

.append("serverName=").append(serverNameBuffer==null?"null":newString(serverNameBuffer.array(),"utf-8")).append(",")

.append("loggerNameLength=").append(loggerNameLength).append(",")

.append("loggerName=").append(loggerNameBuffer==null?"null":newString(loggerNameBuffer.array(),"utf-8")).append(",")

.append("logLevel=").append(logLevel).append(",")

.append("ndcLength=").append(ndcLength).append(",")

.append("ndc=").append(ndcBuffer ==null?"null":newString(ndcBuffer.array(),"utf-8")).append(",")

.append("messageLength=").append(messageLength).append(",")

.append("message=").append(messageBuffer ==null?"null":newString(messageBuffer.array(),"utf-8")).append(",")

.append("threadLength=").append(threadLength).append(",")

.append("thread=").append(threadBuffer==null?"null":newString(threadBuffer.array(),"utf-8")).append(",")

.append("timeStampSec=").append(timeStampSec).append(",")

.append("timeStampUsec=").append(timeStampUsec).append(",")

.append("fileLength=").append(fileLength).append(",")

.append("file=").append(fileBuffer==null?"null":newString(fileBuffer.array(),"utf-8")).append(",")

.append("line=").append(line).append(",")

.append("funcLength=").append(funcLength).append(",")

.append("func=").append(functionBuffer==null?"null":newString(functionBuffer.array(),"utf-8")).append(",")

.append("}");

logger.debug(sb.toString());

}

Event e =syslogUtils.extractEvent(messageBuffer);

if(e ==null) {

logger.warn("Event is null, Parsed partial event, event will be generated when rest of the event is received.");

continue;

}

try{

getChannelProcessor().processEvent(e);

counterGroup.incrementAndGet("events.success");

}catch(ChannelException ex) {

counterGroup.incrementAndGet("events.dropped");

logger.error("Error writting to channel, event dropped",ex);

}

}catch(Exception e){

logger.error("read message error,",e);

}

}

}

}

@Override

public voidstart() {

ChannelFactory factory =newNioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());

ServerBootstrap serverBootstrap =newServerBootstrap(factory);

serverBootstrap.setPipelineFactory(() -> {

syslogTcpHandler handler =newsyslogTcpHandler();

handler.setEventSize(eventSize);

handler.setFormater(formaterProp);

handler.setKeepFields(keepFields);

returnChannels.pipeline(handler);

});

logger.info("Mirror Syslog TCP Source starting...");

if(host==null) {

nettyChannel= serverBootstrap.bind(newInetSocketAddress(port));

}else{

nettyChannel= serverBootstrap.bind(newInetSocketAddress(host,port));

}

super.start();

}

@Override

public voidstop() {

logger.info("Mirror Syslog TCP Source stopping...");

logger.info("Metrics:{}",counterGroup);

if(nettyChannel!=null) {

nettyChannel.close();

try{

nettyChannel.getCloseFuture().await(60,TimeUnit.SECONDS);

}catch(InterruptedException e) {

logger.warn("netty server stop interrupted",e);

}finally{

nettyChannel=null;

}

}

super.stop();

}

@Override

public voidconfigure(Context context) {

Configurables.ensureRequiredNonNull(context,SyslogSourceConfigurationConstants.CONFIG_PORT);

port= context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);

host= context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);

/**

* 默认每条日志记录的大小,默认2500字节。由于序列化的过程中会占用大量的空间,此处将默认大小设置为10*DEFAULT_SIZE

*/

eventSize= context.getInteger("eventSize",SyslogUtils.DEFAULT_SIZE*10);

formaterProp= context.getSubProperties(SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);

keepFields= SyslogUtils.chooseFieldsToKeep(

context.getString(

SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,

SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));

}

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,204评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,091评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,548评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,657评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,689评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,554评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,302评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,216评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,661评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,851评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,977评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,697评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,306评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,898评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,019评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,138评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,927评论 2 355

推荐阅读更多精彩内容