简介
该功能分为两个部分,一个部分是根据表中的配置生成出需要执行的update语句的配置,另一个部分就是读取update语句配置来执行update语句,被更新的表对应的值都已经写成了视图,可以直接从视图中读取,除了部分需要特殊处理的字段
由于这个功能是为了基础数据更新服务的,因此它的触发条件是,基础数据更新之后,再去生成对应的update语句更新目标表中的冗余字段
为了方便描述,将第一个部分叫做JOB063,第二个部分叫做JOB064
JOB063
表设计
先从表设计开始说明
字段名 | 说明 |
---|---|
targetTable | 被更新的表 |
targetJson | 为目标表被更新的字段以及更新值的来源 |
executeTimeColumn | 目标表中用于限定更新范围的时间字段 |
searchKey | 目标表中用于限定更新范围的字段,例如store_info表中有area_no,使用area_no作为限定更新范围的字段 |
view_key | 视图中对应的字段,例如store_info的area_no对应视图中的city_no |
source_table_name | 数据源的视图的名字,用表其实也可以只要字段能够对应上就行 |
data_source | 多数据源的情况下设置数据源,用于切换数据源,例如现在有业务库和ETL库,就需要标注目标表所在的库,这里用10,20等值存在数据字典表中 |
special_method | 假如更新的目标表是作为一些报表的数据源,则需要重新执行对应的底表数据方法来保持报表数据与基础数据的一致性,这个字段可以根据逗号将需要执行的方法都分隔开例如JOB039,JOB034这样 |
start_date | 目标表根据时间字段更新的开始时间 |
end_date | 目标表根据时间字段更新的结束时间 |
changed_key | 基础数据表中对应的字段表,例如area_info的area_no |
changed_table | 基础数据表的表名,例如area_info,为了方便描述就叫他触发表 |
changed_time_column | 基础数据表中限定获取数据的范围的时间字段,例如要获取area_info中在昨天更新的数据,就使用upd_date字段 |
changed_view_key | 基础数据表中changedKey与视图数据的对应字段,例如area_info在某些表中需要找到视图数据中的city_no来对应某一条数据 |
table_update_column | 目标表的更新时间字段,例如store_info中的upd_date字段 |
active_flag | 是否要根据此配置生成update语句 |
方法实现
// 存储更新的配置
List<UpdateParamEntity> saveList = new ArrayList<>();
DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
UpdateParamGenerateJOBModel generateModel = new UpdateParamGenerateJOBModel();
BeanMapUtils.mapToBean(params, generateModel);
// 更新的表的视图 area_info wine_no
Map<String,List<String>> tableMap = new HashMap<>();
// 数据视图的map
Map<String,List<Map<String,Object>>> resourceListMap = new HashMap<>();
// 默认同步昨天一整天的更新
LocalDateTime startDate = LocalDateTime.of(LocalDate.now().minusDays(1), LocalTime.MIN);
LocalDateTime endDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN);
if (StringUtils.isNotBlank(generateModel.getStartDate())) {
startDate = LocalDateTime.parse(generateModel.getStartDate(),df);
}
if (StringUtils.isNotBlank(generateModel.getEndDate())) {
endDate = LocalDateTime.parse(generateModel.getEndDate(),df);
}
默认获取昨天有更新的基础数据来生成update语句
List<String> tableNames = new ArrayList<>();
if (StringUtils.isNotBlank(generateModel.getTableNames())) {
String[] tableNameArr = generateModel.getTableNames().split(",");
tableNames = Arrays.asList(tableNameArr);
}
List<UpdateParamConfigEntity> configList = updateParamConfigService.getListByTableNameList(tableNames);
如果有特地传入表名,则只生成对应表的update语句,没有就全部生成
,之后就对configList里面的每一条config进行更新
// 1. 先把所有涉及的视图先查出来
List<String> sourceTableNames = configList.stream().map(item->item.getSourceTableName()).distinct().collect(Collectors.toList());
for (String sourceTableName : sourceTableNames) {
if (resourceListMap.containsKey(sourceTableName)) {
continue;
}
List<Map<String,Object>> resourceList = updateParamConfigService.getResourceList(sourceTableName);
resourceListMap.put(sourceTableName,resourceList);
}
// 2. 把时间段内更新的表的记录找出来
for (UpdateParamConfigEntity config : configList) {
config.setStartDateStr(df.format(config.getStartDate()));
config.setEndDateStr(df.format(config.getEndDate()));
if (tableMap.containsKey(config.getChangedTable())) {
continue;
}
UpdateParamSearchModel configModel = new UpdateParamSearchModel(config);
configModel.setStartDate(df.format(startDate));
configModel.setEndDate(df.format(endDate));
if (Constant.CalcKey.store_info.equalsIgnoreCase(config.getChangedTable())) {
configModel.setExtraWhere(" AND is_area_change = '1' ");
}
List<String> updateParam = this.getUpdateTablesKeyList(configModel);
tableMap.put(config.getChangedTable(),updateParam);
}
// 3. 生成对应的配置
for (UpdateParamConfigEntity config : configList) {
saveList.addAll(this.generateUpdateParam(config,resourceListMap,tableMap));
}
重点就是这个generateUpdateParam方法
generateUpdateParam
这个方法传入配置的实体,视图的Map,有变动的基础数据的Map
其实这个方法也比较简单总的来说就是干了这几件事
- 先根据配置将视图数据拿出来
- 根据配置中的触发表来获取被更新了的基础数据
- 根据被更新了的基础数据逐条生成对应的更新配置
LocalDateTime now = LocalDateTime.now();
// 根据配置生成的语句的列表
List<UpdateParamEntity> saveList = new ArrayList<>();
// 配置项的List
List<JSONObject> configList = new ArrayList<>();
// 0. 获取视图数据 即 更新的数据的来源
List<Map<String, Object>> resourceList = resourceMap.get(config.getSourceTableName());
// 1. 获取更新的表的主键字段 用来筛选更新数据
List<String> tableKeys = tableMap.get(config.getChangedTable());
if (CollectionUtil.isEmpty(tableKeys)) {
return saveList;
}
Map<String,JSONObject> tableKeyMap = new HashMap<>();
List<JSONObject> filterdResourceList = new ArrayList<>();
// 2. 深拷贝数据,防止当前获取的数据被修改
for (Map<String, Object> resource : resourceList) {
JSONObject tempJson = new JSONObject();
String key = MapUtil.getStr(resource , config.getViewKey());
String value = MapUtil.getStr(resource,config.getChangedViewKey());
if (StringUtils.isNotBlank(value) && tableKeys.contains(value)) {
tempJson = JSONObject.parseObject(JSONObject.toJSONString(resource));
tableKeyMap.put(key,tempJson);
}
}
filterdResourceList = tableKeyMap.values().stream().collect(Collectors.toList());
// 3. 获取配置并转化成对应的json
configList = JSONObject.parseArray(config.getTargetJson(), JSONObject.class);
// 因为filterdResourceList已经根据本次需要更新的数据进行筛选了,因此拿这个list遍历就可以
for (JSONObject resource : filterdResourceList) {
JSONObject setParamJson = new JSONObject();
for (JSONObject targetConfig : configList) {
TargetConfigModel model = new TargetConfigModel(targetConfig);
String value = resource.getString(model.getSourceKey());
setParamJson.put(model.getDestination(),StringUtils.isNotEmpty(value) ? value : model.getSourceKey());
}
// 拼接更新的参数
UpdateParamEntity updateParamEntity = new UpdateParamEntity();
updateParamEntity.setTargetTable(config.getTargetTable());
// 时间区间 + key = wineNo/areaNo
String whereParam = config.getExecuteTimeColumn() + " BETWEEN " + "'"
+ config.getStartDateStr() + "'" + " AND " + "'" + config.getEndDateStr() +
"' AND " + config.getSearchKey() + " = " + "'" + resource.getString(config.getViewKey()) + "'";
updateParamEntity.setStatus(Constants.UPDATE_PARAM_STATUS.INIT);
updateParamEntity.setUpdateParam(setParamJson.toJSONString());
updateParamEntity.setWhereParam(whereParam);
updateParamEntity.setRegPsn(Constants.GENERATE_UPDATE_JOB.JOB_NAME);
updateParamEntity.setUpdPsn(Constants.GENERATE_UPDATE_JOB.JOB_NAME);
updateParamEntity.setRegDate(now);
updateParamEntity.setUpdDate(now);
updateParamEntity.setOriginExecuteTime(config.getStartDateStr());
updateParamEntity.setDataSource(config.getDataSource());
updateParamEntity.setSourceTableName(config.getSourceTableName());
updateParamEntity.setViewKey(config.getViewKey());
updateParamEntity.setViewValue(resource.getString(config.getViewKey()));
updateParamEntity.setTableUpdateColumn(config.getTableUpdateColumn());
updateParamEntity.setConfigId(config.getId());
updateParamEntity.setIsJobFinish(StringUtils.isNotBlank(config.getSpecialMethod()) ? Constants.UPDATE_PARAM_STATUS.INIT : Constants.UPDATE_PARAM_STATUS.FINISHED);
saveList.add(updateParamEntity);
}
return saveList;
然后回到主方法,把生成出来的saveList保存进入数据库中,在JOB064中进行update处理即可
JOB064
表设计
接下来介绍具体用于执行update语句的表
字段名 | 说明 |
---|---|
target_table | 目标表 |
update_param | 在JOB063中生成的update语句的set部分 |
where_param | 在JOB063中生成的update语句的where部分 |
data_source | 数据库的数据源(业务库,ETL库等) |
source_table_name | set部分的数据来源 |
table_update_column | 目标表的更新时间字段,例如upd_date,update_time |
config_id | 用于从config中获取需要更新后执行的方法 |
is_job_finish | 更新后需要执行的方法是否完成 |
status | update 语句是否完成 |
功能实现
Map<String,List<Map<String,Object>>> resourceListMap = new HashMap<>();
Map<String,String> errorMsg = new HashMap<>();
DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
DateTimeFormatter dateDf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
UpdateParamExecuteJobModel executeModel = new UpdateParamExecuteJobModel();
BeanMapUtils.mapToBean(params, executeModel);
LocalDateTime startDate = LocalDateTime.of(LocalDate.now().minusDays(1), LocalTime.MIN);
LocalDateTime endDate = LocalDateTime.now();
if (StringUtils.isNotBlank(executeModel.getStartDate())) {
startDate = LocalDateTime.parse(executeModel.getStartDate(),df);
} else {
executeModel.setStartDate(df.format(startDate));
}
if (StringUtils.isNotBlank(executeModel.getEndDate())) {
endDate = LocalDateTime.parse(executeModel.getEndDate(),df);
} else {
executeModel.setEndDate(df.format(endDate));
}
if (StringUtils.isNotBlank(executeModel.getTableNames())) {
String[] tableNames = executeModel.getTableNames().split(",");
executeModel.setTableNameList(Arrays.asList(tableNames));
}
List<UpdateParamEntity> updateParamList = baseMapper.getListByParam(executeModel);
处理参数并且把需要执行的更新语句查询出来
List<String> sourceTableNames = updateParamList.stream().map(item->item.getSourceTableName()).distinct().collect(Collectors.toList());
for (String sourceTableName : sourceTableNames) {
if (resourceListMap.containsKey(sourceTableName)) {
continue;
}
List<Map<String,Object>> resourceList = updateParamConfigService.getResourceList(sourceTableName);
resourceListMap.put(sourceTableName,resourceList);
}
把涉及到的视图查出来
List<String> tableNameList = updateParamList.stream()
.map(UpdateParamEntity::getTargetTable)
.distinct()
.collect(Collectors.toList());
// 根据表名分割事务
for (String tableName : tableNameList) {
List<UpdateParamEntity> updateParamEntityList = updateParamList.stream()
.filter(item -> item.getTargetTable().equals(tableName))
.collect(Collectors.toList());
if (CollectionUtil.isEmpty(updateParamEntityList)) {
continue;
}
String dataSource = updateParamEntityList.get(0).getDataSource();
// 先把update语句需要的参数处理好
List<UpdateParamConfigEntity> configList = updateParamConfigService.list();
List<UpdateParamExecuteModel> executeModels = this.processUpdateParam(updateParamEntityList,resourceListMap,configList);
try {
if (Constant.DATA_SOURCE.PRC_BI.equals(dataSource)) {
// BI
updateParamTransactionalService.updateTableByModelsInPRCBI(executeModels);
}
// 这里是省略没写的数据源选项
} catch (Exception e) {
log.error("执行更新语句报错,{}",e);
errorMsg.put(tableName,e.getMessage());
continue;
}
// 更新成功之后把更新成功的记录更新状态为成功
//
updateParamTransactionalService.updateStatusToSuccess(executeModels);
这里就是把所有的配置拿出来处理成特定的update语句格式,具体就是看processUpdateParam方法是怎么去处理数据的
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
List<UpdateParamExecuteModel> executeModels = new ArrayList<>();
for (UpdateParamEntity updateParamEntity : updateParamEntityList) {
UpdateParamConfigEntity config = configList.stream().filter(item->item.getId().equals(updateParamEntity.getConfigId())).findFirst().orElse(null);
if ("1".equalsIgnoreCase(updateParamEntity.getStatus())) {
continue;
}
List<Map<String,Object>> resourceList = resourceListMap.get(updateParamEntity.getSourceTableName());
if (CollectionUtil.isEmpty(resourceList)) {
log.error("没有找到对应的视图数据,{}",updateParamEntity);
throw new GlobalException("没有找到对应的视图数据");
}
UpdateParamExecuteModel executeModel = new UpdateParamExecuteModel();
Map<String,Object> setMap = new HashMap<>();
executeModel.setId(updateParamEntity.getId());
// 拼接setMap和whereMap
Map<String,Object> updateParamMap = JSONObject.parseObject(updateParamEntity.getUpdateParam());
Map<String,Object> resource = getResourceByResourceList(resourceList,updateParamEntity.getViewKey(),updateParamEntity.getViewValue());
if (resource == null) {
log.error("没有找到对应的视图数据,{}",updateParamEntity);
throw new GlobalException("没有找到对应的视图数据");
}
// 根据updateParamMap拼接setMap
for (Map.Entry<String, Object> entry : updateParamMap.entrySet()) {
String key = entry.getKey();
String[] keyArr = key.split(",");
String value = entry.getValue().toString();
switch (value) {
// 这里是一些省略掉的
default:
setMap.put(key, "'" + value +"'");
}
}
setMap.put(updateParamEntity.getTableUpdateColumn(),"'"+ df.format(now)+"'");
executeModel.setTableName(updateParamEntity.getTargetTable());
executeModel.setParamsMap(setMap);
executeModel.setWhereParam(updateParamEntity.getWhereParam());
executeModels.add(executeModel);
}
return executeModels;
这里其实就是根据之前在config里面配置的json来找出对应的数据,并且写成set语句的形式,之后再update语句中就可以直接执行了
<update id="updateTableByModelsInPRCETL">
UPDATE ${model.tableName}
<foreach collection="model.paramsMap" item="value" index="key" open="set" close="" separator=",">
${key} = ${value}
</foreach>
<where>
${model.whereParam}
</where>
</update>
执行update语句
List<Long> configIds = updateParamList.stream()
.filter(item -> item.getIsJobFinish().equals(Constants.UPDATE_PARAM_STATUS.INIT))
.map(UpdateParamEntity::getConfigId)
.distinct().collect(Collectors.toList());
// 调用定时任务
List<UpdateParamConfigEntity> specialMethods = updateParamConfigService.getDistinctSpecialMethodsByIds(configIds);
List<UpdateParamConfigEntity> executeMethods = new ArrayList<>();
List<Long> successConfigs = new ArrayList<>();
// 根据specialMethods和startDate和endDate 来执行定时任务
for (UpdateParamConfigEntity specialMethod : specialMethods) {
if (StrUtil.isEmpty(specialMethod.getSpecialMethod())) {
continue;
}
if (CollectionUtil.isNotEmpty(executeMethods)) {
executeMethods.add(specialMethod);
continue;
} else {
Boolean isRepeat = false;
for (UpdateParamConfigEntity executeMethod : executeMethods) {
if (executeMethod.getSpecialMethod().equals(specialMethod.getSpecialMethod())
&& executeMethod.getStartDate().equals(specialMethod.getStartDate())
&& executeMethod.getEndDate().equals(specialMethod.getEndDate())) {
// 同一时间段的job只需要执行一次
isRepeat = true;
break;
}
}
if (!isRepeat) {
executeMethods.add(specialMethod);
}
}
}
if (CollectionUtil.isNotEmpty(executeMethods)) {
for (UpdateParamConfigEntity executeMethod : executeMethods) {
ResultModel resultModel = this.runSpecialMethods(executeMethod);
if (resultModel.isSuccess()) {
// 把成功的保存下来再更新状态
successConfigs.add(executeMethod.getId());
}
}
}
if (CollectionUtil.isNotEmpty(successConfigs)) {
// 获取对应id
List<Long> updateParamIds = updateParamList.stream().filter(item->successConfigs.contains(item.getConfigId())).map(UpdateParamEntity::getId).collect(Collectors.toList());
updateParamTransactionalService.updateIsJobFinish(updateParamIds);
}
在update语句执行完成之后,需要把对应的方法拿出来执行
public ResultModel runSpecialMethods(UpdateParamConfigEntity executeMethod) {
ResultModel resultModel = ResultModel.success();
// 先清空当前线程的数据源
DynamicContextHolder.poll();
DateTimeFormatter dateTimeFormater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
DateTimeFormatter dateFormater = DateTimeFormatter.ofPattern("yyyy-MM-dd");
String jobstartDateTime = dateTimeFormater.format(LocalDateTime.of(LocalDate.of(executeMethod.getStartDate().getYear(),3,3),LocalTime.parse("03:33:33")));
String jobEndDateTime = dateTimeFormater.format(LocalDateTime.of(LocalDate.of(executeMethod.getStartDate().getYear(),3,3),LocalTime.parse("03:33:33")));;
String jobstartDate = dateFormater.format(LocalDate.of(executeMethod.getStartDate().getYear(),3,3));
String jobEndDate = dateFormater.format(LocalDate.of(executeMethod.getStartDate().getYear(),3,3));
Map<String,Object> apiConfig = new HashMap<>();
Map<String,Object> timeConfig = new HashMap<>();
String specialMethods = executeMethod.getSpecialMethod();
if (StringUtils.isNotBlank(specialMethods)) {
List<String> specialMethodList = Arrays.asList(specialMethods.split(","));
for (String specialMethod : specialMethodList) {
apiConfig.put("apiCode",specialMethod);
timeConfig.put("startDate",jobstartDateTime);
timeConfig.put("startTime",jobstartDateTime);
timeConfig.put("startSyncDate",jobstartDateTime);
timeConfig.put("beginDate",jobstartDate);
timeConfig.put("endDate",jobEndDateTime);
timeConfig.put("endTime",jobEndDateTime);
timeConfig.put("endSyncDate",jobEndDateTime);
timeConfig.put("overDate",jobEndDate);
apiConfig.put("params", new JSONObject(timeConfig));
resultModel = prcJobComponent.run(apiConfig);
if (!resultModel.isSuccess()) {
log.error(apiConfig.get("apiCode")+"执行失败!,{}",resultModel);
return resultModel;
}
}
}
return resultModel;
}
这个方法需要注意的是要先清空数据源,避免数据源切换带来的问题,由于接口是编号的形式,因此只需要用这种编号去找对应的方法执行就可以了,如果是需要执行某个类的方法,可以用反射的方式来实现,将对应的Service取出来通过反射调用对应的方法即可
这里就是处理update语句的全部流程了