ChunJun使用教程
目录
ChunJun简介
ChunJun(原名FlinkX)是一个基于Flink的批流统一的数据同步工具,实现了多种异构数据源之间高效的数据同步功能。它支持多种数据源,包括MySQL、PostgreSQL、Oracle、SQLServer、Hive、HBase、MongoDB、Kafka、Elasticsearch等。
主要特点:
- 批流统一:同一套代码实现离线批量和实时流式数据同步
- 高性能:基于Flink的分布式处理能力,支持大规模数据同步
- 扩展性强:插件化设计,支持自定义开发新的数据源
- 易用性高:采用JSON配置文件定义任务,无需编码
Job配置
在ChunJun中,一个Job代表一个完整的数据同步任务。Job通过JSON格式的配置文件来定义。
主要结构:
{
"job": {
"content": [
{
"reader": {},
"writer": {},
"transformer": []
}
],
"setting": {}
}
}
主要参数:
-
content: 定义具体的数据处理流程 -
setting: 定义Job的全局设置,如并发数、错误处理等
Reader介绍
Reader负责从数据源读取数据。ChunJun支持多种类型的Reader。
常见Reader类别:
- 关系型数据库:MySQLReader, OracleReader, SQLServerReader等
- 大数据存储:HiveReader, HBaseReader, HDFSReader等
- NoSQL数据库:MongoDBReader, RedisReader等
- 消息队列:KafkaReader, PulsarReader等
通用参数:
-
name: Reader的名称,如"mysqlreader" -
parameter: 具体的配置参数,因Reader类型而异
使用示例(KafkaReader):
"reader": {
"name": "kafkareader",
"parameter": {
"topic": "user_activities",
"bootstrapServers": "localhost:9092",
"groupId": "etl_group",
"codec": "json"
}
}
Writer介绍
Writer负责将数据写入目标存储系统。
常见Writer类别:
- 关系型数据库:MySQLWriter, OracleWriter, SQLServerWriter等
- 大数据存储:HiveWriter, HBaseWriter, HDFSWriter等
- NoSQL数据库:MongoDBWriter, RedisWriter等
- 搜索引擎:ElasticsearchWriter
通用参数:
-
name: Writer的名称,如"elasticsearchwriter" -
parameter: 具体的配置参数,因Writer类型而异
使用示例(ElasticsearchWriter):
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"hosts": ["http://localhost:9200"],
"index": "user_activities",
"type": "_doc",
"bulkAction": 1000
}
}
Transformer介绍
Transformer用于在数据从Reader到Writer的过程中进行转换。它允许用户对数据进行清洗、转换、过滤等操作,以满足目标系统的数据要求。
Transformer的主要功能:
- 字段映射和重命名
- 数据过滤
- 字段值转换
- 字段增加或删除
- 复杂的数据处理逻辑
Transformer类别及说明
ChunJun提供了多种Transformer,每种都有其特定的用途:
-
GroovyTransformer
- 说明:使用Groovy脚本进行灵活的数据转换
- 适用场景:需要复杂的转换逻辑,如多字段组合、条件处理等
- 使用示例:
{ "name": "groovy", "parameter": { "code": " def map = [:]; map['full_name'] = record.first_name + ' ' + record.last_name; map['age'] = record.age.toInteger() + 1; return map; " } }
-
FilterTransformer
- 说明:根据条件过滤数据记录
- 适用场景:需要去除不符合特定条件的数据
- 使用示例:
{ "name": "FilterTransformer", "parameter": { "columnName": "age", "columnValue": "18", "compareMode": "GT" } }
-
FieldMapper
- 说明:进行字段映射和重命名
- 适用场景:源数据和目标数据的字段名不一致时
- 使用示例:
{ "name": "FieldMapper", "parameter": { "mappings":[ { "srcField":"source_name", "destField":"target_name" } ] } }
-
StringReplaceTransformer
- 说明:替换字符串中的指定内容
- 适用场景:需要清理或标准化字符串数据
- 使用示例:
{ "name": "StringReplaceTransformer", "parameter": { "columnName": "address", "replaceParams":[ { "regex": "St\\.", "replace": "Street" } ] } }
-
PadTransformer
- 说明:对字段进行填充操作
- 适用场景:需要将字段填充到特定长度
- 使用示例:
{ "name": "PadTransformer", "parameter": { "columnName": "id", "padType": "left", "padChar": "0", "length": 10 } }
-
ReplaceNullTransformer
- 说明:将null值替换为指定的值
- 适用场景:处理缺失数据
- 使用示例:
{ "name": "ReplaceNullTransformer", "parameter": { "columnName": "salary", "replaceValue": "0" } }
-
CaseTransformer
- 说明:转换字段的大小写
- 适用场景:标准化文本数据
- 使用示例:
{ "name": "CaseTransformer", "parameter": { "columnName": "name", "caseType": "UPPER" } }
使用多个Transformer
您可以在一个任务中使用多个Transformer,它们将按照配置的顺序依次执行:
"transformer": [
{
"name": "FilterTransformer",
"parameter": {
"columnName": "age",
"columnValue": "18",
"compareMode": "GT"
}
},
{
"name": "FieldMapper",
"parameter": {
"mappings":[
{
"srcField":"name",
"destField":"full_name"
}
]
}
},
{
"name": "groovy",
"parameter": {
"code": "
def map = record;
map['processed_at'] = new Date().format(\"yyyy-MM-dd'T'HH:mm:ss.SSSZ\");
return map;
"
}
}
]
在这个例子中,数据首先经过过滤,然后进行字段映射,最后通过Groovy脚本添加处理时间戳。
注意事项
- Transformer的执行顺序很重要,它们会按照配置中的顺序依次处理数据。
- 使用GroovyTransformer时,确保脚本的性能,因为它会对每条记录执行。
- 某些Transformer(如FilterTransformer)可能会减少数据量,这可能会影响性能统计。
- 在使用Transformer时,要注意数据类型的一致性,确保转换后的数据类型与Writer期望的类型相匹配。
通过合理使用这些Transformer,您可以在数据同步过程中实现复杂的数据处理逻辑,确保数据质量和一致性。
案例:从Kafka到Elasticsearch的数据同步
以下是一个完整的Job配置示例,展示如何从Kafka读取数据并写入Elasticsearch:
{
"job": {
"content": [
{
"reader": {
"name": "kafkareader",
"parameter": {
"topic": "user_activities",
"bootstrapServers": "localhost:9092",
"groupId": "etl_group",
"codec": "json",
"consumerSettings": {
"auto.offset.reset": "latest"
}
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"hosts": ["http://localhost:9200"],
"index": "user_activities",
"type": "_doc",
"bulkAction": 1000,
"timeout": 300
}
},
"transformer": [
{
"name": "groovy",
"parameter": {
"code": "
def map = [:];
map['user_id'] = record.user_id;
map['activity_type'] = record.type;
map['timestamp'] = record.timestamp;
map['details'] = record.details.toString();
map['processed_at'] = new Date().format(\"yyyy-MM-dd'T'HH:mm:ss.SSSZ\");
return map;
"
}
}
]
}
],
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
}
}
}
运行Job
保存上述配置为 kafka_to_es_job.json,然后使用以下命令运行:
./chunjun -job path/to/kafka_to_es_job.json
这个案例展示了如何使用ChunJun创建一个从Kafka读取用户活动数据,转换数据格式,然后将其写入Elasticsearch的作业。通过调整JSON配置文件,您可以轻松地修改数据源、目标和转换逻辑,而无需更改代码。