init.sql
CREATE TABLE department
(
id int NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT '主键id',
name varchar(32) NOT NULL DEFAULT '' COMMENT '部门名字',
parent_id bigint NOT NULL DEFAULT 0 COMMENT '部门父节点',
delete_flag tinyint NOT NULL DEFAULT 0 COMMENT '删除标识',
modify_time int UNSIGNED NOT NULL DEFAULT 0 COMMENT '修改时间',
create_time int UNSIGNED NOT NULL DEFAULT 0 COMMENT '创建时间'
)
ENGINE innodb
CHARSET utf8mb4 COMMENT '部门列表';
Python代码实现
"""
实现从建表sql语句到 SqlAlchemy Table的映射
"""
import re
import sqlalchemy as sa
TYPE_RELATION = {
'varchar': sa.String,
'int': sa.Integer,
'smallint': sa.SmallInteger,
'tinyint': sa.SmallInteger,
'bigint': sa.BigInteger,
'text': sa.TEXT
}
def extract_pure_field(sql_file):
"""去除多余空格,空行, 注释 配置 等"""
content = ''
with open(sql_file, mode='r') as fileObj:
for line, data in enumerate(fileObj):
line_text = data.strip()
if re.match('^--.*|^set .*', line_text) is None:
# 多个空格变为一个空格
line_text_after = re.sub(' +', ' ', line_text)
content += line_text_after
return content
def parse_table_sql(sql_file):
tables_dict = {}
content = extract_pure_field(sql_file)
content = content.replace('\r', '').replace('\n', '')
tables = [table for table in content.split(';') if table.lower().strip().startswith('create')]
for table_sql in tables:
table_name = re.findall('table(.*?)\(', table_sql.lower())[0].strip()
fields = re.findall('(\(.*\))', table_sql)[0].split(',')
fields_list = []
for field in fields:
field_sp = field.strip().split(' ')
field_name = field_sp[0].strip()
if len(field_sp) < 2:
continue
if field_name.lower() == 'unique' or field_name.lower() == 'key':
continue
field_type = field_sp[1].split('(')[0].strip().lower()
comment = ''
primary_key = False
for idx, sp in enumerate(field_sp):
sp_lower = sp.lower().strip()
if sp_lower == 'comment':
comment = field_sp[idx + 1]
elif sp_lower == 'primary':
primary_key = True
fields_list.append(
{
'name': field_name,
'type': field_type,
'comment': comment,
'primary_key': primary_key
}
)
tables_dict[table_name] = {
'fields': fields_list,
'table_sql': table_sql
}
return tables_dict
class ModelMetaclass(type):
"""元类控制Model属性读写
"""
def __getattr__(cls, item):
if item.endswith('sa') and hasattr(cls, '__tables__'):
return getattr(cls, '__tables__')[item]
def __setattr__(self, key, value):
raise Exception('class Model not allow to be set attr')
class Model(metaclass=ModelMetaclass):
__tables__ = {}
__metadata__ = sa.MetaData()
@classmethod
def append(cls, table_sa: sa.Table):
cls.__tables__[str(table_sa.name) + '_sa'] = table_sa
if __name__ == '__main__':
file = './init.sql'
tables_dict = parse_table_sql(file)
for name, table_info in tables_dict.items():
columns = []
columns_list = table_info['fields']
for c in columns_list:
column = sa.Column(c['name'], TYPE_RELATION[c['type']], primary_key=c['primary_key'], comment=c['comment'])
columns.append(column)
table = sa.Table(name, Model.__metadata__, *columns)
table.sql = table_info['table_sql']
Model.append(table)
print(type(Model.department_sa))
print(Model.department_sa.sql)
out:
<class 'sqlalchemy.sql.schema.Table'>
CREATE TABLE department(id int NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT '主键id',name varchar(32) NOT NULL DEFAULT '' COMMENT '部门名字',parent_id bigint NOT NULL DEFAULT 0 COMMENT '部门父节点',delete_flag tinyint NOT NULL DEFAULT 0 COMMENT '删除标识',modify_time int UNSIGNED NOT NULL DEFAULT 0 COMMENT '修改时间',create_time int UNSIGNED NOT NULL DEFAULT 0 COMMENT '创建时间')ENGINE innodbCHARSET utf8mb4 COMMENT '部门列表'