python | pymysql模块

1、基本增删改查操作
2、python调用存储过程
3、多线程实现mysql存取操作

一、基础操作

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pymysql 

# 连接数据库
#conn = pymysql.connect('localhost', 'root', 'root')

# 也可以使用关键字参数
#conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root', db='', charset='utf8')

# 也可以使用字典进行连接参数的管理
config = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': '',
    'charset': 'utf8'
}
conn = pymysql.connect(**config)

# 如果使用事务引擎,可以设置自动提交事务,或者在每次操作完成后手动提交事务conn.commit()
#conn.autocommit(1)    # conn.autocommit(True) 

# 使用cursor()方法获取操作游标
cursor = conn.cursor()
# 因该模块底层其实是调用CAPI的,所以,需要先得到当前指向数据库的指针。

try:
    # 创建数据库
    DB_NAME = 'test'
    cursor.execute('DROP DATABASE IF EXISTS %s' %DB_NAME)
    cursor.execute('CREATE DATABASE IF NOT EXISTS %s' %DB_NAME)
    conn.select_db(DB_NAME)

     #创建表
    TABLE_NAME = 'user'
    cursor.execute('CREATE TABLE %s(id int primary key,name varchar(30))' %TABLE_NAME)

    # 插入单条数据
    sql = 'INSERT INTO user values("%d","%s")' %(1,"jack")
    
    # 批量插入数据
    values = []
    for i in range(3, 20):
        values.append((i,'kk'+str(i)))
    cursor.executemany('INSERT INTO user values(%s,%s)',values)
   
   # 查询数据条目
    count = cursor.execute('SELECT * FROM %s' %TABLE_NAME)
    print('total records: %d' %count)
    print('total records:', cursor.rowcount)

    # 获取表名信息
    desc = cursor.description
    print("%s %3s" % (desc[0][0], desc[1][0]))

    # 查询一条记录
    print 'fetch one record:'
    result = cursor.fetchone()
    print result
    print ('id: %s,name: %s' %(result[0],result[1]))

    # 查询多条记录
    print 'fetch five record:'
    results = cursor.fetchmany(5)
    for r in results:
        print (r)

    print('===============')

     # 查询所有记录
    # 重置游标位置,偏移量:大于0向后移动;小于0向前移动,mode默认是relative
    # relative:表示从当前所在的行开始移动; absolute:表示从第一行开始移动
    cursor.scroll(0,mode='absolute')
    results = cursor.fetchall()
    for r in results:
        print r
    print('--------------')
    cursor.scroll(-2)
    results = cursor.fetchall()
    for r in results:
        print r

    # 更新记录
    cursor.execute('UPDATE %s SET name = "%s" WHERE id = %s' %(TABLE_NAME,'Jack',1))



    # 如果没有设置自动提交事务,则这里需要手动提交一次
    conn.commit()
except:
    import traceback
    traceback.print_exc()
    # 发生错误时会滚
    conn.rollback()
finally:
    # 关闭游标连接
    cursor.close()
    # 关闭数据库连接
    conn.close()

每次都连接关闭很麻烦,使用上下文管理,简化连接过程

import pymysql
import contextlib
#定义上下文管理器,连接后自动关闭连接
@contextlib.contextmanager
def mysql(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1',charset='utf8'):
  conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset=charset)
  cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
  try:
    yield cursor
  finally:
    conn.commit()
    cursor.close()
    conn.close()
 
# 执行sql
with mysql() as cursor:
  print(cursor)
  row_count = cursor.execute("select * from tb7")
  row_1 = cursor.fetchone()
  print row_count, row_1

二、如何python调用 callproc 进行调用储存过程

1.创建完整的Mysql数据库连接
2.使用cursor()初始化数据库游标
3.使用游标来调用callproc函数 里面添加 需要传入的变量 例如 callproc(name,args) name="proc_user",args=['21',syh];
4.cursor可以传递出一系列的结果集,使用storeresult来获取一系列的iterator指向结果集
5.用fetchall方法获取结果

callproc 无法直接获得out和INOUT变量 ,但是变量存在server中,可以通过@_procname_n 来获取变量值,可以按照传入参数的位置获取,如第1个 SELECT @_procname_0

调用无参存储过程

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"

import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
#游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
#无参数存储过程
cursor.callproc('p2')    #等价于cursor.execute("call p2()")

row_1 = cursor.fetchone()
print row_1

conn.commit()
cursor.close()
conn.close()

调用有参存储过程

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"

import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

cursor.callproc('p1', args=(1, 22, 3, 4))
#获取执行完存储的参数,参数@开头
cursor.execute("select @p1,@_p1_1,@_p1_2,@_p1_3")   #{u'@_p1_1': 22, u'@p1': None, u'@_p1_2': 103, u'@_p1_3': 24}
row_1 = cursor.fetchone()
print row_1


conn.commit()
cursor.close()
conn.close()

三、多线程、多进程操作mysql

1、多线程连接:

起初使用threading.Thread模块,先建立一个MySQL连接,然后由多个线程来执行具体的SQL。但发现在执行的时候,不是报MySQL连接被关闭,就是出现其他异常错误。上网查询,是因为多个线程无法共享一个数据库连接,会出现不可预测的情况。

官方建议使用连接池模块,参照了他人的做法,创建线程连接池,一次性创建多个连接。

2、创建线程池的三种方法
1)过去:
使用threadpool模块,这是个python的第三方模块,支持python2和python3。threadpool是一个比较老的模块了,现在虽然还有一些人在用,但已经不再是主流了,关于python多线程,现在已经开始步入未来(future模块)了具体使用方式如下:

import threadpool
import time

def sayhello (a):
    print("hello: "+a)
    time.sleep(2)

def main():
    global result
    seed=["a","b","c"]
    start=time.time()
    task_pool=threadpool.ThreadPool(5)
    requests=threadpool.makeRequests(sayhello,seed)
    for req in requests:
        task_pool.putRequest(req)
    task_pool.wait()
    end=time.time()
    time_m = end-start
    print("time: "+str(time_m))
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))

if __name__ == '__main__':
    main()

运行结果如下:


2)未来:
使用concurrent.futures模块,这个模块是python3中自带的模块,但是,python2.7以上版本也可以安装使用,具体使用方式如下:

from concurrent.futures import ThreadPoolExecutor
import time

def sayhello(a):
    print("hello: "+a)
    time.sleep(2)

def main():
    seed=["a","b","c"]
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))
    start2=time.time()
    with ThreadPoolExecutor(3) as executor:
        for each in seed:
            executor.submit(sayhello,each)
    end2=time.time()
    print("time2: "+str(end2-start2))
    start3=time.time()
    with ThreadPoolExecutor(3) as executor1:
        executor1.map(sayhello,seed)
    end3=time.time()
    print("time3: "+str(end3-start3))

if __name__ == '__main__':
    main()

运行结果如下:


【注意】
concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:

  • map可以保证输出的顺序, submit输出的顺序是乱的
  • 如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()
  • submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。

3)现在?
这里要考虑一个问题,以上两种线程池的实现都是封装好的,任务只能在线程池初始化的时候添加一次,那么,假设我现在有这样一个需求,需要在线程池运行时,再往里面添加新的任务(注意,是新任务,不是新线程),那么要怎么办?
其实有两种方式:

  • 重写threadpool或者future的函数:这个方法需要阅读源模块的源码,必须搞清楚源模块线程池的实现机制才能正确的根据自己的需要重写其中的方法。

  • 自己构建一个线程池:这个方法就需要对线程池的有一个清晰的了解了,附上构建的一个线程池实例:

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import Queue
import hashlib
import logging
from utils.progress import PrintProgress
from utils.save import SaveToSqlite


class ThreadPool(object):
    def __init__(self, thread_num, args):

        self.args = args
        self.work_queue = Queue.Queue()
        self.save_queue = Queue.Queue()
        self.threads = []
        self.running = 0
        self.failure = 0
        self.success = 0
        self.tasks = {}
        self.thread_name = threading.current_thread().getName()
        self.__init_thread_pool(thread_num)

    # 线程池初始化
    def __init_thread_pool(self, thread_num):
        # 下载线程
        for i in range(thread_num):
            self.threads.append(WorkThread(self))
        # 打印进度信息线程
        self.threads.append(PrintProgress(self))
        # 保存线程
        self.threads.append(SaveToSqlite(self, self.args.dbfile))

    # 添加下载任务
    def add_task(self, func, url, deep):
        # 记录任务,判断是否已经下载过
        url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest()
        if not url_hash in self.tasks:
            self.tasks[url_hash] = url
            self.work_queue.put((func, url, deep))
            logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8")))

    # 获取下载任务
    def get_task(self):
        # 从队列里取元素,如果block=True,则一直阻塞到有可用元素为止。
        task = self.work_queue.get(block=False)

        return task

    def task_done(self):
        # 表示队列中的某个元素已经执行完毕。
        self.work_queue.task_done()

    # 开始任务
    def start_task(self):
        for item in self.threads:
            item.start()

        logging.debug("Work start")

    def increase_success(self):
        self.success += 1

    def increase_failure(self):
        self.failure += 1

    def increase_running(self):
        self.running += 1

    def decrease_running(self):
        self.running -= 1

    def get_running(self):
        return self.running

    # 打印执行信息
    def get_progress_info(self):
        progress_info = {}
        progress_info['work_queue_number'] = self.work_queue.qsize()
        progress_info['tasks_number'] = len(self.tasks)
        progress_info['save_queue_number'] = self.save_queue.qsize()
        progress_info['success'] = self.success
        progress_info['failure'] = self.failure

        return progress_info

    def add_save_task(self, url, html):
        self.save_queue.put((url, html))

    def get_save_task(self):
        save_task = self.save_queue.get(block=False)

        return save_task

    def wait_all_complete(self):
        for item in self.threads:
            if item.isAlive():
                # join函数的意义,只有当前执行join函数的线程结束,程序才能接着执行下去
                item.join()

# WorkThread 继承自threading.Thread
class WorkThread(threading.Thread):
    # 这里的thread_pool就是上面的ThreadPool类
    def __init__(self, thread_pool):
        threading.Thread.__init__(self)
        self.thread_pool = thread_pool

    #定义线程功能方法,即,当thread_1,...,thread_n,调用start()之后,执行的操作。
    def run(self):
        print (threading.current_thread().getName())
        while True:
            try:
                # get_task()获取从工作队列里获取当前正在下载的线程,格式为func,url,deep
                do, url, deep = self.thread_pool.get_task()
                self.thread_pool.increase_running()

                # 判断deep,是否获取新的链接
                flag_get_new_link = True
                if deep >= self.thread_pool.args.deep:
                    flag_get_new_link = False

                # 此处do为工作队列传过来的func,返回值为一个页面内容和这个页面上所有的新链接
                html, new_link = do(url, self.thread_pool.args, flag_get_new_link)

                if html == '':
                    self.thread_pool.increase_failure()
                else:
                    self.thread_pool.increase_success()
                    # html添加到待保存队列
                    self.thread_pool.add_save_task(url, html)

                # 添加新任务,即,将新页面上的不重复的链接加入工作队列。
                if new_link:
                    for url in new_link:
                        self.thread_pool.add_task(do, url, deep + 1)

                self.thread_pool.decrease_running()
                # self.thread_pool.task_done()
            except Queue.Empty:
                if self.thread_pool.get_running() <= 0:
                    break
            except Exception, e:
                self.thread_pool.decrease_running()
                # print str(e)
                break

3、多进程连接
多进程(multiprocessing模块),具体使用看这里multiprocessing模块的使用
可以和MySQL建立多个连接并发执行SQL。对比执行耗时,整体性能比单个进程快,但其中单个SQL的执行效率,多进程没有单进程执行的快。
demo:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import MySQLdb
from multiprocessing import Process,Pool

class mysqlopr():
    '''省略'''

pool = Pool(5)      ####设置进程数
for i in range(10):
    pool.apply_async(func=run_sql_func, args=(i,))      ####异步执行
    #pool.apply(func=run_sql_func, args=(arg,))                 ####同步执行,官方不建议使用,python3.+版本已无该方法
pool.close()
pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容

  • MYSQL 基础知识 1 MySQL数据库概要 2 简单MySQL环境 3 数据的存储和获取 4 MySQL基本操...
    Kingtester阅读 7,811评论 5 116
  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 3,537评论 0 5
  • 一、Python简介和环境搭建以及pip的安装 4课时实验课主要内容 【Python简介】: Python 是一个...
    _小老虎_阅读 5,744评论 0 10
  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 5,307评论 0 9
  • 小时候一起玩的我们 你总照顾着我 有人欺负我 你就赶紧跑去打他 后来你就成了我的守护神 那时的我们什么都不懂 只是...
    Jay1972阅读 485评论 4 3