当前lucene-candy提供三种监听器,开发者只需要实现监听器,组件即能自动加载生效。
- cn.juque.lucenecandy.core.datasync.listener.ICommitListener:tcc同步数据的监听器;
- cn.juque.lucenecandy.core.datasync.listener.ITccControllerListener:tcc模式接收到数据报文后的执行的监听器;
- cn.juque.lucenecandy.core.datasync.listener.IMsgWriteListener:msg模式持久化数据操作时执行的监听器;
- cn.juque.lucenecandy.core.datasync.listener.ICommitListener:
public interface ICommitListener {
/**
* 事前操作
*
* @param param 参数
* @return boolean,false将会取消提交
*/
Boolean before(IndexUpdateParamBO param);
/**
* 事后操作
*
* @param param 参数
*/
void after(IndexUpdateParamBO param);
}
调度方式:
@Slf4j
@Service("tccDataSyncService")
@DataSync(value = "tcc")
public class TccDataSyncServiceImpl implements IDataSyncService {
@Resource
private IpCache ipCache;
@Resource
private CommitListenerRender commitListenerRender;
@Resource(name = "candyTryCommitService")
private ITccService candyTryCommitService;
@Resource(name = "candyCommitService")
private ITccService candyCommitService;
@Resource(name = "candyCancelService")
private ITccService candyCancelService;
...
private boolean batchPost(IndexUpdateParamBO paramBO, SyncTypeEnum syncType) {
// 事前操作
if (!this.commitListenerRender.before(paramBO)) {
return true;
}
String urlPath = null;
switch (syncType) {
case PRE:
urlPath = UrlConstant.PRE_COMMIT_DOC;
break;
case COMMIT:
urlPath = UrlConstant.COMMIT_DOC;
break;
case CANCEL:
urlPath = UrlConstant.CANCEL_DOC;
break;
default:
break;
}
...
// 事后
this.commitListenerRender.after(paramBO);
return flag.get();
} catch (Exception e) {
log.error("batch post error", e);
return false;
}
}
}
- cn.juque.lucenecandy.core.datasync.listener.ITccControllerListener:
public interface ITccControllerListener {
/**
* 事前操作
*
* @param syncTypeEnum 同步方式
* @param param 参数
* @return boolean,false将会取消提交
*/
Boolean before(IDataSyncService.SyncTypeEnum syncTypeEnum, IndexUpdateParamVO param);
/**
* 事后操作
*
* @param syncTypeEnum 同步方式
* @param param 参数
*/
void after(IDataSyncService.SyncTypeEnum syncTypeEnum, IndexUpdateParamVO param);
}
调用方式
@Slf4j
@Controller
@RequestMapping("/lucd")
public class CandyController {
@Resource(name = "candyTryCommitService")
private ITccService candyTryCommitService;
@Resource(name = "candyCommitService")
private ITccService candyCommitService;
@Resource(name = "candyCancelService")
private ITccService candyCancelService;
@Resource
private DataSyncWaitWriteMsgCache dataSyncWaitWriteMsgCache;
@Resource
private TccControllerListenerRender tccControllerListenerRender;
/**
* 预提交文档
*
* @return BaseResponseVO
*/
@PostMapping(value = "/tryCommit")
@ResponseBody
public BaseResponseVO<Boolean> tryCommit(@RequestBody BaseRequestVO<IndexUpdateParamVO> request) {
IndexUpdateParamVO paramVO = request.getParam();
if (Boolean.FALSE.equals(this.checkIndexParam(paramVO))) {
return new BaseResponseVO<>(MessageEnum.OK, false);
}
// 事前
this.tccControllerListenerRender.before(IDataSyncService.SyncTypeEnum.PRE, paramVO);
...
// 事后
this.tccControllerListenerRender.after(IDataSyncService.SyncTypeEnum.PRE, paramVO);
return new BaseResponseVO<>(MessageEnum.OK, true);
}
/**
* 生效文档
*
* @return BaseResponseVO
*/
@PostMapping(value = "/commit")
@ResponseBody
public BaseResponseVO<Boolean> commit(@RequestBody BaseRequestVO<IndexUpdateParamVO> request) {
IndexUpdateParamVO paramVO = request.getParam();
if (Boolean.FALSE.equals(this.checkIndexParam(paramVO))) {
return new BaseResponseVO<>(MessageEnum.OK, false);
}
// 事前
this.tccControllerListenerRender.before(IDataSyncService.SyncTypeEnum.COMMIT, paramVO);
...
// 事后
this.tccControllerListenerRender.after(IDataSyncService.SyncTypeEnum.COMMIT, paramVO);
return new BaseResponseVO<>(MessageEnum.OK, true);
}
/**
* 取消操作
*
* @return BaseResponseVO
*/
@PostMapping(value = "/cancel")
@ResponseBody
public BaseResponseVO<Boolean> cancel(@RequestBody BaseRequestVO<IndexUpdateParamVO> request) {
IndexUpdateParamVO paramVO = request.getParam();
if (Boolean.FALSE.equals(this.checkIndexParam(paramVO))) {
return new BaseResponseVO<>(MessageEnum.OK, false);
}
// 事前
this.tccControllerListenerRender.before(IDataSyncService.SyncTypeEnum.CANCEL, paramVO);
...
// 事后
this.tccControllerListenerRender.after(IDataSyncService.SyncTypeEnum.CANCEL, paramVO);
return new BaseResponseVO<>(MessageEnum.OK, true);
}
}
- cn.juque.lucenecandy.core.datasync.listener.IMsgWriteListener:
public interface IMsgWriteListener {
/**
* 事前操作
* <li>可实现监听器,根据需要变更、读取报文</li>
*
* @param messageEntity 消息实体
* @return boolean: false 取消写入索引
*/
Boolean before(BaseMessageEntity messageEntity);
/**
* 事后操作
*<li>可实现监听器,根据需要变更、读取报文</li>
* @param messageEntity 消息实体
*/
void after(BaseMessageEntity messageEntity);
}
调用方式
@Slf4j
@Component("dataSyncMsgWriteThreadPool")
public class DataSyncMsgWriteThreadPool {
private static final ThreadFactory THREAD_FACTORY = ThreadUtil.newNamedThreadFactory("index-msg-read-", true);
private static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(1, THREAD_FACTORY);
private static final BooleanQuery.Builder BOOL_BUILDER = new BooleanQuery.Builder();
private static final PageInfo PAGE_INFO = new PageInfo();
private static final Sort SORT = new Sort(new SortField("timestamp", SortField.Type.LONG));
private static final AtomicBoolean IS_END = new AtomicBoolean(true);
@Resource
private DocumentHelper documentHelper;
@Resource
private LuceneHelper luceneHelper;
@Resource
private IndexInfoCache indexInfoCache;
@Resource
private MsgWriteListenerRender msgWriteListenerRender;
...
private void write(List<BaseMessageEntity> list) {
if (CollUtil.isEmpty(list)) {
return;
}
list.forEach(f -> {
if(!this.msgWriteListenerRender.before(f)) {
return;
}
DocSyncTypeEnum syncType = DocSyncTypeEnum.valueOf(f.getSyncType());
switch (syncType) {
case ADD:
this.writeForAdd(f);
break;
case UPDATE:
this.writeForUpdate(f);
break;
case DEL:
this.writeForDel(f);
break;
default:
break;
}
this.msgWriteListenerRender.after(f);
});
}
}