tags: springbatch mongodb
1.引言
之前对Spring Batch的通过实例的方式进行了介绍,有兴趣的可见以下文章:
- 数据批处理神器-Spring Batch(1)简介及使用场景
- 快速了解组件-spring batch(2)之helloworld
- 快速使用组件-spring batch(3)读文件数据到数据库
- 决战数据库-spring batch(4)数据库到数据库
- 便捷的数据读写-spring batch(5)结合beetlSql进行数据读写
- 增量同步-spring batch(6)动态参数绑定与增量同步
- 调度与监控-spring batch(7)结合xxl-job进行批处理
除了文件及关系型数据库的数据同步,Spring Batch的读组件(ItemReader
),处理组件(ItemProcessor
),写组件(ItemWriter
)支持丰富的数据类型,其中MongoItemReader
及MongoItemWriter
是针对mongo的读写组件,用户可以直接使用,进行Mongodb
的数据读写操作。一种比较常用的情景是从关系型数据库(如mysql
)把数据同步到mongodb
中,下面通过实例对mysql
到mongodb
的数据同步进行讲解。本文主要讲解有关Mongodb
的操作,对于Spring Batch
使用beetlsql
进行关系数据库数据读取的操作请见文章《便捷的数据读写-spring batch(5)结合beetlSql进行数据读写》。本文的示例代码见github示例仓库。
2.开发环境
- JDK: jdk1.8
- Spring Boot: 2.1.4.RELEASE
- Spring Batch:4.1.2.RELEASE
- 开发IDE: IDEA
- 构建工具Maven: 3.3.9
- 日志组件logback:1.2.3
- lombok:1.18.6
- MySQL: 5.6.26
- Mongodb:4.0.10
3.开发流程
3.1 示例数据库及目标数据库
本示例的流程如下所示:
示例工程中的sql
目录有相应的关系数据库脚本,mytest.sql
脚本创建一个test_user
表,并有相应的测试数据。mongodb
的安装可见官方文档,建立相应的存放数据的Collection
,本示例为mytest
。
3.2 添加maven
依赖及配置mongodb
连接地址
由于需要使用mongodb
的操作,因此需要添加它的依赖。如下所示:
<!-- mongodb -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
添加依赖后,mongodb
的连接地址需配置在配置文件中,若有用户名密码,则同样需要配置。如下:
spring.data.mongodb.uri=mongodb://192.168.222.10/mytest
# spring.data.mongodb.username=
# spring.data.mongodb.password=
3.3 编写mongodb
的读写组件
按示例,共三个组件,需要的是一个读mysql
数据库的组件,一个mysql
数据库实体转化为mongodb
的处理组件,一个写入mongodb
的写组件,代码结构如下图所示:
其中ItemReader
组件和ItemProcessor
组件无须多讲,可参考之前的文章,这里主要讲一下mongodb
的ItemWriter
,此写入组件通过继承MongoItemWriter
,编写自己的逻辑即可,而Spring Batch
提供的mongodb
写操作,是在初始化ItemWriter
时,通过MongoOperations
引入的,因此,MongoBatchConfig
文件中,添加以下代码:
@Bean
public ItemWriter mongoWriter(MongoOperations mongoTemplate) {
UserItemWriter userItemWriter = new UserItemWriter();
userItemWriter.setTemplate(mongoTemplate);
userItemWriter.setCollection("user");
return userItemWriter;
}
其中,MongoOperations
是在初始化时注入,在自定义的UserItemWriter
中,设置template
及collection
即可。若逻辑简单,不写自定义的ItemWriter,也可以直接使用MongoItemWriterBuilder
,直接构建MongoItemWriter
,如下所示:
return new MongoItemWriterBuilder<MongoUser>()
.collection("user")
.template(mongoTemplate)
.build();
以上是写组件的构建,同理,对于mongodb
的读组件,构建方式类似,只是需要注意一下动态参数的配置,如下示例代码是查询数据,并返回map
,参数是在构建任务时动态传入的。
@Bean
@StepScope
public MongoItemReader<Map> tweetsItemReader(MongoOperations mongoTemplate,@Value("#{jobParameters['hashTag']}") String hashtag) {
return new MongoItemReaderBuilder<Map>()
.name("tweetsItemReader")
.targetType(Map.class)
.jsonQuery("{ \"entities.hashtags.text\": { $eq: ?0 }}")
.collection("tweets_collection")
.parameterValues(Collections.singletonList(hashtag))
.pageSize(10)
.sorts(Collections.singletonMap("created_at", Sort.Direction.ASC))
.template(mongoTemplate)
.build();
}
4.执行结果
编写单元测试或者在Controller
编写启动任务,即可进行数据同步测试,执行结果如下所示:
5.总结
本文基于Spring Batch
对数据从mysql
到mongodb
进行数据同步,通过结合示例代码,实现mongodb
的读写组件进行编写及配置,希望需要使用Spring Batch
进行关系数据库和mongodb
进行批处理任务开发的人员有帮助。