Python采用并发查询mysql以及调用API灌数据 (二) - PyMysql操作数据库基本类封装

前情回顾

上一篇文章已经写好了查询数据库以及post请求API的实例,那么本章节我们来继续。

实战任务

本次因为服务架构重构,表优化、重构,带来的任务就是需要从原来的mysql数据库中,读取原表数据(部分存在多张关联查询)然后通过调用API的服务方式灌入新的数据库表中(包含mysql、mongodb)。

执行流程如下

那么根据流程所需要的功能,需要以下的实例进行支撑:
1.并发实例
2.查询数据实例
3.执行post请求实例

目标:构建实际数据场景 --> 抽象编写查询以及post的类方法 --> 编写整合处理方法

构建实际数据场景

可以看出,整个流程中对于mysql的操作是很重要的,为了方便行事。下面我对PyMysql操作数据库的基本类进行了封装处理。

编写数据库查询的工具类方法

实现代码如下:

# -*- coding: utf-8 -*-
import pymysql
import re

class MysqldbHelper(object): # 继承object类所有方法

    '''
    构造方法:
    config = {
        'host': '127.0.0.1',
        'port': 3306,
        'user': 'root',
        'passwd': 'root',
        'charset':'utf8',
        'cursorclass':pymysql.cursors.DictCursor
        }
    conn = pymysql.connect(**config)
    conn.autocommit(1)
    cursor = conn.cursor()
    '''
    def __init__(self , config):

        self.host = config['host']
        self.username = config['user']
        self.password = config['passwd']
        self.port = config['port']
        self.con = None
        self.cur = None

        try:
            self.con = pymysql.connect(**config)
            self.con.autocommit(1)
            # 所有的查询,都在连接 con 的一个模块 cursor 上面运行的
            self.cur = self.con.cursor()
        except:
            print "DataBase connect error,please check the db config."

    # 关闭数据库连接
    def close(self):
        if not  self.con:
            self.con.close()
        else:
            print "DataBase doesn't connect,close connectiong error;please check the db config."

    # 创建数据库
    def createDataBase(self,DB_NAME):
        # 创建数据库
        self.cur.execute('CREATE DATABASE IF NOT EXISTS %s DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci' % DB_NAME)
        self.con.select_db(DB_NAME)
        print 'creatDatabase:' + DB_NAME

    # 选择数据库
    def selectDataBase(self,DB_NAME):
        self.con.select_db(DB_NAME)

    # 获取数据库版本号
    def getVersion(self):
        self.cur.execute("SELECT VERSION()")
        return self.getOneData()
    
    # 获取上个查询的结果
    def getOneData(self):
        # 取得上个查询的结果,是单个结果
        data = self.cur.fetchone()
        return data

    # 创建数据库表
    def creatTable(self, tablename, attrdict, constraint):
        """创建数据库表

            args:
                tablename  :表名字
                attrdict   :属性键值对,{'book_name':'varchar(200) NOT NULL'...}
                constraint :主外键约束,PRIMARY KEY(`id`)
        """
        if self.isExistTable(tablename):
            print "%s is exit" % tablename
            return
        sql = ''
        sql_mid = '`id` bigint(11) NOT NULL AUTO_INCREMENT,'
        for attr,value in attrdict.items():
            sql_mid = sql_mid + '`'+attr + '`'+' '+ value+','
        sql = sql + 'CREATE TABLE IF NOT EXISTS %s ('%tablename
        sql = sql + sql_mid
        sql = sql + constraint
        sql = sql + ') ENGINE=InnoDB DEFAULT CHARSET=utf8'
        print 'creatTable:'+sql
        self.executeCommit(sql)

    def executeSql(self,sql=''):
        """执行sql语句,针对读操作返回结果集

            args:
                sql  :sql语句
        """
        try:
            self.cur.execute(sql)
            records = self.cur.fetchall()
            return records
        except pymysql.Error,e:
            error = 'MySQL execute failed! ERROR (%s): %s' %(e.args[0],e.args[1])
            print error

    def executeCommit(self,sql=''):
        """执行数据库sql语句,针对更新,删除,事务等操作失败时回滚

        """
        try:
            self.cur.execute(sql)
            self.con.commit()
        except pymysql.Error, e:
            self.con.rollback()
            error = 'MySQL execute failed! ERROR (%s): %s' %(e.args[0],e.args[1])
            print "error:", error
            return error

    def insert(self, tablename, params):
        """创建数据库表

            args:
                tablename  :表名字
                key        :属性键
                value      :属性值
        """
        key = []
        value = []
        for tmpkey, tmpvalue in params.items():
            key.append(tmpkey)
            if isinstance(tmpvalue, str):
                value.append("\'" + tmpvalue + "\'")
            else:
                value.append(tmpvalue)
        attrs_sql = '('+','.join(key)+')'
        values_sql = ' values('+','.join(value)+')'
        sql = 'insert into %s'%tablename
        sql = sql + attrs_sql + values_sql
        print '_insert:'+sql
        self.executeCommit(sql)

    def select(self, tablename, cond_dict='', order='', fields='*'):
        """查询数据

            args:
                tablename  :表名字
                cond_dict  :查询条件
                order      :排序条件

            example:
                print mydb.select(table)
                print mydb.select(table, fields=["name"])
                print mydb.select(table, fields=["name", "age"])
                print mydb.select(table, fields=["age", "name"])
        """
        consql = ' '
        if cond_dict!='':
            for k, v in cond_dict.items():
                consql = consql+'`'+k +'`'+ '=' + '"'+v + '"' + ' and'
        consql = consql + ' 1=1 '
        if fields == "*":
            sql = 'select * from %s where ' % tablename
        else:
            if isinstance(fields, list):
                fields = ",".join(fields)
                sql = 'select %s from %s where ' % (fields, tablename)
            else:
                print "fields input error, please input list fields."
        sql = sql + consql + order
        print 'select:' + sql
        return self.executeSql(sql)

    def insertMany(self,table, attrs, values):
        """插入多条数据

            args:
                tablename  :表名字
                attrs        :属性键
                values      :属性值

            example:
                table='test_mysqldb'
                key = ["id" ,"name", "age"]
                value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [104 ,"liuqiao3", "28"]]
                mydb.insertMany(table, key, value)
        """
        values_sql = ['%s' for v in attrs]
        attrs_sql = '('+','.join(attrs)+')'
        values_sql = ' values('+','.join(values_sql)+')'
        sql = 'insert into %s'% table
        sql = sql + attrs_sql + values_sql
        print 'insertMany:'+sql
        try:
            print sql
            for i in range(0,len(values),20000):
                    self.cur.executemany(sql,values[i:i+20000])
                    self.con.commit()
        except pymysql.Error,e:
            self.con.rollback()
            error = 'insertMany executemany failed! ERROR (%s): %s' %(e.args[0],e.args[1])
            print error

    def delete(self, tablename, cond_dict):
        """删除数据

            args:
                tablename  :表名字
                cond_dict  :删除条件字典

            example:
                params = {"name" : "caixinglong", "age" : "38"}
                mydb.delete(table, params)

        """
        consql = ' '
        if cond_dict!='':
            for k, v in cond_dict.items():
                if isinstance(v, str):
                    v = "\'" + v + "\'"
                consql = consql + tablename + "." + k + '=' + v + ' and '
        consql = consql + ' 1=1 '
        sql = "DELETE FROM %s where%s" % (tablename, consql)
        print sql
        return self.executeCommit(sql)

    def update(self, tablename, attrs_dict, cond_dict):
        """更新数据

            args:
                tablename  :表名字
                attrs_dict  :更新属性键值对字典
                cond_dict  :更新条件字典

            example:
                params = {"name" : "caixinglong", "age" : "38"}
                cond_dict = {"name" : "liuqiao", "age" : "18"}
                mydb.update(table, params, cond_dict)

        """
        attrs_list = []
        consql = ' '
        for tmpkey, tmpvalue in attrs_dict.items():
            attrs_list.append("`" + tmpkey + "`" + "=" +"\'" + tmpvalue + "\'")
        attrs_sql = ",".join(attrs_list)
        print "attrs_sql:", attrs_sql
        if cond_dict!='':
            for k, v in cond_dict.items():
                if isinstance(v, str):
                    v = "\'" + v + "\'"
                consql = consql + "`" + tablename +"`." + "`" + k + "`" + '=' + v + ' and '
        consql = consql + ' 1=1 '
        sql = "UPDATE %s SET %s where%s" % (tablename, attrs_sql, consql)
        print sql
        return self.executeCommit(sql)

    def dropTable(self, tablename):
        """删除数据库表

            args:
                tablename  :表名字
        """
        sql = "DROP TABLE  %s" % tablename
        self.executeCommit(sql)

    def deleteTable(self, tablename):
        """清空数据库表

            args:
                tablename  :表名字
        """
        sql = "DELETE FROM %s" % tablename
        print "sql=",sql
        self.executeCommit(sql)

    def isExistTable(self, tablename):
        """判断数据表是否存在

            args:
                tablename  :表名字

            Return:
                存在返回True,不存在返回False
        """
        sql = "select * from %s" % tablename
        result = self.executeCommit(sql)
        if result is None:
            return True
        else:
            if re.search("doesn't exist", result):
                return False
            else:
                return True

if __name__ == "__main__":

    # 定义数据库访问参数
    config = {
        'host': '你的mysql服务器IP地址',
        'port': 3361,
        'user': 'root',
        'passwd': '你的mysql服务器root密码',
        'charset': 'utf8',
        'cursorclass': pymysql.cursors.DictCursor
    }

    # 初始化打开数据库连接
    mydb = MysqldbHelper(config)

    # 打印数据库版本
    print mydb.getVersion()

    # 创建数据库
    DB_NAME = 'test_db'
    # mydb.createDataBase(DB_NAME)

    # 选择数据库
    print "========= 选择数据库%s ===========" % DB_NAME
    mydb.selectDataBase(DB_NAME)

    #创建表
    TABLE_NAME = 'test_user'
    print "========= 选择数据表%s ===========" % TABLE_NAME
    # CREATE TABLE %s(id int(11) primary key,name varchar(30))' %TABLE_NAME
    attrdict = {'name':'varchar(30) NOT NULL'}
    constraint = "PRIMARY KEY(`id`)"
    mydb.creatTable(TABLE_NAME,attrdict,constraint)

    # 插入纪录
    print "========= 单条数据插入 ==========="
    params = {}
    for i in range(5):
        params.update({"name":"testuser"+str(i)}) # 生成字典数据,循环插入
        print params
        mydb.insert(TABLE_NAME, params)
        print

    # 批量插入数据
    print "========= 多条数据同时插入 ==========="
    insert_values = []
    for i in range(5):
        # values.append((i,"testuser"+str(i)))
        insert_values.append([u"测试用户"+str(i)]) # 插入中文数据
    print insert_values
    insert_attrs = ["name"]
    mydb.insertMany(TABLE_NAME,insert_attrs, insert_values)

    # 数据查询
    print "========= 数据查询 ==========="
    print mydb.select(TABLE_NAME, fields=["id", "name"])
    print mydb.select(TABLE_NAME, cond_dict = {'name':'测试用户2'},fields=["id", "name"])
    print mydb.select(TABLE_NAME, cond_dict = {'name':'测试用户2'},fields=["id", "name"],order="order by id desc")

    # 删除数据
    print "========= 删除数据 ==========="
    delete_params = {"name": "测试用户2"}
    mydb.delete(TABLE_NAME, delete_params)

    # 更新数据
    print "========= 更新数据 ==========="
    update_params = {"name": "测试用户99"}   # 需要更新为什么值
    update_cond_dict = {"name": "测试用户3"}  # 更新执行的查询条件
    mydb.update(TABLE_NAME, update_params, update_cond_dict)

    # 删除表数据
    print "========= 删除表数据 ==========="
    mydb.deleteTable(TABLE_NAME)

    # 删除表
    print "========= 删除表     ==========="
    mydb.dropTable(TABLE_NAME)

测试执行结果如下:

D:\Python27\python.exe E:/PycharmProjects/DataProject/tools/MysqlTools.py
{u'VERSION()': u'5.7.9-log'}
========= 选择数据库test_db ===========
========= 选择数据表test_user ===========
test_user is exit
========= 单条数据插入 ===========
{'name': 'testuser0'}
_insert:insert into test_user(name) values('testuser0')

{'name': 'testuser1'}
_insert:insert into test_user(name) values('testuser1')

{'name': 'testuser2'}
_insert:insert into test_user(name) values('testuser2')

{'name': 'testuser3'}
_insert:insert into test_user(name) values('testuser3')

{'name': 'testuser4'}
_insert:insert into test_user(name) values('testuser4')

========= 多条数据同时插入 ===========
[[u'\u6d4b\u8bd5\u7528\u62370'], [u'\u6d4b\u8bd5\u7528\u62371'], [u'\u6d4b\u8bd5\u7528\u62372'], [u'\u6d4b\u8bd5\u7528\u62373'], [u'\u6d4b\u8bd5\u7528\u62374']]
insertMany:insert into test_user(name) values(%s)
insert into test_user(name) values(%s)
========= 数据查询 ===========
select:select id,name from test_user where   1=1 
[{u'id': 361, u'name': u'testuser0'}, {u'id': 362, u'name': u'testuser1'}, {u'id': 363, u'name': u'testuser2'}, {u'id': 364, u'name': u'testuser3'}, {u'id': 365, u'name': u'testuser4'}, {u'id': 366, u'name': u'\u6d4b\u8bd5\u7528\u62370'}, {u'id': 367, u'name': u'\u6d4b\u8bd5\u7528\u62371'}, {u'id': 368, u'name': u'\u6d4b\u8bd5\u7528\u62372'}, {u'id': 369, u'name': u'\u6d4b\u8bd5\u7528\u62373'}, {u'id': 370, u'name': u'\u6d4b\u8bd5\u7528\u62374'}]
select:select id,name from test_user where  `name`="测试用户2" and 1=1 
[{u'id': 368, u'name': u'\u6d4b\u8bd5\u7528\u62372'}]
select:select id,name from test_user where  `name`="测试用户2" and 1=1 order by id desc
[{u'id': 368, u'name': u'\u6d4b\u8bd5\u7528\u62372'}]
========= 删除数据 ===========
DELETE FROM test_user where test_user.name='测试用户2' and  1=1 
========= 更新数据 ===========
attrs_sql: `name`='测试用户99'
UPDATE test_user SET `name`='测试用户99' where `test_user`.`name`='测试用户3' and  1=1 
========= 删除表数据 ===========
sql= DELETE FROM test_user
========= 删除表     ===========

Process finished with exit code 0

好了,写完了基本操作类之后该怎么引用呢?
我们下一章节再见。


关注微信公众号,回复【资料】、Python、PHP、JAVA、web,则可获得Python、PHP、JAVA、前端等视频资料。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容

  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,922评论 2 89
  • MYSQL 基础知识 1 MySQL数据库概要 2 简单MySQL环境 3 数据的存储和获取 4 MySQL基本操...
    Kingtester阅读 7,809评论 5 116
  • 观其大纲 page 01 基础知识 1 MySQL数据库概要 2 简单MySQL环境 3 数据的存储和获取 4 M...
    周少言阅读 3,156评论 0 33
  • 分享一个我自己改变前的故事,每个女人都喜欢能够在家里有一个自己的地位,有一个爱自己的老公,能和婆婆处的来,但是往往...
    元小英阅读 190评论 17 0
  • 婆,身形削瘦,背微弯,脸上全是皱纹,小小的眼睛,无牙,真的是一颗牙都没有,头发总是盘成一个发髻。 婆总是穿着一件边...
    国宴阅读 264评论 11 8