python连接mysql接口简单demo

自学python,利用flask简单写一个rest接口,直接上代码:

connectMysql.py,连接mysql:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pymysql, logging

#日志模块
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 定义一个类
class MysqlOperate:
# 连接MySQL
    def connect_local_db():
        return pymysql.connect(host='192.168.19.161', port=3306, user='root', password='123456', database='pas',
                               charset='utf8')

# 查询模块
def query_country_name(sql_str):
    logger.info(sql_str)
    con = MysqlOperate.connect_local_db()
    cur = con.cursor()
    try:
        cur.execute(sql_str)
        rows = cur.fetchmany(5)
        return rows
    except:
        logger.error("error!")
        raise
    finally:
        cur.close()
        con.close()


if __name__ == '__main__':
    result = query_country_name('select * from demo limit 0,10;')
    for row in result:
        for item in row:
            print(item)

定义一个接口,
RestServer.py:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import flask, json, logging
import ConnectMysql #引用ConnectMysql文件
from flask import request

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

server = flask.Flask(__name__)


@server.route('/v1/brd/task', methods=['post'])
def task():
    json_req = request.json
    logger.info("接受到参数:%s",json_req)
    task_name = json_req.get('taskName')
    category = json_req.get('Category')
    description = json_req.get('description')
    sql_type = json_req.get('sqlType')
    status = json_req.get('status')
    sql_statement = json_req.get('sqlStatement')
    has_execs = json_req.get('hasExecs')
    in_user = json_req.get('inUser')
    result = {'taskName': task_name,
              'Category': category,
              'description': description,
              'sqlType': sql_type,
              'status': status,
              'sqlStatement': sql_statement,
              'hasExecs': has_execs,
              'inUser': in_user
              }
    logger.info('接受到的参数:%s', result)
    successed = True
    message=''
    code = 200
    try:
        data = ConnectMysql.query_country_name(sql_statement)
        logger.info(data)
    except:
        message = 'this is a exception message!'
        successed = False
        code = 500

    res = {'successed': successed,
           'message': message,
           'code': code
           }

    return json.dumps(res, ensure_ascii=False)


server.run(port=7777, debug=True, host='0.0.0.0')

运行该脚本,出现以下信息,表示成功启动server:

2019-05-16 11:21:43,480 - werkzeug - WARNING -  * Debugger is active!
2019-05-16 11:21:43,518 - werkzeug - INFO -  * Debugger PIN: 801-886-480
2019-05-16 11:21:43,540 - werkzeug - INFO -  * Running on http://0.0.0.0:7777/ (Press CTRL+C to quit)
2019-05-16 11:21:51,259 - werkzeug - INFO -  * Detected change in 'E:\\PyWorkspace\\Demo\\src\\main\\demo\\RestServer.py', reloading
2019-05-16 11:21:51,295 - werkzeug - INFO -  * Restarting with stat
pydev debugger: process 10424 is connecting

前台请求接口:http://127.0.0.1:7777/v1/brd/taskPOST方式
请求参数如下:

{
  "taskName":"demoTask",
  "Category":"110100",
  "description":"demo description",
  "sqlType":"sql",
  "status":"active",
  "sqlStatement":"select * from demo limit 1,10;",
  "hasExecs":0,
  "inUser":"2"
}

接口返回:

`{"successed": false, "message": "this is a exception message!", "code": 500}``

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容