了解SPI机制
SPI 全称为 (Service Provider Interface) ,是JDK内置的一种服务提供发现机制。SPI是一种动态替换发现的机制, 比如有个接口,想运行时动态的给它添加实现,你只需要添加一个实现。我们经常遇到的就是java.sql.Driver接口,其他不同厂商可以针对同一接口做出不同的实现,mysql和postgresql都有不同的实现提供给用户,而Java的SPI机制可以为某个接口寻找服务实现。
建议阅读 https://juejin.im/post/5af952fdf265da0b9e652de3
常用接口介绍(动态字节码增强)
skywalking agent plugin是基于 bytebuddy 来做字节码增强。
skywalking封装后,我们需要常用的接口:
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine: 定义需要捕捉的类和方法
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor : 在方法前后执行
编写一个agent(oss上传文件的plugin)
oss 需要捕捉的部分代码如下: OSSObjectOperation
public class OSSObjectOperation extends OSSOperation {
public OSSObjectOperation(ServiceClient client, CredentialsProvider credsProvider) {
super(client, credsProvider);
}
/**
* Upload input stream or file to oss.
*/
public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws OSSException, ClientException {
assertParameterNotNull(putObjectRequest, "putObjectRequest");
PutObjectResult result = null;
if (!isNeedReturnResponse(putObjectRequest)) {
result = writeObjectInternal(WriteMode.OVERWRITE, putObjectRequest, putObjectReponseParser);
} else {
result = writeObjectInternal(WriteMode.OVERWRITE, putObjectRequest, putObjectProcessReponseParser);
}
if (isCrcCheckEnabled()) {
OSSUtils.checkChecksum(result.getClientCRC(), result.getServerCRC(), result.getRequestId());
}
return result;
}
/**
* Upload input stream to oss by using url signature.
*/
public PutObjectResult putObject(URL signedUrl, InputStream requestContent, long contentLength,
Map<String, String> requestHeaders, boolean useChunkEncoding) throws OSSException, ClientException {
assertParameterNotNull(signedUrl, "signedUrl");
assertParameterNotNull(requestContent, "requestContent");
if (requestHeaders == null) {
requestHeaders = new HashMap<String, String>();
}
RequestMessage request = new RequestMessage(null, null);
request.setMethod(HttpMethod.PUT);
request.setAbsoluteUrl(signedUrl);
request.setUseUrlSignature(true);
request.setContent(requestContent);
request.setContentLength(determineInputStreamLength(requestContent, contentLength, useChunkEncoding));
request.setHeaders(requestHeaders);
request.setUseChunkEncoding(useChunkEncoding);
PutObjectResult result = null;
if (requestHeaders.get(OSSHeaders.OSS_HEADER_CALLBACK) == null) {
result = doOperation(request, putObjectReponseParser, null, null, true);
} else {
result = doOperation(request, putObjectProcessReponseParser, null, null, true);
}
if (isCrcCheckEnabled()) {
OSSUtils.checkChecksum(result.getClientCRC(), result.getServerCRC(), result.getRequestId());
}
return result;
}
......
- 先建一个maven项目,修改 pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.skywalkingagent</groupId>
<artifactId>apm-oss-3.x-plugin</artifactId>
<packaging>jar</packaging>
<name>skyagent-oss</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<checkstyle.skip>true</checkstyle.skip><!--停掉package时候的checkstyle,如果不停掉会报checkstyle.xml 文件找不到 -->
</properties>
<!--必须继承否则会加载时候报错-->
<parent>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-sdk-plugin</artifactId>
<version>6.6.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent-core</artifactId>
<version>6.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>3.1.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
- 编写定义类与捕捉类
/**定义需要捕捉的类定义
*/
public class OSSObjectOperationInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.aliyun.oss.internal.OSSObjectOperation";
/*
*用来做拦截的类
*/
private static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.oss.OSSObjectOperationInterceptor";
@Override
protected ClassMatch enhanceClass() {
return NameMatch.byName(ENHANCE_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] { new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("putObject");
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
} };
}
}
/**
*拦截器实现类
*OSSObjectOperation.putObject 方法执行前后拦截
*/
public class OSSObjectOperationInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
if (allArguments[0] == null) {
return ret;
}
ContextManager.stopSpan();
return ret;
}
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
if (allArguments[0] == null) {
return;
}
final ContextCarrier contextCarrier = new ContextCarrier();
String bucketName = null;
long size = -1;
// 处理 OSSObjectOperation.putObject(PutObjectRequest putObjectRequest)
if (allArguments.length == 1 && allArguments[0] instanceof PutObjectRequest) {
PutObjectRequest putObjectRequest = (PutObjectRequest) allArguments[0];
bucketName = putObjectRequest.getBucketName();
if (putObjectRequest.getFile() != null) {
size = putObjectRequest.getFile().length();
} else if (putObjectRequest.getInputStream() != null) {
size = putObjectRequest.getInputStream().available();
}
} // 处理 OSSObjectOperation.putObject(URL signedUrl, InputStream requestContent,
// long contentLength,Map<String, String> requestHeaders, boolean
// useChunkEncoding)
else if (allArguments[0] instanceof URL && allArguments[1] instanceof InputStream) {
bucketName = ((URL) allArguments[0]).getPath();
size = (Long) allArguments[2];
if (size <= 0 && allArguments[1] != null) {
size = ((InputStream) allArguments[1]).available();
}
}
AbstractSpan span = ContextManager.createExitSpan("oss-putObject", contextCarrier, bucketName + ":" + size);
span.setComponent(ComponentsDefine.TRANSPORT_CLIENT);
SpanLayer.asHttp(span);
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
}
- 配置 skywalking-plugin.def 定义启用的PluginDefine
oss-3.x=org.apache.skywalking.apm.plugin.oss.v3.define.OSSObjectOperationInstrumentation
-
代码整体目录
image2019-8-15_14-13-56.png - 编译执行 mvn package
- 放入编译后的jar文件放入业务机器上agent/plugin的目录下(例如 /lib/skywanlking/agent/plugin)
实际效果
微信图片_20200304170756.png