目的:方便和web项目整合,需要一套java接口至少实现如下功能
1.使用获取任务列表 (正在运行的,已经结束的)
2.获取当前spark版本号
3.根据指定标示,杀死任务
实现思路,通过在浏览器里分析网页,得知网页的链接信息,用java 发起 http请求,获得网页字符串信息,借助一个工具 jsoup (网页解析工具);截取自己需要 的内容。
jsoup 下载地址:http://central.maven.org/maven2/org/jsoup/jsoup/1.11.1/jsoup-1.11.1.jar
如下是我自己封装的一个类:
package com.bocom.dm.spark.monitor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.http.client.CookieStore;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.spark.deploy.SparkSubmitUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import com.bocom.irascloud.util.DealMethod;
import com.bocom.irascloud.util.HttpTools;
public final class SparkJobMonitor {
private String ip;
private int port;
private long timeout;
private static SparkJobMonitor monitor = new SparkJobMonitor();
private String sparkVersion;
private JSONArray activeJobs;
private JSONArray completedJobs;
private SparkJobMonitor() {
}
public static void main(String[] args) throws Exception {
SparkJobMonitor sjm = SparkJobMonitor.getInstance("cdh2", 4040, 30000);
System.out.println(sjm.getSparkVersion());
System.out.println(sjm.getActiveJobs());
System.out.println(sjm.getCompletedJobs());
JSONObject job = (JSONObject)sjm.getActiveJobs().get(0);
sjm.killJob(job.getInt("Job Id"));
}
public static SparkJobMonitor getInstance(String ipOrDomain,int port,long timeout){
monitor.ip = ipOrDomain;
monitor.port = port;
monitor.timeout = timeout;
init();
return monitor;
}
private static void init() {
try {
initUrls();
String url = "http://"+SparkJobMonitor.monitor.ip+":"+SparkJobMonitor.monitor.port+"jobs";
Document doc = Jsoup.connect(url).timeout(3000).get();
//获取版本
Element version = doc.select("span.version").first();
if(version!=null){
SparkJobMonitor.monitor.sparkVersion = version.text();
}else{
SparkJobMonitor.monitor.sparkVersion = "I'm sorry that parsing is error";
}
Iterator<Element> tables = doc.select("table").iterator();
//正在运行的任务
JSONArray active = parseTable(tables.next());
//完成的任务
JSONArray completed = parseTable(tables.next());
SparkJobMonitor.monitor.setActiveJobs(active);
SparkJobMonitor.monitor.setCompletedJobs(completed);
} catch (IOException e) {
e.printStackTrace();
}
}
private static void initUrls() {
// TODO Auto-generated method stub
}
private static JSONArray parseTable(Element ele){
if(ele!=null){
Elements ths = ele.select("tr > th");
List<String> thsText = ths.eachText();
Elements trs = ele.select("tr");
Iterator<Element> ite = trs.iterator();
ite.next();//跳过表头
JSONArray array = new JSONArray();
while(ite.hasNext()){
Element tds = ite.next();
List<String> tdsText = tds.select("td").eachText();
JSONObject obj = new JSONObject();
for(int i=0;i<thsText.size();i++){
obj.put(thsText.get(i), tdsText.get(i));
}
array.add(obj);
}
return array;
}
return null;
}
public String getSparkVersion(){
return sparkVersion;
}
private void setSparkVersion(String sparkVersion) {
this.sparkVersion = sparkVersion;
}
private void setActiveJobs(JSONArray activeJobs) {
this.activeJobs = activeJobs;
}
public JSONArray getActiveJobs() {
return activeJobs;
}
private void setCompletedJobs(JSONArray completedJobs) {
this.completedJobs = completedJobs;
}
public JSONArray getCompletedJobs() {
return completedJobs;
}
public void killJob(int jobId){
/*String url = "http://"+SparkJobMonitor.monitor.ip+":"+SparkJobMonitor.monitor.port+"/jobs/job/?id="+jobId;*/
String killUrl = "http://"+SparkJobMonitor.monitor.ip+":"+SparkJobMonitor.monitor.port+"/stages/stage/kill?id="+jobId+"&terminate=true";
DealMethod method = new DealMethod() {
@Override
public void deal(CloseableHttpResponse response) throws Exception {
System.out.println("====kill====");
}
};
try {
CookieStore cookie = HttpTools.doGet(killUrl, null, method, null);
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用的Spark Job 模型:
package com.bocom.dm.spark.monitor;
public class SparkJobInfo {
private int jobId;
private String description;
private String submitted;
private String duration;
private String stages;
private String tasks;
public int getJobId() {
return jobId;
}
public void setJobId(int jobId) {
this.jobId = jobId;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getSubmitted() {
return submitted;
}
public void setSubmitted(String submitted) {
this.submitted = submitted;
}
public String getDuration() {
return duration;
}
public void setDuration(String duration) {
this.duration = duration;
}
public String getStages() {
return stages;
}
public void setStages(String stages) {
this.stages = stages;
}
public String getTasks() {
return tasks;
}
public void setTasks(String tasks) {
this.tasks = tasks;
}
}
使用:
public class SparkJobMain {
private SparkJobMonitor sjm = null;
private SparkJobMonitor getMonitor(){
if(this.sjm==null){
this.sjm = SparkJobMonitor.getInstance("cdh2", 4040, 30000);
}
return this.sjm;
}
public String getVersion() {
return getMonitor().getSparkVersion();
}
public String getActiveJos() {
// TODO Auto-generated method stub
return null;
}
}
以上是基于spark 1.3.0,如果是1.6.0以后的,需要将代码 里面的 url 换掉 ,url 地址参考
http://spark.apache.org/docs/latest/monitoring.html