Java SOA 框架

最近几个星期学习了点后端的东西,现在简单的记录下,先说说公司的的SOA(什么是SOA,google)框架。

服务注册

SOA 服务在启动的时候会向配置中心注册服务信息

CoreBootstrap.getInstance().include(args != null && args.length > 0?args:new String[]{"conf/Configure.json"}).start();

程序启动的时候会从配置文件读取服务的服务名, 集群名,协议, 业务端口,I/O处理线程池大小,业务处理线程池大小,缓冲队列大小 ,提供服务的接口名列表等信息, 将服务名,集群名,机器ip,端口号 注册到配置中心

服务发现

SOA 客户端通过配置文件中的服务名,集群名,协议 等信息创建SOA 客户端

ClientUtil.getContext().initClients("conf/Client.json");
  • 负载均衡
    SOA 服务的负载均衡是在客户端做的,客户端在初始化的时候从配置中心根据服务名拉取ip 数组, 通过round-robin 做负载均衡
public String getUrl() {
        int index = this.counter.getAndIncrement();
        index %= this.providerList.size();
        index = index >= 0?index:index + this.providerList.size();
        return (String)this.providerList.get(index);
    }
  • 线程池策略

客户端线程池策略支持semaphore(线程池满时抛出异常) 和 queue(线程池满时排队等待执行);


public Object execute(Task task) throws Throwable {
        BreakerMetrics metric = metrics.get(task.getMethod());
        if (metric.isOpen()) {
            metric.increment(BreakerStatus.BREAKER_REJECT);
            task.setStatus(CallStatus.sick);
            if (task.supportsFallback()) {
                return task.callFallback();
            } else {
                throw new RequestRejectedException(String.format("Service(%s)'s circuit-breaker is open!", property.getService()));
            }
        }

        if (strategy.isBusy()) {
            task.setStatus(CallStatus.no_available_thread);
            if (task.supportsFallback()) {
                return task.callFallback();
            } else {
                throw new NoAvailableWorkerException(String.format("Service(%s) has no available worker in client!", property.getService()));
            }
        } else {
            try {
                return callCommand(task, metric);
            } finally {
                strategy.release();
            }
        }
    }

...

public boolean isBusy() {
        return !semaphore.tryAcquire();
}

...

private void initCalculator() {
        this.calculator = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(property.getCheckWindowInMillis());
                    } catch (InterruptedException e) {
                        break;
                    }

                    metrics.values().forEach((metric) -> {
                        metric.calc();
                    });
                }
            }
        });
        this.calculator.start();
    }

...

public void calc() {
        MetricSnapshot check = array.calcCheck();
        if (open && (check.getCircuitBreak() + check.getTotal()) < 1.0f) {
            logger.error("Api({}), Action(Close), Info(No request for a while)!", apiName);
            open = false;
            return;
        }

        if (open) {
            return;
        }

        if (check.getTotal() >= property.getRequestCountThreshold()) {
            float total = check.getTotal();
            float error = check.getError();
            float errorPercentage = error / total;
            if (errorPercentage > property.getErrorPercentageThreshold()) {
                lastSingleTestTime = new AtomicLong(System.currentTimeMillis());
                logger.error(
                        "Api({}), Action(Open), Info(Error percentage: {}, Total request: {}, all requests will try to invoke fallback instead, single test will start after {}ms)!",
                        apiName, errorPercentage, total, property.getSingleTestWindowInMillis());
                open = true;
            } else {
                logger.debug("Api({}), Action(Check), Info(Error percentage: {}, Total request: {})!", apiName, errorPercentage, total);
            }
        }
    }

metric.isOpen() 性能指标的统计通过一个线程定时计算出一个指标快照,判断出是否达到性能瓶颈
strategy.isBusy() 目前只对semaphore 策略有效。

通信协议

目前支持json协议和thrift二进制协议

thrift二进制协议

兼容Apache thrift 二进制协议,不过为了追踪各个SOA服务之间的调用情况,会尝试加入了一些额外的信息。


public Object invoke() throws TUserException, TSystemException, TUnknownException {
        Object e;
        try {
            this.tSocket.open();
            if(this.tSocket.isUpgraded() == null && this.tryUpgraded) {
                try {
                    this.writeRequest(UPGRADE_METHODINFO, new Object[]{AgentConfiguration.getServiceName()});
                    this.readResponse(UPGRADE_METHODINFO, true);
                    this.tSocket.setUpgraded(true);
                } catch (TApplicationException var6) {
                    this.tSocket.setUpgraded(false);
                }
            }

            this.writeRequest(this.methodInfo, this.args);
            e = this.readResponse(this.methodInfo, false);
        } catch (TTransportException | TProtocolException var7) {
            this.tSocket.closeImpl();
            throw var7;
        } finally {
            this.tSocket.close();
        }

        return e;
    }

调用rpc 接口的时候,会尝试调用UPGRADE_METHODINFO :__thriftpy_tracing_method_name__v2
接口,判断service是否支持额外字段,从而升级。

  • thrift二进制协议格式
 protected void writeRequest(MethodInfo methodInfo, Object[] args) {
        if(Boolean.TRUE.equals(this.tSocket.isUpgraded())) {
            TraceHeader header = new TraceHeader(Trace.getCurrentRequestId(), Trace.getCurrentRpcIdWithAppId(), this.context.exportMeta());
            this.writeBean(TRACE_HEADER, header);
        }

        this.writeMessageBegin(new TMessage(methodInfo.getName(), 1, 1));
        this.writeArgs(methodInfo, args);
        this.writeMessageEnd();
        this.flush();
    }

协议格式

bool类型:

一个字节的类型,两个字节的字段编号,一个字节的值(true:1,false:0).

Byte类型:

一个字节的类型,两个字节的字段编号,一个字节的值.

I16类型:

一个字节的类型,两个字节的字段编号,两个字节的值.

I32类型:

一个字节的类型,两个字节的字段编号,四个字节的值.

I64类型和double类型:

一个字节的类型,两个字节的字段编号,八个字节的值.

String类型:

一个字节的类型,两个字节的字段编号,四个字节的负载数据长度,负载数据的值.

Struct类型:

一个字节的类型,两个字节的字段编号,结构体负载数据,一个字节的结束标记.

MAP类型:

一个字节的类型,两个字节的字段编号,一个字节的键类型,一个字节的值类型,四个字节的负载数据长度,负载数据的值.

Set类型:

一个字节的类型,两个字节的字段编号,一个字节的值类型,四个字节的负载数据长度,负载数据的值.

List类型:

一个字节的类型,两个字节的字段编号,一个字节的值类型,四个字节的负载数据长度,负载数据的值.

消息(函数)类型:
四个字节的版本(含调用类型),四个字节的消息名称长度,消息名称,四个字节的流水号,消息负载数据的值,一个字节的结束标记。

  • 数据传输

数据传输通过tcp传输结束之后关闭长连接(留点长连接缓冲池是不是会好点),

json协议

发现json协议其实是走的http 协议POST方法传输数据, 此处省略

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,067评论 19 139
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,385评论 11 349
  • 简介 用简单的话来定义tcpdump,就是:dump the traffic on a network,根据使用者...
    保川阅读 5,990评论 1 13
  • 每个人,都应该学会相应的生存能力,每一个单身人,则更应该多培养一点生存能力,多学一些东西,记得多学会照顾自己,比如...
    遗弃小屋阅读 592评论 0 4
  • 加到大了不起的微信,撒花!
    李葳Taiwan阅读 130评论 0 0