线上运行的时单行日志超7M时会导致Flume异常,导致无法正常提供服务。
2017-07-10 14:55:10,649 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:240)] Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1088783 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1088783 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-07-10 14:55:10,649 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1088783 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
... 3 more
在flume的conf文件夹下的log2kafka.conf
加上 a1.sinks.k1.kafka.producer.max.request.size = 5271988
即可。
可能还会和kafka的配置有关。
更改后的完整配置
-
Flume 1.7.0:
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#taildir sources
a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /data1/koala/logs/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data1/koala/logs/crock.log
a1.sources.r1.headers.f1.topic = CallInfo
a1.sources.r1.fileHeader = true
##kafka sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.k1.kafka.producer.max.request.size = 7271988
a1.sinks.k1.kafka.producer.max.request.size = 10485760
a1.sinks.k1.kafka.producer.compression.type = gzip
#a1.sinks.k1.kafka.kafka.topic = CallInfo
##test
a1.sinks.k1.kafka.bootstrap.servers = kafka-01:9092,kafka-02:9092
##channels
##a1.channels.c1.type=memory
##a1.channels.c1.capacity=10000
##a1.channels.c1.transactionCapacity=100
a1.channels.c1.type=file
a1.channels.c1.checkpointDir = /data2/koala/flume/checkpoint
a1.channels.c1.dataDirs = /data2/koala/flume/data
##a1.channels.c1.checkpointInterval = 1000
-
kafka 0.10.0 :
参数 | 值 | 值范围 |
---|---|---|
log.retention.bytes | 9663676416 | 1024-1073741824000 |
log.retention.hours | 168 | 1-8760 |
auto.create.topics.enable | true | true,false |
default.replication.factor | 1 | 1-10 |
delete.topic.enable | true | true,false |
log.cleaner.enable | false | true,false |
log.message.format.version | 0.10.0 | 0.10.0,0.9.0,0.8.2 |
message.max.bytes | 100000000 | 1024-1073741823 |
num.io.threads | 8 | 1-20 |
num.partitions | 1 | 1-100 |
num.replica.fetchers | 1 | 1-10 |
queued.max.requests | 500 | 1-5000 |
socket.receive.buffer.bytes | 102400 | 1024-102400000 |
socket.send.buffer.bytes | 102400 | 1024-102400000 |