链路追踪方案
背景
目前日志使用aop+logback生成。
存在问题
1.高并发下日志大量生成以至于无法确认哪条日志对应哪个用户的操作;
2.日志生成过于零散缺乏统一的生成格式;
所以目前尝试引入链路追踪的方式来定位异常。
方案选择
服务分类 | cat | zipkin | pinpoint | skywalking | aws cloudwatch |
---|---|---|---|---|---|
依赖 | ·Java 6,7,8 ·Maven 3.2.3 ·mysql5.6 ·Linux 2.6以及之上(2.6内核才可以支持epoll) |
·Java 6,7,8 ·Maven3.2+ ·rabbitMQ |
·Java 6,7,8 ·maven3+ ·Hbase0.94+ |
·Java 6,7,8 ·maven3.0+ ·nodejs ·zookeeper ·elasticsearch |
·Java 6,7,8 ·maven3.0+ ·aws |
实现方式 | 代码埋点(拦截器,注解,过滤器等) | 拦截请求,发送(HTTP,mq)数据至zipkin服务 | java探针,字节码增强 | java探针,字节码增强 | java aop,fluent-bit传输 |
存储选择 | mysql , hdfs | in-memory , mysql , Cassandra , Elasticsearch | HBase | elasticsearch , H2 | cloudwatch |
通信方式 | http , MQ | thrift | GRPC | ||
MQ监控 | 不支持 | 不支持 | 不支持 | 支持 | 不支持 |
全局调用统计 | 支持 | 不支持 | 支持 | 支持 | 支持 |
trace查询 | 不支持 | 支持 | 不支持 | 支持 | 支持 |
报警 | 支持 | 不支持 | 支持 | 支持 | 支持 |
JVM监控 | 不支持 | 不支持 | 支持 | 支持 | 不支持 |
优点 | 功能完善 | spring-cloud-sleuth可以很好的集成zipkin , 代码无侵入,集成非常简单 , 社区更加活跃。对外提供有query接口,更加容易二次开发 | 完全无侵入, 仅需修改启动方式,界面完善,功能细致。 | 全无侵入,界面完善,支持应用拓扑图及单个调用链查询。功能比较完善(zipkin + pinpoint) | 可定制性强,可根据需求自定义任何功能 |
缺点 | ·代码侵入性较强,需要埋点 ·文档比较混乱,文档与发布版本的符合性较低,需要依赖点评私服 (或者需要把他私服上的jar手动下载下来,然后上传到我们的私服上去)。 |
·默认使用的是http请求向zipkin上报信息,耗性能。 ·跟sleuth结合可以使用rabbitMQ的方式异步来做,增加了复杂度,需要引入rabbitMQ 。·数据分析比较简单。 |
·不支持查询单个调用链, 对外表现的是整个应用的调用生态。 ·二次开发难度较高 |
·版本之前BUG较多 ,网上反映兼容性较差 ·3.2新版本的反映情况较少依赖较多。 |
·依赖于aws ·有一定学习成本 |
文档 | 网上资料较少,仅官网提供的文档,比较乱 | 文档完善 | 文档完善 | 文档完善 | 文档完善,但过于老旧,查询不方便 |
使用公司 | 大众点评, 携程, 陆金所,同程旅游,猎聘网 | naver | 华为软件开发云、天源迪科、当当网、京东金融 |
我们当前服务是单体服务,为此引入第三方过于复杂。
其次当前服务的监控依赖于aws的cloudwatch,目前通过fluent-bit将pods中的日志上传处理,暂时不希望引入额外的监控系统
最终决定通过拦截器+修改日志生成方案来解决。
详细设计
- aop日志生成
之前的日志生成过于零散,无法通过cloudwatch有效监控,报警,所以本次引入request 与 error request 类,通过 aop环绕方法统一打印日志
aop环绕方法,异常方法实现@Data public class RequestInfo { private String ip; private String url; private String httpMethod; private String classMethod; private Object requestParams; private Object result; private Long timeCost; } @Data public class RequestErrorInfo { private String ip; private String url; private String httpMethod; private String classMethod; private Object requestParams; private RuntimeException exception; }
@Around("webLogPointcut()") public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { long start = System.currentTimeMillis(); ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = attributes.getRequest(); Object result = proceedingJoinPoint.proceed(); RequestInfo requestInfo = new RequestInfo(); requestInfo.setIp(request.getRemoteAddr()); requestInfo.setUrl(request.getRequestURL().toString()); requestInfo.setHttpMethod(request.getMethod()); requestInfo.setClassMethod(String.format("%s.%s", proceedingJoinPoint.getSignature().getDeclaringTypeName(), proceedingJoinPoint.getSignature().getName())); requestInfo.setRequestParams(getRequestParamsByProceedingJoinPoint(proceedingJoinPoint)); requestInfo.setResult(result); requestInfo.setTimeCost(System.currentTimeMillis() - start); log.info("Request Info: {}", JSON.toJSONString(requestInfo)); return result; } /** * 异常通知: * 1. 在目标方法非正常结束,发生异常或者抛出异常时执行 * 1. 在异常通知中设置异常信息,并将其保存 * * @param throwable */ @AfterThrowing(value = "webLogPointcut()", throwing = "throwable") public void doAfterThrowing(JoinPoint joinPoint, RuntimeException throwable) { // 保存异常日志记录 ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = attributes.getRequest(); RequestErrorInfo requestErrorInfo = new RequestErrorInfo(); requestErrorInfo.setIp(request.getRemoteAddr()); requestErrorInfo.setUrl(request.getRequestURL().toString()); requestErrorInfo.setHttpMethod(request.getMethod()); requestErrorInfo.setClassMethod(String.format("%s.%s", joinPoint.getSignature().getDeclaringTypeName(), joinPoint.getSignature().getName())); requestErrorInfo.setRequestParams(getRequestParamsByJoinPoint(joinPoint)); requestErrorInfo.setException(throwable); log.error("Error Request Info: {}", JSON.toJSONString(requestErrorInfo)); } private Map<String, Object> getRequestParamsByProceedingJoinPoint(ProceedingJoinPoint proceedingJoinPoint) { //参数名 String[] paramNames = ((MethodSignature) proceedingJoinPoint.getSignature()).getParameterNames(); //参数值 Object[] paramValues = proceedingJoinPoint.getArgs(); return buildRequestParam(paramNames, paramValues); } private Map<String, Object> getRequestParamsByJoinPoint(JoinPoint joinPoint) { //参数名 String[] paramNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames(); //参数值 Object[] paramValues = joinPoint.getArgs(); return buildRequestParam(paramNames, paramValues); } private Map<String, Object> buildRequestParam(String[] paramNames, Object[] paramValues) { Map<String, Object> requestParams = new HashMap<>(); for (int i = 0; i < paramNames.length; i++) { Object value = paramValues[i]; //如果是文件对象 if (value instanceof MultipartFile) { MultipartFile file = (MultipartFile) value; value = file.getOriginalFilename(); //获取文件名 } requestParams.put(paramNames[i], value); } return requestParams; }
- 添加traceId (存在问题 已在下方修复)
通过ThreadContext存储traceId,traceId来源可以是前端,如果前端不包含则通过雪花算法生成一个id 2.1 添加traceId拦截器
2.2 项目中引入该拦截器@Component public class LogInterceptor implements HandlerInterceptor { private final static String TRACE_ID = "traceId"; @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String traceId = request.getHeader("traceId"); if (StringUtils.isEmpty(traceId)) { //traceId = java.util.UUID.randomUUID().toString().replaceAll("-", "").toUpperCase(); traceId = SnowflakeIdUtils.next().toString(); } ThreadContext.put("traceId", traceId); return true; } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { ThreadContext.remove(TRACE_ID); } }
@Configuration public class WebConfig implements WebMvcConfigurer { @Autowired LogInterceptor logInterceptor; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(logInterceptor); } }
- 编辑logback 日志生成方式
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--> <pattern>[TRACEID:%X{traceId}] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> </appender>
- aws eks fluent-bit cloudwatch 搭建监控,报警平台
通过cloudwatch 平台统一业务日志,并删选异常日志,日志提供所属容器及traceId等内容filter @logStream like 'test' //test 表示模糊查询项目 | fields kubernetes.pod_name //显示日志所属容器 | parse log "[TRACEID:*] * [*] * * - *" as @traceId,@time, @thread,@level,@source,@msg //解析logback日志 | filter @msg like "Request" //筛选request and error request 日志 | parse @msg "Request Info: {\"classMethod\":\"*\",\"httpMethod\":\"*\",\"ip\":\"*\",\"requestParams\":*,\"result\":*,\"timeCost\":*,\"url\":\"*\"" as classMethod,httpMethod,ip,requestParams,result,timeCost,url //解析request log | parse result "{\"code\":\"*\",*}" as resultCode ,other //解析result code 查询业务异常
备注
一.存在问题
使用该方案生成日志对controller层的参数有限制,不能引入request,response等参数,如果一定需要使用,可以通过spring容器内读取。
二.在使用这套日志体系后发现新的问题,由于项目中引入了基于netty的websocket,不受aop的日志记录影响,所以参考以上方案简单实现了类似的功能。 如:
- 在@BeforeHandshake中引入traceId;
- 创建日志类
@Data
public class SocketRequestInfo {
private String uid;
private String roomId;
private Integer code;
private Long timeCost;
}
@Data
public class SocketRequestErrorInfo {
private String uid;
private String roomId;
private RuntimeException exception;
}
- 在@OnBinary,@OnError 中分别打印日志
参考
- 分布式链路追踪技术对比
- 用好 Spring AOP,天降大锅从容应对!
- Set up Fluent Bit as a DaemonSet to send logs to CloudWatch Logs PDF
- CloudWatch Logs Insights query syntax
2022-04-14 更新
tarceId: 当traceId使用ThreadLocal时,在rpc调用出现一个问题。
环境:k8s springboot okhttp retrofit2
当使用retrofit2 调用下游业务时发现thraceId丢失。
之前因为业务紧,选择了一个非常low的方案,在所有api接口上添加@Header Sring traceId
今天在在学习新知识时偶然发现更优雅的解决方案,特此记录。
新增traceUtils
核心: 用MDC替换ThreadLcoal
public class TraceUtils {
public final static String TRACE_ID = "traceId";
public static String getTraceId() {
return MDC.get(TRACE_ID);
}
public static void addTraceId(String traceId) {
if (StringUtils.isEmpty(traceId)) {
MDC.put(TRACE_ID, SnowflakeIdUtils.next().toString());
} else {
MDC.put(TRACE_ID, traceId);
}
}
public static void clear() {
MDC.clear();
}
}
修改LogInterceptor
@Component
public class LogInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String traceId = request.getHeader(TraceUtils.TRACE_ID);
TraceUtils.addTraceId(traceId);
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView)
throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
throws Exception {
TraceUtils.clear();
}
}
新增OkHttpInterceptor
public class OkHttpTraceIdInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
String traceId = TraceUtils.getTraceId();
Request request = null;
if (traceId != null) {
//添加请求体
request = chain.request().newBuilder().addHeader(TraceUtils.TRACE_ID, traceId).build();
}
Response originResponse = chain.proceed(request);
return originResponse;
}
}
修改okHttpUtils
public class OkHttpUtils {
private final static OkHttpClient okHttpClient;
static {
okHttpClient = new OkHttpClient.Builder()
//.sslSocketFactory(sslSocketFactory(), x509TrustManager())
.retryOnConnectionFailure(true)
.connectionPool(pool())
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS)
.addInterceptor(new OkHttpTraceIdInterceptor())
.build();
}
public static OkHttpClient getOkHttpClient() {
return okHttpClient;
}
public static ConnectionPool pool() {
return new ConnectionPool(200, 5, TimeUnit.MINUTES);
}
}
当使用MDH代替ThreadLocal ,终于将大面积接口调用的TraceId参数去掉了。
cloudwatch数据查询转opensearch
随着日志增加,需要监控及时报警,而cloudwatch的成本是根据查询数据量来计算的;再加上一些用于分析的日志,成本直接爆炸;为了解决该问题,引入了opensearch来存储日志数据;
关键要解决的几个问题:
opensearch没有类型cloudwatch中的parse关键词,所以一些需要解析的文本数据需要放在fluent-bit上传日志时完成
opensearch需要关心下机器的性能瓶颈
opensearch的权限系统和aws自身的是2套
grafana中只能用es lucene来查询,每个索引是单独的数据源
好处就是opensearch只需要支付节点的费用,不用再因为查询过于频繁烦恼