1. 概述
可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。
本文主要分成四部分:
总体流程,让你有个整体的认识
查询操作
插入操作
彩蛋,😈彩蛋,🙂彩蛋
建议你看过这两篇文章(非必须):
2. 主流程
MyCAT Server接收MySQL Client基于MySQL协议的请求,翻译SQL成MongoDB操作发送给MongoDB Server。
MyCAT Server接收MongoDB Server返回的MongoDB数据,翻译成MySQL数据结果返回给MySQL Client。
这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。
Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。
MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。
是不是熟悉的味道。不得不说 JDBC 规范的精妙。
3. 查询操作
SELECTid,nameFROMuserWHEREname>''ORDERBY_idDESC;
看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。
1、查询 MongoDB
// MongoSQLParser.javapublicMongoDataquery()throwsMongoSQLException{if(!(statementinstanceofSQLSelectStatement)) {//return null;thrownewIllegalArgumentException("not a query sql statement"); } MongoData mongo =newMongoData(); DBCursor c =null; SQLSelectStatement selectStmt = (SQLSelectStatement) statement; SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();inticount =0;if(sqlSelectQueryinstanceofMySqlSelectQueryBlock) { MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery(); BasicDBObject fields =newBasicDBObject();// 显示(返回)的字段for(SQLSelectItem item : mysqlSelectQuery.getSelectList()) {//System.out.println(item.toString());if(!(item.getExpr()instanceofSQLAllColumnExpr)) {if(item.getExpr()instanceofSQLAggregateExpr) { SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();if(expr.getMethodName().equals("COUNT")) {// TODO 待读:count(*)icount =1; mongo.setField(getExprFieldName(expr), Types.BIGINT); } fields.put(getExprFieldName(expr),1); }else{ fields.put(getFieldName(item),1); } } }// 表名SQLTableSource table = mysqlSelectQuery.getFrom(); DBCollection coll =this._db.getCollection(table.toString()); mongo.setTable(table.toString());// WHERESQLExpr expr = mysqlSelectQuery.getWhere(); DBObject query = parserWhere(expr);// GROUP BYSQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy(); BasicDBObject gbkey =newBasicDBObject();if(groupby !=null) {for(SQLExpr gbexpr : groupby.getItems()) {if(gbexprinstanceofSQLIdentifierExpr) { String name = ((SQLIdentifierExpr) gbexpr).getName(); gbkey.put(name, Integer.valueOf(1)); } } icount =2; }// SKIP / LIMITintlimitoff =0;intlimitnum =0;if(mysqlSelectQuery.getLimit() !=null) { limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset()); limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount()); }if(icount ==1) {// COUNT(*)mongo.setCount(coll.count(query)); }elseif(icount ==2) {// MapReduceBasicDBObject initial =newBasicDBObject(); initial.put("num",0); String reduce ="function (obj, prev) { "+" prev.num++}"; mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce)); }else{if((limitoff >0) || (limitnum >0)) { c = coll.find(query, fields).skip(limitoff).limit(limitnum); }else{ c = coll.find(query, fields); }// order bySQLOrderBy orderby = mysqlSelectQuery.getOrderBy();if(orderby !=null) { BasicDBObject order =newBasicDBObject();for(inti =0; i < orderby.getItems().size(); i++) { SQLSelectOrderByItem orderitem = orderby.getItems().get(i); order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType())); } c.sort(order);// System.out.println(order);} } mongo.setCursor(c); }returnmongo;}
2、查询条件
// MongoSQLParser.javaprivatevoidparserWhere(SQLExpr aexpr, BasicDBObject o){if(aexprinstanceofSQLBinaryOpExpr) { SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr; SQLExpr exprL = expr.getLeft();if(!(exprLinstanceofSQLBinaryOpExpr)) {if(expr.getOperator().getName().equals("=")) { o.put(exprL.toString(), getExpValue(expr.getRight())); }else{ String op ="";if(expr.getOperator().getName().equals("<")) { op ="$lt"; }elseif(expr.getOperator().getName().equals("<=")) { op ="$lte"; }elseif(expr.getOperator().getName().equals(">")) { op ="$gt"; }elseif(expr.getOperator().getName().equals(">=")) { op ="$gte"; }elseif(expr.getOperator().getName().equals("!=")) { op ="$ne"; }elseif(expr.getOperator().getName().equals("<>")) { op ="$ne"; } parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight())); } }else{if(expr.getOperator().getName().equals("AND")) { parserWhere(exprL, o); parserWhere(expr.getRight(), o); }elseif(expr.getOperator().getName().equals("OR")) { orWhere(exprL, expr.getRight(), o); }else{thrownewRuntimeException("Can't identify the operation of of where"); } } }}privatevoidorWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob){ BasicDBObject xo =newBasicDBObject(); BasicDBObject yo =newBasicDBObject(); parserWhere(exprL, xo); parserWhere(exprR, yo); ob.put("$or",newObject[]{xo, yo});}
3、解析 MongoDB 数据
// MongoResultSet.javapublicMongoResultSet(MongoData mongo, String schema)throwsSQLException{this._cursor = mongo.getCursor();this._schema = schema;this._table = mongo.getTable();this.isSum = mongo.getCount() >0;this._sum = mongo.getCount();this.isGroupBy = mongo.getType();if(this.isGroupBy) { dblist = mongo.getGrouyBys();this.isSum =true; }if(this._cursor !=null) { select = _cursor.getKeysWanted().keySet().toArray(newString[0]);// 解析 fieldsif(this._cursor.hasNext()) { _cur = _cursor.next();if(_cur !=null) {if(select.length ==0) { SetFields(_cur.keySet()); } _row =1; } }// 设置 fields 类型if(select.length ==0) { select =newString[]{"_id"}; SetFieldType(true); }else{ SetFieldType(false); } }else{ SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};SetFieldType(mongo.getFields()); }}
当使用SELECT *查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。
4、返回数据给 MySQL Client
// JDBCConnection.javaprivatevoidouputResultSet(ServerConnection sc, String sql)throwsSQLException{ ResultSet rs =null; Statement stmt =null;try{ stmt = con.createStatement(); rs = stmt.executeQuery(sql);// headerList fieldPks =newLinkedList<>(); ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs,this.isSpark);intcolunmCount = fieldPks.size(); ByteBuffer byteBuf = sc.allocate(); ResultSetHeaderPacket headerPkg =newResultSetHeaderPacket(); headerPkg.fieldCount = fieldPks.size(); headerPkg.packetId = ++packetId; byteBuf = headerPkg.write(byteBuf, sc,true); byteBuf.flip();byte[] header =newbyte[byteBuf.limit()]; byteBuf.get(header); byteBuf.clear(); List fields =newArrayList(fieldPks.size());for(FieldPacket curField : fieldPks) { curField.packetId = ++packetId; byteBuf = curField.write(byteBuf, sc,false); byteBuf.flip();byte[] field =newbyte[byteBuf.limit()]; byteBuf.get(field); byteBuf.clear(); fields.add(field); }// header eofEOFPacket eofPckg =newEOFPacket(); eofPckg.packetId = ++packetId; byteBuf = eofPckg.write(byteBuf, sc,false); byteBuf.flip();byte[] eof =newbyte[byteBuf.limit()]; byteBuf.get(eof); byteBuf.clear();this.respHandler.fieldEofResponse(header, fields, eof,this);// rowwhile(rs.next()) { RowDataPacket curRow =newRowDataPacket(colunmCount);for(inti =0; i < colunmCount; i++) {intj = i +1;if(MysqlDefs.isBianry((byte) fieldPks.get(i).type)) { curRow.add(rs.getBytes(j)); }elseif(fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL || fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL -256)) {// field type is unsigned byte// ensure that do not use scientific notation formatBigDecimal val = rs.getBigDecimal(j); curRow.add(StringUtil.encode(val !=null? val.toPlainString() :null, sc.getCharset())); }else{ curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset())); } } curRow.packetId = ++packetId; byteBuf = curRow.write(byteBuf, sc,false); byteBuf.flip();byte[] row =newbyte[byteBuf.limit()]; byteBuf.get(row); byteBuf.clear();this.respHandler.rowResponse(row,this); } fieldPks.clear();// row eofeofPckg =newEOFPacket(); eofPckg.packetId = ++packetId; byteBuf = eofPckg.write(byteBuf, sc,false); byteBuf.flip(); eof =newbyte[byteBuf.limit()]; byteBuf.get(eof); sc.recycle(byteBuf);this.respHandler.rowEofResponse(eof,this); }finally{if(rs !=null) {try{ rs.close(); }catch(SQLException e) { } }if(stmt !=null) {try{ stmt.close(); }catch(SQLException e) { } } }}// MongoResultSet.java@OverridepublicStringgetString(String columnLabel)throwsSQLException{ Object x = getObject(columnLabel);if(x ==null) {returnnull; }returnx.toString();}
当返回字段值是 Object 时,返回该对象.toString()。例如:
mysql>select*fromuserorderby_idasc;+--------------------------+------+-------------------------------+| _id | name | profile |+--------------------------+------+-------------------------------+|1|123| {"age":1,"height":100} |
4. 插入操作
// MongoSQLParser.javapublicintexecuteUpdate()throwsMongoSQLException{if(statementinstanceofSQLInsertStatement) {returnInsertData((SQLInsertStatement) statement); }if(statementinstanceofSQLUpdateStatement) {returnUpData((SQLUpdateStatement) statement); }if(statementinstanceofSQLDropTableStatement) {returndropTable((SQLDropTableStatement) statement); }if(statementinstanceofSQLDeleteStatement) {returnDeleteDate((SQLDeleteStatement) statement); }if(statementinstanceofSQLCreateTableStatement) {return1; }return1;}privateintInsertData(SQLInsertStatement state){if(state.getValues().getValues().size() ==0) {thrownewRuntimeException("number of columns error"); }if(state.getValues().getValues().size() != state.getColumns().size()) {thrownewRuntimeException("number of values and columns have to match"); } SQLTableSource table = state.getTableSource(); BasicDBObject o =newBasicDBObject();inti =0;for(SQLExpr col : state.getColumns()) { o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i))); i++; } DBCollection coll =this._db.getCollection(table.toString()); coll.insert(o);return1;}
5. 彩蛋
1、支持多 MongoDB ,并使用 MyCAT 进行分片。
MyCAT 配置:multi_mongodb
2、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。
查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。
MyCAT 配置:single_mongodb_mysql
3、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。
MyCAT 配置:single_mongodb