客户端基本逻辑
核心类 ConfigurationDiscoveryService
public class ConfigurationDiscoveryService implements BootService, GRPCChannelListener
// 初始化, 每20秒调用一次 grc 服务端, 进行配置拉取
getDynamicConfigurationFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("ConfigurationDiscoveryService")
).scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this::getAgentDynamicConfig,
t -> LOGGER.error("Sync config from OAP error.", t)
),
Config.Collector.GET_AGENT_DYNAMIC_CONFIG_INTERVAL,
Config.Collector.GET_AGENT_DYNAMIC_CONFIG_INTERVAL,
TimeUnit.SECONDS
);
拉取逻辑
try {
...
if (configurationDiscoveryServiceBlockingStub != null) {
final Commands commands = configurationDiscoveryServiceBlockingStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).fetchConfigurations(builder.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
} catch (Throwable t) {
...
}
执行 Commands : ConfigurationDiscoveryCommandExecutor
ConfigurationDiscoveryCommand agentDynamicConfigurationCommand = (ConfigurationDiscoveryCommand) command;
ServiceManager.INSTANCE.findService(ConfigurationDiscoveryService.class).handleConfigurationDiscoveryCommand(agentDynamicConfigurationCommand);
最终还是调用到 ConfigurationDiscoveryService#handleConfigurationDiscoveryCommand, 逻辑如下
// 1. 读取配置
List<KeyStringValuePair> config = readConfig(configurationDiscoveryCommand);
config.forEach(property -> {
// 获取对应的属性的 WatcherHolder
String propertyKey = property.getKey();
WatcherHolder holder = register.get(propertyKey);
if (holder != null) {
AgentConfigChangeWatcher watcher = holder.getWatcher();
String newPropertyValue = property.getValue();
// 判断到新值为空, 旧值不为空, 则发送删除事件
if (StringUtil.isBlank(newPropertyValue)) {
if (watcher.value() != null) {
// Notify watcher, the new value is null with delete event type.
watcher.notify(
new AgentConfigChangeWatcher.ConfigChangeEvent(
null, AgentConfigChangeWatcher.EventType.DELETE
));
} else {
// Don't need to notify, stay in null.
}
} else {
// 新旧值都存在, 并且不相等, 发送修改事件
if (!newPropertyValue.equals(watcher.value())) {
watcher.notify(new AgentConfigChangeWatcher.ConfigChangeEvent(
newPropertyValue, AgentConfigChangeWatcher.EventType.MODIFY
));
} else {
// Don't need to notify, stay in the same config value.
}
}
} else {
LOGGER.warn("Config {} from OAP, doesn't match any watcher, ignore.", propertyKey);
}
});
AgentConfigChangeWatcher 的注册
通过 ConfigurationDiscoveryService#registerAgentConfigChangeWatcher 方法注册, 目前有以下几种
- SpanLimitWatcher("agent.span_limit_per_segment")
- IgnoreSuffixPatternsWatcher("agent.ignore_suffix", this)
- SamplingRateWatcher("agent.sample_n_per_3_secs", this)
- TraceIgnorePatternWatcher("agent.trace.ignore_path", this)
事件处理方式, DELETE 事件使用默认值, 修改则替换新值, activeSetting 包含值的替换, 以及类似对应服务属性的更新的操作
public void notify(final ConfigChangeEvent value) {
if (EventType.DELETE.equals(value.getEventType())) {
activeSetting(getDefaultValue());
} else {
activeSetting(value.getNewValue());
}
}
服务端grpc服务
grpc实现类: ConfigurationDiscoveryServiceHandler, 在 Configuration-discovery-receiver-plugin 包中
对应到 skywalking 服务端 application.yml 配置
configuration-discovery:
selector: ${SW_CONFIGURATION_DISCOVERY:default}
default:
disableMessageDigest: ${SW_DISABLE_MESSAGE_DIGEST:false}
此模块依赖 Configuration 模块的事件更新机制 ConfigChangeWatcher
对应配置中心的key 为 configuration-discovery.default.agentConfigurations
对应文档: Dynamic Configuration
对应配置内容为
configurations:
//service name
serviceA:
// Configurations of service A
// Key and Value are determined by the agent side.
// Check the agent setup doc for all available configurations.
key1: value1
key2: value2
...
serviceB:
...
Config Key | Value Description | Value Format Example | Required Plugin(s) |
---|---|---|---|
agent.sample_n_per_3_secs | The number of sampled traces per 3 seconds | -1 | - |
agent.ignore_suffix | If the operation name of the first span is included in this set, this segment should be ignored. Multiple values should be separated by ,
|
.txt,.log |
- |
agent.trace.ignore_path | The value is the path that you need to ignore, multiple paths should be separated by , more details
|
/your/path/1/**,/your/path/2/** |
apm-trace-ignore-plugin |
agent.span_limit_per_segment | The max number of spans per segment. | 300 |
- |
主体逻辑
- AgentConfigurationsWatcher 收到更新通知后, 通过核心配置读取逻辑 AgentConfigurationsReader#readAgentConfigurationsTable 对属性 AgentConfigurationsTable 进行更新,
// 配置更新事件
@Override
public void notify(ConfigChangeEvent value) {
if (value.getEventType().equals(EventType.DELETE)) {
settingsString = Const.EMPTY_STRING;
this.agentConfigurationsTable = new AgentConfigurationsTable();
} else {
settingsString = value.getNewValue();
AgentConfigurationsReader agentConfigurationsReader =
new AgentConfigurationsReader(new StringReader(value.getNewValue()));
this.agentConfigurationsTable = agentConfigurationsReader.readAgentConfigurationsTable();
}
}
// 解析配置
public AgentConfigurationsTable readAgentConfigurationsTable() {
AgentConfigurationsTable agentConfigurationsTable = new AgentConfigurationsTable();
try {
if (Objects.nonNull(yamlData)) {
Map configurationsData = (Map) yamlData.get("configurations");
if (configurationsData != null) {
configurationsData.forEach((k, v) -> {
Map map = (Map) v;
StringBuilder serviceConfigStr = new StringBuilder();
Map<String, String> config = new HashMap<>(map.size());
map.forEach((key, value) -> {
config.put(key.toString(), value.toString());
serviceConfigStr.append(key.toString()).append(":").append(value.toString());
});
AgentConfigurations agentConfigurations = new AgentConfigurations(
k.toString(), config, DigestUtils.sha512Hex(serviceConfigStr.toString()));
agentConfigurationsTable.getAgentConfigurationsCache()
.put(agentConfigurations.getService(), agentConfigurations);
});
}
}
} catch (Exception e) {
log.error("Read ConfigurationDiscovery configurations error.", e);
}
return agentConfigurationsTable;
}
- 通过 grpc 读取对应 service 返回配置, AgentConfigurationsWatcher#getAgentConfigurations
public AgentConfigurations getAgentConfigurations(String service) {
AgentConfigurations agentConfigurations = agentConfigurationsTable.getAgentConfigurationsCache().get(service);
if (null == agentConfigurations) {
return emptyAgentConfigurations;
} else {
return agentConfigurations;
}
}