前言:
最近工作中需要和第三方进行数据对接,对方的服务端是webservice接口,因此需要使用python模拟webservice客户端,这里简要记录一下模拟过程!
相关说明:
具体开发流程是先在本地开发自测完成,然后在线上调试。所以服务端也需要我们自己模拟!
- 客户端编写:使用python的requests模块;
- 服务端模拟:使用SoapUI工具
SoapUI模拟服务端:
1.加载wsdl文件
2.创建服务端
3.开启服务器
4.服务器开启动标志
使用requests模块编写客户端:
1.说明:客户端编写整体思路如下
- 构造请求头headers;
- 构造请求体数据:请求体数数据是发送给服务器的,这里是根据wsdl文件中的请求体数据而来的;
- 解析响应体,获取服务器端的响应数据;
2.代码示例(以请求登录接口为例):
# coding:utf-8
import xml.dom.minidom as XD
from xml.sax.saxutils import escape, unescape
import requests
import traceback
# 发送给webservice的请求体数据
send_webservice_data = '''<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://service.inf.oa.ztesoft.com">
<soapenv:Header/>
<soapenv:Body>
<ser:Login>
<ser:UserName>username</ser:UserName>
<ser:Password>password</ser:Password>
</ser:Login>
</soapenv:Body>
</soapenv:Envelope>'''
class WebService(object):
# 响应节点名称(通过该名称拿到响应数据)
rtn_node_name = "LoginReturn"
def __init__(self):
# 请求url
self.url = "http://127.0.0.1:8088/mockOaOrderServicePortSoapBinding"
# 构造响应头
self.headers = {
"User-Agent": "Python Post",
"Content-type": "text/xml; charset=UTF-8",
"Content-length": "%d" % len(send_webservice_data),
"SOAPAction": ""
}
def send_request(self, send_data):
"""发送webservice请求,获取响应信息"""
response = requests.post(self.url, headers=self.headers, data=send_data, verify=False)
xml_data = response.content
return xml_data
def parse_data(self, xml_data):
"""
1.解析响应数据:响应数据是xml格式的,这里需要解析xml获取响应数据
2.响应数据样例:
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://service.inf.oa.ztesoft.com">
<soapenv:Header/>
<soapenv:Body>
<ser:LoginResponse>
<LoginReturn>{"success":true, "msg":"登录成功"}</LoginReturn>
</ser:LoginResponse>
</soapenv:Body>
</soapenv:Envelope>
"""
dom = XD.parseString(xml_data)
# 通过节点名称,获取响应数据
rtn_data = dom.getElementsByTagName(self.__class__.rtn_node_name)[0].childNodes[0].nodeValue
return rtn_data
def main(self):
try:
# 获取响应的xml数据
xml_data = self.send_request(send_webservice_data)
# 解析响应的xml数据:获取响应结果
rtn_data = self.parse_data(xml_data)
print rtn_data
except Exception as e:
logger.error(e.message)
logger.error(traceback.format_exc())
if __name__ == "__main__":
WebService().main()