Current HA in openLooKeng:
https://openlookeng.io/docs/docs/installation/deployment-ha.html
Active-Active High Availability Cluster
The main purpose of an active-active cluster is to achieve load balancing.
An active-active cluster is typically made up of at least two nodes, both actively running the same kind of service simultaneously.
Active-Passive High Availability Cluster
The passive (a.k.a. failover) server serves as a backup that's ready to take over as soon as the active (a.k.a. primary) server gets disconnected or is unable to serve.
PrestoServer 启一个服务器线程,里面包含创建一个ServerMainModule,用于根据节点的配置来创建CoordinatorModule或WorkerModule。换句话讲现在是一个Server对应一个CN或者WN。如果说加入反向代理,这个关系不需要改变,需要改变的是在某一个节点上新增一个Proxy线程,用于发现所有CN并做相应的负载均衡。
https://www.bbsmax.com/A/QW5Y0E0GJm/ (服务发现独立部署,解决Presto on Yarn Coordinator地址变化问题)
解析PrestoProxy:
public final class PrestoProxy
{
private PrestoProxy() {}
public static void start(Module... extraModules)
{
Bootstrap app = new Bootstrap(ImmutableList.<Module>builder()
.add(new NodeModule())
.add(new HttpServerModule())
.add(new JsonModule())
.add(new JaxrsModule())
.add(new MBeanModule())
.add(new JmxModule())
.add(new LogJmxModule())
.add(new TraceTokenModule())
.add(new EventModule())
.add(new ProxyModule())
.add(extraModules)
.build());
Logger log = Logger.get(PrestoProxy.class);
try {
app.strictConfig().initialize();
log.info("======== SERVER STARTED ========");
}
catch (Throwable t) {
log.error(t);
System.exit(1);
}
}
public static void main(String[] args)
{
start();
}
}
NodeModule:注册当前节点信息
HttpServerModule: 以http server提供给cli访问
ProxyModule.java
configBinder(binder).bindConfigDefaults(HttpServerConfig.class, httpServerConfig -> {
httpServerConfig.setAdminEnabled(false);
});
httpClientBinder(binder).bindHttpClient("proxy", ForProxy.class);
ProxyResource.java
@Inject
public ProxyResource(@ForProxy HttpClient httpClient, JsonWebTokenHandler jwtHandler, ProxyConfig config)
{
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.jwtHandler = requireNonNull(jwtHandler, "jwtHandler is null");
this.remoteUri = requireNonNull(config.getUri(), "uri is null");
this.hmac = hmacSha256(loadSharedSecret(config.getSharedSecretFile()));
}
ProxyModule:
ProxyConfig 一些与代理相关的配置,包括:
uri
sharedSecretFile ?????
ProxyResource执行具体的请求及获取响应,下面逐行理解该类代码:
private static final String X509_ATTRIBUTE = "javax.servlet.request.X509Certificate";
private static final Duration ASYNC_TIMEOUT = new Duration(2, MINUTES);
private static final JsonFactory JSON_FACTORY = new JsonFactory().disable(CANONICALIZE_FIELD_NAMES);
private final ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("proxy-%s")); // new一个执行线程?
private final HttpClient httpClient; // proxy提供给外部的httpclient实例,由外部注册进来
private final JsonWebTokenHandler jwtHandler;
private final URI remoteUri; // proxy提供给外部访问的uri
private final HashFunction hmac;
RESTful API
@GET
@Path("/v1/info") // 对外的接口名
@Produces(APPLICATION_JSON)
public void getInfo(
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
Request.Builder request = prepareGet()
.setUri(uriBuilderFrom(remoteUri).replacePath("/v1/info").build()); // build request
performRequest(servletRequest, asyncResponse, request, response ->
responseWithHeaders(Response.ok(response.getBody()), response)); // 执行请求,获取结果
}
@POST
@Path("/v1/statement") // 对外的接口名
@Produces(APPLICATION_JSON)
public void postStatement(
String statement,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
Request.Builder request = preparePost()
.setUri(uriBuilderFrom(remoteUri).replacePath("/v1/statement").build())
.setBodyGenerator(createStaticBodyGenerator(statement, UTF_8));
performRequest(servletRequest, asyncResponse, request, response -> buildResponse(uriInfo, response));
}
@GET
@Path("/v1/proxy") // 对外的接口名
@Produces(APPLICATION_JSON)
public void getNext(
@QueryParam("uri") String uri,
@QueryParam("hmac") String hash,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
if (!hmac.hashString(uri, UTF_8).equals(HashCode.fromString(hash))) {
throw badRequest(FORBIDDEN, "Failed to validate HMAC of URI");
}
Request.Builder request = prepareGet().setUri(URI.create(uri));
performRequest(servletRequest, asyncResponse, request, response -> buildResponse(uriInfo, response));
}
@DELETE
@Path("/v1/proxy") // 对外的接口名
@Produces(APPLICATION_JSON)
public void cancelQuery(
@QueryParam("uri") String uri,
@QueryParam("hmac") String hash,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
if (!hmac.hashString(uri, UTF_8).equals(HashCode.fromString(hash))) {
throw badRequest(FORBIDDEN, "Failed to validate HMAC of URI");
}
Request.Builder request = prepareDelete().setUri(URI.create(uri));
performRequest(servletRequest, asyncResponse, request, response -> responseWithHeaders(noContent(), response));
}
service-inventory.json这是一个比较重要的文件,里面记录了整个集群的信息,discovery集群利用这个配置文件获取集群的所有信息,知道集群中所有部署的情况及如何与其它节点进行通信。它的配置如下:
{
"environment": "test",
"services": [
{
"id": "C8A9EE64-0476-452C-8638-8E72F3EE3CA6",
"nodeId": "597A741E-9968-40E2-BB4D-7AF26DE18689",
"type": "discovery",
"pool": "general",
"location": "/172.17.31.245",
"state": "RUNNING",
"properties": {
"http": "[http://172.17.31.245:8411](http://172.17.31.245:8411/)"
}
},
{
"id": "370AF416-5F44-47D3-BFB6-D93A92676D49",
"nodeId": "0BA42FDB-5DBA-4A2C-BE26-9596B7B4368E",
"type": "discovery",
"pool": "general",
"location": "/172.17.31.246",
"state": "RUNNING",
"properties": {
"http": "[http://172.17.31.246:8411](http://172.17.31.246:8411/)"
}
}]
}
HetuDiscoveryModule 绑定 HetuServiceInventory, 从共享StateStore里获取到所有discovery-service里的节点,并取第一个作为提供服务的节点;Prxoy模式下无法从ServiceInventory里获取到确切地连接地址信息,需要动态地从StateStore里读取到所有可用的Coordinator列表,并基于某种LB策略选取列表中的某个Coordinator来执行本次任务。