Python通过ssh链接Mysql定时发送邮件V2

不积跬步,无以至千里;不积小流,无以成江海

之前写过一篇python自动发邮件的文章,这段时间又进行了改进,来更新一下。
主要更新内容有:
1)通过pandas连接MySQL数据库
2)多sql语句查询
3)不同收件人,发送不同内容
4)pandas生成日期列表
5)增加错误日志信息

接下来一一介绍,实际业务当中,很少有一个sql语句可以得出结果的,一般都需要一些临时表,那么我们怎么来实现呢,在python中利用pandas就比较容易了。之前利用了mysqldb的cursor,这次用一个更擅长处理数据的pandas,用这个工具实际处理起来更容易、方便一些。

# 我们使用pandas的read_sql函数,
# 将具体的sql语句传入,就可以将结果保存为DataFrame
def dbconnect_ssh_dq(ssh_host,ssh_port,ssh_password,ssh_user,db_host,db_name,db_port,db_user,db_passwd):
    with SSHTunnelForwarder(
            (ssh_host, ssh_port),
            ssh_password = ssh_password,
            #ssh_pkey=keyfile,
            ssh_username = ssh_user,
            remote_bind_address = (db_host, db_port)
    ) as server:
        db = MySQLdb.connect(
            host='127.0.0.1',
            port=server.local_bind_port,
            user=db_user,
            passwd=db_passwd,
            db=db_name,
            charset="utf8")

        df = pd.read_sql('''SELECT
date_y,
uid,
sum(money) amt
FROM
table
GROUP BY date_y DESC,uid;''', con = db)

        db.close()
        return df

2)对于多sql语句查询,也可以利用类似的方法,我这里由于将脚本都放入了txt中,就通过一个循环,将语句依次读入,生成临时变量,然后再返回

# 这段也动态生成了十几个变量
def dbconnect_ssh(ssh_host,ssh_port,ssh_password,ssh_user,db_host,db_name,db_port,db_user,db_passwd,province):
    with SSHTunnelForwarder(
            (ssh_host, ssh_port),
            ssh_password = ssh_password,
            #ssh_pkey=keyfile,
            ssh_username = ssh_user,
            remote_bind_address = (db_host, db_port)
    ) as server:
        db = MySQLdb.connect(
            host='127.0.0.1',
            port=server.local_bind_port,
            user=db_user,
            passwd=db_passwd,
            db=db_name,
            charset="utf8")

        for i in range(1, 13):
            sql = ''
            fd = open('/home/data/datacenter/sql_' + str(i) + '.txt')
            lines = fd.readlines()
            for line in lines:
                line = line.strip('\n')
                sql += line + ' '
            sql = sql.replace(':prov', province)
            print '1st',i,sql
            #locals()['sql_' + str(i)] = sql
            #print 'var sql',i,locals()['sql_' + str(i)]
            locals()['df' + str(i)] = pd.read_sql(sql, con = db)
            #print locals()['df1']
            #print 'df done',locals()['df' + str(i)]
            #df += locals()['df' + str(i)]
        db.close()

        return locals()['df1'],locals()['df2'],locals()['df3'],locals()['df4'],locals()['df5'],locals()['df6'],locals()['df7'],locals()['df8'],locals()['df9'],locals()['df10'],locals()['df11'],locals()['df12']

3)不同的收件人,发送不同的内容,我这边是通过一个Excel文件来记录不同的收件人,发送什么内容的,其实发送的内容是有规律的,就是某个人获取某个省份的数据,我只要在excel里面记录好收件人和省份的对应关系就好了,接下来就是读取excel的列作为参数传入发送邮件的函数就好了。

# 我这里写了一个函数用来返回省份、地址、收件人、抄送人等
# 接下来就可以作为参数出入发送邮件的函数
def email_to_person(filename):
    wb = xlrd.open_workbook(filename)
    ws = wb.sheet_by_name('table_message')
    prov = []
    email_addr = []
    prov_name = []
    prov_no = []
    email_cc = []
    for rownum in range(ws.nrows):
        for colnum in range(ws.ncols):
            if colnum == 0:
                prov_name.append(ws.cell(rownum,colnum).value)
            elif colnum == 1:
                email_addr.append(ws.cell(rownum,colnum).value)
            elif colnum == 2:
                prov.append(ws.cell(rownum,colnum).value)
            elif colnum == 3:
                prov_no.append(ws.cell(rownum,colnum).value)
            else:
                email_cc.append(ws.cell(rownum,colnum).value)
    return prov,email_addr,prov_no,email_cc

4)pandas生成日期列表,当某一天没有数据的时候,我们不希望这一天的数据缺失,我们想这一天的数据都是为0.为了解决这个问题,我想到了弄一个日历表,这样数据和日历表关联,每日的数据都有了,如果这天没有交易,就用0去填补。其实pandas在处理时间、日期方面是很强大的,我看了一下文档,着实震惊了,连自定义的日历、工作日函数都有,太强大了。言归正传,这里弄一个七天的日期列表就好了:

# 是不是很强大啊
def datelist(beginDate, endDate):
    # '20160601' or datetime
    date_l=[x.strftime('%Y%m%d') for x in list(pd.date_range(start=beginDate, end=endDate))]
    return pd.DataFrame(date_l, columns = ['date_y'])

5)增加错误日志信息,这个需求logging和traceback模块

# 其中level用来定义日志的级别,我这边跟踪错误日志信息,就设置了error
logging.basicConfig(level = logging.ERROR,
                format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                datefmt = '%a, %d %b %Y %H:%M:%S',
                filename = '/home/data/datacenter/mylog.log',
                filemode = 'w')

    #定义一个StreamHandler,将INFO级别或更高的日志信息打印到标准错误,并将其添加到当前的日志处理对象#
    console = logging.StreamHandler()
    console.setLevel(logging.ERROR)
    formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
    console.setFormatter(formatter)
    logging.getLogger('').addHandler(console)

# 然后通过try except将错误信息记录在mylog.log文件
try:
    pass
except:
        s = traceback.format_exc()
        logging.error(s)

下面是完整代码:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import MySQLdb
from sshtunnel import SSHTunnelForwarder
import MySQLdb.cursors
import xlwt
import os

import sys
import time,datetime
from smtplib import SMTP
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
import datetime
import os.path
import mimetypes
from email.MIMEBase import MIMEBase
from email import Encoders
import pandas as pd 
import numpy as np
import pickle
import xlrd,xlwt
import logging,traceback

reload(sys)
sys.setdefaultencoding('utf8')


def dbconnect_ssh(ssh_host,ssh_port,ssh_password,ssh_user,db_host,db_name,db_port,db_user,db_passwd,province):
    with SSHTunnelForwarder(
            (ssh_host, ssh_port),
            ssh_password = ssh_password,
            #ssh_pkey=keyfile,
            ssh_username = ssh_user,
            remote_bind_address = (db_host, db_port)
    ) as server:
        db = MySQLdb.connect(
            host='127.0.0.1',
            port=server.local_bind_port,
            user=db_user,
            passwd=db_passwd,
            db=db_name,
            charset="utf8")

        for i in range(1, 13):
            sql = ''
            fd = open('/home/data/datacenter/sql_' + str(i) + '.txt')
            lines = fd.readlines()
            for line in lines:
                line = line.strip('\n')
                sql += line + ' '
            sql = sql.replace(':prov', province)
            print '1st',i,sql
            #locals()['sql_' + str(i)] = sql
            #print 'var sql',i,locals()['sql_' + str(i)]
            locals()['df' + str(i)] = pd.read_sql(sql, con = db)
            #print locals()['df1']
            #print 'df done',locals()['df' + str(i)]
            #df += locals()['df' + str(i)]
        db.close()

        return locals()['df1'],locals()['df2'],locals()['df3'],locals()['df4'],locals()['df5'],locals()['df6'],locals()['df7'],locals()['df8'],locals()['df9'],locals()['df10'],locals()['df11'],locals()['df12']

def dbconnect_ssh_dq(ssh_host,ssh_port,ssh_password,ssh_user,db_host,db_name,db_port,db_user,db_passwd):
    with SSHTunnelForwarder(
            (ssh_host, ssh_port),
            ssh_password = ssh_password,
            #ssh_pkey=keyfile,
            ssh_username = ssh_user,
            remote_bind_address = (db_host, db_port)
    ) as server:
        db = MySQLdb.connect(
            host='127.0.0.1',
            port=server.local_bind_port,
            user=db_user,
            passwd=db_passwd,
            db=db_name,
            charset="utf8")

        df = pd.read_sql('''SELECT
date_y,
uid,
sum(money) amt
FROM
table
GROUP BY date_y DESC,uid;''', con = db)

        db.close()
        return df

def send_mail(config):
    print 'Sending Mail...'
    message = MIMEMultipart()
    message["Accept-Charset"] = "ISO-8859-1,utf-8"
    message['From'] = 'name@great.com'
    #print 'to',config['to']
    message['To'] =','.join(config['to'])
    #print 'message_to',message['To']
    message['CC'] = ','.join(config['cc'])
    message['Subject'] = config['subject']
    message['Date'] = time.ctime(time.time())
    #message['Reply-To'] = 'name@great.com'
    message['X-Priority'] = '3'
    message['X-MSMail-Priority'] = 'Normal'
    if config['text']:
        text = config['text']
        message.attach(text)
    smtp = SMTP(config['server'], config['port'])

    username = 'name@great.com'
    smtp.login(username, 'password')

    smtp.sendmail(username, config['to'] + config['cc'], message.as_string())
    print 'Send Mail OK'
    print time.strftime("%Y%m%d %H:%M:%S", time.localtime())
    #print 'to',message['To']
    #print 'cc',message['CC']
    smtp.close()
    time.sleep(1)

def send_mail_to_test(context,curr_time,email,prov_n,email_c):
    #print '$$$$$',email
    send_mail({
        #'to': ["name@great.com"],
        #'cc': ['name@great.com'],
        'to': email.encode('utf8').split(','),
        'cc': email_c.encode('utf8').split(','),
        'server': 'smtp.qiye.163.com',
        'port': 25,
        'subject': '【上海数据部】' + prov_n + '日交易汇报'+curr_time,
        'username': 'name@great.com',
        'password': 'password',
        'text': context}
    )

def message_from_excel(filename,email,prov_n,email_c):
    #print 'html', email
    wb = xlrd.open_workbook(filename)
    #ws = wb.sheet_by_name('table_message')
    title = [u'日交易数据',u'交易明细']
    html_content = "<html>" + "<style>table {border-collapse: collapse;}table, td, th {border: 1px solid #B0E2FF;}body {background-color:#FFFFFF;}</style>" + "<meta charset='utf-8'>" + "<p>Dear all,</p><p>邮件为" + prov_n + "日交易数据,已更新至" + datetime.datetime.now().strftime('%Y-%m-%d') +",请查阅,谢谢!</p>"
    content = ""
    for s in range(2):
        content += "<table border='1' cellspacing='0' cellpadding='10'> <caption>" + prov_n + title[s] + "</caption><tbody>"
        ws = wb.sheets()[s]
        for rownum in range(ws.nrows):
            content += "<tr>"
            for colnum in range(ws.ncols):
                #print int(ws.cell(rownum,colnum).value)
                content += "<td> {0} </td>".format(ws.cell(rownum,colnum).value)
            content += "</tr>"
        content += "</tbody></table>"
        content += "<br>"
        
    html_body = "<body>" + content + '<p>说明:</p><p>1、<font color="red">say something! </font></p><p>2、如有需求变更,请联系name@great.com</p>' + "</body><br></br>"
    html_content += html_body
    #html_content += '<br><p>以上数据为系统自动发送,如有疑问请联系name@great.com</p></br>'
    html_content += "</html>"
    context = MIMEText(html_content,_subtype='html',_charset='utf-8')
    #print html_content
    send_mail_to_test(context,curr_time,email,prov_n,email_c)

def email_to_person(filename):
    wb = xlrd.open_workbook(filename)
    ws = wb.sheet_by_name('table_message')
    prov = []
    email_addr = []
    prov_name = []
    prov_no = []
    email_cc = []
    for rownum in range(ws.nrows):
        for colnum in range(ws.ncols):
            if colnum == 0:
                prov_name.append(ws.cell(rownum,colnum).value)
            elif colnum == 1:
                email_addr.append(ws.cell(rownum,colnum).value)
            elif colnum == 2:
                prov.append(ws.cell(rownum,colnum).value)
            elif colnum == 3:
                prov_no.append(ws.cell(rownum,colnum).value)
            else:
                email_cc.append(ws.cell(rownum,colnum).value)
    return prov,email_addr,prov_no,email_cc

def datelist(beginDate, endDate):
    # '20160601' or datetime
    date_l=[x.strftime('%Y%m%d') for x in list(pd.date_range(start=beginDate, end=endDate))]
    return pd.DataFrame(date_l, columns = ['date_y'])
    
if __name__ == '__main__':
    curr_time = time.strftime("%Y%m%d %H:%M:%S", time.localtime())
    print curr_time
    ssh_host = "2.2.22.2"
    ssh_port = 22222
    ssh_password = "1111111111111111"
    ssh_user = "name"
    db_host = "5.5.5.55"
    db_name = "name"
    db_port = 3306
    db_user = "user_name"
    db_passwd = "333333333333333333"

    db_host_dq = "7.7.7.7"
    db_name_dq = "db_name"
    db_port_dq = 3306
    db_user_dq = "user_name"
    db_passwd_dq = "0000000000000000000"

    endDate = datetime.datetime.now()
    delta = datetime.timedelta(days = 1)
    endDate = endDate - delta
    delta = datetime.timedelta(days = 6)
    beginDate = endDate - delta
    endDate = endDate.strftime('%Y%m%d')
    beginDate = beginDate.strftime('%Y%m%d')
    print beginDate,endDate

    df_date = datelist(beginDate, endDate)
    print df_date

    Excelfile = '/home/data/datacenter/r.xls'
    email_prov_file = '/home/data/datacenter/email_prov.xls'

    logging.basicConfig(level = logging.ERROR,
                format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                datefmt = '%a, %d %b %Y %H:%M:%S',
                filename = '/home/data/datacenter/mylog.log',
                filemode = 'w')

    #定义一个StreamHandler,将INFO级别或更高的日志信息打印到标准错误,并将其添加到当前的日志处理对象#
    console = logging.StreamHandler()
    console.setLevel(logging.ERROR)
    formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
    console.setFormatter(formatter)
    logging.getLogger('').addHandler(console)
    
    prov = []
    email_addr = []
    prov_n = []
    email_cc = []
    prov, email_addr, prov_n,email_cc = email_to_person(email_prov_file)
    print prov_n
    try:
        for j in range(int(sys.argv[1]), int(sys.argv[2])):
            df1, df2, df3, df4, df5, df_user, df7, df8, df9, df10, df11, df12 = dbconnect_ssh(ssh_host,ssh_port,ssh_password,ssh_user,db_host,db_name,db_port,db_user,db_passwd,prov[j])
            print df1
            df_dq = dbconnect_ssh_dq(ssh_host,ssh_port,ssh_password,ssh_user,db_host_dq,db_name_dq,db_port_dq,db_user_dq,db_passwd_dq)
            #print df_dq
            df_dq = pd.merge(df_dq, df_user, how = 'inner', on = 'uid')
            df_dq = df_dq['amt'].groupby(df_dq['date_y']).sum().reset_index()
            print df_dq
            df_dq.columns = ['date_y', 'amt']
            print df_dq.columns
            df1 = pd.concat([df1, df_dq])
            print df1
            df1 = df1['amt'].groupby(df1['date_y']).sum().reset_index()
            df1 = df1.sort_values(by = ['date_y'], ascending = False)
            print df1
            df = pd.merge(df1, df2, how = 'outer', on = 'date_y')
            df = pd.merge(df, df3, how = 'outer', on = 'date_y')
            #print df.head()
            #df['total_1'] = df.apply(lambda x: x.sum(), axis=1)
            #df.head()
            df = pd.merge(df, df4, how = 'outer', on = 'date_y')
            df = pd.merge(df, df5, how = 'outer', on = 'date_y')
            df = df.fillna(0)
            print df
            #df.iloc[5,3] = 0
            #print df
          
            cols = list(df)
            #print cols
            
            df = df.ix[:, cols]
            df = df.sort_index(by = u'日期', ascending = False)
            df = df.set_index(u'日期')
            #print df.head()
            #print df[df.index.duplicated()]
            df_detail = pd.merge(df7, df8, how = 'outer', on = 'date_y')
            df_detail = pd.merge(df_detail, df_dq, how = 'outer', on = 'date_y')
            df_detail = pd.merge(df_detail, df9, how = 'outer', on = 'date_y')
            df_detail = pd.merge(df_detail, df10, how = 'outer', on = 'date_y')
            df_detail = pd.merge(df_detail, df11, how = 'outer', on = 'date_y')
            df_detail = pd.merge(df_detail, df12, how = 'outer', on = 'date_y')
            df_detail = pd.merge(df_date, df_detail, how = 'outer', on = 'date_y')
            df_detail = df_detail.fillna(0)       
            df_detail = df_detail.sort_index(by='date_y', ascending = False)
            
            out = pd.ExcelWriter(Excelfile)
            #df3.to_excel(out,sheet_name = 'df3')
            df = df[df.columns].astype('int')
            df.to_excel(out,sheet_name = 'df')
            df_detail.to_excel(out, sheet_name = 'df_detail')
            out.save()
            #print 'prov_n ',j,prov_n[j]
            message_from_excel(Excelfile,email_addr[j],prov_n[j],email_cc[j])
    except:
        s = traceback.format_exc()
        logging.error(s)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容