数据库数据迁移过程对于双写和读操作简化的一点思考

数据数据迁移其实主要就是垂直拆分和分库分表 垂直拆分和分库分表过程中主要数据库的操作就是双写和查询 我们会有开关来控制状态的转换,公司业务里orm主要是用mybatis

本文主要目的是为了减少代码的侵入性和迁移过程中数据库读写代码的可复用性,实际项目里单个表涉及的查询多达几十个,并且涉及到几十个文件的修改,为了减少迁移过程中对业务代码的修改,我文章下面会给一个样例(毕竟不能带上公司业务代码)。我目前在酷家乐工作中有遇到以下两个之前处理起来代码比较繁琐的地方

1、一个是大量的业务表从老的数据进行迁移(这里可能容易遇到自增主键切换写顺序一致性问题),

2、还有一个问题就是部分大表的扩容(实际上相当于垂直拆分 然后分库) 本质上都是一回事情

进行迁移的话会需要采用dts或者数据库日志binlog同步存量数据的过程,这里根据自己公司的技术栈来选择 存量数据写入数据库的时候带上主键

INSERT INTO tablename(field1,field2, field3, ...) 
VALUES(value1, value2, value3, ...) 
ON DUPLICATE KEY UPDATE 
field1=value1,field2=value2, field3=value3, ...;

写增量数据

1、通常做法是进行双写 写入老表的时候同时写入新表,如果需要的话可以加上手动事务管理,毕竟是跨库,不过实际应用场景中写入失败的情况很少,根据实际情况来决定。
2、双写逻辑会麻烦点的地方就是插入顺序切换时候的自增主键一致性问题。

自增主键插入的问题

(1) 老表如果是单表自增的,新表是单表自增的话

找流量低的时候切换读写顺序,如果业务需要高度一致性,加分布式锁,需要额外的开关来决定是否走有锁的逻辑,顺序切换以后,关闭那个控制是否走锁的逻辑的开关(如果条件允许 的话,各个服务器之间其实通过本地rocksDb写磁盘 实现一个分布式一致性算法比如于raft 也是可以的 实际上各个服务自己的集群就是一个小型的raft集群了) 其实这里也可以方法 (2)去解决,就是需要个过渡表来管理主键自增

(2) 老表如果是单表自增的,新表是分表的话

老表自增id 增加一个大区间比如原来是 id = 10^7 我们直接增加到 id = 2 * 10 ^ 7 新表设置自增id为 10^7 + delta( delta > 0 && delta < 10 ^ 4) 这个范围自己控制一下就好, 切换插入顺序以后主键不会冲突,也不会阻塞依赖业务方修改sql为rpc 迁移完成以后 新表主键改成 3 * 10 ^ 7, 这里只是大致数量, 实际区间大小由业务来决定
at

然后就是代码逻辑冗余问题了

这里其实稍微涉及到一点mybatis的架构,通常业务里面我们的mybatis的mapper对象本质上是MapperProxy这个类,套了层jdk动态代理而已,对于需要垂直拆分数据迁移表相关的mapper,我这边直接自己实现了一个代理,绕过MapperProxy,直接通过 SqlSession 去执行,但是我还是会实现一个 mapper类的代理对象去替换掉业务代码里面用到的mapper对象,从而实现基本无侵入性, 迁移完了以后代码还是需要改一下包名啥的

改造如下 我这里以一个UserMapper为例

    @Primary
    @Bean
    public UserMapper delegateUserMapper(DbHandler dbHandler) {
        return (UserMapper) Proxy.newProxyInstance(UserMapper.class.getClassLoader(),
                new Class<?>[]{ UserMapper.class }, dbHandler);
    }

MapperHandler

public class MapperHandler {

    private final MapperMethod.SqlCommand command;
    private final MapperMethod.MethodSignature methodSignature;

    public MapperHandler(Class<?> mapperInterface, Method method, Configuration config) {
        this.command = new MapperMethod.SqlCommand(config,
                mapperInterface, method);
        this.methodSignature = new MapperMethod.MethodSignature(config,
                mapperInterface, method);

    }

    public MapperMethod.MethodSignature getMethodSignature() {
        return methodSignature;
    }

    public Object execute(SqlSession sqlSession, Object[] args) {
        return execute(sqlSession, args, methodSignature.convertArgsToSqlCommandParam(args));
    }

   public Object execute(SqlSession sqlSession, Object[] args, Object param) {
        Object result;
        switch (command.getType()) {
        case INSERT: {
            result = rowCountResult(sqlSession.insert(command.getName(), param));
            break;
        }
        case UPDATE: {
            result = rowCountResult(sqlSession.update(command.getName(), param));
            break;
        }
        case DELETE: {
            result = rowCountResult(sqlSession.delete(command.getName(), param));
            break;
        }
        case SELECT:
            if (methodSignature.returnsVoid() && methodSignature.hasResultHandler()) {
                executeWithResultHandler(sqlSession, args, param);
                result = null;
            } else if (methodSignature.returnsMany()) {
                result = executeForMany(sqlSession, args, param);
            } else if (methodSignature.returnsMap()) {
                result = executeForMap(sqlSession, args, param);
            } else if (methodSignature.returnsCursor()) {
                result = executeForCursor(sqlSession, args, param);
            } else {
                result = sqlSession.selectOne(command.getName(), param);
            }
            break;
        case FLUSH:
            result = sqlSession.flushStatements();
            break;
        default:
            throw new BindingException("Unknown execution method for: " + command.getName());
        }
        if (result == null && methodSignature.getReturnType().isPrimitive() &&
                !methodSignature.returnsVoid()) {
            throw new BindingException("Mapper method '" + command.getName()
                    + " attempted to return null from a method with a primitive return type (" +
                    methodSignature.getReturnType() + ").");
        }
        return result;
    }

    private Object rowCountResult(int rowCount) {
        final Object result;
        if (methodSignature.returnsVoid()) {
            result = null;
        } else if (Integer.class.equals(methodSignature.getReturnType()) || Integer.TYPE.equals(
                methodSignature.getReturnType())) {
            result = rowCount;
        } else if (Long.class.equals(methodSignature.getReturnType()) || Long.TYPE.equals(
                methodSignature.getReturnType())) {
            result = (long) rowCount;
        } else if (Boolean.class.equals(methodSignature.getReturnType()) || Boolean.TYPE.equals(
                methodSignature.getReturnType())) {
            result = rowCount > 0;
        } else {
            throw new BindingException(
                    "Mapper method '" + command.getName() + "' has an unsupported return type: " +
                            methodSignature.getReturnType());
        }
        return result;
    }

    private void executeWithResultHandler(SqlSession sqlSession, Object[] args, Object param) {
        MappedStatement ms = sqlSession.getConfiguration().getMappedStatement(command.getName());
        if (void.class.equals(ms.getResultMaps().get(0).getType())) {
            throw new BindingException("method " + command.getName()
                    + " needs either a @ResultMap annotation, a @ResultType annotation,"
                    +
                    " or a resultType attribute in XML so a ResultHandler can be used as a parameter.");
        }
        if (methodSignature.hasRowBounds()) {
            RowBounds rowBounds = methodSignature.extractRowBounds(args);
            sqlSession.select(command.getName(), param, rowBounds,
                    methodSignature.extractResultHandler(args));
        } else {
            sqlSession.select(command.getName(), param, methodSignature.extractResultHandler(args));
        }
    }

    private <E> Object executeForMany(SqlSession sqlSession, Object[] args, Object param) {
        List<E> result;
        if (methodSignature.hasRowBounds()) {
            RowBounds rowBounds = methodSignature.extractRowBounds(args);
            result = sqlSession.<E>selectList(command.getName(), param, rowBounds);
        } else {
            result = sqlSession.<E>selectList(command.getName(), param);
        }
        // issue #510 Collections & arrays support
        if (!methodSignature.getReturnType().isAssignableFrom(result.getClass())) {
            if (methodSignature.getReturnType().isArray()) {
                return convertToArray(result);
            } else {
                return convertToDeclaredCollection(sqlSession.getConfiguration(), result);
            }
        }
        return result;
    }

    private <T> Cursor<T> executeForCursor(SqlSession sqlSession, Object[] args, Object param) {
        Cursor<T> result;
        if (methodSignature.hasRowBounds()) {
            RowBounds rowBounds = methodSignature.extractRowBounds(args);
            result = sqlSession.<T>selectCursor(command.getName(), param, rowBounds);
        } else {
            result = sqlSession.<T>selectCursor(command.getName(), param);
        }
        return result;
    }

    private <E> Object convertToDeclaredCollection(Configuration config, List<E> list) {
        Object collection = config.getObjectFactory().create(methodSignature.getReturnType());
        MetaObject metaObject = config.newMetaObject(collection);
        metaObject.addAll(list);
        return collection;
    }

    @SuppressWarnings("unchecked")
    private <E> Object convertToArray(List<E> list) {
        Class<?> arrayComponentType = methodSignature.getReturnType().getComponentType();
        Object array = Array.newInstance(arrayComponentType, list.size());
        if (arrayComponentType.isPrimitive()) {
            for (int i = 0; i < list.size(); i++) {
                Array.set(array, i, list.get(i));
            }
            return array;
        } else {
            return list.toArray((E[]) array);
        }
    }

    private <K, V> Map<K, V> executeForMap(SqlSession sqlSession, Object[] args, Object param) {
        Map<K, V> result;
        if (methodSignature.hasRowBounds()) {
            RowBounds rowBounds = methodSignature.extractRowBounds(args);
            result = sqlSession.<K, V>selectMap(command.getName(), param,
                    methodSignature.getMapKey(),
                    rowBounds);
        } else {
            result = sqlSession.<K, V>selectMap(command.getName(), param,
                    methodSignature.getMapKey());
        }
        return result;
    }
}

DbHandler 整体架构如下

@Service
public class DbHandler implements InvocationHandler, BeanPostProcessor {

    private SqlSession mSrcSqlSession;

    private SqlSession mDestSqlSession;

    private ConcurrentHashMap<Method, MapperHandler> mMethodCache = new ConcurrentHashMap<>();

    @Autowired
    public void setProperties(
            @Qualifier("srcSqlSessionFactory") SqlSessionFactory srcSqlSessionFactory,
            @Qualifier("destSqlSessionFactory") SqlSessionFactory destSqlSessionFactory) {
    }


    @Override
    public Object invoke(final Object proxy, final Method method, final Object[] args)
            throws Throwable {
        /** 
         * 这里处理sql处理
         * 正常情况下不会有delete 这里进行异常判断 根据业务场景进行处理
         */
        return null;
    }

    /**
     * 这里如果是单表迁移 可以考虑整体加分布式锁,或者把主键自增的任务交给一个中间表
     * 移交完成以后 再由中间表移交给新表
     * @param args
     * @param oldHandler
     * @param newHandler
     * @return
     */
    private Object doInsert(Object[] args, MapperHandler oldHandler, MapperHandler newHandler) {
       
    }

    private Object doUpdate(Object[] args, MapperHandler oldHandler, MapperHandler newHandler) {
      
    }

    private Object doSelect(Object[] args, MapperHandler oldHandler, MapperHandler newHandler) {
        if (readNew()) {
            return newHandler.execute(mDestSqlSession, args);
        }
        return oldHandler.execute(mSrcSqlSession, args);
    }

    /**
     * 开关是否读新表
     * @return
     */
    private boolean readNew() {
        /**
         * TODO
         */
        return false;
    }

    /**
     * 开关写旧表
     * @return
     */
    private boolean writeOld() {
        /**
         * TODO
         */
        return true;
    }

    /**
     * 开关写新表
     * @return
     */
    private boolean writeNew() {
        /**
         * TODO
         */
        return true;
    }

    /**
     * 开关先插入新表
     * @return
     */
    private boolean insertNewFirst() {
        /**
         * TODO
         */
        return false;
    }

    private boolean partitionDb() {
        return false;
    }

    private int getSequenceId() {
        return 10000;
    }

    private String getStatementId(Method method) {
        return method.getDeclaringClass().getName() + "." + method.getName();
    }

    private MappedStatement getMappedStatement(Method method, String statementId,
           
    }


    private MapperHandler cachedMapperMethod(Method method, Class<?> clazz,
            Configuration configuration) {
     }
}

很多细节我这里暂时就先略去了,大家可以自己思考下怎么写,毕竟这个比较偏向业务,我这里 就是直接获取 sqlSession, 然后我们可以借助 MapperSignature这个内部类来完成mybatis的接下来的工作
下面这里给出一个 invoke和insert方法的简要实现 , 细节大家看下注释我用的是java8

   @Override
    public Object invoke(final Object proxy, final Method method, final Object[] args)
            throws Throwable {
        /**
         * {@link Object#hashCode()} {@link #equals(Object)} 这些方法不做处理
         * interface default实现不做处理
         */
        if (Object.class.equals(method.getDeclaringClass()) || method.isDefault()) {
            return method.invoke(this, args);
        }
        Configuration configuration = mSrcSqlSession.getConfiguration();
        String statementId = getStatementId(method);
        MappedStatement mappedStatement = getMappedStatement(method, statementId, configuration);
        SqlCommandType sqlCommandType = mappedStatement.getSqlCommandType();
        MapperHandler oldMapperHandler = cachedMapperMethod(method, OldUserMapper.class,
                mSrcSqlSession.getConfiguration());
        MapperHandler newMapperHandler = cachedMapperMethod(method, NewUserMapper.class,
                mDestSqlSession.getConfiguration());

        if (SqlCommandType.INSERT.equals(sqlCommandType)) {
            return doInsert(args, oldMapperHandler, newMapperHandler);
        } else if (SqlCommandType.UPDATE.equals(sqlCommandType)) {
            return doUpdate(args, oldMapperHandler, newMapperHandler);
        } else if (SqlCommandType.SELECT.equals(sqlCommandType)) {
            return doSelect(args, oldMapperHandler, newMapperHandler);
        }
        /**
         * 正常情况下不会有delete 这里进行异常判断 根据业务场景进行处理
         */
        return null;
    }

    /**
     * 这里如果是单表迁移 可以考虑整体加分布式锁,或者把主键自增的任务交给一个中间表
     * 移交完成以后 再由中间表移交给新表
     * @param args
     * @param oldHandler
     * @param newHandler
     * @return
     */
    private Object doInsert(Object[] args, MapperHandler oldHandler, MapperHandler newHandler) {
        Object newRet, oldRet;
        boolean enableOld = writeOld();
        boolean enableNew = writeNew();
        if (enableOld && enableNew) {
            if (insertNewFirst()) {
                /**
                 * 如果是分库分表 我们从sequence里取出id
                 */
                if (partitionDb()) {
                    if (args[0] instanceof User) {
                        int id = getSequenceId();
                        ((User) args[0]).setUserId(id);
                    }
                }
                newRet = newHandler.execute(mDestSqlSession, args);
                oldRet = oldHandler.execute(mSrcSqlSession, args);
                /**
                 * 这里可以做比较
                 */
                return newRet;
            } else 「
                oldRet = oldHandler.execute(mSrcSqlSession, args);
                newRet = newHandler.execute(mDestSqlSession, args);
                /**
                 * 这里可以做比较
                 */
                return oldRet;
            }
        } else if (enableNew) {
            return newHandler.execute(mDestSqlSession, args);
        }
        return oldHandler.execute(mSrcSqlSession, args);
    }

mybatis xml插入的时候 判断下userId是否为null 是null就自增否则直接插入

<insert id="addUser" parameterType="com.qunhe.instdeco.partition.data.User" useGeneratedKeys="true">
        INSERT INTO user
        <trim prefix="(" suffix=")" suffixOverrides=",">
            <if test="user.userId != null">
                user_id,
            </if>
            <if test="user.name != null">
                username,
            </if>
            <if test="user.age != null">
                age,
            </if>
        </trim>
        VALUES
        <trim prefix="(" suffix=")" suffixOverrides=",">
            <if test="user.userId != null">
                #{user.userId},
            </if>
            <if test="user.name != null">
                #{user.name},
            </if>
            <if test="user.age != null">
                #{user.age},
            </if>
        </trim>
    </insert>

这样一来的话 原有的业务代码里面基本不需要我们自行修改代码,我这里其实还省略了很多的细节,酷家乐业务里面查询的时候如果是分库分表的话,对于分表键批量查询其实多的时候可以采用 ElasticSearch来查,这里就需要判断 分表键的参数的数量 需要 methodSignature去把 Object[] args转换为 ParamMap 其实就是一个hashMap大家自己去看下mybatis这部分源码就知道了.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容