ORM项目

ORM项目

复习

字符串.join方法

print(','.join(['dddsd','sddsfdsf','sdfddsdsfdfsdf'])) # dddsd,sddsfdsf,sdfddsdsfdfsdf
print(','.join('sddsffffffffffff')) # s,d,d,s,f,f,f,f,f,f,f,f,f,f,f,f

%s 扩展用法

print('%s is %s'%('lyy','sb'))
print('%(name)s is %(feature)s'%({'name':'lyy','feature':'SB'}))

_getattr_ 和 _setattr_

# 关于__getattr__ 和 __setattr__

1.__getattr__   >>> 比如说user.name,user是由一个类实例化的对象

# 当一个对象.name的时候,就会往user对象里找name这个属性;
# 如果有name这个属性,则直接返回在name这个对象下返回的name的值
# 如果没有name这个属性,则就会调用__getattr__执行下面的代码块

2. __setattr__   >>> 比如说 user.name='lyysb' ,user是由一个类实例化的对象
# 当一个对象  user.name='lyysb'的时候, 会调用这个方法
# __init__ 下可以通过对象直接进行.语法赋值就是在底层调用了__setattr__方法



class Foo(dict):
    def __init__(self,**kwargs):
        print(kwargs)
        super().__init__(**kwargs)

    def __setattr__(self, key, value):  # 当碰到Foo().a=1,就会触发这个方法
        print('run')
        self[key]=value

    def __getattr__(self, item):  
        print(666)
        try:
            return self[item]
        except TypeError:
            raise ('没有该属性')

            
>>> 定义 __setattr__方法和 __getattr__方法的目的
name=Foo()
name.aaa='123'  # 走的是__setattr__方法
print(name.aaa) # 走的时候__getattr__方法

################################################################################################
**kwargs的作用

class User(Foo):
    pass

# 添加数据
a=User(name='kkk',age=34,password=2134)  
print(a.name)

# 修改数据
a.name='sb'
print(a.name)

函数setattr和getattr

# hasattr(object, name)
判断一个对象里面是否有name属性或者name方法,返回BOOL值,有name特性返回True, 否则返回False。
需要注意的是name要用括号括起来

class test():
    name='lyysb'
    def run(self):
        return 'lyy is a super SB'

t=test()

print(hasattr(t,'name')) # True


# getattr(object, name[,default])
获取对象object的属性或者方法,如果存在打印出来;
如果不存在,打印出默认值,默认值可选。
需要注意的是,如果是返回的对象的方法,返回的是方法的内存地址,如果需要运行这个方法,
可以在后面添加一对括号。

class test():
    name='lyysb'
    def run(self):
        return 'lyy is a super SB'

t=test()

1.获取name属性,存在就打印出来。
print(getattr(t, "name")) # lyysb

2.获取run方法,存在就打印出方法的内存地址。
print(getattr(t,'run')) # <bound method test.run of <__main__.test object at 0x7fc729c5b7b8>>


3.获取run方法,后面加括号可以将这个方法运行。
print(getattr(t,'run')()) # lyy is a super SB

4.获取一个不存在的属性
# print(getattr(t,'sddsds'))  # AttributeError: 'test' object has no attribute 'sddsds'


5.设置默认值 若属性不存在,返回一个默认值
print(getattr(t,'sdsdds','lyysb')) # 返回默认值lyysb


# setattr(object, name, values)
给对象的属性赋值,若属性不存在,先创建再赋值。

>>> 为属相赋值,并没有返回值

1. 类的名称空间
print(test.__dict__)
#{'__module__': '__main__', 'name': 'lyysb', 'run': <function test.run at 0x7f97378b1d90>, '__dict__': <attribute '__dict__' of 'test' objects>, '__weakref__': <attribute '__weakref__' of 'test' objects>, '__doc__': None}

2. 对象的名称空间
print(t.__dict__) #{}

3. 添加属性
print(setattr(t, "age", "18")) #None

4. 添加属性后对象的名称空间
{'age': '18'}

# 总结
对象刚创建,名称空间是空的,但是对象可以调用类的名称空间
当执行setattr 时候,属性会存入对象的名称空间,类的名称空间不变

打散机制

# 字典打散通过**,打散后的数据是key=value的形式,所以只能作为参数传给函数

attrs=[{'id': 1, 'name': 'lxx', 'password': '123'}]
res=attrs[0]

def f(id,name,password):
    print(id)
    print(name)
    print(password)


f(id=1,name='lxx',password='123')

print(dict(**res))

元类

# 定义元类时候一定要返回 type.__new__(cls, name,bases,attrs)

class ModelsMetaclass(type):
    def __new__(cls, name,bases,attrs): # 实例化成类对象下面所具备的属性
        print(attrs)
        print(attrs['TABLE_NAME'])
        return type.__new__(cls, name,bases,attrs)
 
class User(metaclass=ModelsMetaclass):
    TABLE_NAME='USER'
    a=1
    b=2



print(type(User))  # b<class '__main__.ModelsMetaclass'>

# 如果不返回 type.__new__(cls, name,bases,attrs),则生成的类对象就是nonetype

class ModelsMetaclass(type):
    def __new__(cls, name,bases,attrs): # 实例化成类对象下面所具备的属性
        pass
    
class User(metaclass=ModelsMetaclass):
    TABLE_NAME='USER'
    a=1
    b=2



print(type(User))  # <class 'NoneType'>

元类的应用场景

取出由元类生成的类中的属性
class ModelsMetaclass(type):
    def __new__(cls, name, bases, attrs):  # 实例化成类对象下面所具备的属性
        print(attrs) # {'__module__': '__main__', '__qualname__': 'User', 'TABLE_NAME': 'USER', 'a': 1, 'b': 2}
        print(attrs['TABLE_NAME']) # USER
        return type.__new__(cls, name, bases, attrs)


class User(metaclass=ModelsMetaclass):
    TABLE_NAME = 'USER'
    a = 1
    b = 2
限定由元类产生的类对象的规则
class ModelsMetaclass(type):
    def __new__(cls, name,bases,attrs): # 实例化成类对象下面所具备的属性
        # print(name[0])
        if name[0].islower():
            raise Exception('类名称首字母应该大写')

        print('造类成功')
        # return type.__new__(cls, name,bases,attrs)

class ser(metaclass=ModelsMetaclass):
    TABLE_NAME='USER'
    a=1
    b=2

常用报错

TypeError: NoneType takes no arguments
class User(None):
    pass
    
# 任何类继承None 都会报错
# 如果不返回type.__new__(cls,name, bases, attrs),则就会造类失败

class ModelsMetaclass(type):
    def __new__(cls, name, bases, attrs):  # 实例化成类对象下面所具备的属性
        print(attrs)

class Model(metaclass=ModelsMetaclass):# 类型是None
    pass


class User(Model):  # 这个类继承了None
    TABLE_NAME = 'USER'
    a = 1
    b = 2

# 正确写法
class ModelsMetaclass(type):
    def __new__(cls, name, bases, attrs):  # 实例化成类对象下面所具备的属性
        print(attrs)
        return type.__new__(cls,name, bases, attrs)


class Model(metaclass=ModelsMetaclass):# 类型是None
    pass


class User(Model):  # 这个类继承了None
    TABLE_NAME = 'USER'
    a = 1
    b = 2

复习pymysql

# 表结构
mysql> select * from user;
+----+------+----------+
| id | name | password |
+----+------+----------+
|  1 | lxx  | 123      |
+----+------+----------+
1 row in set (0.00 sec)

# 当接收不到服务器的值的时候,最后返回空元祖
import pymysql


conn=pymysql.connect(
            host='database',
            port=3306,
            user='java',
            password='1234',
            charset='utf8',
            database='youku',
            autocommit=True
        )

cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)

cursor.execute("select * from user where id=132")

print(cursor.fetchall())  # ()


# 防止SQL注入(不要做字符串拼接,在cursor提交命令时候再传入参数)
import pymysql


conn = pymysql.connect(
            host='database',
            port=3306,
            user='java',
            password='1234',
            charset='utf8',
            database='youku',
            autocommit=True
        )

sql='select * from user where id=%s'

cursor=conn.cursor(pymysql.cursors.DictCursor)

cursor.execute(sql,args=1)

print(cursor.fetchall())

单例模式

# 每次只能生成一个对象

import pymysql

class Mysql:
    __instense=None
    def __init__(self):
        self.conn=pymysql.connect(
            host='database',
            port=3306,
            user='java',
            password='1234',
            charset='utf8',
            database='youku',
            autocommit=True
        )

        self.cursor=self.conn.cursor(cursor=pymysql.cursors.DictCursor)


    def close_db(self):
        self.cursor.close()
        self.conn.close()

    def select(self,sql,args):
        self.cursor.execute(sql,args)
        #(selct * from user where id=%s,1)
        '''
        sql=select * from user where name=%s and password=%s
        cursor.execute(sql,(lxx,123))
        
        '''
        rs=self.cursor.fetchall()
        print(rs,'sssssssssssssssssss')
        return rs

    def execute(self,sql,args):
        try:
            self.cursor.execute(sql,args)
            affected=self.cursor.rowcount
        except BaseException as e:
            print(e)
        return affected

    @classmethod
    def singleton(cls):
        if not cls.__instense:
            cls.__instense=cls() #对象不存在,生成一个对象

        return cls.__instense  

if __name__ == '__main__':
    ms=Mysql()
    # re=ms.select('select * from user where id =%s',1)
    # print(re)
    print(ms.select('select * from user where id=%s', 1))

数据库连接池

import pymysql

from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。
    ping=0,
    # ping MySQL服务端,检查是否服务可用。
    host='database',
    port=3306,
    user='java',
    password='1234',
    database='youku',
    charset='utf8'
)


def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则
    # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()

    print( '链接被拿走了', conn._con)
    print( '池子里目前有', POOL._idle_cache, '\r\n')

    cursor = conn.cursor()
    cursor.execute('select * from user')
    result = cursor.fetchall()
    print(result)
    conn.close()

if __name__ == '__main__':

    func()

对象关系映射

单例模式版本

  • fuckorm.py
from ORM项目 import Mysql_singleton


# 父类
class Field:
    def __init__(self, name, column_type, primary_key, default):
        self.name = name
        self.column_type = column_type
        self.primary_key = primary_key
        self.default = default

# # 定义varchar类
class StringField(Field):
    def __init__(self, name=None, column_type='varchar(200)', primary_key=False, default=None):
        super().__init__(name, column_type, primary_key, default)


# 定义int类
class IntegerField(Field):
    def __init__(self, name=None, column_type='int', primary_key=False, default=None):
        super().__init__(name, column_type, primary_key, default)


# 定义元类
# 让一个表对应到一个类里面,类里面需要有表名 哪个字段是主键
class ModelsMetaclass(type):
    def __new__(cls, name, bases, attrs):  # 实例化成类对象下面所具备的属性

        if name == 'Models':
            return type.__new__(cls, name, bases, attrs)

        print(name)
        print(attrs)
        table_name = attrs.get('table_name', None)  # 将表名存入类的名称空间里
        # print(attrs)

        if not table_name:
            table_name = name

        primary_key = None
        mappings = dict()  # 定义空字典,存的是列对象

        for k, v in attrs.items():
            if isinstance(v, Field):
                mappings[k] = v  # 将需要的字段都放进mappings这个字典里
                if v.primary_key:

                    # 找到主键
                    if primary_key:
                        raise TypeError('主键重复:%s' % k)
                    primary_key = v.name

        for k in mappings.keys():
            attrs.pop(k)  # 将需要的字段从attr这个名称空间中删除

        if not primary_key:
            raise TypeError('没有主键')

        attrs['table_name'] = table_name
        attrs['primary_key'] = primary_key
        attrs['mappings'] = mappings
        return type.__new__(cls, name, bases, attrs)


# 使得对象有. 语法
class Models(dict, metaclass=ModelsMetaclass):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def __setattr__(self, key, value):  # 字典对象
        self[key] = value

    def __getattr__(self, item):  # 如果.语法后面的名字不是类中的属性(不在kwargs这个名称空间里),则就会报错,所以需要异常处理
        try:
            return self[item]
        except TypeError:
            raise ('没有该属性')

    @classmethod
    def select_one(cls, **kwargs):
        # 只查一条
        print(kwargs)  # {'id': 1}
        key = list(kwargs.keys())[0]  # ['id']
        value = kwargs[key]
        sql = 'select * from %s where %s=?' % (cls.table_name, key)  # select * from user where id=?
        print(sql, '=========')
        sql = sql.replace('?', '%s')  # select * from user where id=%s
        print(sql, '+++++++++++++++++++')
        ms = Mysql_singleton.Mysql()
        re = ms.select(sql, value)  # 如果收不到就是空的元组
        print(re, '&&&&&&&&&&&&&&&&&&&&&&&&&&&')
        if re:  # [{'id': 1, 'name': 'lxx', 'password': '123'}]

            return cls(**re[0])  # User(id=1,name='lxx',password='123')
        else:  # user=User.select_one(id=2)
            return

    @classmethod
    def select_many(cls, **kwargs):
        ms = Mysql_singleton.Mysql()
        if kwargs:
            key = list(kwargs.keys())[0]
            value = kwargs[key]
            sql = 'select * from %s where %s=?' % (cls.table_name, key)
            sql = sql.replace('?', '%s')
            re = ms.select(sql, value)


        else:
            sql='select * from %s'(cls.table_name)
            re=ms.select(sql,None)

        if re:
            return cls(**re[0])
        else:
            return

    def update(self):
        ms=Mysql_singleton.Mysql()
        # update user set name='lyysb' and password='ssb'
        fields=[] # 存所有非主键字段
        args=[] # 存传入pysql的参数
        pr=None # 存主键
        for k,v in self.mappings.items():
            if v.primary_key:
                pr=getattr(self,v.name,None)
            else:
                fields.append(v.name+'=?')
                args.append(getattr(self,v.name,None))

        sql="update %s set %s where %s =%s"%(self.table_name,','.join(fields),self.primary_key,pr)

        sql=sql.replace('?','%s')
        print(sql)
        ms.execute(sql,args)


    def save(self):
        ms=Mysql_singleton.Mysql()
        # insert into user (name,password) values (?,?)
        field=[]
        values=[]
        args=[]
        for k,v in self.mappings.items():
           if not v.primary_key:
                field.append(v.name)
                values.append('?')
                args.append(getattr(self,v.name,None))
        sql='insert into %s(%s) values (%s)'%(self.table_name,','.join(field),','.join(values))
        sql=sql.replace('?','%s')
        ms.execute(sql,args)

class User(Models):
    table_name = 'user'
    id = IntegerField(name='id', primary_key=True, default=0)
    password = StringField('password')


if __name__ == '__main__':
    user = User.select_one(id=1)
    print(user.name)

# a = User(name='we', password='s231')
  • Mysql_singleton

    import pymysql
    
    
    class Mysql:
        __instense = None
    
        def __init__(self):
            self.conn = pymysql.connect(
                host='database',
                port=3306,
                user='java',
                password='1234',
                charset='utf8',
                database='youku',
                autocommit=True
            )
    
            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
    
        def close_db(self):
            self.cursor.close()
            self.conn.close()
    
        def select(self, sql, args):
            self.cursor.execute(sql, args)
            # (selct * from user where id=%s,1)
            '''
            sql=select * from user where name=%s and password=%s
            cursor.execute(sql,(lxx,123))
            
            '''
            rs = self.cursor.fetchall()
            print(rs, 'sssssssssssssssssss')
            return rs
    
        def execute(self, sql, args):
            try:
                self.cursor.execute(sql, args)
                affected = self.cursor.rowcount
            except BaseException as e:
                print(e)
            return affected
    
        @classmethod
        def singleton(cls):
            if not cls.__instense:
                cls.__instense = cls()
    
            return cls.__instense
    
    
    if __name__ == '__main__':
        ms = Mysql()
        # re=ms.select('select * from user where id =%s',1)
        # print(re)
        print(ms.select('select * from user where id=%s', 1))
    

连接池版本

  • fuckorm.py
from ORM项目.ORM_POOL import Mysql_p


# 父类
class Field:
    def __init__(self, name, column_type, primary_key, default):
        self.name = name
        self.column_type = column_type
        self.primary_key = primary_key
        self.default = default


#
# # 定义varchar类
class StringField(Field):
    def __init__(self, name=None, column_type='varchar(200)', primary_key=False, default=None):
        super().__init__(name, column_type, primary_key, default)


# 定义int类
class IntegerField(Field):
    def __init__(self, name=None, column_type='int', primary_key=False, default=None):
        super().__init__(name, column_type, primary_key, default)


# 定义元类
# 让一个表对应到一个类里面,类里面需要有表名 哪个字段是主键
class ModelsMetaclass(type):
    def __new__(cls, name, bases, attrs):  # 实例化成类对象下面所具备的属性

        if name == 'Models':
            return type.__new__(cls, name, bases, attrs)

        print(name)
        print(attrs)
        table_name = attrs.get('table_name', None)  # 将表名存入类的名称空间里
        # print(attrs)

        if not table_name:
            table_name = name

        primary_key = None
        mappings = dict()  # 定义空字典,存的是列对象

        for k, v in attrs.items():
            if isinstance(v, Field):
                mappings[k] = v  # 将需要的字段都放进mappings这个字典里
                if v.primary_key:

                    # 找到主键
                    if primary_key:
                        raise TypeError('主键重复:%s' % k)
                    primary_key = v.name

        for k in mappings.keys():
            attrs.pop(k)  # 将需要的字段从attr这个名称空间中删除

        if not primary_key:
            raise TypeError('没有主键')

        attrs['table_name'] = table_name
        attrs['primary_key'] = primary_key
        attrs['mappings'] = mappings
        return type.__new__(cls, name, bases, attrs)


# 使得对象有. 语法
class Models(dict, metaclass=ModelsMetaclass):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def __setattr__(self, key, value):  # 字典对象
        self[key] = value

    def __getattr__(self, item):  # 如果.语法后面的名字不是类中的属性(不在kwargs这个名称空间里),则就会报错,所以需要异常处理
        try:
            return self[item]
        except TypeError:
            raise ('没有该属性')

    @classmethod
    def select_one(cls, **kwargs):
        # 只查一条
        print(kwargs)  # {'id': 1}
        key = list(kwargs.keys())[0]  # ['id']
        value = kwargs[key]
        sql = 'select * from %s where %s=?' % (cls.table_name, key)  # select * from user where id=?
        print(sql, '=========')
        sql = sql.replace('?', '%s')  # select * from user where id=%s
        print(sql, '+++++++++++++++++++')
        ms = Mysql_p.Mysql()
        re = ms.select(sql, value)  # 如果收不到就是空的元组
        print(re, '&&&&&&&&&&&&&&&&&&&&&&&&&&&')
        if re:  # [{'id': 1, 'name': 'lxx', 'password': '123'}]

            return cls(**re[0])  # User(id=1,name='lxx',password='123')
        else:  # user=User.select_one(id=2)
            return

    @classmethod
    def select_many(cls, **kwargs):
        ms = Mysql_p.Mysql()
        if kwargs:
            key = list(kwargs.keys())[0]
            value = kwargs[key]
            sql = 'select * from %s where %s=?' % (cls.table_name, key)
            sql = sql.replace('?', '%s')
            re = ms.select(sql, value)


        else:
            sql='select * from %s'%(cls.table_name)
            re=ms.select(sql,None)

        if re:
            lis_obj = [cls(**r) for r in re]
            return lis_obj
        else:
            return

    def update(self):
        ms= Mysql_p.Mysql()
        # update user set name='lyysb' and password='ssb'
        fields=[] # 存所有非主键字段
        args=[] # 存传入pysql的参数
        pr=None # 存主键
        for k,v in self.mappings.items():
            if v.primary_key:
                pr=getattr(self,v.name,None)
            else:
                fields.append(v.name+'=?')
                args.append(getattr(self,v.name,None))

        sql="update %s set %s where %s =%s"%(self.table_name,','.join(fields),self.primary_key,pr)

        sql=sql.replace('?','%s')
        print(sql)
        ms.execute(sql,args)


    def save(self):
        ms= Mysql_p.Mysql()
        # insert into user (name,password) values (?,?)
        field=[]
        values=[]
        args=[]
        for k,v in self.mappings.items():
            if not v.primary_key:
                field.append(v.name)
                values.append('?')
                args.append(getattr(self,v.name,None))
        sql='insert into %s(%s) values (%s)'%(self.table_name,','.join(field),','.join(values))
        sql=sql.replace('?','%s')
        ms.execute(sql,args)


class User(Models):
    table_name = 'user'
    id = IntegerField(name='id', primary_key=True, default=0)
    password = StringField('password')

class Notice(Models):
    table_name='notice'
    id=IntegerField(name='id',primary_key=True)
    name=StringField('name')
    content=StringField('content')
    user_id=IntegerField('user_id')


if __name__ == '__main__':
    user = User.select_one(id=1)
    print(user.name)

# a = User(name='we', password='s231')

# 测试(select)

    # notice=Notice.select_one(id=1)
    # print(notice) #{'id': 1, 'name': '测试1', 'content': '内容1', 'user_id': 1, 'create_time': datetime.datetime(2019, 6, 16, 21, 29, 55)}
    # print(notice.content) # 内容1

# select_many
#     notice_list=Notice.select_many()
#     print(notice_list) #[{'id': 1, 'name': '测试1', 'content': '内容1', 'user_id': 1, 'create_time': datetime.datetime(2019, 6, 16, 21, 29, 55)}, {'id': 2, 'name': '测试2', 'content': '内容2', 'user_id': 1, 'create_time': datetime.datetime(2019, 6, 16, 21, 30, 4)}] sssssssssssssssssss

# update
# notice.name='我改了'
# notice.update()

# 插入数据
notice=Notice(name='123',content='.....',user_id=1)
notice.save()


  • Mysql_p

    import pymysql
    from ORM项目.ORM_POOL import mysql_pool
    
    
    class Mysql:
    
        def __init__(self):
            self.conn= mysql_pool.POOL.connection()
            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
    
        def close_db(self):
            self.cursor.close()
            self.conn.close()
    
        def select(self, sql, args):
            self.cursor.execute(sql, args)
            # (selct * from user where id=%s,1)
            '''
            sql=select * from user where name=%s and password=%s
            cursor.execute(sql,(lxx,123))
            
            '''
            rs = self.cursor.fetchall()
            print(rs, 'sssssssssssssssssss')
            return rs
    
        def execute(self, sql, args):
            try:
                self.cursor.execute(sql, args)
                affected = self.cursor.rowcount
            except BaseException as e:
                print(e)
            return affected
    
    
    
    if __name__ == '__main__':
        ms = Mysql()
        # re=ms.select('select * from user where id =%s',1)
        # print(re)
        print(ms.select('select * from user where id=%s', 1))
    
    • mysql_pool
    import pymysql
    
    from DBUtils.PooledDB import PooledDB
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。
        ping=0,
        # ping MySQL服务端,检查是否服务可用。
        host='database',
        port=3306,
        user='java',
        password='1234',
        database='youku',
        charset='utf8',
        autocommit=True
    )
    
    
    def func():
        # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
        # 否则
        # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
        # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
        # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
        # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
        conn = POOL.connection()
    
        print( '链接被拿走了', conn._con)
        print( '池子里目前有', POOL._idle_cache, '\r\n')
    
        cursor = conn.cursor()
        cursor.execute('select * from user')
        result = cursor.fetchall()
        print(result)
        conn.close()
    
    if __name__ == '__main__':
    
        func()
    

sqlalchemy 模块实现对象关系映射

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint

engine = create_engine("mysql+pymysql://root:123@localhost/test", encoding='utf8')

Base = declarative_base()


class UserType(Base):
    __tablename__ = 'Usertype'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(32), nullable=False, server_default='')


class User(Base):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(32), nullable=True, server_default='')
    extra = Column(String(32), nullable=True, server_default='')
    type_id = Column(Integer, ForeignKey(UserType.id))


Session = sessionmaker(engine)
session = Session()


def create_tabler():
    Base.metadata.create_all(engine)


def drop_table():
    Base.metadata.drop_all(engine)


def insert_user():
    data = [
        User(name='lyy', extra='SB', type_id=5),
        User(name='lyy2', extra='SB2', type_id=3),
        User(name='lyy3', extra='SB3', type_id=1),
        User(name='lyy4', extra='SB4', type_id=3),
        User(name='lyy5', extra='SB5', type_id=4)

    ]

    session.add_all(data)
    session.commit()


def insert_usertype_one():
    '''
    增加一条数据
    :return:
    '''
    data = UserType(name='胡宏鹏SB')
    session.add(data)


def insert_usertype_many():
    '''
    增加多条数据
    :return:
    '''
    data = [
        UserType(name='胡大鹏SB'),
        UserType(name='胡二鹏SB'),
        UserType(name='胡三鹏SB'),
        UserType(name='胡四鹏SB'),
        UserType(name='胡五鹏SB')

    ]

    session.add_all(data)
    session.commit()


def select_user_many():
    res=session.query(User).all()
    for row in res:
        print(row.id,row.name,row.extra,row.type_id)

def select_all():
    '''
    查询所有记录
    :return:
    '''

    res = session.query(UserType).all()  # 列表中存放着对象,每一个对象指向表中的一条记录
    for row in res:
        print(row.id, row.name)


def select_first():
    '''
    查询第一条记录
    :return:
    '''
    res = session.query(UserType).first()
    print(res.id, res.name)


def select_choice():
    '''
    查询指定字段
    :return:
    '''
    res = session.query(UserType, UserType.name, UserType.id).all()
    '''
    UserType 返回一个对象指向一条记录
    UserType.name  取出name字段的值
    UserType.id  取出id字段的值
    最后每一条记录的这三个元素包含在一个元组里,所有的记录包含在一个大列表
    [(<__main__.UserType object at 0x7fb41933ef28>, '胡宏鹏SB', 1), 
    (<__main__.UserType object at 0x7fb41933ef98>, '胡大鹏SB', 2),
    (<__main__.UserType object at 0x7fb419361048>, '胡二鹏SB', 3), 
    (<__main__.UserType object at 0x7fb4193610b8>, '胡三鹏SB', 4), 
    (<__main__.UserType object at 0x7fb419361128>, '胡四鹏SB', 5), 
    (<__main__.UserType object at 0x7fb419361198>, '胡五鹏SB', 6)]
    
    '''

    for row in res:
        print(row[2], row[1], row[0])
    '''
    1 胡宏鹏SB <__main__.UserType object at 0x7f7ecc640e80>
    2 胡大鹏SB <__main__.UserType object at 0x7f7ecc640ef0>
    3 胡二鹏SB <__main__.UserType object at 0x7f7ecc640f60>
    4 胡三鹏SB <__main__.UserType object at 0x7f7ecc640fd0>
    5 胡四鹏SB <__main__.UserType object at 0x7f7ecc661080>
    6 胡五鹏SB <__main__.UserType object at 0x7f7ecc6610f0>
    '''


def select_get():
    '''
    查询指定id的数据
    :return:
    '''
    res = session.query(UserType).get(3)
    print(res.id, res.name, res)  # 3 胡二鹏SB <__main__.UserType object at 0x7fe336c34da0>


def select_where():
    # res1=session.query(UserType).filter(UserType.name=='胡宏鹏SB').all() # [<__main__.UserType object at 0x7fa60fa32da0>]
    # print(res1[0].id,res1[0].name,res1) # 1 胡宏鹏SB [<__main__.UserType object at 0x7f1a7c17ee10>]
    #
    # res2=session.query(UserType).filter(UserType.id>3).all()
    # for row in res2:
    #     print(row.id,row.name,row)

    '''
    /usr/bin/python3.6 /home/java/Desktop/pycharm_project/ORM_POOL/SQLAlchemy的操作/第一遍/execute.py
    4 胡三鹏SB <__main__.UserType object at 0x7f5ff32c0d30>
    5 胡四鹏SB <__main__.UserType object at 0x7f5ff32c0e10>
    6 胡五鹏SB <__main__.UserType object at 0x7f5ff32c0e80>
    4 胡三鹏SB <__main__.UserType object at 0x7f5ff32c03c8>
    5 胡四鹏SB <__main__.UserType object at 0x7f5ff32c0f60>
    6 胡五鹏SB <__main__.UserType object at 0x7f5ff32c0fd0>
    '''

    test = session.query(UserType).filter(UserType.id > 1, UserType.name == '胡大鹏SB')
    # SELECT `Usertype`.id AS `Usertype_id`, `Usertype`.name AS `Usertype_name` FROM `Usertype`
    # WHERE `Usertype`.id > %(id_1)s AND `Usertype`.name = %(name_1)s
    res3 = test.all()

    print(res3[0].id, res3[0].name, res3)  # 2 胡大鹏SB [<__main__.UserType object at 0x7fae5ca7bf60>]


def select_between():
    '''
    查询某一条件区间的记录,一般是id
    :return:
    '''
    res = session.query(UserType).filter(UserType.id.between(1, 4)).all()
    for row in res:
        print(row.id, row.name, row)

    '''
    1 胡宏鹏SB <__main__.UserType object at 0x7ff2d4f82dd8>
    2 胡大鹏SB <__main__.UserType object at 0x7ff2d4f82ef0>
    3 胡二鹏SB <__main__.UserType object at 0x7ff2d4f82f60>
    4 胡三鹏SB <__main__.UserType object at 0x7ff2d4f82fd0>
    '''


def select_in():
    res = session.query(UserType).filter(UserType.name.in_(['胡二鹏SB', '胡大鹏SB', '胡三鹏SB'])).all()
    for row in res:
        print(row.id, row.name, row)

    '''
    2 胡大鹏SB <__main__.UserType object at 0x7f292e0abf28>
    3 胡二鹏SB <__main__.UserType object at 0x7f292e0d6048>
    4 胡三鹏SB <__main__.UserType object at 0x7f292e0d60b8>
    '''


def select_not_in():
    res = session.query(UserType).filter(UserType.name.notin_(['胡二鹏SB', '胡大鹏SB', '胡三鹏SB'])).all()
    for row in res:
        print(row.id, row.name, row)

    '''
    1 胡宏鹏SB <__main__.UserType object at 0x7f0500aecf60>
    5 胡四鹏SB <__main__.UserType object at 0x7f0500b16080>
    6 胡五鹏SB <__main__.UserType object at 0x7f0500b160f0>
    '''


def select_sym():  # 通配符查询
    ret = session.query(UserType).filter(UserType.name.like('%二%')).all()
    for row in ret:
        print(row.name)  # 胡二鹏SB


def select_limit():
    ret = session.query(UserType)[0:2]  # 切片操作
    for row in ret:
        print(row.name)


def select_sort():
    # ret=session.query(User).order_by(User.name.desc()).all()
    # for row in ret:
    #     print(row.name)
    '''
    lyy5
    lyy4
    lyy3
    lyy2
    lyy
    '''

    ret2=session.query(User).order_by(User.name.desc(),User.id.asc()).all()
    for row in ret2:
        print(row.name)

    '''
    lyy5
    lyy4
    lyy3
    lyy2
    lyy
    '''


def select_group():
    from sqlalchemy.sql import func
    res=session.query(User.type_id,func.max(User.id),func.min(User.id)).group_by(User.type_id).all()
    print(res) # [(1, 3, 3), (3, 4, 2), (4, 5, 5), (5, 1, 1)]

    res2=session.query(User.name,func.max(User.id),func.min(User.id)).group_by(User.type_id).having(func.min(User.id>2)).all()
    print(res2) # [('lyy3', 3, 3), ('lyy5', 5, 5)]


session.close()
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容