在上一章Hadoop Rpc(一)已经浅显的使用了protobuffer,现在就基于上一篇里生成的类,基于hadoop rpc来实现咱们自己的协议。这样也方便咱们在IDE中对hadoop rpc的代码进行调试。
第一步:定一个接口继承自rpc.proto.MyResourceTracker.MyResourceTrackerService.BlockingInterface
rpc.proto.MyResourceTracker.MyResourceTrackerService.BlockingInterface
是protoc编译MyResourceTracker.proto
文件生成的类
这一步的目的是能通过ProtocolInfo
注解来定义协议的版本,版本是必要的,否则运行时会报错
@ProtocolInfo(protocolName = "rpc.proto.MyResourceTracker", protocolVersion = 1)
public interface MyResourceTracker extends rpc.proto.MyResourceTracker.MyResourceTrackerService.BlockingInterface {
}
第二步:实现协议的实现类
public class MyResourceTrackerService implements MyResourceTracker {
@Override
public MyResourceTrackerProtos.MyResourceTrackerResponseProto registerNodeManager(RpcController controller, MyResourceTrackerProtos.MyResourceTrackerRequestProto req) throws ServiceException {
int cpu = req.getCpu();
int memory = req.getMemory();
String hostId = req.getHostId();
System.out.println(String.format("cpu: %d, memory: %d, hostId: %s", cpu, memory, hostId));
return MyResourceTrackerProtos.MyResourceTrackerResponseProto.newBuilder().setFlag(true).build();
}
}
第三步:生成server并启动
注意的点在代码中进行注释
public class Server {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
// 这里不设置,默认使用过期的WritableRpcEngine。
// 第二个参数一定要与下面的setProtocol中的类相同
RPC.setProtocolEngine(conf, MyResourceTracker.class, ProtobufRpcEngine.class);
// 这个是必须的,这里就是生成stub程序,
// blockingService实际就是个装饰器,实际干活的就是实现类MyResourceTrackerService
BlockingService blockingService = rpc.proto.MyResourceTracker.MyResourceTrackerService.newReflectiveBlockingService(new MyResourceTrackerService());
RPC.Server server = new RPC.Builder(conf).
setProtocol(MyResourceTracker.class).
setInstance(blockingService).
setBindAddress("127.0.0.1").
setPort(2222).
setNumHandlers(10).build();
server.start();
}
}
第四步:生成client
注意的点在代码中进行注释
public class Client {
public static void main(String[] args) throws IOException, ServiceException {
Configuration conf = new Configuration();
// 必须设置,不然会使用WritableRpcEngine
RPC.setProtocolEngine(conf, MyResourceTracker.class, ProtobufRpcEngine.class);
// 根据server端设置就好,注意version要一致
MyResourceTracker proxy = RPC.getProxy(MyResourceTracker.class, 1, new InetSocketAddress("127.0.0.1", 2222), conf);
MyResourceTrackerProtos.MyResourceTrackerRequestProto req = MyResourceTrackerProtos.MyResourceTrackerRequestProto.newBuilder().
setCpu(1).setMemory(2).setHostId("localhost:5006").build();
MyResourceTrackerProtos.MyResourceTrackerResponseProto myResourceTrackerResponseProto = proxy.registerNodeManager(null, req);
System.out.println(myResourceTrackerResponseProto.getFlag());
}
}
运行
服务端:
客户端:
POM文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>hadoop-learn</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>learn-hadoop-yarn</module>
<module>java-analysis</module>
<module>learn-antlr4</module>
<module>learn-zookeeper</module>
</modules>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<hadoop.version>3.2.2</hadoop.version>
<antlr4.version>4.8</antlr4.version>
<junit.version>4.12</junit.version>
<protobuf.version>2.5.0</protobuf.version>
<slf4j.version>1.7.25</slf4j.version>
<guava.version>27.0-jre</guava.version>
<commons-collections.version>3.2.2</commons-collections.version>
<commons-lang3.version>3.7</commons-lang3.version>
<htrace3.version>3.1.0-incubating</htrace3.version>
<htrace4.version>4.1.0-incubating</htrace4.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<scope>compile</scope>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.woodstox</groupId>
<artifactId>woodstox-core</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>${commons-collections.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>${htrace3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core4</artifactId>
<version>${htrace4.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>