Java 利用 Jsoup 获取 spark 运行任务

目的:方便和web项目整合,需要一套java接口至少实现如下功能

1.使用获取任务列表 (正在运行的,已经结束的)

2.获取当前spark版本号

3.根据指定标示,杀死任务

实现思路,通过在浏览器里分析网页,得知网页的链接信息,用java 发起 http请求,获得网页字符串信息,借助一个工具 jsoup (网页解析工具);截取自己需要 的内容。

jsoup 下载地址:http://central.maven.org/maven2/org/jsoup/jsoup/1.11.1/jsoup-1.11.1.jar

如下是我自己封装的一个类:

package com.bocom.dm.spark.monitor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.http.client.CookieStore;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.spark.deploy.SparkSubmitUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import com.bocom.irascloud.util.DealMethod;
import com.bocom.irascloud.util.HttpTools;

public final  class SparkJobMonitor {

    private String ip;
    private int port;
    private long timeout;
    private static SparkJobMonitor monitor = new SparkJobMonitor();
    private String sparkVersion;
    private JSONArray activeJobs;
    private JSONArray completedJobs;
    
    private SparkJobMonitor() {
    }
    
    public static void main(String[] args) throws Exception {
        SparkJobMonitor sjm = SparkJobMonitor.getInstance("cdh2", 4040, 30000); 
        
        System.out.println(sjm.getSparkVersion());
        System.out.println(sjm.getActiveJobs());
        System.out.println(sjm.getCompletedJobs());
        
        JSONObject job = (JSONObject)sjm.getActiveJobs().get(0);
        sjm.killJob(job.getInt("Job Id"));
    }
    
    public static SparkJobMonitor getInstance(String ipOrDomain,int port,long timeout){
        monitor.ip = ipOrDomain;
        monitor.port = port;
        monitor.timeout = timeout;
        
        init();
        return monitor;
    }
    
    private static void init() {
        try {
            
            initUrls();
            
            String url = "http://"+SparkJobMonitor.monitor.ip+":"+SparkJobMonitor.monitor.port+"jobs";
            
            Document doc = Jsoup.connect(url).timeout(3000).get();
            
            //获取版本
            Element version = doc.select("span.version").first();
            if(version!=null){
                SparkJobMonitor.monitor.sparkVersion = version.text();
            }else{
                SparkJobMonitor.monitor.sparkVersion = "I'm sorry that parsing is error";
            }
            
            
            Iterator<Element> tables = doc.select("table").iterator();
            
            //正在运行的任务
            JSONArray active = parseTable(tables.next());
            //完成的任务
            JSONArray completed = parseTable(tables.next());
            
            SparkJobMonitor.monitor.setActiveJobs(active);
            SparkJobMonitor.monitor.setCompletedJobs(completed);
            
        } catch (IOException e) {
            e.printStackTrace();
        }
        
    }
    
    private static void initUrls() {
        // TODO Auto-generated method stub
        
    }

    private static JSONArray parseTable(Element ele){
        if(ele!=null){
            
            Elements ths = ele.select("tr > th");
            List<String> thsText = ths.eachText();
            
            Elements trs = ele.select("tr");
            Iterator<Element> ite = trs.iterator();
            ite.next();//跳过表头
            
            JSONArray array = new JSONArray();
            while(ite.hasNext()){
                
                Element tds = ite.next();
                List<String> tdsText = tds.select("td").eachText();
                
                JSONObject obj  = new JSONObject();
                
                for(int i=0;i<thsText.size();i++){
                    obj.put(thsText.get(i), tdsText.get(i));
                }
                array.add(obj);
            }
            
            return array;
        }
        return null;
    }

    public String getSparkVersion(){
        return sparkVersion;
    }
    private void setSparkVersion(String sparkVersion) {
        this.sparkVersion = sparkVersion;
    }
    
    private void setActiveJobs(JSONArray activeJobs) {
        this.activeJobs = activeJobs;
    }
    public JSONArray getActiveJobs() {
        return activeJobs;
    }
    private void setCompletedJobs(JSONArray completedJobs) {
        this.completedJobs = completedJobs;
    }
    public JSONArray getCompletedJobs() {
        return completedJobs;
    }
    
    public void killJob(int jobId){
        /*String url = "http://"+SparkJobMonitor.monitor.ip+":"+SparkJobMonitor.monitor.port+"/jobs/job/?id="+jobId;*/
        String killUrl = "http://"+SparkJobMonitor.monitor.ip+":"+SparkJobMonitor.monitor.port+"/stages/stage/kill?id="+jobId+"&terminate=true";
        
        DealMethod method = new DealMethod() {
            
            @Override
            public void deal(CloseableHttpResponse response) throws Exception {
                System.out.println("====kill====");
            }
        };
        
        try {
            CookieStore cookie = HttpTools.doGet(killUrl, null, method, null);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

使用的Spark Job 模型:

package com.bocom.dm.spark.monitor;

public class SparkJobInfo {

    private int jobId;
    private String description;
    private String submitted;
    private String duration;
    private String stages;
    private String tasks;
    public int getJobId() {
        return jobId;
    }
    public void setJobId(int jobId) {
        this.jobId = jobId;
    }
    public String getDescription() {
        return description;
    }
    public void setDescription(String description) {
        this.description = description;
    }
    public String getSubmitted() {
        return submitted;
    }
    public void setSubmitted(String submitted) {
        this.submitted = submitted;
    }
    public String getDuration() {
        return duration;
    }
    public void setDuration(String duration) {
        this.duration = duration;
    }
    public String getStages() {
        return stages;
    }
    public void setStages(String stages) {
        this.stages = stages;
    }
    public String getTasks() {
        return tasks;
    }
    public void setTasks(String tasks) {
        this.tasks = tasks;
    }
    
}

使用:

public class SparkJobMain {
    
    private SparkJobMonitor sjm = null;
    
    private SparkJobMonitor getMonitor(){
        if(this.sjm==null){
            this.sjm = SparkJobMonitor.getInstance("cdh2", 4040, 30000); 
        }
        
        return this.sjm;
    }

    public String getVersion() {
        return getMonitor().getSparkVersion();
    }

    public String getActiveJos() {
        // TODO Auto-generated method stub
        return null;
    }
    
}

以上是基于spark 1.3.0,如果是1.6.0以后的,需要将代码 里面的 url 换掉 ,url 地址参考
http://spark.apache.org/docs/latest/monitoring.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容