序
本文主要研究一下skywalking的RemoteClientManager
RemoteClientManager
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
public class RemoteClientManager implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
private final ModuleDefineHolder moduleDefineHolder;
private ClusterNodesQuery clusterNodesQuery;
private volatile List<RemoteClient> usingClients;
private GaugeMetrics gauge;
private int remoteTimeout;
/**
* Initial the manager for all remote communication clients.
*
* @param moduleDefineHolder for looking up other modules
* @param remoteTimeout for cluster internal communication, in second unit.
*/
public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) {
this.moduleDefineHolder = moduleDefineHolder;
this.usingClients = ImmutableList.of();
this.remoteTimeout = remoteTimeout;
}
public void start() {
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refresh, 1, 5, TimeUnit.SECONDS);
}
/**
* Query OAP server list from the cluster module and create a new connection for the new node. Make the OAP server
* orderly because of each of the server will send stream data to each other by hash code.
*/
void refresh() {
if (gauge == null) {
gauge = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
.createGauge("cluster_size", "Cluster size of current oap node",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
try {
if (Objects.isNull(clusterNodesQuery)) {
synchronized (RemoteClientManager.class) {
if (Objects.isNull(clusterNodesQuery)) {
this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("Refresh remote nodes collection.");
}
List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
instanceList = distinct(instanceList);
Collections.sort(instanceList);
gauge.setValue(instanceList.size());
if (logger.isDebugEnabled()) {
instanceList.forEach(instance -> logger.debug("Cluster instance: {}", instance.toString()));
}
if (!compare(instanceList)) {
if (logger.isDebugEnabled()) {
logger.debug("ReBuilding remote clients.");
}
reBuildRemoteClients(instanceList);
}
printRemoteClientList();
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
//......
}
- RemoteClientManager提供了getRemoteClient方法获取usingClients,它还提供了start方法,该方法注册一个定时任务每隔5秒执行一次refresh;refresh方法通过clusterNodesQuery.queryRemoteNodes()获取instanceList列表,然后根据Address去重一下再排序,然后跟本地的RemoteClient列表进行对比,如果有发现变更则触发reBuildRemoteClients操作,最后在执行printRemoteClientList
reBuildRemoteClients
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
public class RemoteClientManager implements Service {
//......
private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream()
.collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close)));
final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream()
.collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create)));
final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet());
unChangeAddresses.stream()
.filter(remoteClientCollection::containsKey)
.forEach(unChangeAddress -> remoteClientCollection.get(unChangeAddress).setAction(Action.Unchanged));
// make the latestRemoteClients including the new clients only
unChangeAddresses.forEach(latestRemoteClients::remove);
remoteClientCollection.putAll(latestRemoteClients);
final List<RemoteClient> newRemoteClients = new LinkedList<>();
remoteClientCollection.forEach((address, clientAction) -> {
switch (clientAction.getAction()) {
case Unchanged:
newRemoteClients.add(clientAction.getRemoteClient());
break;
case Create:
if (address.isSelf()) {
RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
newRemoteClients.add(client);
} else {
RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout);
client.connect();
newRemoteClients.add(client);
}
break;
}
});
//for stable ordering for rolling selector
Collections.sort(newRemoteClients);
this.usingClients = ImmutableList.copyOf(newRemoteClients);
remoteClientCollection.values()
.stream()
.filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close))
.forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close());
}
//......
}
- reBuildRemoteClients方法先构建remoteClientCollection及latestRemoteClients,然后取交集得到unChangeAddresses,然后从latestRemoteClients移除unChangeAddresses,最后再把latestRemoteClients添加到remoteClientCollection;之后遍历remoteClientCollection,对于action为Create的区分为SelfRemoteClient及GRPCRemoteClient,对于GRPCRemoteClient的还执行一下connect操作;最后对newRemoteClients进行排序,然后重新赋值给usingClients;最后对于action为close的remoteClient执行close操作
RemoteSenderService
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java
public class RemoteSenderService implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteSenderService.class);
private final ModuleManager moduleManager;
private final HashCodeSelector hashCodeSelector;
private final ForeverFirstSelector foreverFirstSelector;
private final RollingSelector rollingSelector;
public RemoteSenderService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.hashCodeSelector = new HashCodeSelector();
this.foreverFirstSelector = new ForeverFirstSelector();
this.rollingSelector = new RollingSelector();
}
public void send(String nextWorkName, StreamData streamData, Selector selector) {
RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class);
RemoteClient remoteClient = null;
List<RemoteClient> clientList = clientManager.getRemoteClient();
if (clientList.size() == 0) {
logger.warn("There is no available remote server for now, ignore the streaming data until the cluster metadata initialized.");
return;
}
switch (selector) {
case HashCode:
remoteClient = hashCodeSelector.select(clientList, streamData);
break;
case Rolling:
remoteClient = rollingSelector.select(clientList, streamData);
break;
case ForeverFirst:
remoteClient = foreverFirstSelector.select(clientList, streamData);
break;
}
remoteClient.push(nextWorkName, streamData);
}
}
- RemoteSenderService提供了send方法,该方法从clientManager.getRemoteClient()获取clientList,然后根据selector类型从中选取一个remoteClient执行remoteClient.push(nextWorkName, streamData)
RemoteClientSelector
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java
public interface RemoteClientSelector {
RemoteClient select(List<RemoteClient> clients, StreamData streamData);
}
- RemoteClientSelector定义了select方法
HashCodeSelector
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java
public class HashCodeSelector implements RemoteClientSelector {
@Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {
int size = clients.size();
int selectIndex = Math.abs(streamData.remoteHashCode()) % size;
return clients.get(selectIndex);
}
}
- HashCodeSelector实现了RemoteClientSelector接口,它通过
Math.abs(streamData.remoteHashCode()) % size
来选择selectIndex
RollingSelector
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java
public class RollingSelector implements RemoteClientSelector {
private int index = 0;
@Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {
int size = clients.size();
index++;
int selectIndex = Math.abs(index) % size;
if (index == Integer.MAX_VALUE) {
index = 0;
}
return clients.get(selectIndex);
}
}
- RollingSelector实现了RemoteClientSelector接口,它通过每次递增index然后根据
Math.abs(index) % size
选择selectIndex
ForeverFirstSelector
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java
public class ForeverFirstSelector implements RemoteClientSelector {
private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
@Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {
if (logger.isDebugEnabled()) {
logger.debug("clients size: {}", clients.size());
}
return clients.get(0);
}
}
- ForeverFirstSelector实现了RemoteClientSelector接口,它始终返回第一个client
小结
RemoteClientManager提供了getRemoteClient方法获取usingClients,它还提供了start方法,该方法注册一个定时任务每隔5秒执行一次refresh;refresh方法通过clusterNodesQuery.queryRemoteNodes()获取instanceList列表,然后根据Address去重一下再排序,然后跟本地的RemoteClient列表进行对比,如果有发现变更则触发reBuildRemoteClients操作,最后在执行printRemoteClientList