有些场景下,我们可能想做一些自动发现的一些功能,调用指定接口的所有group实现,并将所有结果追加输出。
如:一种基于Dubbo订阅模式,我们提供了一个订阅者接口。附上一段伪代码(全文提供的都是伪代码,主要提供一种设计思路):
public interface Subscriber{
/**
** 获取当前订阅者信息,比如订阅的事件类型,订阅的服务id等
**/
List<SubsriberInfo> getSubsriberInfos();
/**
** 发送消息
**/
void sendMessage(Message message);
}
使用Dubbo的merge方式,组合查询出所有的订阅者信息
public class SubsribeService{
@DubboReference(group="*",merger="true")
private Subscriber subsriber;
public List<SubsriberInfo> getSubsriberInfos(){
// 注意不要加缓存,要每次获取,因为注册的服务可能有变化,Dubbo会自动获取所有注册的服务并合并结果集
return subsriber.getSubsriberInfos();
}
}
有时候我们可能希望返回的每一个SubsriberInfo对象中,能带上group字段,但如果让服务发布方去set这个字段感觉会很另类,这时可以借助dubbo的filter机制实现。
为了让Filter能够统一拦截,我们为SubsriberInfo提取出了一个基接口IProviderEntity,用于提供setGroup方法。当发现dubbp返回值实现了这个接口,就为结果集setGroup。
public class SetGroupFilter implements Filter{
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Result result = invoker.invoke(invocation);
if(result instanceof AsyncRpcResult){
return (Result) Proxy.newProxyInstance(Result.class.getClassLoader(), new Class[]{Result.class}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
}
if("get".equals(method.getName())){
Result getResultVal = (Result) method.invoke(result, args);
Object retVal = getResultVal.getValue();
setProviderId(retVal,invoker);
return getResultVal;
}else if("whenCompleteWithContext".equals(method.getName())){
method.invoke(result,args);
return proxy;
}
return method.invoke(result,args);
}
});
}else{
Object retVal = result.getValue();
setGroup(retVal,invoker);
return result;
}
}
private void setGroup(Object retVal,Invoker invoker){
if(retVal instanceof IProviderEntity){
((IProviderEntity) retVal).setGroup(StringUtils.parseQueryString(invoker.getUrl().toString()).get("group"));
}else if(retVal instanceof Collection){
Iterator iterator = ((Collection) retVal).iterator();
while(iterator.hasNext()){
Object next = iterator.next();
setGroup(next, invoker);
}
}
}
}
我们获取到了所有Group注册的订阅者之后,我们可能就需要调用指定的Group,来实现发送消息。
如:group1系统订阅了订单变更通知。
此时就要调用group1的订阅者实现,来执行发送消息。
那么如何动态的去调用group呢?Dubbo为我们提供了api的方式,可通过这种方式拿到指定group的代理接口实现。再附上一段伪代码,如执行group1。
ReferenceConfig referenceConfig = new ReferenceConfig();
referenceConfig.setInterface(Subscriber.class);
referenceConfig.setGroup("group1");
Subscriber subscriber = referenceConfig.get();
subscriber.sendMessage(message);
当然了,这种设计架构也有着他致命的缺陷,那就是如果被订阅者的服务某一时刻挂掉了,那么此时对应的服务可能就会丢消息。因此,此设计的架构适用于实时消息订阅的一种情况,只需要实时消息,对历史的消息没有什么价值的场景,如多节点的Websocket事件推送。
至此,基于dubbo的发布订阅模式完成,此文仅仅阐述了一个设计思路,在具体应用中可能还需要结合实际的业务场景做一些修改。