数据库中间件 MyCAT 源码分析 —— SQL ON MongoDB

1. 概述

2. 主流程

3. 查询操作

4. 插入操作

5. 彩蛋

1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。

吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

总体流程,让你有个整体的认识

查询操作

插入操作

彩蛋,😈彩蛋,🙂彩蛋

建议你看过这两篇文章(非必须):

《MyCAT 源码分析 —— 【单库单表】插入》

《MyCAT 源码分析 —— 【单库单表】查询》

2. 主流程

MyCAT Server接收MySQL Client基于MySQL协议的请求,翻译SQLMongoDB操作发送给MongoDB Server。

MyCAT Server接收MongoDB Server返回的MongoDB数据,翻译成MySQL数据结果返回给MySQL Client。

这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。

Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。

MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。

是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

SELECTid,nameFROMuserWHEREname>''ORDERBY_idDESC;

看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

1、查询 MongoDB

// MongoSQLParser.javapublicMongoDataquery()throwsMongoSQLException{if(!(statementinstanceofSQLSelectStatement)) {//return null;thrownewIllegalArgumentException("not a query sql statement");  }  MongoData mongo =newMongoData();  DBCursor c =null;  SQLSelectStatement selectStmt = (SQLSelectStatement) statement;  SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();inticount =0;if(sqlSelectQueryinstanceofMySqlSelectQueryBlock) {      MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();      BasicDBObject fields =newBasicDBObject();// 显示(返回)的字段for(SQLSelectItem item : mysqlSelectQuery.getSelectList()) {//System.out.println(item.toString());if(!(item.getExpr()instanceofSQLAllColumnExpr)) {if(item.getExpr()instanceofSQLAggregateExpr) {                  SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();if(expr.getMethodName().equals("COUNT")) {// TODO 待读:count(*)icount =1;                      mongo.setField(getExprFieldName(expr), Types.BIGINT);                  }                  fields.put(getExprFieldName(expr),1);              }else{                  fields.put(getFieldName(item),1);              }          }      }// 表名SQLTableSource table = mysqlSelectQuery.getFrom();      DBCollection coll =this._db.getCollection(table.toString());      mongo.setTable(table.toString());// WHERESQLExpr expr = mysqlSelectQuery.getWhere();      DBObject query = parserWhere(expr);// GROUP BYSQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();      BasicDBObject gbkey =newBasicDBObject();if(groupby !=null) {for(SQLExpr gbexpr : groupby.getItems()) {if(gbexprinstanceofSQLIdentifierExpr) {                  String name = ((SQLIdentifierExpr) gbexpr).getName();                  gbkey.put(name, Integer.valueOf(1));              }          }          icount =2;      }// SKIP / LIMITintlimitoff =0;intlimitnum =0;if(mysqlSelectQuery.getLimit() !=null) {          limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());          limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());      }if(icount ==1) {// COUNT(*)mongo.setCount(coll.count(query));      }elseif(icount ==2) {// MapReduceBasicDBObject initial =newBasicDBObject();          initial.put("num",0);          String reduce ="function (obj, prev) { "+"  prev.num++}";          mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));      }else{if((limitoff >0) || (limitnum >0)) {              c = coll.find(query, fields).skip(limitoff).limit(limitnum);          }else{              c = coll.find(query, fields);          }// order bySQLOrderBy orderby = mysqlSelectQuery.getOrderBy();if(orderby !=null) {              BasicDBObject order =newBasicDBObject();for(inti =0; i < orderby.getItems().size(); i++) {                  SQLSelectOrderByItem orderitem = orderby.getItems().get(i);                  order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));              }              c.sort(order);// System.out.println(order);}      }      mongo.setCursor(c);  }returnmongo;}

2、查询条件

// MongoSQLParser.javaprivatevoidparserWhere(SQLExpr aexpr, BasicDBObject o){if(aexprinstanceofSQLBinaryOpExpr) {      SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;      SQLExpr exprL = expr.getLeft();if(!(exprLinstanceofSQLBinaryOpExpr)) {if(expr.getOperator().getName().equals("=")) {              o.put(exprL.toString(), getExpValue(expr.getRight()));          }else{              String op ="";if(expr.getOperator().getName().equals("<")) {                  op ="$lt";              }elseif(expr.getOperator().getName().equals("<=")) {                  op ="$lte";              }elseif(expr.getOperator().getName().equals(">")) {                  op ="$gt";              }elseif(expr.getOperator().getName().equals(">=")) {                  op ="$gte";              }elseif(expr.getOperator().getName().equals("!=")) {                  op ="$ne";              }elseif(expr.getOperator().getName().equals("<>")) {                  op ="$ne";              }              parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));          }      }else{if(expr.getOperator().getName().equals("AND")) {              parserWhere(exprL, o);              parserWhere(expr.getRight(), o);          }elseif(expr.getOperator().getName().equals("OR")) {              orWhere(exprL, expr.getRight(), o);          }else{thrownewRuntimeException("Can't identify the operation of  of where");          }      }  }}privatevoidorWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob){  BasicDBObject xo =newBasicDBObject();  BasicDBObject yo =newBasicDBObject();  parserWhere(exprL, xo);  parserWhere(exprR, yo);  ob.put("$or",newObject[]{xo, yo});}

3、解析 MongoDB 数据

// MongoResultSet.javapublicMongoResultSet(MongoData mongo, String schema)throwsSQLException{this._cursor = mongo.getCursor();this._schema = schema;this._table = mongo.getTable();this.isSum = mongo.getCount() >0;this._sum = mongo.getCount();this.isGroupBy = mongo.getType();if(this.isGroupBy) {      dblist = mongo.getGrouyBys();this.isSum =true;  }if(this._cursor !=null) {      select = _cursor.getKeysWanted().keySet().toArray(newString[0]);// 解析 fieldsif(this._cursor.hasNext()) {          _cur = _cursor.next();if(_cur !=null) {if(select.length ==0) {                  SetFields(_cur.keySet());              }              _row =1;          }      }// 设置 fields 类型if(select.length ==0) {          select =newString[]{"_id"};          SetFieldType(true);      }else{          SetFieldType(false);      }  }else{      SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};SetFieldType(mongo.getFields());  }}

当使用SELECT *查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

4、返回数据给 MySQL Client

// JDBCConnection.javaprivatevoidouputResultSet(ServerConnection sc, String sql)throwsSQLException{  ResultSet rs =null;  Statement stmt =null;try{      stmt = con.createStatement();      rs = stmt.executeQuery(sql);// headerList fieldPks =newLinkedList<>();      ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs,this.isSpark);intcolunmCount = fieldPks.size();      ByteBuffer byteBuf = sc.allocate();      ResultSetHeaderPacket headerPkg =newResultSetHeaderPacket();      headerPkg.fieldCount = fieldPks.size();      headerPkg.packetId = ++packetId;      byteBuf = headerPkg.write(byteBuf, sc,true);      byteBuf.flip();byte[] header =newbyte[byteBuf.limit()];      byteBuf.get(header);      byteBuf.clear();      List fields =newArrayList(fieldPks.size());for(FieldPacket curField : fieldPks) {          curField.packetId = ++packetId;          byteBuf = curField.write(byteBuf, sc,false);          byteBuf.flip();byte[] field =newbyte[byteBuf.limit()];          byteBuf.get(field);          byteBuf.clear();          fields.add(field);      }// header eofEOFPacket eofPckg =newEOFPacket();      eofPckg.packetId = ++packetId;      byteBuf = eofPckg.write(byteBuf, sc,false);      byteBuf.flip();byte[] eof =newbyte[byteBuf.limit()];      byteBuf.get(eof);      byteBuf.clear();this.respHandler.fieldEofResponse(header, fields, eof,this);// rowwhile(rs.next()) {          RowDataPacket curRow =newRowDataPacket(colunmCount);for(inti =0; i < colunmCount; i++) {intj = i +1;if(MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {                  curRow.add(rs.getBytes(j));              }elseif(fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||                      fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL -256)) {// field type is unsigned byte// ensure that do not use scientific notation formatBigDecimal val = rs.getBigDecimal(j);                  curRow.add(StringUtil.encode(val !=null? val.toPlainString() :null, sc.getCharset()));              }else{                  curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));              }          }          curRow.packetId = ++packetId;          byteBuf = curRow.write(byteBuf, sc,false);          byteBuf.flip();byte[] row =newbyte[byteBuf.limit()];          byteBuf.get(row);          byteBuf.clear();this.respHandler.rowResponse(row,this);      }      fieldPks.clear();// row eofeofPckg =newEOFPacket();      eofPckg.packetId = ++packetId;      byteBuf = eofPckg.write(byteBuf, sc,false);      byteBuf.flip();      eof =newbyte[byteBuf.limit()];      byteBuf.get(eof);      sc.recycle(byteBuf);this.respHandler.rowEofResponse(eof,this);  }finally{if(rs !=null) {try{              rs.close();          }catch(SQLException e) {          }      }if(stmt !=null) {try{              stmt.close();          }catch(SQLException e) {          }      }  }}// MongoResultSet.java@OverridepublicStringgetString(String columnLabel)throwsSQLException{  Object x = getObject(columnLabel);if(x ==null) {returnnull;  }returnx.toString();}

当返回字段值是 Object 时,返回该对象.toString()。例如:

mysql>select*fromuserorderby_idasc;+--------------------------+------+-------------------------------+| _id                      | name | profile                      |+--------------------------+------+-------------------------------+|1|123| {"age":1,"height":100} |

4. 插入操作

// MongoSQLParser.javapublicintexecuteUpdate()throwsMongoSQLException{if(statementinstanceofSQLInsertStatement) {returnInsertData((SQLInsertStatement) statement);  }if(statementinstanceofSQLUpdateStatement) {returnUpData((SQLUpdateStatement) statement);  }if(statementinstanceofSQLDropTableStatement) {returndropTable((SQLDropTableStatement) statement);  }if(statementinstanceofSQLDeleteStatement) {returnDeleteDate((SQLDeleteStatement) statement);  }if(statementinstanceofSQLCreateTableStatement) {return1;  }return1;}privateintInsertData(SQLInsertStatement state){if(state.getValues().getValues().size() ==0) {thrownewRuntimeException("number of  columns error");  }if(state.getValues().getValues().size() != state.getColumns().size()) {thrownewRuntimeException("number of values and columns have to match");  }  SQLTableSource table = state.getTableSource();  BasicDBObject o =newBasicDBObject();inti =0;for(SQLExpr col : state.getColumns()) {      o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));      i++;  }  DBCollection coll =this._db.getCollection(table.toString());  coll.insert(o);return1;}

5. 彩蛋

1、支持多 MongoDB ,并使用 MyCAT 进行分片。

MyCAT 配置:multi_mongodb

2、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。

查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。

MyCAT 配置:single_mongodb_mysql

3、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。

MyCAT 配置:single_mongodb

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容