kafka connect 常见问题

ERROR WorkerSourceTask{id=mysql-source-binlog-336-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)

org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback

        at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:252)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)

        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 3900896 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

查看 kafka connect的日志 INFO ProducerConfig values中max.request.size 的值

mysql-source-binlog-336-jobId-dbhistory  的max.request.siz改掉了。但是数据的max.request.siz没改依然报错

connector-producer-mysql-source-binlog-336-jobId-0 

参考

https://github.com/confluentinc/cp-docker-images/issues/445

加入参数

"database.history.producer.max.request.size": "157286400",

"max.request.size": "157286400"

到source中解决

---------------------------------------------------------------------------------------------------------------------

{"code":500,"data":null,"msg":"java.lang.IllegalArgumentException: URI is not absolute"}

等待下次再次出现通过 http://elk-ops.iqdnet.cn/ 查看日志分析,当前日志只保留4天,无法查看

---------------------------------------------------------------------------------------------------------------------

[2019-11-14 14:10:10,064] ERROR WorkerSourceTask{id=mysql-source-binlog-336-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:281)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:309)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234)

        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic mysql-source-binlog-336-jobId.slow_log.slow_log :

        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:83)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:281)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

        ... 11 more

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Key","namespace":"mysql_source_binlog_336_jobId.slow_log.slow_log","fields":[{"name":"id","type":"string"}],"connect.name":"mysql_source_binlog_336_jobId.slow_log.slow_log.Key"}

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while forwarding register schema request to the master; error code: 50003

        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)

        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)

        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)

        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)

        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)

        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)

        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)

        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)

        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)

        at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:131)

        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:281)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:281)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:309)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234)

        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

查看SchemaRegister日志没有发现明显错误,看到hostname不通

---------------------------------------------------------------------------------------------------------------------

[2019-11-21 16:32:16,681] ERROR Error during binlog processing. Last offset stored = {ts_sec=1574325000, file=mysqld-bin.008559, pos=419190460, row=1, server_

id=463306, event=3}, binlog reader near position = mysqld-bin.008559/430041542 (io.debezium.connector.mysql.BinlogReader:1054)

[2019-11-21 16:32:16,681] ERROR Failed due to error: Error processing binlog event (io.debezium.connector.mysql.BinlogReader:209)

org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-binlog-333-jobId:0 using brokers at null: truncate table mysql.slow_log

        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)

        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:208)

        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:508)

        at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1095)

        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:943)

        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)

        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)

        at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-binlog-333-jobId:0 using brokers at null: truncate table mysql.slow_log

        at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:361)

        at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:694)

        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:492)

        ... 5 more

Caused by: io.debezium.relational.history.DatabaseHistoryException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for dbhistory.mysql-source-binlog-333-jobId-0:120001 ms has passed since batch creation

        at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:198)

        at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:66)

        at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:60)

        at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:356)

        ... 7 more

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for dbhistory.mysql-source-binlog-333-jobId-0:120001 ms has passed since batch creation

        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)

        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)

        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)

        at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:188)

        ... 10 more

Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for dbhistory.mysql-source-binlog-333-jobId-0:120001 ms has passed since batch creation             

---------------------------------------------------------------------------------------------------------------------

ERROR WorkerSourceTask{id=mysql-source-binlog-358-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.conn

ect.runtime.WorkerTask:179)

org.apache.kafka.connect.errors.ConnectException: log event entry exceeded max_allowed_packet; Increase max_allowed_packet on master; the first event 'mysqld-

bin.000060' at 910723425, the last event read from '/data/binlog/qdpp/mysqld-bin.000060' at 123, the last byte read from '/data/binlog/qdpp/mysqld-bin.000060'

at 910723444. Error code: 1236; SQLSTATE: HY000.

        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)

        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)

        at io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1041)

        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)

        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)

        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)

        at java.lang.Thread.run(Thread.java:748)

Caused by: com.github.shyiko.mysql.binlog.network.ServerException: log event entry exceeded max_allowed_packet; Increase max_allowed_packet on master; the fir

st event 'mysqld-bin.000060' at 910723425, the last event read from '/data/binlog/qdpp/mysqld-bin.000060' at 123, the last byte read from '/data/binlog/qdpp/m

ysqld-bin.000060' at 910723444.

        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:914)

        ... 3 more 

单个事物日志超过max_allowed_packet配置的限制大小

同时Error code: 1236代表BINLOG不存在

---------------------------------------------------------------------------------------------------------------------

ERROR WorkerSinkTask{id=elasticsearch-sink-binlog-364-taskId-0} Task threw an uncaught and unrecoverable exception. Task is being ki

lled and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)

org.apache.kafka.connect.errors.ConnectException: java.net.SocketTimeoutException: Read timed out

        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.indexExists(JestElasticsearchClient.java:284)

        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:290)

        at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:255)

        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:169)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)

        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

可能是ES重启导致,需要手动重启任务

参考,说的是长时间不使用断开链接

https://github.com/confluentinc/kafka-connect-elasticsearch/pull/349

添加参数

max.connection.idle.time.ms

---------------------------------------------------------------------------------------------------------------------

[2019-11-25 20:27:55,399] ERROR WorkerSourceTask{id=mysql-source-binlog-333-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.conn

ect.runtime.WorkerTask:179)

org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-binlog-333-j

obId:0 using brokers at null: SAVEPOINT `SAVEPOINT_1`

        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)

        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:208)

        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:508)

        at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1095)

        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:943)

        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)

        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)

        at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-b

inlog-333-jobId:0 using brokers at null: SAVEPOINT `SAVEPOINT_1`

        at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:361)

        at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:694)

        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:492)

        ... 5 more

Caused by: io.debezium.relational.history.DatabaseHistoryException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartit

ionException: This server is not the leader for that topic-partition.

        at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:198)

        at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:66)

        at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:60)

        at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:356)

        ... 7 more

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topi

c-partition.

        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)

        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)

        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)

        at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:188)

        ... 10 more

Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

由于kafka调整参数重启,leader切换导致的问题。

---------------------------------------------------------------------------------------------------------------------

查看partition情况

./bin/kafka-topics  --bootstrap-server  10.37.251.101:9092 --topic mysql-source-binlog-324-jobId.longfor_mdm.cp_role_user_relation --describe

./bin/kafka-topics  --bootstrap-server  kafka-platform-01.qiandingyun.com:9092 --topic mysql-source-binlog-337-jobId.databus.ads_fenxiao_hehuoren_detail --describe

发现分区数为5

查看offest

./bin/kafka-consumer-groups --bootstrap-server 10.37.251.101:9092 --describe --group mysql-source-binlog-324-jobId.longfor_mdm.cp_role_user_relation

---------------------------------------------------------------------------------------------------------------------

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409; error code: 409   

存在相同版本的schema,直接删除旧的

http://10.50.6.54:8081/subjects/mysql-source-binlog-336-jobId.slow_log.slow_log-value/versions/1

---------------------------------------------------------------------------------------------------------------------

[2019-12-04 17:05:03,719] INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Retrying leaderEpoch request for partition _schemas-0 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)

[2019-12-04 17:05:04,721] WARN [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Error when sending leader epoch request for Map(_schemas-0 -> (currentLeaderEpoch=Optional[1], leaderEpoch=0)) (kafka.server.ReplicaFetcherThread)

java.io.IOException: Connection to 0 was disconnected before the response was read

        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)

        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)

        at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)

        at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)

        at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)

        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)

[2019-12-04 17:05:04,722] INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Retrying leaderEpoch request for partition _schemas-0 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)

---------------------------------------------------------------------------------------------------------------------

JDBC source mysql source不加任务时区参数,time类型小于8点,可能导致了-8小时为负数导致报错

Caused by: org.apache.kafka.connect.errors.DataException: Kafka Connect Time type should not have any date fields set to non-zero values.

        at org.apache.kafka.connect.data.Time.fromLogical(Time.java:64)

        at io.confluent.connect.avro.AvroData$7.convert(AvroData.java:287)

        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:420)

        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:607)

        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:366)

        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:284)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

        ... 11 more 

source参数加上时区参数

?serverTimezone=Asia/Shanghai

"db.timezone":"Asia/Shanghai"

后貌似Date类型-8小时导致时分秒不为零报错

Caused by: org.apache.kafka.connect.errors.DataException: Kafka Connect Date type should not have any time fields set to non-zero values.

        at org.apache.kafka.connect.data.Date.fromLogical(Date.java:64)

        at io.confluent.connect.avro.AvroData$6.convert(AvroData.java:276)

        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:420)

        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:607)

        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:366)

        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)

        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:284)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

        ... 11 more   

---------------------------------------------------------------------------------------------------------------------

1.

kafka在重启后,server.log 一直报WARN

INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Retrying leaderEpoch request for partition _schemas2-0 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)

[2019-12-23 16:10:08,438] INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: java.io.IOException: Connection to 0 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)

WARN [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Error when sending leader epoch request for Map(_schemas2-0 -> (currentLeaderEpoch=Optional[1], leaderEpoch=0)) (kafka.server.ReplicaFetcherThread)

java.io.IOException: Connection to 0 was disconnected before the response was read

        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)

        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)

        at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)

        at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)

        at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)

        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89) 

通过命令

./bin/kafka-topics --describe --zookeeper 10.37.251.222:2181 --topic _schemas2

Topic:_schemas2 PartitionCount:1        ReplicationFactor:2    Configs:cleanup.policy=compact

        Topic: _schemas2        Partition: 0    Leader: 0      Replicas: 1001,0        Isr: 0

指定优先选择副本作为leader

指定partions的leader

{

"partitions":

  [

    {"topic":"_schemas2","partition":0}

  ]

}

./bin/kafka-preferred-replica-election.sh --bootstrap-server 10.37.251.101:9092 --path-to-json-file /tmp/kafka-preferred-replica-election.json

指定replicas级别leader指定

执行

{

"version":1,

"partitions":

  [

    {"topic":"_schemas2","partition":0,"replicas":[1001]}

  ]

}

./bin/kafka-reassign-partitions.sh --zookeeper 10.37.251.222:2181  --reassignment-json-file /tmp/reassign-plan.json --execute

验证

./bin/kafka-reassign-partitions.sh --zookeeper 10.37.251.222:2181 --reassignment-json-file /tmp/reassign-plan.json --verify

再次验证

./bin/kafka-topics.sh --zookeeper 10.37.251.222:2181 --describe --topic _schemas2

查询zk信息

./bin/zkCli.sh -server 10.37.251.222:2181

修改了brlkers/ids下的[1001,0]为[1001]

修改后部分topic的leader变为-1,手动修改指定leader

产生0这个broker原因是中途打开broker.id的配置属性配置为0导致的,虽然新配置还原重启但是造成了新broker存在的假象。最终建议加一个0节点和1节点应该也能用了

重启kafka WARN不再报,但是kfaka的数据丢失同步任务目的地少数据。

set /brokers/topics/_schemas2/partitions/0/state {"controller_epoch":2,"leader":1001,"version":1,"leader_epoch":6,"isr":[1001]}

set /brokers/topics/mysql-source-binlog-318-jobId.qdp_rosetta.rst_task_problem/partitions/1/state {"controller_epoch":2,"leader":1001,"version":1,"leader_epoch":1,"isr":[1001]}

多个副本进入Isr只有leader可被读取

通过对比qa环境的schema的topic状态发现 dev的Replicas为2

./bin/kafka-topics --describe --zookeeper 10.37.253.31:2181 --topic _schemas

Topic:_schemas  PartitionCount:1        ReplicationFactor:3    Configs:cleanup.policy=compact

        Topic: _schemas Partition: 0    Leader: 0      Replicas: 0,1,2 Isr: 0,1 

_schemas2数据出现丢失,leader不可用

2.

产生的关联问题

重启kafka后发现任务报错。_schema中存在mysql-source-binlog-318-jobId.qdp_rosetta.rst_task_item-value 但是ID是421,也就是根据ID取出schema没拿到。但是为什么会取错ID为403

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mysql-source-binlog-318-jobId.qdp_rosetta.rst_task to Avro:

        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

        ... 13 more                     

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 403

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)

        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)   

---------------------------------------------------------------------------------------------------------------------

poll()方法return后没有返回,kafkaconnect的日志报WARN

[2019-12-20 09:54:20,153] WARN [Producer clientId=connector-producer-mysql-source-jdbc-333-jobId-0] Error while fetching metadata with correlation id 3 : {mysql-source-jdbc-333-jobId.devds.=LEADER_NOT_AVAILABLE}

(org.apache.kafka.clients.NetworkClient:1051)

---------------------------------------------------------------------------------------------------------------------

jdbc sink自动建表问题

无法识别mysql字段类型的长度值,始终使用默认值设定长度。并且像year(2018)类型最终映射成为date(2018-01-01)落地不符合要求

---------------------------------------------------------------------------------------------------------------------

org.apache.kafka.connect.errors.ConnectException: query may not be combined with whole-table copying settings.

jdbc source 中的sql 存在 join 语句不允许使用"table.whitelist" 属性,因为使用了"table.whitelist"导致对应的topic 的 schema没有创建成功,导致数据无法进入kafka而且不报错

必须topic.prefix属性指定完整的命名 mysql-source-jdbc-443-jobId.devds.user

---------------------------------------------------------------------------------------------------------------------

数据同步少数,停止同步不报错

[2020-01-08 15:43:11,124] ERROR WorkerSourceTask{id=mysql-source-binlog-353-jobId-0} Failed to flush, timed out while waiting for producer to flush outstanding 3 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431)

[2020-01-08 15:43:11,124] ERROR WorkerSourceTask{id=mysql-source-binlog-353-jobId-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)

---------------------------------------------------------------------------------------------------------------------

[2020-01-09 14:11:12,933] ERROR WorkerSourceTask{id=mysql-source-binlog-360-jobId-0} Failed to flush, timed out while waiting for producer to flush outstanding 3050 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431)

[2020-01-09 14:11:12,949] ERROR WorkerSourceTask{id=mysql-source-binlog-360-jobId-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)

---------------------------------------------------------------------------------------------------------------------

[2020-01-10 16:12:17,935] ERROR WorkerSinkTask{id=mysql-sink-binlog-485-taskId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)

org.apache.kafka.common.errors.WakeupException

        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:490)

        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)

        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)

        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)

        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:693)

        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454)

        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1412)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:332)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:360)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:431)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)

        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

---------------------------------------------------------------------------------------------------------------------

org.apache.kafka.connect.errors.ConnectException: PK mode for table 'users' is RECORD_KEY, but record key schema is missing

配置错误

"pk.mode": "record_key",改为"pk.mode": "record_value"

---------------------------------------------------------------------------------------------------------------------

Failed due to error: Aborting snapshot due to error when last running 'SELECT * FROM `qding_brick`.`bd_person_addr`': Can''t call rollback when autocommit=true (io.debezium.connector.mysql.SnapshotReader:20

9)

---------------------------------------------------------------------------------------------------------------------

[2020-01-14 01:56:33,803] ERROR WorkerSinkTask{id=mysql-sink-binlog-501-taskId-0} Commit of offsets threw an unexpected exception for sequence number 4899: {mysql-source-binlog-371-jobId.qdp_rosetta.rst_task_item-0=OffsetAndMetadata{offse

t=17870259, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:259)

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.

Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed out.

[2020-01-14 01:56:34,053] INFO [Consumer clientId=connector-consumer-mysql-sink-binlog-501-taskId-0, groupId=connect-mysql-sink-binlog-501-taskId] Discovered group coordinator 10.50.6.53:9092 (id: 2147483644 rack: null) (org.apache.kafka.

clients.consumer.internals.AbstractCoordinator:728)

---------------------------------------------------------------------------------------------------------------------

debezium做snapshot时使用jdbc查询tinyint(1)将大与0的数变为了1返回

设置这个参数解决

"database.tinyInt1isBit":"false"

---------------------------------------------------------------------------------------------------------------------

A slave with the same server_uuid/server_id as this slave has connected to the master

不明原因,task重启解决了,google查询怀疑和mysql服务有关系

---------------------------------------------------------------------------------------------------------------------

Caused by: io.debezium.text.ParsingException: no viable alternative at input

直接抛弃解析不了的sql "database.history.skip.unparseable.ddl": "true"

搜索可以升级到1.1.0.Beta1解决该bug

---------------------------------------------------------------------------------------------------------------------

[2020-03-12 12:42:28,414] ERROR WorkerSinkTask{id=mysql-sink-binlog-536-taskId-0} Commit of offsets threw an unexpected exception for sequence number 56: {mysql-source-binlog-378-jobId.qdp_rosetta.rst_task_item-0=OffsetAndMetadata{offset=19974922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:259)

max.poll.records

max.poll.interval.ms

offset.flush.timeout.ms=10000

---------------------------------------------------------------------------------------------------------------------

No producer is available. Ensure that 'start()' is called before storing database history records.

---------------------------------------------------------------------------------------------------------------------

Failed to flush, timed out while waiting for producer to flush outstanding 14581 messages

---------------------------------------------------------------------------------------------------------------------

  ERROR WorkerSinkTask{id=mysql-sink-binlog-498-taskId-0} Commit of offsets threw an unexpected exception for sequence number 604: {mysql-source-binlog-366-jobId.qdp_rosetta.rst_task-0=OffsetAndMetadata{offset=264911, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:259)

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容