通过前几章的学习我们已经基本了解了dubbo的远程调用和服务治理的实现原理,为了加深理解,下面我们将亲手实现远程调用的实现。首先画张类图来总体看下结构
基本实现依据我们分析的dubbo原理,分为引用端、发布端和注册中心。
首先服务发布端使用ServiceBean端方式进行服务api式的export发布
public class ServiceBean<T> implements DisposableBean {
private T ref;
private Class type;
private int port=DEFAULT_PORT;
private static final int DEFAULT_PORT=52066;
private Url url;
private static final String PROVIDER_PATH="providers";
private static final Map<String,NiniServerInvoker> invokersMap=new ConcurrentHashMap<>();
private NinaServer nettyServer;
public static Map<String, NiniServerInvoker> getInvokersMap() {
return invokersMap;
}
public ServiceBean(T ref, Class type, int port) {
this.ref = ref;
this.type = type;
this.port=port;
}
public ServiceBean(T ref, Class type) {
this.ref = ref;
this.type = type;
}
/**
* 发布服务
*/
public void export() throws Exception {
//获取本地ip
String hostIp = findHostIp();
//生成要发布的url
Url url = new Url(hostIp, Integer.valueOf(port).toString(), type);
this.url=url;
//生成对应调用invoker
invokersMap.putIfAbsent(type.getName(),new NiniServerInvoker(ref,type));
//注册
ZkUtils.createNode(type.getName());
ZkUtils.createNode(type.getName()+"/"+PROVIDER_PATH);
ZkUtils.createNode(type.getName()+"/"+PROVIDER_PATH+"/"+url.toString());
//开启服务
nettyServer = new NinaServer(url).init().start();
}
/**
* 获取本地ip
* @return
*/
public static String findHostIp(){
try {
String hostAddress = InetAddress.getLocalHost().getHostAddress();
return hostAddress;
} catch (UnknownHostException e) {
throw new RuntimeException("获取本地ip失败");
}
}
@Override
public void destroy() throws Exception {
invokersMap.remove(url.toString());
ZkUtils.revomeNode("/"+type.getName()+"/providers/"+url.toString());
nettyServer.close();
}
}
核心逻辑在生成调用invoker,注册zk服务,和开启远程服务(省去了服务端订阅事件)
生成调用invoker一样通过一个动态生成wrapper来实现依据方法名调用的形式
package kai.test.nina.service;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import javassist.*;
import kai.test.nina.NinaInovker;
import kai.test.nina.Request;
import kai.test.nina.Response;
import kai.test.nina.test.TestApi;
import kai.test.nina.test.TestApiImpl;
/**
* Created by kai.yang on 2018/11/2.
*/
public class NiniServerInvoker<T> implements NinaInovker {
T ref;
Class<T> type;
final Wrapper wrapper;
public NiniServerInvoker(T ref, Class<T> type) throws Exception {
this.ref = ref;
this.type = type;
wrapper = createWrapper();
wrapper.setType(type);
wrapper.setRef(ref);
}
@Override
public Response invoke(Request request) throws Throwable {
Response response = new Response(request.getId());
try {
Object invoke = wrapper.invoke(request.getMethod(), request.getParams());
response.setResult(invoke);
} catch (Exception e) {
response.setE(e);
}
return response;
}
@Override
public boolean destory() throws Throwable {
// TODO: 2018/11/2
return false;
}
private Wrapper createWrapper() throws Exception {
ClassPool pool = ClassPool.getDefault();
CtClass ctClass = pool.makeClass(type.getName() + "$Wrapper");
CtField refField = new CtField(pool.getCtClass("java.lang.Object"), "ref", ctClass);
CtField typeField = new CtField(pool.getCtClass("java.lang.Class"), "type", ctClass);
refField.setModifiers(Modifier.PRIVATE);
typeField.setModifiers(Modifier.PRIVATE);
ctClass.addField(refField);
ctClass.addField(typeField);
ctClass.addMethod(CtNewMethod.setter("setRef", refField));
ctClass.addMethod(CtNewMethod.setter("setType", typeField));
ctClass.addConstructor(CtNewConstructor.defaultConstructor(ctClass));
ctClass.addInterface(pool.get(Wrapper.class.getName()));
StringBuilder sb = new StringBuilder();
sb.append(" public Object invoke(String method,Object[] params){\n ")
.append(type.getName()).
append(" o=(").append(type.getName()).append(")ref;\n");
Method[] declaredMethods = type.getDeclaredMethods();
boolean isVoid = false;
if (declaredMethods != null) {
for (Method m : declaredMethods) {
sb.append(" if(method.equals(\"").append(m.getName()).append("\")){\n ");
Wrapper.ParamDefine[] paramDefines=null;
//处理入参的基本类型
if (m.getParameterCount() > 0) {
paramDefines=new Wrapper.ParamDefine[m.getParameterCount()];
Class<?>[] parameterTypes = m.getParameterTypes();
// TODO: 2018/11/3
for (int i = 0; i < parameterTypes.length; i++) {
Wrapper.ParamDefine define=null;
if(parameterTypes[i]==int.class){
define=new Wrapper.ParamDefine("","((Integer)params["+i+"]).intValue()",true);
}else if(parameterTypes[i]==boolean.class){
define=new Wrapper.ParamDefine("","((Float)params["+i+"]).floatValue()",true);
}else if(parameterTypes[i]==double.class){
define=new Wrapper.ParamDefine("","((Double)params["+i+"]).doubleValue()",true);
}else if(parameterTypes[i]==double.class){
define=new Wrapper.ParamDefine("","((Long)params["+i+"]).longValue()",true);
}else if(parameterTypes[i]==byte.class){
define=new Wrapper.ParamDefine("","((Byte)params["+i+"]).byteValue()",true);
}else{
define=new Wrapper.ParamDefine("("+parameterTypes[i].getName()+")","params["+i+"]",false);
}
if(define!=null){
paramDefines[i]=define;
}
}
}
Class<?> returnType = m.getReturnType();
if (returnType == void.class) {
isVoid = true;
sb.append(" o.").append(m.getName());
} else if (returnType.isPrimitive()) {
if (returnType == boolean.class) {
sb.append("boolean b=o.").append(m.getName());
} else if (returnType == int.class) {
sb.append("int b=o.").append(m.getName());
} else if (returnType == double.class) {
sb.append("double b=o.").append(m.getName());
} else if (returnType == float.class) {
sb.append("float b=o.").append(m.getName());
} else if (returnType == long.class) {
sb.append("long b=o.").append(m.getName());
}else if (returnType == byte.class) {
sb.append("byte b=o.").append(m.getName());
}
} else {
sb.append("Object result=o.").append(m.getName());
}
if (m.getParameterCount() > 0) {
for (int i = 0; i < paramDefines.length; i++) {
sb.append("(").append(paramDefines[i].type).append(paramDefines[i].parms);
if (i != paramDefines.length - 1) {
sb.append(",");
}
}
sb.append(");\n");
} else {
sb.append("();\n");
}
if (isVoid) {
sb.append("return null;}\n");
isVoid = false;
} else if (returnType == boolean.class) {
sb.append(" Boolean result=Boolean.valueOf(b);\n");
sb.append("return result;}\n");
} else if (returnType == int.class) {
sb.append(" Integer result=Integer.valueOf(b);\n");
sb.append("return result;}\n");
} else if (returnType == double.class) {
sb.append(" Double result=Double.valueOf(b);\n");
sb.append("return result;}\n");
} else if (returnType == float.class) {
sb.append(" Float result=Float.valueOf(b);\n");
sb.append("return result;}\n");
} else if (returnType == long.class) {
sb.append(" Long result=Long.valueOf(b);\n");
sb.append("return result;}\n");
} else {
sb.append("return result;}\n");
}
}
sb.append("throw new RuntimeException(\"远程调用失败,没有此方法!\");}");
}
// System.out.println(sb.toString());
CtMethod m = CtNewMethod.make(sb.toString(), ctClass);
m.setModifiers(Modifier.PUBLIC);
ctClass.addMethod(m);
Class aClass = ctClass.toClass();
return (Wrapper) aClass.newInstance();
}
public interface Wrapper {
Object invoke(String method, Object[] params);
void setRef(Object ref);
void setType(Class type);
/**
* wrapper中参数描述
*/
class ParamDefine{
String type;
String parms;
boolean isPrimitive;
public ParamDefine(String type, String parms, boolean isPrimitive) {
this.type = type;
this.parms = parms;
this.isPrimitive = isPrimitive;
}
}
}
}
简单的zkc操作生成对应的服务目录
生成注册服务后,开启netty服务
package kai.test.nina.service;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import kai.test.nina.DefaultNettyHandler;
import kai.test.nina.Request;
import kai.test.nina.Url;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by kai.yang on 2018/11/1.
*/
public class NinaServer {
private Logger logger= LoggerFactory.getLogger(getClass());
private Url url;
ServerBootstrap serverBootstrap = null;
volatile boolean isClosed;
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
public NinaServer(Url url) {
this.url = url;
}
public boolean isBound() {
return false;
}
public NinaServer init() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ByteArrayDecoder());
socketChannel.pipeline().addLast(new ByteArrayEncoder());
socketChannel.pipeline().addLast(new DefaultNettyHandler(Request.class));
}
});
start();
return this;
}
public boolean close() {
if (isClosed) {
return true;
}
isClosed = true;
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
return true;
}
public NinaServer start() {
ChannelFuture future = serverBootstrap.bind(Integer.valueOf(url.getPort())).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
future.channel().eventLoop().schedule(() -> start(), 1, TimeUnit.SECONDS);
} else {
logger.info("服务可用了");
}
}
});
return this;
}
}
服务的消息处理类,我消费端和服务端采用同样的一个类
package kai.test.nina;
import java.util.Map;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import kai.test.nina.service.NiniServerInvoker;
import kai.test.nina.service.ServiceBean;
/**
* Created by kai.yang on 2018/11/1.
*/
@io.netty.channel.ChannelHandler.Sharable
public class DefaultNettyHandler<T> extends ChannelDuplexHandler {
KryoSerializer recivedSerializer;
KryoSerializer respSer=new KryoSerializer(Response.class);
Class<T> serializerType;
public DefaultNettyHandler(Class serializerType) {
this.serializerType = serializerType;
recivedSerializer=new KryoSerializer(serializerType);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接收消息");
Object deserialize = null;
try {
deserialize = recivedSerializer.deserialize((byte[]) msg);
} catch (Exception e) {
e.printStackTrace();
}
if(deserialize instanceof Response){
Response.received((Response) deserialize);
}else if(deserialize instanceof Request){
Request request=(Request)deserialize;
Map<String, NiniServerInvoker> invokersMap = ServiceBean.getInvokersMap();
NiniServerInvoker niniServerInvoker = invokersMap.get(request.getClassType());
Response response=new Response(request.getId());
try {
response = niniServerInvoker.invoke(request);
} catch (Throwable throwable) {
response.setE(throwable);
}
byte[] serialize = respSer.serialize(response);
ctx.writeAndFlush(serialize);
}else{
throw new RuntimeException("非法请求参数");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
重点在构造此channleHandler传入 serializerType=Request还是Response来判断是服务端还是消费端。
下面在消费端,一样先通过RefeneceBean生成代理类
package kai.test.nina.reference;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import kai.test.nina.NinaInovker;
import kai.test.nina.Request;
/**
* Created by kai.yang on 2018/11/1.
*/
public class RefeneceBean<T> {
private Class<T> interfaceType;
private T proxy;
public RefeneceBean(Class<T> interfaceType) {
this.interfaceType=interfaceType;
}
public T get(){
return (T)createProxy();
}
/**
* 创建代理
* @return
*/
private T createProxy() {
try {
if(proxy==null){
//创建目录invokers
NinaDirectory ninaDirectory = new NinaDirectory(interfaceType);
//包装代理
proxy=(T)Proxy.newProxyInstance(this.getClass().getClassLoader(),new Class[]{interfaceType},new referenceInvocationHandler(interfaceType,ninaDirectory));
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("调用服务初始化失败");
}
return proxy;
}
private static class referenceInvocationHandler implements InvocationHandler{
private Class interfaceType;
private NinaInovker ninaInovker;
public referenceInvocationHandler(Class interfaceType,NinaInovker ninaInovker) {
this.interfaceType=interfaceType;
this.ninaInovker=ninaInovker;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(method.getName().equals("toString")){
return proxy.toString();
}
if(method.getName().equals("equals")){
return false;
}
Request request=new Request();
request.setClassType(interfaceType.getName());
request.setMethod(method.getName());
request.setParams(args);
return ninaInovker.invoke(request).get();
}
}
}
核心地方在于
NinaDirectory ninaDirectory = new NinaDirectory(interfaceType);
此处创建了一个目录引用多个invoker类
package kai.test.nina.reference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import kai.test.nina.*;
/**
* Created by kai.yang on 2018/11/2.
*/
public class NinaDirectory implements NinaInovker, NinaListener {
private final String PROVIDER_PATH;
Class type;
//urlstring--->invoker
Map<String, NinaInovker> inovkerMap = new ConcurrentHashMap<>();
List<NinaInovker> invokers = new ArrayList<>();
AtomicInteger count = new AtomicInteger(0);
public NinaDirectory(Class type) throws Exception {
this.type = type;
PROVIDER_PATH="/"+this.type.getName()+"/providers";
subscribe();
}
@Override
public Response invoke(Request request) throws Throwable {
//选择-->此处简写只有轮询
if (invokers.size() == 0) {
throw new RuntimeException("远程调用失败,无可用服务" + request.getClassType());
}
int postion = (count.getAndIncrement() % invokers.size());
NinaInovker ninaInovker = invokers.get(postion);
//调用选择出的invoker,进行调用
return ninaInovker.invoke(request);
}
public NinaDirectory subscribe() throws Exception {
//启动时首次获取所有path,直接初识化
List<String> nodes = ZkUtils.getNodes(PROVIDER_PATH);
if(nodes!=null){
for (String url:nodes) {
refreshInvokers(url,1);
}
}
//从zk中订阅服务,获取目录下的url
ZkUtils.subscribe(PROVIDER_PATH,this);
return this;
}
public void refreshInvokers(String urlStr,int option) {
switch (option) {
case NinaListener.ADD_OPTION:
Url url = new Url(urlStr);
NinaInovker oldInvoker = inovkerMap.get(urlStr);
if(oldInvoker==null) {
NinaClientInvoker ninaClientInvoker = new NinaClientInvoker(new NinaClient(url), url);
inovkerMap.put(urlStr, ninaClientInvoker);
invokers.add(ninaClientInvoker);
}
break;
case NinaListener.REMOVE_OPTION:
//需要删除的服务
NinaInovker ninaInovker = inovkerMap.get(urlStr);
if (ninaInovker != null) {
inovkerMap.remove(ninaInovker);
invokers.remove(ninaInovker);
try {
ninaInovker.destory();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
break;
case NinaListener.UPDATE_OPTION:
// TODO: 2018/11/2
break;
}
}
@Override
public boolean destory() throws Throwable {
if(invokers!=null){
for (NinaInovker invoker:invokers
) {
invoker.destory();
}
}
inovkerMap.clear();
invokers.clear();
return true;
}
@Override
public void doNotify(int option,String url) throws Exception {
System.out.println("监听到数据变化"+url+",type="+option);
switch (option){
case NinaListener.ADD_OPTION:
refreshInvokers(url,NinaListener.ADD_OPTION);
break;
case NinaListener.REMOVE_OPTION:
refreshInvokers(url,NinaListener.REMOVE_OPTION);
break;
case NinaListener.UPDATE_OPTION:
refreshInvokers(url,NinaListener.UPDATE_OPTION);
break;
}
}
}
启动此类时,会调用subscribe方法开始刷新服务
public NinaDirectory subscribe() throws Exception {
//启动时首次获取所有path,直接初识化
List<String> nodes = ZkUtils.getNodes(PROVIDER_PATH);
if(nodes!=null){
for (String url:nodes) {
refreshInvokers(url,1);
}
}
//从zk中订阅服务,获取目录下的url
ZkUtils.subscribe(PROVIDER_PATH,this);
return this;
}
我是第一次启动时是直接从zk中获取目标接口目录下的服务url,进行刷新invoker,然后向zk中添加监听器,有变动时回调本地的doNotify方法进行刷新服务。
调用刷新方法 refreshInvokers(url,1)会生成NinaClientInvoker来封装本地调用
package kai.test.nina.reference;
import kai.test.nina.NinaInovker;
import kai.test.nina.Request;
import kai.test.nina.Response;
import kai.test.nina.Url;
/**
* Created by kai.yang on 2018/11/2.
*/
public class NinaClientInvoker implements NinaInovker {
NinaClient ninaClient;
Url url;
public NinaClientInvoker(NinaClient ninaClient, Url url) {
this.ninaClient = ninaClient;
this.url = url;
}
@Override
public Response invoke(Request request) throws Throwable {
//发送消息
System.out.println("调用服务"+url);
boolean send = ninaClient.send(request);
if (send){
return new Response(30000, request.getId());
}
throw new RuntimeException("远程调用失败,发送消息异常");
}
@Override
public boolean destory() throws Throwable {
ninaClient.destory();
return false;
}
}
里边包装的当然是一个nettyClient
package kai.test.nina.reference;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.util.concurrent.DefaultThreadFactory;
import kai.test.nina.*;
/**
* Created by kai.yang on 2018/11/1.
*/
public class NinaClient {
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Math.min(Runtime.getRuntime().availableProcessors() + 1, 32), new DefaultThreadFactory("NettyClientWorker", true));
private Bootstrap bootstrap;
private volatile Url url;
private volatile Channel channel;
KryoSerializer<Request> kryoSerializer=new KryoSerializer<>(Request.class);
public NinaClient(Url url) {
this.url = url;
init();
open();
}
public void init(){
final DefaultNettyHandler nettyClientHandler = new DefaultNettyHandler(Response.class);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535));
bootstrap.handler(new ChannelInitializer() {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ByteArrayDecoder());
ch.pipeline().addLast(new ByteArrayEncoder());
ch.pipeline().addLast(nettyClientHandler);
}
});
}
public Channel open(){
ChannelFuture future = bootstrap.connect(url.getIp(), Integer.valueOf(url.getPort()));
boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);
if(ret&&future.isSuccess()){
if(channel!=null){
channel.close();
}
channel=future.channel();
return channel;
}
throw new RuntimeException("开启通道信息异常");
}
public void destory(){
if(channel!=null){
channel.close();
}
}
public boolean send(Object o){
if(channel==null||!channel.isActive()) {
try {
if(channel!=null) {
channel.close();
}
} catch (Exception e) {
}
open();
}
byte[] serialize = kryoSerializer.serialize(o);
ChannelFuture future = channel.writeAndFlush(serialize);
Throwable cause = future.cause();
if(cause!=null){
cause.printStackTrace();
throw new RuntimeException("发送消息失败");
}
return true;
}
}
此client一样是通过DefaultNettyHandler来实现消息接收。
在NinaDirectory中通过invokers进行服务缓存,inovkerMap来缓存invoker和接口间关系用于刷新服务。
在消费端发起调用时
//选择-->此处简写只有轮询
if (invokers.size() == 0) {
throw new RuntimeException("远程调用失败,无可用服务" + request.getClassType());
}
int postion = (count.getAndIncrement() % invokers.size());
NinaInovker ninaInovker = invokers.get(postion);
//调用选择出的invoker,进行调用
return ninaInovker.invoke(request);
模拟了个简单轮询操作进行选择服务器。
好了,上面就是主要的实现类。还有一些异步阻塞我是直接放到了Response类中进行
package kai.test.nina;
import java.io.Serializable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by kai.yang on 2018/11/1.
*/
public class Response implements Serializable{
private static final long serialVersionUID = -4355285085441000005L;
String id;
Object result;
Throwable e;
long timeout;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private static ConcurrentHashMap<String,Response> RESULTS=new ConcurrentHashMap<>();
public Response(long timeout,String id) {
this.timeout = timeout;
this.id=id;
RESULTS.putIfAbsent(id,this);
}
public Response(String id) {
this.id = id;
}
/**
* 接收消息通知
* @param response
*/
public static void received(Response response){
Response responseResult = RESULTS.get(response.getId());
responseResult.doReceived(response);
}
public void doReceived(Response response){
try{
lock.lock();
if(response!=null){
this.setResult(response.getResult());
this.setE(response.getE());
this.done.signal();
}
}finally {
lock.unlock();
}
}
public Object get()throws Throwable{
try {
lock.lock();
if(result==null&&e==null){
boolean await = done.await(timeout, TimeUnit.MILLISECONDS);
if(!await){
throw new RuntimeException("远程调用超时");
}
}
if(result!=null){
return result;
}
if(e!=null){
throw e;
}
}catch (RuntimeException re){
throw re;
}catch (Exception e){
e.printStackTrace();
throw new RuntimeException("远程调用异常");
}finally {
lock.unlock();
}
return null;
}
public String getId() {
return id;
}
public Response setId(String id) {
this.id = id;
return this;
}
public Object getResult() {
return result;
}
public Response setResult(Object result) {
this.result = result;
return this;
}
public Throwable getE() {
return e;
}
public Response setE(Throwable e) {
this.e = e;
return this;
}
public long getTimeout() {
return timeout;
}
public Response setTimeout(long timeout) {
this.timeout = timeout;
return this;
}
}
通过Response的get方法来进行阻塞,等待服务端返回结构。还有在消息处理类DefaultNettyHandler中
if(deserialize instanceof Response){
Response.received((Response) deserialize);
}else if(deserialize instanceof Request){
Request request=(Request)deserialize;
Map<String, NiniServerInvoker> invokersMap = ServiceBean.getInvokersMap();
NiniServerInvoker niniServerInvoker = invokersMap.get(request.getClassType());
Response response=new Response(request.getId());
try {
response = niniServerInvoker.invoke(request);
} catch (Throwable throwable) {
response.setE(throwable);
}
byte[] serialize = respSer.serialize(response);
ctx.writeAndFlush(serialize);
我们可以看到,
- 当服务端接收消息后,反序列成Request类,找到对应接口的invoker,进行调用,返回返回结构
- 当消费端接收到消息后,通过Response.received的方法找到原油Response对象,唤醒原线程
public static void received(Response response){
Response responseResult = RESULTS.get(response.getId());
responseResult.doReceived(response);
}
public void doReceived(Response response){
try{
lock.lock();
if(response!=null){
this.setResult(response.getResult());
this.setE(response.getE());
this.done.signal();
}
}finally {
lock.unlock();
}
}
好了,主要逻辑就这些。希望有助于帮助大家学习dubbo源码。看下测试结果:
有三个测试类
//服务一
public class TestMain {
public static void main(String[] args) throws Exception {
//测试方法,没有调用销毁,每次启动删除原有注册服务
ZkUtils.revomeAllChildNode("/kai.test.nina.test.TestApi");
TestApi testApi = new TestApiImpl();
ServiceBean<TestApi> testApiServiceBean = new ServiceBean<>(testApi, TestApi.class);
testApiServiceBean.export();
System.in.read();
}
}
//服务二
public class TestMain2 {
public static void main(String[] args) throws Exception {
TestApi testApi = new TestApiImpl();
ServiceBean<TestApi> testApiServiceBean = new ServiceBean<>(testApi, TestApi.class,9000);
testApiServiceBean.export();
System.in.read();
}
}
//客户端
public class TestClientMain {
public static void main(String[] args) throws IOException {
RefeneceBean<TestApi> refeneceBean=new RefeneceBean(TestApi.class);
TestApi testApi = refeneceBean.get();
for (int i = 0; i <4 ; i++) {
System.out.println(testApi.setInt(90));
}
}
}
调用结果
调用服务nina:192.168.199.130:52066?interface=kai.test.nina.test.TestApi&methods=setInt,goBack,sayVoid,sayHello,sayboolean
监听到数据变化nina:192.168.199.130:52066?interface=kai.test.nina.test.TestApi&methods=setInt,goBack,sayVoid,sayHello,sayboolean,type=1
监听到数据变化nina:192.168.199.130:9000?interface=kai.test.nina.test.TestApi&methods=setInt,sayHello,goBack,sayVoid,sayboolean,type=1
接收消息
90
调用服务nina:192.168.199.130:9000?interface=kai.test.nina.test.TestApi&methods=setInt,sayHello,goBack,sayVoid,sayboolean
接收消息
90
调用服务nina:192.168.199.130:52066?interface=kai.test.nina.test.TestApi&methods=setInt,goBack,sayVoid,sayHello,sayboolean
接收消息
90
调用服务nina:192.168.199.130:9000?interface=kai.test.nina.test.TestApi&methods=setInt,sayHello,goBack,sayVoid,sayboolean
接收消息
90
最后附上源码 https://gitee.com/kaiyang_taichi/Nina/blob/master/nina.zip
里边有测试类,我的zookeeper是用的本地配置,地址写在utils中,可自行扩展。
首页 dubbo源码欣赏简介