基础依赖
mybatisplus、springboot
需求
如果需要从API获取数据并存入数据库中,可以尝试以下方法。
必要条件
首先需要创建对应实体类、并且实现KeyInterface
其次实现它的对应Service
tableEntity实体类
@Data
@TableName("tableName")
public class tableEntity implements KeyInterface {
@Override
public String getKey() {
return getPrefix()+getUserId();
}
public String getPrefix() {
String tableName = this.getClass().getAnnotation(TableName.class).value();
String camel = ToCamelUtils.toCamel(tableName);
return "Sync"+camel+":";
}
/**
* 用户id
*/
@TableField(value = "user_id")
@SerializedName("Userid")
private String userId;
/**
* 名称
*/
@TableField(value = "user_name")
@SerializedName("UserName")
private String userName;
}
keyinterface
public interface KeyInterface {
String getKey();
String getPrefix();
}
这里需要将每个实体类对应的数据库字段名使用@TableField注解将对应JSON数据取出,因此在获取到JSON数据后,建表的字段需要与JSON字段对应,避免在处理时因为找不到对应字段造成的数据丢失。
在实现了对应的实体类后还需要实现对应实体类的Service
Service
@Service
public class tableEntityService extends ServiceImpl<tableEntityDao, tableEntity> implements BatchInterface {
@Override
public boolean batchInsert(List list) {
return baseMapper.batchInsert(list);
}
@Override
public boolean batchUpdate(List list) {
return baseMapper.batchUpdate(list);
}
}
BatchInterface
public interface BatchInterface<T extends BaseETLEntity> {
public boolean batchInsert(List<T> list);
public boolean batchUpdate(List<T> list);
}
service需要实现BatchInterface方法,用于实现批量保存和批量更新方法,在写这个功能时需要根据json数据将库中数据进行更新或插入因此需要使用实现这两个方法以提高效率。
在实现对应Service和Entity后就可以开始着手实现通用的处理方法了
SyncService
@Slf4j
public class ETLSyncService<ServiceImpl extends com.baomidou.mybatisplus.extension.service.impl.ServiceImpl & BatchInterface, Target extends SyncBaseETLEntity & KeyInterface,Mapper extends BaseMapper<Target> >{
private String tableName;
// private List<String> keys;
@Autowired
private ServiceImpl targetService;
@Autowired
private RedisUtils redisUtils;
private static final String INSERT = "INSERT";
private static final String UPDATE = "UPDATE";
private static final boolean PASS = true;
private static final Long DEFAULT_EXPIRED= Long.valueOf(60 * 30) ;
@Transactional(rollbackFor = Exception.class)
public ResultModel executeSync(String tableName ,JSONArray jsonArray) throws IllegalAccessException, InstantiationException {
beforeSync();
Service serviceAnnotion = this.getClass().getAnnotation(Service.class);
if (serviceAnnotion == null) {
log.error("对接接口错误:"+ this.getClass().getName()+"@Service注解为空,无法获取表名");
throw new GlobalException("对接接口错误:"+ this.getClass().getName()+" @Service注解为空,无法获取表名");
}
Set<Target> updateList = new HashSet<>();
Set<Target> saveList = new HashSet<>();
LocalDateTime now = LocalDateTime.now();
cleanRedis(tableName);
boolean isEmpty = getAndSaveInRedis();
this.tableName = tableName;
try {
if (!isEmpty) {
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
Target target = convert(jsonObject,now);
// 如果target返回null就下一个
if (target == null) {
continue;
}
Target origin = (Target) redisUtils.get(target.getKey(),target.getClass());
if (origin == null) {
saveList.add(target);
continue;
}
if (!isSame(target,origin,saveList,updateList,now)) {
target.setRegDate(origin.getRegDate());
target.setRegPsn(origin.getRegPsn());
updateList.add(target);
}
}
if (!saveList.isEmpty()) {
this.saveOrUpdateList(new ArrayList<>(saveList),INSERT);
}
if (!updateList.isEmpty()) {
this.saveOrUpdateList(new ArrayList<>(updateList),UPDATE);
}
} else {
// 需要同步的表为空则直接把获取到的数据全部插入
for (int j = 0 ; j < jsonArray.size();j++) {
Target target = convert(jsonArray.getJSONObject(j),now);
if (target == null) {
continue;
}
saveList.add(target);
}
saveOrUpdateList(new ArrayList<>(saveList),INSERT);
}
}
catch (IllegalAccessException illegalAccessException) {
log.error("ETL对接接口错误:"+ this.getClass().getName()+"异常信息:"+illegalAccessException);
return ResultModel.error(illegalAccessException.getMessage());
}
catch (InstantiationException instantiationException) {
log.error("ETL对接接口错误:"+ this.getClass().getName()+"异常信息:"+instantiationException);
return ResultModel.error(instantiationException.getMessage());
}
catch (GlobalException globalException) {
globalException.printStackTrace();
log.error("ETL对接接口错误:"+globalException.getMsg());
return ResultModel.error(globalException.getMsg());
}
catch (Exception exception) {
exception.printStackTrace();
log.error("ETL对接接口错误:"+exception);
return ResultModel.error(exception.getMessage());
}
return ResultModel.success();
}
public void beforeSync() {}
// 批量删除相关前缀的key
public boolean cleanRedis(String camelTableName) {
String prefix = "Sync"+camelTableName+":*";
Set<String> keysList = redisUtils.getKeys(prefix);
redisUtils.batchDelete(new ArrayList<>(keysList));
return true;
}
public boolean getAndSaveInRedis() {
List<Target> allList = targetService.list();
if (allList.isEmpty()) {
return true;
}
// for (Target target : allList) {
// if (StringUtils.isBlank(target.getKey())) {
// throw new GlobalException("ETL对接接口错误: 主键为空");
// }
// redisUtils.set(target.getKey(),target,3600);
// }
redisUtils.batchSet(allList.stream().collect(Collectors.toConcurrentMap(target->target.getKey(),target->target)),DEFAULT_EXPIRED);
return false;
}
private void saveOrUpdateList(List<Target> list,String operationType) {
if(!checkPoint(list)) {
log.error("检查点不通过");
return;
}
switch (operationType) {
case INSERT:
this.batchInsert(list);
break;
case UPDATE:
this.batchUpdate(list);
break;
default:
log.error("操作类型错误:"+operationType);
break;
}
}
// 可以重写该方法来检验检查点
public boolean checkPoint(List<Target> target) {
return PASS;
}
public boolean isPropertiesLegel(JSONObject jsonObject) {return true;}
// 将JSONObject转换成目标类型
public Target convert(JSONObject jsonObject,LocalDateTime now) throws IllegalAccessException, InstantiationException {
if (jsonObject == null) {
throw new GlobalException("ETLSync同步失败: 数据为空");
}
// 可以重写该方法来校验JSONObject的数据是否合格,返回空的数据就不插入表内
if (!isPropertiesLegel(jsonObject)) {
return null;
}
Class<?> targetClass = GenericsUtils.getSuperClassGenricType(this.getClass(),1);
Target target = (Target) targetClass.newInstance();
Field[] fields = targetClass.getDeclaredFields();
for (Field field : fields) {
TableField tableField = field.getAnnotation(TableField.class);
if (tableField == null) {
log.error("ETL对接接口错误:"+ field.getName()+"@TableField注解为空,无法获取字段名");
throw new GlobalException("ETL对接接口错误:"+ field.getName()+" @TableField注解为空,无法获取字段名");
}
if (tableField.exist() == false) {
continue;
}
// 根据数据库中的字段名与Json的key做匹配
String tableName = tableField.value();
field.setAccessible(true);
// 先直接根据注解的获取的字段名与json的key对比
Object obj = jsonObject.get(tableName);
if (obj == null) {
// 找不到就根据驼峰命名去找
String toCamel = ToCamelUtils.toCamel(tableName);
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
if(entry.getKey().equalsIgnoreCase(toCamel)) {
obj = entry.getValue();
break;
}
}
}
// 还找不到就忽略大小写去匹配字段名与json的key
if (obj == null) {
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
if(entry.getKey().equalsIgnoreCase(tableName)) {
obj = entry.getValue();
break;
}
}
}
// 判断是否能转换成日期
try {
// 判断是否为字符串
if (obj instanceof String) {
LocalDateTime localDateTime = LocalDateTime.parse((String)obj);
field.set(target,localDateTime);
} else {
field.set(target,obj);
}
} catch (DateTimeParseException e) {
field.set(target,obj);
}
}
target.setUpdDate(now);
target.setRegDate(now);
target.setUpdPsn("Sync"+tableName);
target.setRegPsn("Sync"+tableName);
return target;
}
// 判断两者是否相同
public boolean isSame(Target target,Target origin,Set<Target> saveList,Set<Target> updateTarget,LocalDateTime now) throws IllegalAccessException, InstantiationException {
Field[] fields = GenericsUtils.getSuperClassGenricType(this.getClass(),1).getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
if ("regPsn".equalsIgnoreCase(field.getName()) || "regDate".equalsIgnoreCase(field.getName()) ||
"updPsn".equalsIgnoreCase(field.getName()) || "updDate".equalsIgnoreCase(field.getName())) {
continue;
}
if (field.get(target) == null && field.get(origin) != null) {
return false;
} else if (field.get(target) != null && field.get(origin) == null) {
return false;
} else if (field.get(target) == null && field.get(origin) == null){
continue;
}
if (!field.get(target).equals(field.get(origin))) {
return false;
}
}
for (Target temp : saveList) {
if (temp.equals(target)) {
return true;
}
}
for (Target temp : updateTarget) {
if (temp.equals(target)) {
return true;
}
}
return true;
}
// 批量保存方法,如果字段超过21个则需要重写该方法
public boolean batchInsert(List<Target> targetList) {
Consumer<List<Target>> saveConsumer = (batch) -> {
targetService.batchInsert(batch);
};
SqlserverBatchOperationUtil.processInBatches(targetList,100,saveConsumer);
return true;
}
// 批量更新方法,如果字段超过21个则需要重写该方法
public boolean batchUpdate(List<Target> targetList) {
Consumer<List<Target>> updateConsumer = (batch) -> {
targetService.batchUpdate(batch);
};
SqlserverBatchOperationUtil.processInBatches(targetList,100,updateConsumer);
return true;
}
// 如果需要在全量同步之前先清空数据要重写该方法
public boolean deleteAll () {
return true;
}
}
接下来将分析这个类中实现的方法
// 批量保存方法,如果字段超过21个则需要重写该方法
public boolean batchInsert(List<Target> targetList) {
Consumer<List<Target>> saveConsumer = (batch) -> {
targetService.batchInsert(batch);
};
SqlserverBatchOperationUtil.processInBatches(targetList,100,saveConsumer);
return true;
}
// 批量更新方法,如果字段超过21个则需要重写该方法
public boolean batchUpdate(List<Target> targetList) {
Consumer<List<Target>> updateConsumer = (batch) -> {
targetService.batchUpdate(batch);
};
SqlserverBatchOperationUtil.processInBatches(targetList,100,updateConsumer);
return true;
}
// 如果需要在全量同步之前先清空数据要重写该方法
public boolean deleteAll () {
return true;
}
上列三个方法分别用于批量保存、批量更新、以及全量删除
如果有其他需求,例如在保存api中的json数据时需要先全量删除时就在继承SyncService的类中重写这个deleteAll方法,并且如同注释所示
sqlserver在执行sql语句时超过2100个参数则会报错,因此在单条json数据超过21个字段时就必须要重写batchUpdate方法和batchInsert方法避免报错
// 可以重写该方法来检验检查点
public boolean checkPoint(List<Target> target) {
return PASS;
}
public boolean isPropertiesLegel(JSONObject jsonObject) {return true;}
// 将JSONObject转换成目标类型
public Target convert(JSONObject jsonObject,LocalDateTime now) throws IllegalAccessException, InstantiationException {
if (jsonObject == null) {
throw new GlobalException("ETLSync同步失败: 数据为空");
}
// 可以重写该方法来校验JSONObject的数据是否合格,返回空的数据就不插入表内
if (!isPropertiesLegel(jsonObject)) {
return null;
}
Class<?> targetClass = GenericsUtils.getSuperClassGenricType(this.getClass(),1);
Target target = (Target) targetClass.newInstance();
Field[] fields = targetClass.getDeclaredFields();
for (Field field : fields) {
TableField tableField = field.getAnnotation(TableField.class);
if (tableField == null) {
log.error("对接接口错误:"+ field.getName()+"@TableField注解为空,无法获取字段名");
throw new GlobalException("对接接口错误:"+ field.getName()+" @TableField注解为空,无法获取字段名");
}
if (tableField.exist() == false) {
continue;
}
// 根据数据库中的字段名与Json的key做匹配
String tableName = tableField.value();
field.setAccessible(true);
// 先直接根据注解的获取的字段名与json的key对比
Object obj = jsonObject.get(tableName);
if (obj == null) {
// 找不到就根据驼峰命名去找
String toCamel = ToCamelUtils.toCamel(tableName);
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
if(entry.getKey().equalsIgnoreCase(toCamel)) {
obj = entry.getValue();
break;
}
}
}
// 还找不到就忽略大小写去匹配字段名与json的key
if (obj == null) {
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
if(entry.getKey().equalsIgnoreCase(tableName)) {
obj = entry.getValue();
break;
}
}
}
// 判断是否能转换成日期
try {
// 判断是否为字符串
if (obj instanceof String) {
LocalDateTime localDateTime = LocalDateTime.parse((String)obj);
field.set(target,localDateTime);
} else {
field.set(target,obj);
}
} catch (DateTimeParseException e) {
field.set(target,obj);
}
}
return target;
}
convert方法用于把json中的数据转换成对应的实体类,再存入list中保存至数据库中
isPropertiesLegel用于检验对应的json是否符合某些规则,可以在继承了SyncService的类中重写该方法来达成校验的目的,这样就能提前排除某些不需要的数据
checkPoint也是在执行更新或插入之前通过重写该方法来决定是否需要将数据更新或插入到数据库中
public ResultModel executeSync(String tableName ,JSONArray jsonArray) throws IllegalAccessException, InstantiationException {
beforeSync();
Service serviceAnnotion = this.getClass().getAnnotation(Service.class);
if (serviceAnnotion == null) {
log.error("对接接口错误:"+ this.getClass().getName()+"@Service注解为空,无法获取表名");
throw new GlobalException("对接接口错误:"+ this.getClass().getName()+" @Service注解为空,无法获取表名");
}
Set<Target> updateList = new HashSet<>();
Set<Target> saveList = new HashSet<>();
LocalDateTime now = LocalDateTime.now();
cleanRedis(tableName);
boolean isEmpty = getAndSaveInRedis();
this.tableName = tableName;
try {
if (!isEmpty) {
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
Target target = convert(jsonObject,now);
// 如果target返回null就下一个
if (target == null) {
continue;
}
Target origin = (Target) redisUtils.get(target.getKey(),target.getClass());
if (origin == null) {
saveList.add(target);
continue;
}
if (!isSame(target,origin,saveList,updateList,now)) {
target.setRegDate(origin.getRegDate());
target.setRegPsn(origin.getRegPsn());
updateList.add(target);
}
}
if (!saveList.isEmpty()) {
this.saveOrUpdateList(new ArrayList<>(saveList),INSERT);
}
if (!updateList.isEmpty()) {
this.saveOrUpdateList(new ArrayList<>(updateList),UPDATE);
}
} else {
// 需要同步的表为空则直接把获取到的数据全部插入
for (int j = 0 ; j < jsonArray.size();j++) {
Target target = convert(jsonArray.getJSONObject(j),now);
if (target == null) {
continue;
}
saveList.add(target);
}
saveOrUpdateList(new ArrayList<>(saveList),INSERT);
}
}
catch (IllegalAccessException illegalAccessException) {
log.error("ETL对接接口错误:"+ this.getClass().getName()+"异常信息:"+illegalAccessException);
return ResultModel.error(illegalAccessException.getMessage());
}
catch (InstantiationException instantiationException) {
log.error("ETL对接接口错误:"+ this.getClass().getName()+"异常信息:"+instantiationException);
return ResultModel.error(instantiationException.getMessage());
}
catch (GlobalException globalException) {
globalException.printStackTrace();
log.error("ETL对接接口错误:"+globalException.getMsg());
return ResultModel.error(globalException.getMsg());
}
catch (Exception exception) {
exception.printStackTrace();
log.error("ETL对接接口错误:"+exception);
return ResultModel.error(exception.getMessage());
}
return ResultModel.success();
}
public void beforeSync() {}
// 批量删除相关前缀的key
public boolean cleanRedis(String camelTableName) {
String prefix = "Sync"+camelTableName+":*";
Set<String> keysList = redisUtils.getKeys(prefix);
redisUtils.batchDelete(new ArrayList<>(keysList));
return true;
}
public boolean getAndSaveInRedis() {
List<Target> allList = targetService.list();
if (allList.isEmpty()) {
return true;
}
redisUtils.batchSet(allList.stream().collect(Collectors.toConcurrentMap(target->target.getKey(),target->target)),DEFAULT_EXPIRED);
return false;
}
cleanRedis 在同步之前需要先清空Redis的缓存,避免因为缓存导致数据没有及时更新
getAndSaveInRedis 在更新前将数据库中的数据存入缓存避免持续对数据库进行访问
beforeSync 通过重写该方法,实现执行同步之前的方法
executeSync
这个方法是整个同步方法的执行方法,首先通过Service注解获取到具体实现类,在保存至redis的方法中判断数据库是否为空,如果为空则跳过更新判断直接把所有数据存入数据库中,不为空则判断是否在数据库中已存在并且是否与json数据不一致,如果不一致则更新。
整个模板方法到此就完成了
接下来只需要写一个Service来继承这个SyncService即可同步任意字段的json
@Service(value = "SyncPmsUserInfoService")
public class SyncUserInfoService extends SyncService<UserInfoService, UserInfoEntity,UserInfoDao>{
@Autowired
private PmsUserInfoService pmsUserInfoService;
@Override
public boolean deleteAll() {
QueryWrapper wrapper = new QueryWrapper();
return pmsUserInfoService.remove(wrapper);
}
}
接口
private ResultModel syncToDB(Map<String, Object> params, JobLogEntity jobLogEntity) {
ResultModel resultModel = new ResultModel();
try {
String tableName = MapUtil.getStr(params,"tableName");
String isDeleteAll = MapUtil.getStr(params,"isDeleteAll");
if (StringUtils.isBlank(tableName)) {
return ResultModel.error("同步至ETL失败:tableName不能为空");
}
tableName = ToCamelUtils.toCamel(tableName);
String serviceName = "Sync"+tableName+"Service";
Object service = SpringContextUtils.getBean(serviceName);
if (service instanceof SyncService) {
ETLSyncService syncService = (ETLSyncService) service;
if (StringUtils.isNotBlank(isDeleteAll)) {
if ("true".equalsIgnoreCase(isDeleteAll)) {
syncService.deleteAll();
}
}
JSONArray jsonArray = syncApiUtil.getApiAsJSONArray(tableName);
if (jsonArray == null) {
throw new GlobalException("同步失败:Api数据获取失败");
}
resultModel = syncService.executeSync(tableName,jsonArray);
}
if (resultModel.isSuccess()) {
updateJob(bizPrcJobLogEntity, "success", "");
} else {
updateJob(jobLogEntity, "error", (String)resultModel.get("msg"));
}
return resultModel;
}
catch (GlobalException e) {
e.printStackTrace();
resultModel.put("code", -1);
resultModel.put("msg", e.getMsg());
return resultModel;
}
catch (BeansException e) {
e.printStackTrace();
resultModel.put("code", -1);
resultModel.put("msg", "获取对应Service失败");
return resultModel;
}
catch (Exception e) {
e.printStackTrace();
resultModel.put("code", -1);
return resultModel;
}
}
可以写一个这样的接口,根据传入的tableName参数获取具体需要同步数据的表,这样就不需要为每一个同步数据的功能去创建功能类似的接口了