如何复制Pinpoint中一条调用链的完整数据
如果你熟悉Pinpoint的话,你应该知道一条调用链包含哪些数据
在这里我指的是com.navercorp.pinpoint.web.controller.BusinessTransactionController#transactionInfo方法中查询会涉及到的HBase数据。
为什么会产生这个需求呢,HBase中的数据都是配置了TTL的,过一段时间会被清理,你可能就和要这条骨骼惊奇的调用链说拜拜了。
如果能够离线或者保存另外的HBase里,可以更快的复现场景进行调试和排查。
先看一下到底会查哪些表吧。
- TraceV2
- ApiMetaData
- StringMetaData
- SqlMetaData_Ver2
Pinpoint在构造调用链界面需要的信息的时候,TraceV2是一行数据,用事务号就能查询出来。
然后遍历这行数据里的Span和SpanEvent,使用包含的apiId,stringId,sqlId去后三个表对应查询所关联的数据。
后面三个,一个复杂的调用链会查询很多次。怎么才能知道呢?
下面只是记录一下本地试验的方法,仅在测试环境中使用。
利用Spring AOP 将hbase查询结果 插入到另外的hbase中
我想了下,如果在hbase查询的时候进行AOP拦截,并且把数据发送到另外一个hbase的话,这样不就能把一条调用链的数据给剥离出来了么?
我把将查询出来的数据插入到别的hbase的过程叫做逆转。
我们要寻找一些合适的spring bean,因为spring的aop只能作用在spring创建的对象上。
首先我注意到 com.navercorp.pinpoint.common.hbase.RowMapper#mapRow
public interface RowMapper<T> {
T mapRow(Result result, int rowNum) throws Exception;
}
第一个参数org.apache.hadoop.hbase.client.Result包含了查询出来的一行数据,现在就差个表名了。
仔细看pinpoint的代码,每次执行hbase操作前都会调用com.navercorp.pinpoint.common.hbase.TableDescriptor#getTableName
获取表名。
这样设置一个线程上下文(https://github.com/apache/shiro/blob/master/core/src/main/java/org/apache/shiro/util/ThreadContext.java),就能将表名和多行数据完整联系在一起了。
最后线程上下文里面还需要设置一个是否逆转查询数据的标志。
对于查询单条调用链来说就是com.navercorp.pinpoint.web.service.SpanService#selectSpan方法进入的时候开启标志。
实现
首先仿照shiro弄个线程上下文。
package com.navercorp.pinpoint.web.dao;
import java.util.HashMap;
import java.util.Map;
/**
* @author tankilo
* https://github.com/apache/shiro/blob/master/core/src/main/java/org/apache/shiro/util/ThreadContext.java
*/
public final class ThreadContext {
private ThreadContext() {
}
public static final String REVERSE = "REVERSE";
public static final String TABLE_NAME = "TABLE_NAME";
private static ThreadLocal<Map<String, Object>> resources = new InheritableThreadLocal<Map<String, Object>>() {
@Override
protected Map<String, Object> initialValue() {
return new HashMap<>(8);
}
};
public static void put(String key, Object value) {
if (key == null) {
throw new IllegalArgumentException("key cannot be null");
}
if (value == null) {
remove(key);
return;
}
ensureResourcesInitialized();
resources.get().put(key, value);
}
private static void ensureResourcesInitialized() {
if (resources.get() == null) {
resources.set(new HashMap<>(8));
}
}
public static void remove(String key) {
Map<String, Object> map = resources.get();
if (map != null) {
map.remove(key);
}
}
public static Object get(String key) {
Map<String, Object> map = resources.get();
if (map != null) {
return map.get(key);
} else {
return null;
}
}
private static Object getValue(Object key) {
Map<String, Object> perThreadResources = resources.get();
return perThreadResources != null ? perThreadResources.get(key) : null;
}
private static Boolean getBoolean(String key, Boolean defaultValue) {
Object value = getValue(key);
if (null != value) {
return Boolean.valueOf(value.toString());
}
return defaultValue;
}
private static String getString(String key) {
Object value = getValue(key);
if (null != value) {
return value.toString();
} else {
return null;
}
}
public static void setReverse(boolean reverse) {
put(REVERSE, reverse);
}
public static boolean isReverse() {
return getBoolean(REVERSE, false);
}
public static void setTableName(String tableName) {
put(TABLE_NAME, tableName);
}
public static String getTableName() {
return getString(TABLE_NAME);
}
public static void remove() {
resources.remove();
}
}
切面
@Aspect
public class HbaseTemplateReverseAspect {
@Autowired
@Qualifier("hbaseTemplateReverse")
private HbaseOperations2 template2;
@Autowired
private HbaseTableNameProvider tableNameProvider;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Pointcut("execution(public * com.navercorp.pinpoint.common.hbase.RowMapper.mapRow(..))")
public void pointCut() {
}
@After("pointCut() && args(result,rowNum)")
public void doBefore(Result result, int rowNum) {
if (ThreadContext.isReverse()) {
String tableNameStr = ThreadContext.getTableName();
TableName tableName = tableNameProvider.getTableName(tableNameStr);
Put put = new Put(result.getRow());
Cell[] rawCells = result.rawCells();
for (Cell cell : rawCells) {
put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp(), CellUtil.cloneValue(cell));
}
template2.asyncPut(tableName, put);
}
}
@Before("execution(public * com.navercorp.pinpoint.web.dao.hbase.HbaseTraceDaoV2.selectSpan(..))")
public void selectSpan() {
ThreadContext.setTableName(HbaseTable.TRACE_V2.getName());
}
@AfterReturning(returning = "tableName", pointcut = "execution(public * com.navercorp.pinpoint.common.hbase.TableDescriptor.getTableName())")
public void getTableName(TableName tableName) {
ThreadContext.setTableName(tableName.getNameAsString());
}
@Around("execution(public * com.navercorp.pinpoint.web.service.SpanService.selectSpan(..))")
public Object dealContext(ProceedingJoinPoint jp) throws Throwable {
Object result;
try {
ThreadContext.setReverse(true);
result = jp.proceed();
} finally {
ThreadContext.remove();
}
return result;
}
}
遗憾
里面有些曲折,看上面代码也知道。
com.navercorp.pinpoint.web.dao.hbase.HbaseTraceDaoV2#spanMapperV2 不是spring bean,而是手动构造的。
这里我就没有继续弄下去了,因为感觉spring aop的局限性很大,计划用java instrumentation api,和pinpoint的agent一样重新弄下。
代码提交备份在我的github上 https://github.com/tankilo/pinpoint/tree/spring-aop-hbase-reverse