最近几个星期学习了点后端的东西,现在简单的记录下,先说说公司的的SOA(什么是SOA,google)框架。
服务注册
SOA 服务在启动的时候会向配置中心注册服务信息
CoreBootstrap.getInstance().include(args != null && args.length > 0?args:new String[]{"conf/Configure.json"}).start();
程序启动的时候会从配置文件读取服务的服务名, 集群名,协议, 业务端口,I/O处理线程池大小,业务处理线程池大小,缓冲队列大小 ,提供服务的接口名列表等信息, 将服务名,集群名,机器ip,端口号 注册到配置中心
服务发现
SOA 客户端通过配置文件中的服务名,集群名,协议 等信息创建SOA 客户端
ClientUtil.getContext().initClients("conf/Client.json");
- 负载均衡
SOA 服务的负载均衡是在客户端做的,客户端在初始化的时候从配置中心根据服务名拉取ip 数组, 通过round-robin 做负载均衡
public String getUrl() {
int index = this.counter.getAndIncrement();
index %= this.providerList.size();
index = index >= 0?index:index + this.providerList.size();
return (String)this.providerList.get(index);
}
- 线程池策略
客户端线程池策略支持semaphore(线程池满时抛出异常) 和 queue(线程池满时排队等待执行);
public Object execute(Task task) throws Throwable {
BreakerMetrics metric = metrics.get(task.getMethod());
if (metric.isOpen()) {
metric.increment(BreakerStatus.BREAKER_REJECT);
task.setStatus(CallStatus.sick);
if (task.supportsFallback()) {
return task.callFallback();
} else {
throw new RequestRejectedException(String.format("Service(%s)'s circuit-breaker is open!", property.getService()));
}
}
if (strategy.isBusy()) {
task.setStatus(CallStatus.no_available_thread);
if (task.supportsFallback()) {
return task.callFallback();
} else {
throw new NoAvailableWorkerException(String.format("Service(%s) has no available worker in client!", property.getService()));
}
} else {
try {
return callCommand(task, metric);
} finally {
strategy.release();
}
}
}
...
public boolean isBusy() {
return !semaphore.tryAcquire();
}
...
private void initCalculator() {
this.calculator = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(property.getCheckWindowInMillis());
} catch (InterruptedException e) {
break;
}
metrics.values().forEach((metric) -> {
metric.calc();
});
}
}
});
this.calculator.start();
}
...
public void calc() {
MetricSnapshot check = array.calcCheck();
if (open && (check.getCircuitBreak() + check.getTotal()) < 1.0f) {
logger.error("Api({}), Action(Close), Info(No request for a while)!", apiName);
open = false;
return;
}
if (open) {
return;
}
if (check.getTotal() >= property.getRequestCountThreshold()) {
float total = check.getTotal();
float error = check.getError();
float errorPercentage = error / total;
if (errorPercentage > property.getErrorPercentageThreshold()) {
lastSingleTestTime = new AtomicLong(System.currentTimeMillis());
logger.error(
"Api({}), Action(Open), Info(Error percentage: {}, Total request: {}, all requests will try to invoke fallback instead, single test will start after {}ms)!",
apiName, errorPercentage, total, property.getSingleTestWindowInMillis());
open = true;
} else {
logger.debug("Api({}), Action(Check), Info(Error percentage: {}, Total request: {})!", apiName, errorPercentage, total);
}
}
}
metric.isOpen() 性能指标的统计通过一个线程定时计算出一个指标快照,判断出是否达到性能瓶颈
strategy.isBusy() 目前只对semaphore 策略有效。
通信协议
目前支持json协议和thrift二进制协议
thrift二进制协议
兼容Apache thrift 二进制协议,不过为了追踪各个SOA服务之间的调用情况,会尝试加入了一些额外的信息。
public Object invoke() throws TUserException, TSystemException, TUnknownException {
Object e;
try {
this.tSocket.open();
if(this.tSocket.isUpgraded() == null && this.tryUpgraded) {
try {
this.writeRequest(UPGRADE_METHODINFO, new Object[]{AgentConfiguration.getServiceName()});
this.readResponse(UPGRADE_METHODINFO, true);
this.tSocket.setUpgraded(true);
} catch (TApplicationException var6) {
this.tSocket.setUpgraded(false);
}
}
this.writeRequest(this.methodInfo, this.args);
e = this.readResponse(this.methodInfo, false);
} catch (TTransportException | TProtocolException var7) {
this.tSocket.closeImpl();
throw var7;
} finally {
this.tSocket.close();
}
return e;
}
调用rpc 接口的时候,会尝试调用UPGRADE_METHODINFO :__thriftpy_tracing_method_name__v2
接口,判断service是否支持额外字段,从而升级。
- thrift二进制协议格式
protected void writeRequest(MethodInfo methodInfo, Object[] args) {
if(Boolean.TRUE.equals(this.tSocket.isUpgraded())) {
TraceHeader header = new TraceHeader(Trace.getCurrentRequestId(), Trace.getCurrentRpcIdWithAppId(), this.context.exportMeta());
this.writeBean(TRACE_HEADER, header);
}
this.writeMessageBegin(new TMessage(methodInfo.getName(), 1, 1));
this.writeArgs(methodInfo, args);
this.writeMessageEnd();
this.flush();
}
bool类型:
一个字节的类型,两个字节的字段编号,一个字节的值(true:1,false:0).
Byte类型:
一个字节的类型,两个字节的字段编号,一个字节的值.
I16类型:
一个字节的类型,两个字节的字段编号,两个字节的值.
I32类型:
一个字节的类型,两个字节的字段编号,四个字节的值.
I64类型和double类型:
一个字节的类型,两个字节的字段编号,八个字节的值.
String类型:
一个字节的类型,两个字节的字段编号,四个字节的负载数据长度,负载数据的值.
Struct类型:
一个字节的类型,两个字节的字段编号,结构体负载数据,一个字节的结束标记.
MAP类型:
一个字节的类型,两个字节的字段编号,一个字节的键类型,一个字节的值类型,四个字节的负载数据长度,负载数据的值.
Set类型:
一个字节的类型,两个字节的字段编号,一个字节的值类型,四个字节的负载数据长度,负载数据的值.
List类型:
一个字节的类型,两个字节的字段编号,一个字节的值类型,四个字节的负载数据长度,负载数据的值.
消息(函数)类型:
四个字节的版本(含调用类型),四个字节的消息名称长度,消息名称,四个字节的流水号,消息负载数据的值,一个字节的结束标记。
- 数据传输
数据传输通过tcp传输结束之后关闭长连接(留点长连接缓冲池是不是会好点),
json协议
发现json协议其实是走的http 协议POST方法传输数据, 此处省略