[toc]
一、如何增加新表
- 检查当前数据库脚本版本,在cinder\db\sqlalchemy\migrate_repo\versions目录下最后一个文件096_placeholder.py,版本是96。数据库有表migrate_version记录当前版本。
MariaDB [cinder]> select * from migrate_version;
+---------------+-----------------------------------------------------+---------+
| repository_id | repository_path | version |
+---------------+-----------------------------------------------------+---------+
| cinder | /opt/stack/cinder/cinder/db/sqlalchemy/migrate_repo | 96 |
+---------------+-----------------------------------------------------+---------+
- 在cinder\db\sqlalchemy\migrate_repo\versions目录下添加版本脚本文件,文件名为 版本编号任意名,如 097_add_wyue_test_table.py,内容如下:
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer
from sqlalchemy import MetaData, String, Table
def upgrade(migrate_engine):
"""Add attachment_specs table."""
import pdb
pdb.set_trace()
meta = MetaData()
meta.bind = migrate_engine
wyue_test = Table(
'wyue_test', meta,
Column('user_name', String(255)),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
# We use checkfirst argument because this table may already exist if the
# migration is performed on a system that was on a migration earlier than
# 063 when performing the upgrade.
# checkfirst=True 创建表前先检查表是否已经存在,存在则不创建s
wyue_test.create(checkfirst=True)
# 增加表字段
password = Column('password', String(length=36))
wyue_test.create_column(password)
注意:
- 不可设置相同版本号的文件,如092_placeholder.py和
092_ruijie_add_storage_tables.py,否则会提示:
ScriptError: You can only have one Python script per version, but you have: /opt/stack/cinder/cinder/db/sqlalchemy/migrate_repo/versions/092_placeholder.py and /opt/stack/cinder/cinder/db/sqlalchemy/migrate_repo/versions/092_ruijie_add_storage_tables.py - 版本号必须连续,不可跳跃。如 versions\096_placeholder.py
之后便是versions\196_placeholder.py,会提示TRACE cinder KeyError: <VerNum(97)>
- 命令行执行cinder-manage db sync
[ubuntu@localhost devstack]$ /usr/bin/cinder-manage db sync
****省略****
2017-06-16 15:57:05.399 INFO migrate.versioning.api [-] 96 -> 97...
2017-06-16 15:57:05.447 INFO migrate.versioning.api [-] done
可以看到版本从96升级到97。
检查数据库表 migrate_version 字段version 值自动变成了97
MariaDB [cinder]> select * from migrate_version;
+---------------+-----------------------------------------------------+---------+
| repository_id | repository_path | version |
+---------------+-----------------------------------------------------+---------+
| cinder | /opt/stack/cinder/cinder/db/sqlalchemy/migrate_repo | 97 |
+---------------+-----------------------------------------------------+---------+
二、cinder-manage db sync源码分析
源码
- cinder.cmd.manage.DbCommands.sync
# 指令可携带version参数,指定升级到什么版本。如:cinder-manage db sync 97
@args('version', nargs='?', default=None, help='Database version')
def sync(self, version=None):
"""Sync the database up to the most recent version."""
return db_migration.db_sync(version)
- cinder.db.migration.db_sync
def db_sync(version=None, init_version=INIT_VERSION, engine=None):
"""Migrate the database to `version` or the most recent version."""
# 得到数据库引擎,这里不讨论怎么get的
if engine is None:
engine = db_api.get_engine()
# engine = Engine(mysql+pymysql://root:***@127.0.0.1/cinder?charset=utf8)
# 获取当前数据库脚本版本号(分析见下文)
current_db_version = get_backend().db_version(engine,
MIGRATE_REPO_PATH,
init_version)
# TODO(e0ne): drop version validation when new oslo.db will be released
if version and int(version) < current_db_version:
msg = _('Database schema downgrade is not allowed.')
raise exception.InvalidInput(reason=msg)
# 同步数据表
return get_backend().db_sync(engine=engine,
abs_path=MIGRATE_REPO_PATH,
version=version,
init_version=init_version)
- 获取当前数据库脚本版本号, get_backend().db_version 调用顺序:
- oslo_db.sqlalchemy.migration.db_version:
def db_version(engine, abs_path, init_version):
repository = _find_migrate_repo(abs_path)
try:
return versioning_api.db_version(engine, repository)
- migrate.versioning.api.db_version:
@with_engine
def db_version(url, repository, **opts):
engine = opts.pop('engine')
schema = ControlledSchema(engine, repository)
return schema.version
- migrate.versioning.schema.ControlledSchema
def __init__(self, engine, repository):
***省略***
self.load()
def load(self):
"""Load controlled schema version info from DB"""
# cinder\db\sqlalchemy\migrate_repo\migrate.cfg里定义的version_table=migrate_version表
tname = self.repository.version_table
try:
if not hasattr(self, 'table') or self.table is None:
self.table = Table(tname, self.meta, autoload=True)
# 查询migrate_version表
result = self.engine.execute(self.table.select(
self.table.c.repository_id == str(self.repository.id)))
data = list(result)[0]
except:
cls, exc, tb = sys.exc_info()
six.reraise(exceptions.DatabaseNotControlledError,
exceptions.DatabaseNotControlledError(str(exc)), tb)
self.version = data['version']
return data
- 同步数据表get_backend().db_sync调用顺序:
- oslo_db.sqlalchemy.migration.db_sync
if version is None or version > current_version:
migration = versioning_api.upgrade(engine, repository, version)
- migrate.versioning.api.upgrade
return _migrate(url, repository, version, upgrade=True, err=err, **opts)
- migrate.versioning.api._migrate
@with_engine
def _migrate(url, repository, version, upgrade, err, **opts):
engine = opts.pop('engine') # Engine(mysql+pymysql://root:***@127.0.0.1/cinder?charset=utf8)
url = str(engine.url) # 'mysql+pymysql://root:secret@127.0.0.1/cinder?charset=utf8'
schema = ControlledSchema(engine, repository) # <class 'migrate.versioning.schema.ControlledSchema'>
version = _migrate_version(schema, version, upgrade, err) # None
changeset = schema.changeset(version)
for ver, change in changeset:
nextver = ver + changeset.step
log.info('%s -> %s... ', ver, nextver)
if opts.get('preview_sql'):
if isinstance(change, PythonScript):
log.info(change.preview_sql(url, changeset.step, **opts))
elif isinstance(change, SqlScript):
log.info(change.source())
elif opts.get('preview_py'):
if not isinstance(change, PythonScript):
raise exceptions.UsageError("Python source can be only displayed"
" for python migration files")
source_ver = max(ver, nextver)
module = schema.repository.version(source_ver).script().module
funcname = upgrade and "upgrade" or "downgrade"
func = getattr(module, funcname)
log.info(inspect.getsource(func))
else:
# 执行change包含的表结构变化
schema.runchange(ver, change, changeset.step)
log.info('done')
- migrate.versioning.schema.ControlledSchema#changeset
def changeset(self, version=None):
"""API to Changeset creation.
Uses self.version for start version and engine.name
to get database name.
"""
database = self.engine.name
start_ver = self.version
changeset = self.repository.changeset(database, start_ver, version)
return changeset
- migrate.versioning.repository.Repository#changeset
def changeset(self, database, start, end=None):
#database, start, end = ('mysql', <VerNum(97)>, None)
start = version.VerNum(start) # <VerNum(97)>
if end is None:
end = self.latest # 通过正则表达式re.compile(r'^(\d{3,}).*')过滤repository文件得到
else:
end = version.VerNum(end)
if start <= end:
step = 1
range_mod = 1
op = 'upgrade'
else:
step = -1
range_mod = 0
op = 'downgrade'
versions = range(int(start) + range_mod, int(end) + range_mod, step) # [97]
changes = [self.version(v).script(database, op) for v in versions] # [<migrate.versioning.script.py.PythonScript object at 0x52d9410>]
ret = Changeset(start, step=step, *changes)
return ret
- 执行change包含的表结构变化,schema.runchange(ver, change, changeset.step)
- migrate.versioning.schema.ControlledSchema#runchange
def runchange(self, ver, change, step):
# Run the change
change.run(self.engine, step)
# Update/refresh database version
self.update_repository_table(startver, endver)
self.load()
- migrate.versioning.script.py.py
def run(self, engine, step):
***省略***
funcname = base.operations[op] # 'upgrade'
script_func = self._func(funcname) # 给script_func赋值upgrade方法对象
# check for old way of using engine 检查script_func是否是方法类型
if not inspect.getargspec(script_func)[0]:
raise TypeError("upgrade/downgrade functions must accept engine"
" parameter (since version 0.5.4)")
script_func(engine) # 调用097_add_wyue_test_table.py的upgrade方法
总结
逻辑大概是:
- 得到当前的版本号,如95
- 计算当前版本号距离Repository下最新版本如97之间的差,即2
- 获取这两个版本脚本文件封装成change对象,组成集合changeset
- 遍历changeset,执行change里的upgrae方法
三、自定义版本控制脚本
为了和cinder的db版本控制分离开,我们可以自己定义一个。
(1)在自定义文件夹里,按照下图结构添加文件夹。migrate_repo可以直接复制 cinder\db\sqlalchemy\migrate_repo 。
versions用于存放不同版本的脚本文件。
(2)修改 storages\db\migrate_repo\migrate.cfg 配置文件
[db_settings]
# Used to identify which repository this database is versioned under.
# You can use the name of your project.
repository_id=storage_manage
# The name of the database table used to track the schema version.
# This name shouldn't already be used by your project.
# If this is changed once a database is under version control, you'll need to
# change the table name in each database too.
version_table=migrate_version
# When committing a change script, Migrate will attempt to generate the
# sql for all supported databases; normally, if one of them fails - probably
# because you don't have that database installed - it is ignored and the
# commit continues, perhaps ending successfully.
# Databases in this list MUST compile successfully during a commit, or the
# entire commit will fail. List the databases your application will actually
# be using to ensure your updates to that database work properly.
# This must be a list; example: ['postgres','sqlite']
required_dbs=[]
- version_table=migrate_version 指的是数据库里存放版本记录的表。不必修改
- repository_id=storage_manage 对应于migrate_version表里的repository_id字段,是版本控制的主要标识。修改成你指定的名字。
- required_dbs 可不填。
(3)修改 cinder\storages\db\migration.py
因为migrate_version表里必须要有一条repository记录才能执行同步脚本,且migrate_version.repository_path 对应migrate_repo 的目录,比如 /opt/stack/cinder/cinder/storages/db/migrate_repo。
这里我们增加一个函数add_migrate_version(),用来添加这个repository记录。然后再cinder.storages.db.migration.db_sync这个入口函数里调用。
def add_migrate_version():
"""
Check the table migrate_version whether has record about id 'storage_manage',
if has not it , insert a record in that table.
:return:
"""
session = db_api.get_session()
session.begin()
# 拿到migrate_repo的位置。由于migration.py和migrate_repo在同一级目录,所以我们直接通过os.path.realpath(__file__) 是获取当前文件(migration.py)所在的目录地址即可。
cwd_path = os.path.split(os.path.realpath(__file__))[0] + '/migrate_repo'
# 以repository_id和repository_path一起做条件查询MigrateVersion表是否有记录
count = len(session.query(MigrateVersion). \
filter(MigrateVersion.repository_id == 'storage_manage').
filter(MigrateVersion.repository_path == cwd_path).all())
# 没有记录,则插入记录
if count == 0:
version = MigrateVersion(repository_id='storage_manage',
repository_path=cwd_path, version=0)
session.add(version)
session.commit()
session.close()
(4)在 cinder\cmd\manage.py
里增加一个指令函数,调用我们自定义的 migration.db_sync
@args('version', nargs='?', default=None,
help='Database version')
def storage_db_sync(self, version=None):
"""Sync the database up to the most recent version. Only for cinder storage manage. """
from cinder.storages.db import migration
return migration.db_sync(version)
执行指令是: [ubuntu@localhost ~]$ /usr/bin/cinder-manage db storage_db_sync
运行结果:
<!--省略-->
INFO migrate.versioning.api [-] 1 -> 2...
INFO migrate.versioning.api [-] done
注意:
如果在docker环境,需要进入cinder容器里执行指令。指令不会打印任何消息,但是会输出日志到/var/log/kolla/cinder/cinder-manage.log。
# 进入cinder_api容器
docker exec -it -u root cinder_api bash
# 进入容器后,执行
cinder-manage db storage_db_sync
四、同步脚本常用方法
(1) 创建表
# -*- coding:utf-8 -*-
# 数据表脚本
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, UniqueConstraint
from sqlalchemy import MetaData, String, Table
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# 类型表
test_class = Table(
'test_class', meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('deleted', Boolean),
Column('id', Integer, primary_key=True, nullable=False),
Column('classname', String(36)),
# 定义mysql引擎、字符集
mysql_engine='InnoDB',
mysql_charset='utf8'
)
# 用户表,关联test_class表
test_user = Table(
'test_user', meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('deleted', Boolean),
Column('id', Integer, primary_key=True, nullable=False),
Column('username', String(36)),
Column('password', String(36)),
# 定义外键
Column('class', Integer, ForeignKey('test_class.id')),
# 联合索引
UniqueConstraint('username', 'password'),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
test_class.create(checkfirst=True)
test_user.create(checkfirst=True)
(2) 定义外键
定义外键有两种方法:
Column('class', Integer, ForeignKey('test_class.id')),
# 或者:
Column('class', Integer, ForeignKey(test_class.c.id)),
注意:
- 定义外键之前,一定要实例化关联的表对象,比如我们这里用表test_user关联'test_class.id',在定义test_user结构之前必须要先实例化test_class,否则会报错:
CRITICAL cinder [-] Unhandled error: DbMigrationError: Foreign key associated with column 'test_user.class' could not find table 'test_class' with which to generate a foreign key to target column 'id'
实例化test_class,因为这个表已经创建过了,我们不必再定义一遍表字段,用autoload=True自动导入即可:
test_class = Table('test_class', meta, autoload=True)
(3) 增加多字段联合索引
UniqueConstraint('username', 'password'),
(4) 修改表结构
test_user = Table('test_user', meta, autoload=True)
# 修改表字段类型、是否可空、默认值
test_user.c.password.alter(String(8), nullable=False, server_default='888888')
# 增加表字段
age = Column('age', Integer, nullable=True)
test_user.create_column(age)
# 移除表字段,Column 只要定义一个名字即可
age = Column('age')
test_user.drop_column(age)
# 修改表名
test_user.rename('test_user22222')
注意:
- sqlalchemy 里,给column定义默认值,不用'default=xx',而是用'server_default=xxx'
- create_column、drop_column、rename 这三个方法定义在 migrate.changeset.schema.ChangesetTable 类里。
- 如果migrate_engine用的是postgresql,会有些语法上的差别,比如不支持修改表字段类型从boolean到int型,就需要写成sql来执行。如下:
# 给表volume_storages字段usage增加默认值
volume_storages = Table('volume_storages', meta, autoload=True)
if migrate_engine.name == 'postgresql':
# NOTE: PostgreSQL can't cast Boolean to int automatically
sql = 'ALTER TABLE volume_storages ALTER COLUMN usage ' + \
'SET DEFAULT \'data\''
migrate_engine.execute(sql)
else:
# 注意sqlalchemy 里,给column定义默认值,不用'default=xx',而是用'server_default=xxx'
volume_storages.c.usage.alter(server_default='data')