前言
因为新公司用了这个框架做任务分发,所以我才有机会知道并学习!那么Gearman到底是个什么样子的框架或系统呢?它有哪些作用、优缺点呢?
Gearman是一个用来把工作委派给其他机器、分布式的调用更适合做某项工作的机器、并发的做某项工作在多个调用间做负载均衡、或用来在调用其它语言的函数的系统。可用于sso 分发连接,但有弊端就是占用系统资源较多,例如CPU、内存。(来源于百度百科)
简单来说,Gearman就是一个分布式计算调度框架或系统,它的主要职责就是做工作分发,所以既然是用来做分布式计算,那自然资源占用会多一些,毕竟不是用来搞小事情的~
正题
如下图,Gearman的工作方式基本可以这样理解,当然既然是计算,那肯定会有结果返回,Client端在任务提交完成之后,那么就可以等待计算结果返回,然后进行汇总。为了简单便于理解,我这里只画出了一个Client,实际生产中Client和Worker肯定都是很多个。
我们大致了解了Gearman的工作方式,那现在着手编程,试试效果。
环境准备
Linux:Centos 6.10
Gearman:1.1.18
Language:Java
Jar包:gearman-java 0.6
安装好Gearman后,我们用命令启动它,进入Gearman的sbin目录下,启动命令:./gearmand --pid-file=/tmp/gearman.pid --daemon --log-file=/home/log/gearman/gearman.log。(Linux安装源码软件,先configure、再make、最后make install,关于一些安装参数,可以查一下configure的参数,基本上就没啥问题了,缺库的话对应着安装即可)
如下图,服务已经正常启动,默认端口是4730。
第一个程序
public class EchoApplication {
private GearmanClient client;
public EchoApplication() {
GearmanJobServerConnection connection = new GearmanNIOJobServerConnection(Constant.GEARMAN_HOST, Constant.GEARMAN_PORT); // 创建Gearman服务connection
client = new GearmanClientImpl();
client.addJobServer(connection);
}
public String echo(String input) throws IOException {
byte[] data = input.getBytes(); // 发送数据
byte[] reply = ((GearmanClientImpl) client).echo(data); // 调用Gearman
return new String(reply, "UTF-8");
}
public static void main(String... args) {
EchoApplication app = new EchoApplication();
String reply = null;
try {
reply = app.echo("hello");
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("reply is ===> " + reply);
}
}
很简单的一个使用方法,首先在构造函数中初始化连接,然后主函数中调用echo。
如果运行后能够得到如下结果,那么表示Gearman调用成功!
Client、Worker实现
首先,我们清楚Client的职责,就是做工作拆分(根据实际业务逻辑)以及提交,在Gearman系统中,我们只关注工作提交即可。由于Gearman系统中,不要求Client和Worker的实现语言一直,毕竟分布式计算,所以Client和Worker可以使用不同语言实现,为了方便,我这里统一使用Java。
既然是提交工作,那么定义一个工作是必不可少的部分,如下,我就定义了一个很简答的工作(INFO日志输出内容太多,不方便看结果)。
public class ReadDataJob extends AbstractGearmanFunction {
private Logger logger = LoggerFactory.getLogger(ReadDataJob.class);
@Override
public GearmanJobResult executeFunction() {
DataInfo data = SerializationUtils.deserialize((byte[]) this.data);
logger.warn("this thread ID is [{}], i will do [{}]", Thread.currentThread().getId(), data.toString());
return new GearmanJobResultImpl(this.jobHandle, true, SerializationUtils.serialize(data.toString()), new byte[0], new byte[0], 0, 0);
}
}
定义工作需要继承Gearman-Java中的AbstractGearmanFunction,实现executeFunction()方法,然后客户端提交的工作的参数,存放在data字段中。我这里就是简单的打印了提交过来的数据,然后作为返回数据返回。
有了工作,我们实现Client,那Client的做提交动作,在实际生产中,肯定是不断会提交工作,所以我们就简单的实现一个循环来模拟。
public class ClientRunner implements Runnable {
private Logger logger = LoggerFactory.getLogger(ClientRunner.class);
private GearmanClient client;
public ClientRunner() {
GearmanJobServerConnection connection = new GearmanNIOJobServerConnection(Constant.GEARMAN_HOST, Constant.GEARMAN_PORT);
this.client = new GearmanClientImpl();
client.addJobServer(connection);
}
public void run() {
this.start();
}
private void start() {
while (true) {
try {
String function = ReadDataJob.class.getCanonicalName();
DataInfo data = new DataInfo(UUID.randomUUID().toString(), "log", "/home/log/");
// 创建一个工作,包含名字、数据(参数)、工作ID
GearmanJob job = GearmanJobImpl.createJob(function, SerializationUtils.serialize(data), data.getDataId());
this.client.submit(job);
GearmanJobResult result = job.get();
byte[] resData = result.getResults();
String res = SerializationUtils.deserialize(resData);
logger.warn("reply is [{}]", res);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}
以上就是一个简单的Client,首先在构造函数中初始化Gearman连接,然后在循环中一直提交工作。GearmanJob需要有functionName(工作名称),数据(参数),以及ID。submit工作之后,GearmanJob对象可以等待其返回,这个实现模式是不是很像线程池提交Task得到Future对象,然后用Future对象等待获取执行结果呢~。我在这里也仅仅是简单打印返回结果。
最后我们要实现Worker了,也就是实际帮我们完成工作的地方。
public class WorkerRunner implements Runnable {
// 所有定义好的工作
private List<Class<? extends GearmanFunction>> functions;
private GearmanWorker worker;
public WorkerRunner(List<Class<? extends GearmanFunction>> functions) {
// Gearman connection
GearmanJobServerConnection connection = new GearmanNIOJobServerConnection(Constant.GEARMAN_HOST, Constant.GEARMAN_PORT);
// 初始化worker
this.worker = new GearmanWorkerImpl();
this.worker.addServer(connection);
this.functions = new ArrayList<Class<? extends GearmanFunction>>();
this.functions.addAll(functions);
}
public void run() {
this.start();
}
private void start() {
// 将所有定义好的工作,注册到worker中
for (Class<? extends GearmanFunction> function : functions) {
worker.registerFunction(function);
}
// 启动worker
worker.work();
}
}
以上就是Worker的简单实现,首先在构造函数中初始化好Gearman连接,以及Worker对象,然后我们在start函数中把所有定义的工作都注册到Worker中,这里需要注意,查看Worker的源码可以发现,Worker中是把工作的名字和工作一一对应起来的;最后启动Worker,等待接受工作。
OK,我们实现了所有必要组件,那么试试运行我们的代码。为了方便观察其工作结果,所以我针对Client和Worker分别写了启动类。
Client启动
public class ClientApplication {
private ExecutorService client = Executors.newSingleThreadExecutor();
public static void main(String... args) {
ClientApplication app = new ClientApplication();
ClientRunner clientRunner = new ClientRunner();
app.client.submit(clientRunner);
}
}
Worker启动
public class WorkerApplication {
private ExecutorService workers = Executors.newFixedThreadPool(2);
public static void main(String... args) {
WorkerApplication app = new WorkerApplication();
List<Class<? extends GearmanFunction>> functions = new ArrayList<>();
functions.add(ReadDataJob.class);
WorkerRunner worker1 = new WorkerRunner(functions);
WorkerRunner worker2 = new WorkerRunner(functions);
app.workers.submit(worker1);
app.workers.submit(worker2);
}
}
针对worker,我启动了两个,我们可以看看是不是随机选取的Worker。
Worker输出
Client输出
可见两个Worker都有被分派到工作。
OK,到此我们基本对Gearman的使用入门了,后面如果要继续研究Gearman,可能需要关注它的一些关键参数、特性,以及使用场景,毕竟工具会用和拥得正确、拥得好还是有一定差距的~。
最后再插一句,关于Gearman的数据传输协议,它直接使用了字节流传输,那么业界序列化的框架很多,所以我们在开发的时候,很有必要选择一种通用的(多种语言都支持的),千万不要被数据的序列化方式所局限;所以在生产中,我上面的序列化方式并不可取,毕竟这仅仅适用于Java;然后推荐一下序列化方式,比如protocol buffer,Thrift这些,当然这些使用需要一点点学习成本,如果不愿意耗费,而且系统对于序列化性能要求不高,那么最土的方式,转成Json,然后json字符串序列化成byte数组。
如果有不正确的地方,请帮忙指正,谢谢!
github: https://github.com/zhangveiqiang/gearman-learn