当前,lucene-candy实现了三种的数据同步方案
- cn.juque.lucenecandy.core.datasync.DefaultDataSyncServiceImpl :默认的数据同步方式,仅是把数据保存到指定的索引;
- cn.juque.lucenecandy.core.datasync.TccDataSyncServiceImpl :Try-Commit-Cancel 的方案,基于http实现数据在多实例之间的实时同步;
- cn.juque.lucenecandy.core.datasync.MsgDataSyncServiceImpl :基于http实现多实例之间的数据批次同步,这种方案是准实时同步;
实现方案
- DefaultDataSyncServiceImpl
仅是基于LuceneHelper实现的增删改查
public void commit(IndexUpdateParamBO param) {
switch (param.getSyncType()) {
case ADD:
List<Document> docList = this.documentHelper.toDocumentList(param.getContent());
this.luceneHelper.addDocuments(param.getIndexName(), docList);
break;
case UPDATE:
param.getContent().forEach(f -> {
BooleanQuery.Builder builder = new BooleanQuery.Builder();
builder.add(new TermQuery(new Term(StrConstant.D_ID, f.getId())), BooleanClause.Occur.MUST);
this.luceneHelper.updateByQuery(param.getIndexName(), builder.build(), this.documentHelper.toDocument(f));
});
break;
case DEL:
BooleanQuery.Builder builder = new BooleanQuery.Builder();
param.getIdList().forEach(f -> builder.add(new TermQuery(new Term(StrConstant.D_ID, f)), BooleanClause.Occur.SHOULD));
this.luceneHelper.deleteDocuments(param.getIndexName(), builder.build());
break;
default:
break;
}
}
- TccDataSyncServiceImpl
数据同步三步走:
- 预提交(Try):
cn.juque.lucenecandy.core.datasync.tcc.CandyTryCommitServiceImpl 实现数据在实例间的预提交;
@Override
public void add(List<? extends BaseEntity> entityList) {
if (CollUtil.isEmpty(entityList)) {
return;
}
// 保存
IndexBO indexBO = this.documentHelper.index(entityList.get(0).getClass());
this.luceneHelper.addDocuments(indexBO.getIndexName(), this.documentHelper.toDocumentList(entityList));
}
/**
* 更新文档
*
* @param entityList 列表
*/
@Override
public void update(List<? extends BaseEntity> entityList) {
if (CollUtil.isEmpty(entityList)) {
return;
}
entityList.forEach(f->{
Class<? extends BaseEntity> tClass = ClassUtil.getClass(f);
IndexBO indexBO = this.documentHelper.index(tClass);
// 上个版本变更为不可见
BooleanQuery.Builder builder = new BooleanQuery.Builder();
builder.add(new TermQuery(new Term(StrConstant.D_ID, f.getId())), BooleanClause.Occur.MUST);
int version = f.getVersion() - 1;
builder.add(IntPoint.newExactQuery(StrConstant.D_VERSION, version), BooleanClause.Occur.MUST);
List<Document> source = this.luceneHelper.search(indexBO.getIndexName(), builder, Sort.INDEXORDER, null);
this.indexHelper.switchDoc(this.documentHelper.toEntityList(source, tClass), YesOrNoEnum.NO);
// 保存新版本
List<Document> docList = new ArrayList<>(0);
docList.add(this.documentHelper.toDocument(f));
this.luceneHelper.addDocuments(indexBO.getIndexName(), docList);
});
}
/**
* 删除文档
*
* @param className 实体类名
* @param idList 主键列表
*/
@Override
public void del(String className, List<String> idList) {
if (CollUtil.isEmpty(idList)) {
return;
}
Class<? extends BaseEntity> tClass = ClassUtil.loadClass(className);
// 根据id检索出可见文档
BooleanQuery.Builder builder = new BooleanQuery.Builder();
idList.forEach(f -> builder.add(new TermQuery(new Term(StrConstant.D_ID, f)), BooleanClause.Occur.SHOULD));
IdsQueryWrapperBuilder<?> idsQueryWrapperBuilder = new IdsQueryWrapperBuilder<>(tClass, idList);
List<? extends BaseEntity> list = this.indexHelper.searchByIds(idsQueryWrapperBuilder.build());
// 更新为不可见
this.indexHelper.switchDoc(list, YesOrNoEnum.NO);
}
- 提交(Commit):
cn.juque.lucenecandy.core.datasync.tcc.CandyCommitServiceImpl 清除低版本数据,显现最新版本数据;
/**
* 添加文档
* <li>预提交为可见文档</li>
*
* @param entityList 列表
*/
@Override
public void add(List<? extends BaseEntity> entityList) {
if (CollUtil.isEmpty(entityList)) {
return;
}
// 删除所有所有比当前版本低的版本
entityList.forEach(f -> {
IndexBO indexBO = this.documentHelper.index(f.getClass());
int version = f.getVersion() - 1;
BooleanQuery.Builder builder = new BooleanQuery.Builder();
builder.add(new TermQuery(new Term(StrConstant.D_ID, f.getId())), BooleanClause.Occur.MUST);
builder.add(IntPoint.newRangeQuery(StrConstant.D_VERSION, Integer.MIN_VALUE, version), BooleanClause.Occur.MUST);
this.luceneHelper.deleteDocuments(indexBO.getIndexName(), builder.build());
});
}
/**
* 更新文档
*
* @param entityList 列表
*/
@Override
public void update(List<? extends BaseEntity> entityList) {
if (CollUtil.isEmpty(entityList)) {
return;
}
IndexBO indexBO = this.documentHelper.index(entityList.get(0).getClass());
// 移除截至上一个版本的所有版本
BooleanQuery.Builder builder = new BooleanQuery.Builder();
entityList.forEach(f -> {
String id = f.getId();
int version = f.getVersion() - 1;
BooleanQuery.Builder delBuilder = new BooleanQuery.Builder();
delBuilder.add(new TermQuery(new Term(StrConstant.D_ID, id)), BooleanClause.Occur.MUST);
delBuilder.add(IntPoint.newRangeQuery(StrConstant.D_VERSION, Integer.MIN_VALUE, version), BooleanClause.Occur.MUST);
builder.add(delBuilder.build(), BooleanClause.Occur.SHOULD);
});
this.luceneHelper.deleteDocuments(indexBO.getIndexName(), builder.build());
}
/**
* 删除文档
*
* @param className 实体类名
* @param idList 主键列表
*/
@Override
public void del(String className, List<String> idList) {
if (CollUtil.isEmpty(idList)) {
return;
}
// 根据id检索出文档
BooleanQuery.Builder builder = new BooleanQuery.Builder();
idList.forEach(f -> builder.add(new TermQuery(new Term(StrConstant.D_ID, f)), BooleanClause.Occur.SHOULD));
Class<? extends BaseEntity> tClass = ClassUtil.loadClass(className);
IndexBO indexBO = this.documentHelper.index(tClass);
this.luceneHelper.deleteDocuments(indexBO.getIndexName(), builder.build());
}
取消(Cancel):
cn.juque.lucenecandy.core.datasync.tcc.CandyCancelServiceImpl 对预提交到各实例的数据进行版本回滚;
/**
* 添加文档
*
* @param entityList 列表
*/
@Override
public void add(List<? extends BaseEntity> entityList) {
if (CollUtil.isEmpty(entityList)) {
return;
}
// 根据id删除
BooleanQuery.Builder builder = new BooleanQuery.Builder();
entityList.forEach(f-> builder.add(new TermQuery(new Term(StrConstant.D_ID, f.getId())), BooleanClause.Occur.SHOULD));
IndexBO indexBO = this.documentHelper.index(entityList.get(0).getClass());
this.luceneHelper.deleteDocuments(indexBO.getIndexName(), builder.build());
}
/**
* 更新文档
*
* @param entityList 列表
*/
@Override
public void update(List<? extends BaseEntity> entityList) {
if (CollUtil.isEmpty(entityList)) {
return;
}
entityList.forEach(f->{
Class<? extends BaseEntity> tClass = ClassUtil.getClass(f);
IndexBO indexBO = this.documentHelper.index(tClass);
// 删除当前版本
BooleanQuery.Builder builder = new BooleanQuery.Builder();
builder.add(new TermQuery(new Term(StrConstant.D_ID, f.getId())), BooleanClause.Occur.MUST);
builder.add(IntPoint.newExactQuery(StrConstant.D_VERSION, f.getVersion()), BooleanClause.Occur.MUST);
this.luceneHelper.deleteDocuments(indexBO.getIndexName(), builder.build());
// 上一个版本可见
builder = new BooleanQuery.Builder();
builder.add(new TermQuery(new Term(StrConstant.D_ID, f.getId())), BooleanClause.Occur.MUST);
int version = f.getVersion() - 1;
builder.add(IntPoint.newExactQuery(StrConstant.D_VERSION, version), BooleanClause.Occur.MUST);
List<Document> source = this.luceneHelper.search(indexBO.getIndexName(), builder, Sort.INDEXORDER, null);
this.indexHelper.switchDoc(this.documentHelper.toEntityList(source, tClass), YesOrNoEnum.YES);
});
}
/**
* 删除文档
*
* @param className 实体类名
* @param idList 主键列表
*/
@Override
public void del(String className, List<String> idList) {
if (CollUtil.isEmpty(idList)) {
return;
}
// 根据id检索出所有版本
Class<? extends BaseEntity> tClass = ClassUtil.loadClass(className);
IndexBO indexBO = this.documentHelper.index(tClass);
BooleanQuery.Builder builder = new BooleanQuery.Builder();
idList.forEach(f -> builder.add(new TermQuery(new Term(StrConstant.D_ID, f)), BooleanClause.Occur.SHOULD));
List<Document> docList = this.luceneHelper.search(indexBO.getIndexName(), builder, Sort.INDEXORDER, null);
if (CollUtil.isEmpty(docList)) {
return;
}
Map<String, List<Document>> docMap = docList.stream().collect(Collectors.groupingBy(d -> d.get(StrConstant.D_ID)));
// 过滤出最新版本,并更新为可见
docMap.forEach((id, list) -> {
// 根据版本号倒序
list = list.stream().sorted(
Comparator.comparing(c -> Integer.parseInt(c.get(StrConstant.D_VERSION)),
Comparator.reverseOrder())).collect(Collectors.toList());
// 最新版本变更为可见
Document d = list.get(0);
List<Document> dList = new ArrayList<>(1);
dList.add(d);
this.indexHelper.switchDoc(this.documentHelper.toEntityList(dList, tClass), YesOrNoEnum.YES);
// 其他低版本统一删除
int minVersion = Integer.parseInt(d.get(StrConstant.D_VERSION)) - 1;
BooleanQuery.Builder delBuilder = new BooleanQuery.Builder();
delBuilder.add(new TermQuery(new Term(StrConstant.D_ID, id)), BooleanClause.Occur.MUST);
delBuilder.add(IntPoint.newRangeQuery(StrConstant.D_VERSION, Integer.MIN_VALUE, minVersion), BooleanClause.Occur.MUST);
this.luceneHelper.deleteDocuments(indexBO.getIndexName(), delBuilder.build());
});
}
- TccDataSyncServiceImpl
实现父类IDataSyncService,把Tcc集成到IndexHelper。
@Slf4j
@Service("tccDataSyncService")
@DataSync(value = "tcc")
public class TccDataSyncServiceImpl implements IDataSyncService {
...
@Resource(name = "candyTryCommitService")
private ITccService candyTryCommitService;
@Resource(name = "candyCommitService")
private ITccService candyCommitService;
@Resource(name = "candyCancelService")
private ITccService candyCancelService;
/**
* 更新
*
* @param param 数据
*/
@Override
public void commit(@NotNull IndexUpdateParamBO param) {
// 预提交
boolean flag = this.tryOperate(param);
// 实际没有对其他节点执行预提交 或 提交成功
if (flag) {
this.commitOperate(param);
} else {
// 取消
this.cancel(param);
throw new AppException(MessageEnum.SYSTEM_ERROR);
}
}
/**
* 删除
*
* @param param 数据
*/
@Override
public void cancel(@NotNull IndexUpdateParamBO param) {
switch (param.getSyncType()) {
case ADD:
this.candyCancelService.add(param.getContent());
break;
case UPDATE:
this.candyCancelService.update(param.getContent());
break;
case DEL:
this.candyCancelService.del(param.getClassName(), param.getIdList());
break;
default:
break;
}
// cancel
this.batchPost(param, SyncTypeEnum.CANCEL);
}
}
- MsgDataSyncServiceImpl
数据的批次准时同步逻辑:
- 收集数据操作,IndexUpdateParamBO 封装了一次数据操作,并存放到本地队列
public void commit(IndexUpdateParamBO param) {
param.setTimestamp(System.currentTimeMillis());
this.dataSyncService.commit(param);
if(!this.commitListenerRender.before(param)) {
return;
}
this.dataSyncWaitReadMsgCache.put(null, param);
this.commitListenerRender.after(param);
}
- 队列数据写入到本地存储,即:_candy_data_sync_message
private void writeToDisk() {
if (!TAKE_END.compareAndSet(true, false)) {
return;
}
String className = ClassUtil.getClassName(BaseMessageEntity.class, false);
IndexBO indexBO = this.indexInfoCache.get(className, false);
List<IndexUpdateParamBO> list = new ArrayList<>(100);
IndexUpdateParamBO paramBO;
try {
boolean flag = true;
while (flag) {
paramBO = WAIT_READ.poll();
if (Objects.isNull(paramBO)) {
flag = false;
continue;
}
list.add(paramBO);
if (list.size() >= 99) {
this.saveLog(indexBO, list);
list.clear();
}
}
if (CollUtil.isNotEmpty(list)) {
this.saveLog(indexBO, list);
}
} catch (Exception e) {
log.error("msg write to disk error", e);
Thread.currentThread().interrupt();
} finally {
TAKE_END.set(true);
}
}
至此,完成数据操作的本地持久化。接下来,由DataSyncMsgReadThreadPool完成数据操作到实例的同步。该异步线程会进行批量同步,并在同步结束后移除本地持久化。
private void execute() {
if (!IS_END.compareAndSet(true, false)) {
return;
}
String className = ClassUtil.getClassName(BaseMessageEntity.class, false);
IndexBO indexBO = this.indexInfoCache.get(className, false);
String indexName = indexBO.getIndexName();
try {
List<Document> docList = this.luceneHelper.searchByPage(indexName, BOOL_BUILDER, SORT, PAGE_INFO, null);
while (CollUtil.isNotEmpty(docList)) {
List<BaseMessageEntity> entityList = this.documentHelper.toEntityList(docList, BaseMessageEntity.class);
this.batchPost(entityList);
// 删除已同步的文档
BooleanQuery.Builder builder = new BooleanQuery.Builder();
entityList.forEach(
f -> builder.add(new TermQuery(new Term(StrConstant.D_ID, f.getId())), BooleanClause.Occur.SHOULD));
this.luceneHelper.deleteDocuments(indexName, builder.build());
docList = this.luceneHelper.searchByPage(indexName, BOOL_BUILDER, SORT, PAGE_INFO, null);
}
} catch (Exception e) {
log.error("文档同步异常", e);
} finally {
IS_END.set(true);
}
}
实例接收到数据操作报文后,同步到本地索引:_candy_data_sync_message。最后由DataSyncMsgWriteThreadPool 解析并同步到对应的索引。
private void execute() {
if (!IS_END.compareAndSet(true, false)) {
return;
}
String className = ClassUtil.getClassName(BaseMessageEntity.class, false);
IndexBO indexBO = this.indexInfoCache.get(className, false);
String indexName = indexBO.getIndexName();
try {
List<Document> docList = this.luceneHelper.searchByPage(indexName, BOOL_BUILDER, SORT, PAGE_INFO, null);
while (CollUtil.isNotEmpty(docList)) {
List<BaseMessageEntity> entityList = this.documentHelper.toEntityList(docList, BaseMessageEntity.class);
this.write(entityList);
// 删除已同步的文档
BooleanQuery.Builder builder = new BooleanQuery.Builder();
entityList.forEach(f ->
builder.add(
new TermQuery(new Term(StrConstant.D_ID, f.getId())), BooleanClause.Occur.SHOULD));
this.luceneHelper.deleteDocuments(indexName, builder.build());
docList = this.luceneHelper.searchByPage(indexName, BOOL_BUILDER, SORT, PAGE_INFO, null);
}
} catch (Exception e) {
log.error("文档同步异常", e);
} finally {
IS_END.set(true);
}
}
至此,消息同步数据完成。