spark数据监控实战

版权申明:转载请注明出处。
文章来源:http://bigdataer.net

1.概述

数据准确性,稳定性,时效性是数据开发中需要重点关注的,一般称之为数据质量。保证数据质量往往会占用数据开发工程师的很多精力,所以一个好的数据监控系统或者一个合理的数据监控方案对于数据质量的保证至关重要。本文将展示一种实际生产中使用过的数据监控方案,并给出相关的代码。
数据计算采用spark,报警形式采用邮件报警。涉及到的内容有使用springMVC构建一个支持发送html和文件的邮件接口;在spark计算任意过程中调用邮件接口;在spark中通过邮件接口发送hdfs上的结果数据。

2.架构图

架构图

说明:通常情况下公司内部的hadoop/spark集群和外网隔离,直接在spark作业里发送邮件显然不现实。所以需要构建一个邮件发送服务,暴露内网接口给spark作业调用,同时也能访问外网,把邮件发送到用户邮箱中。

3.基于springMVC构建的邮件服务

3.1 设计目标

(1)支持自定义邮件发件人昵称,主题,内容等
(2)支持发送html以及文件

3.2技术方案

springMVC,JavaMail

3.3核心代码

邮件发送工具类EmailSendUtil

import java.io.File;
import java.util.Date;
import java.util.Properties;

import javax.activation.CommandMap;
import javax.activation.DataHandler;
import javax.activation.FileDataSource;
import javax.activation.MailcapCommandMap;
import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;

import org.springframework.stereotype.Service;

@Service
public class EmailSendUtil {

public void sendEmail(String nick,String subject,String content,String receivers,File file) throws Exception {
        
        Properties proper = new Properties();
        proper.setProperty("mail.transport.protocol", "smtp");
        proper.setProperty("mail.stmp.auth", "true");
        
        //账号密码认证
        Session session = Session.getInstance(proper);
        MimeMessage msg = new MimeMessage(session);
        
        try {
            
            MailcapCommandMap mc = (MailcapCommandMap) CommandMap.getDefaultCommandMap();
            mc.addMailcap("text/html;; x-Java-content-handler=com.sun.mail.handlers.text_html");
            mc.addMailcap("text/xml;; x-java-content-handler=com.sun.mail.handlers.text_xml");
            mc.addMailcap("text/plain;; x-java-content-handler=com.sun.mail.handlers.text_plain");
            mc.addMailcap("multipart/*;; x-java-content-handler=com.sun.mail.handlers.multipart_mixed");
            mc.addMailcap("message/rfc822;; x-java-content-handler=com.sun.mail.handlers.message_rfc822");
            CommandMap.setDefaultCommandMap(mc);
            //设置发件人
            String nickname=javax.mail.internet.MimeUtility.encodeText(nick); 
            msg.setFrom(new InternetAddress(nickname+"发件人邮箱地址"));
            //设置收件人
            msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(receivers));
            //设置邮件主题
            msg.setSubject(subject);
            MimeMultipart  msgMimePart = new MimeMultipart ("mixed");
            //正文内容
            MimeBodyPart contents = getBodyPart(content);
            msgMimePart.addBodyPart(contents);
            //附件
            if(file!=null){
                MimeBodyPart attachment = getAttachPart(file);
                msgMimePart.addBodyPart(attachment);
            }
            
            //设置邮件消息体
            msg.setContent(msgMimePart);
            //设置发送时间
            msg.setSentDate(new Date());
            msg.saveChanges();
            
            Transport trans=session.getTransport();
            trans.connect("smtp.exmail.qq.com", "发件人邮箱地址", "密码");
            trans.sendMessage(msg, msg.getRecipients(Message.RecipientType.TO));
            trans.close();
        } catch (Exception e) {
            throw new Exception("email send error:"+e.getMessage());
        }finally{
            if(file!=null&&file.exists()){
                file.delete();
            }
        }
    }

    private static MimeBodyPart getBodyPart(String content) throws MessagingException{
        MimeBodyPart body = new MimeBodyPart();
        MimeMultipart mmp = new MimeMultipart("related");
        MimeBodyPart contents = new MimeBodyPart();
        contents.setContent(content, "text/html;charset=utf-8");
        mmp.addBodyPart(contents);
        body.setContent(mmp);
        return body;
    }
    
    private static MimeBodyPart getAttachPart(File file) throws MessagingException{
        MimeBodyPart attach = new MimeBodyPart();
        FileDataSource fds = new FileDataSource(file);
        attach.setDataHandler(new DataHandler(fds));
        attach.setFileName(file.getName());
        return attach;
    }
}

controller类,写的比较粗糙,提供了两个接口,一个发纯html,一个可以发送混合格式的邮件。

import java.io.File;

import net.bigdataer.api.weixin.utils.EmailSendUtil;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;

@Controller
@RequestMapping("/email_api")
public class EmailSendController extends DmBaseController{

    @Autowired
    private EmailSendUtil est;
    
    //只发送html
    @RequestMapping("/send")
    public @ResponseBody String sendEmail(@RequestParam("nickname") String nickname,@RequestParam("subject") String subject,@RequestParam("content") String content,@RequestParam("receivers") String receivers){
        String result = "{\"status\":\"0\",\"msg\":\"success\"}";
        try{
            est.sendEmail(nickname,subject, content, receivers,null);
        }catch(Exception e){
            result = "{\"status\":\"-1\",\"msg\":\""+e.getMessage()+"\"}";
        }
        return result;
    }
    
    //发送混合格式的邮件
    @RequestMapping("/sendattachment")
    public @ResponseBody String sendAttachment(@RequestParam("nickname") String nickname,@RequestParam("subject") String subject,@RequestParam("content") String content,@RequestParam("receivers") String receivers,@RequestParam("attachment") MultipartFile attachment){
        String result = "{\"status\":\"0\",\"msg\":\"success\"}";
        File file = new File("/opt/soft/tomcat/temp/"+attachment.getOriginalFilename());
        try {
            attachment.transferTo(file); 
            est.sendEmail(nickname,subject, content, receivers,file);
        } catch (Exception e) { 
            result = "{\"status\":\"-1\",\"msg\":\""+e.getMessage()+"\"}";
        }
        
        return result;
    }
    
}

4.spark作业调用邮件接口

4.1封装一个接口调用工具类

这个类提供了对https及http协议的访问,同时支持get和post请求。在本例中没有使用到get请求。
另外这个类依赖于httpclient的相关jar包。我这里使用的jarmaven依赖如下:

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.5.2</version>
        </dependency>

注意:因为spark源码中也使用了http-core的包,你的引用可能会和spark集群中本身的包冲突导致抛找不到某个类或者没有某个方法的异常,需要根据实际情况调整。不过代码大体上都一样,下面的代码可以参考

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpResponseException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.List;

/**
 * 服务请求类
 * 封装了post,get请求
 * 同时支持http和https方式
 * @author liuxuecheng
 *
 */
public class HttpClientUtil { 

    private static final Log logger = LogFactory.getLog(HttpClientUtil.class);
    //设置超时(单位ms)
    private static int TIME_OUT = 3000;
    private static CloseableHttpClient client = null;
    
    private static TrustManager  trustManager = new X509TrustManager() { 
        @Override 
        public X509Certificate[] getAcceptedIssuers() { 
            return null; 
        } 
        @Override
        public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
        }
        @Override
        public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
        }
    };
    
    static{
        
        try{
        //请求配置
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(TIME_OUT)
                .setConnectionRequestTimeout(TIME_OUT)
                .setSocketTimeout(TIME_OUT)
                .build();
        
        //访问https站点相关
        SSLContext context = SSLContext.getInstance("TLS");
        context.init(null, new TrustManager[]{trustManager}, null);
        SSLConnectionSocketFactory scsf = new SSLConnectionSocketFactory(context, NoopHostnameVerifier.INSTANCE);
        //注册
        Registry<ConnectionSocketFactory> registry = RegistryBuilder
                .<ConnectionSocketFactory>create()
                .register("http", PlainConnectionSocketFactory.INSTANCE)
                .register("https", scsf)
                .build();
        
        //连接池
        PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(registry);
        
        //构造请求client
        client = HttpClients.custom()
                .setConnectionManager(manager)
                .setDefaultRequestConfig(config)
                .build();
        
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    /**
     * post方法
     * post请求涉及到不同contentType,对应不同的HttpEntity
     * 这里定义HttpEntity接口,所有实现了这个接口的实体均可传入
     * @param token
     * @param url
     * @param entity
     * @return
     * @throws ClientProtocolException
     * @throws IOException
     */
    public static JSONObject post(String token,String url,HttpEntity entity) throws ClientProtocolException, IOException{
        //UrlEncodedFormEntity stringEntity = new UrlEncodedFormEntity(,"UTF-8");
        HttpPost post = new HttpPost(url);

        if(token !=null){
            post.setHeader("Authorization", "Bearer "+token);
        }
        post.setHeader("Accept-Charset", "UTF-8");
        post.setEntity(entity);
        
        return client.execute(post, handler);
    }
    
    /**
     * get请求
     * @param token
     * @param url
     * @param content_type
     * @param params
     * @return
     * @throws ClientProtocolException
     * @throws IOException
     * @throws URISyntaxException 
     */
    public static JSONObject get(String token,String url,String content_type,List<NameValuePair> params) throws ClientProtocolException, IOException, URISyntaxException{
        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params,"UTF-8");
        entity.setContentType(content_type);
        
        String para = EntityUtils.toString(entity);
        
        HttpGet get = new HttpGet(url);
        if(token !=null){
            get.setHeader("Authorization", "Bearer "+token);
        }
        get.setHeader("Accept-Charset", "UTF-8");

        //get请求将参数拼接在参数中
        get.setURI(new URI(get.getURI().toString()+"?"+para));
        return client.execute(get, handler);
    }
    
    //请求返回格式
    public static ResponseHandler<JSONObject> handler = new ResponseHandler<JSONObject>(){

        @Override
        public JSONObject handleResponse(HttpResponse res) throws ClientProtocolException, IOException {
            StatusLine status = res.getStatusLine();
            HttpEntity entity = res.getEntity();
            
            if(status.getStatusCode()>300){
                throw new HttpResponseException(status.getStatusCode(),
                        status.getReasonPhrase());
            } 
            
            if(entity==null){
                throw new ClientProtocolException("respones has no content");
            }
            
            String res_str = EntityUtils.toString(entity);
            
            return JSONObject.parseObject(res_str);
        }
        
    };
}

4.2进一步对发送邮件的接口封装

以下为scala代码

import java.io.File
import java.util

import org.apache.http.NameValuePair
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.entity.ContentType
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.entity.mime.content.{FileBody, StringBody}
import org.apache.http.message.BasicNameValuePair
import org.apache.http.util.CharsetUtils

/**
  * Created by liuxuecheng on 2017/1/4.
  */
object EmailSendUtil {

  def sendEmail(nickname: String,subject:String,content:String,receivers:String):Unit={
    val url = "邮件发送接口地址"
    val content_type = "application/x-www-form-urlencoded"

    val params = new util.ArrayList[NameValuePair]()
    params.add(new BasicNameValuePair ("nickname",nickname))
    params.add(new BasicNameValuePair ("subject",subject))
    params.add(new BasicNameValuePair ("content",content))
    params.add(new BasicNameValuePair ("receivers",receivers))
    try{
      val entity = new UrlEncodedFormEntity(params,"UTF-8")
      entity.setContentType(content_type)
      HttpClientUtil.post(null,url,entity)
    }catch{
      case e:Throwable=>e.printStackTrace()
    }
  }

  def sendAttachment(nickname: String,subject:String,content:String,receivers:String,file:File):Unit={
    val url = "邮件发送接口地址"
    val body = new FileBody(file)
    val entity = MultipartEntityBuilder.create()
      .setCharset(CharsetUtils.get("UTF-8"))
      .addPart("attachment",body)
      .addPart("nickname",new StringBody(nickname,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addPart("subject",new StringBody(subject,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addPart("content",new StringBody(content,ContentType.create("text/plain",CharsetUtils.get("UTF-8"))))
      .addTextBody("receivers",receivers)
      .setContentType(ContentType.MULTIPART_FORM_DATA)
      .build()

    HttpClientUtil.post(null,url,entity)
  }
}

4.3spark读取hdfs文件并创建File对象

以下截取代码片段

 def getHdfsFile(sc:SparkContext,path:String):File = {
 //需要hdfs的全路径,开头一般为hdfs://或者viewfs:// 并且具体到文件名
    val filePath = "viewfs://xx-cluster"+path+"part-00000"
    sc.addFile(filePath)
    new File(SparkFiles.get(new File(filePath).getName))
  }

既然都拿到文件了,发送邮件不就很简单了,调用上面封装好的接口就行。

4.4spark发送html

有时候没必要生成hdfs,计算结果适合报表展示的时候可以直接collect到内存中,然后构建一段html发送,上代码。

    val rdd = group_rdd.groupByKey(200)
      .map(x=>{
        val raw_uv = x._2.flatMap(e=>e._1).toSeq.distinct.size
        val raw_pv = x._2.map(e=>e._2).reduce(_+_)
        val cm_uv = x._2.flatMap(e=>e._3).toSeq.distinct.size
        val cm_pv = x._2.map(e=>e._4).reduce(_+_)
        IndexEntity(x._1._1,x._1._2,x._1._3,raw_uv,raw_pv,cm_uv,cm_pv)
      }).collect().sortBy(_.search_flag).sortBy(_.platform).sortBy(_.bd)

    //模板拼接
    val tbody:StringBuffer = new StringBuffer()
    rdd.foreach(entity=>{
      tbody.append(s"<tr><td>${entity.bd}</td><td>${entity.platform}</td><td>${entity.search_flag}</td>" +
        s"<td>${entity.raw_uv}</td><td>${entity.cm_uv}</td><td>${new DecimalFormat(".00").format((entity.cm_uv.toDouble/entity.raw_uv)*100)}%</td>" +
        s"<td>${entity.raw_pv}</td><td>${entity.cm_pv}</td><td>${new DecimalFormat(".00").format((entity.cm_pv.toDouble/entity.raw_pv)*100)}%</td></tr>")
    })
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容