起因
项目中使用python 接入支付宝支付,一番寻找发现有python sdk,遂直接使用。
然而,需要使用到转账api,转账需要把支付宝设置为证书模式,但python sdk 没有支持证书模式。
四处搜寻支付宝证书模式相关资料,感觉资料很少?对于我CV工程师很不友好,连理解起来都困难。
于是乎一边参照文档,一边参照别人blog,一边debug对照java sdk和python sdk,总算有点成绩,整出来跟大家分享。
讲解
配置证书模式
因为我的业务是要用到转账,在 支付宝资金产品 -> 商家转账 -> 接入准备 中的描述,若产品涉及 资金支出的接口 必须 设置证书加签方式。
在接口加签方式中有更详细的说明:
我们使用了资金支出类场景,所以我们必须使用证书模式。
那么配置证书后,我们会得到几个证书文件,上面名词解释中也有,我们再梳理一下:
- 如果使用的是秘钥模式,会用到
- 应用私钥:用来给应用消息进行签名。
- 支付宝公钥:应用收到支付宝发送的同步、异步消息时,使用支付宝公钥验证签名信息。
- 如果使用的是证书模式,会用到
- 应用私钥:用来给应用消息进行签名。
- 应用公钥证书:在开放平台上传 CSR 文件后可以获取 CA 机构颁发的应用证书文件(.crt),其中包含了组织/公司名称、应用公钥、证书有效期等内容,一般有效期为 5 年。
- 支付宝公钥证书:用来验证支付宝消息,包含了支付宝公钥、支付宝公司名称、证书有效期等内容,一般有效期为 5 年。
- 支付宝根证书:用来验证支付宝消息,包含了根 CA 名称、根 CA 的公钥、证书有效期等内容。
加签 用到的都是应用私钥,但是 证书模式 的加签需要拼接 应用公钥证书 和 支付宝根证书 的SN值。
秘钥模式的验签 用到的是支付宝公钥,证书模式的验签 用的是 支付宝公钥证书 的公钥。
沙箱环境
证书模式参考文档配置即可,开发环境可以参照沙箱环境文档,使用沙箱环境默认分配的商家或者买家账号登录沙箱支付宝APP,请前往 沙箱账号 获取。
python sdk
参照官方文档python sdk查看相关介绍,github alipay-sdk-python-all 有更详细的信息。
转折
如何使用证书模式进行加签和验签,参照 如何使用证书,好家伙,python 直接跳过。。。经过确认,python sdk 未支持证书模式。
开干
好嘛,先总结一下,我们调用支付宝的过程是怎样的:
- 发起请求
- 解析响应
Over。
哈哈哈哈开个玩笑。
认真点。
发起请求的过程(加签)
发起请求需要实现签名,自行签名的过程如下:
第一步:筛选并排序
获取所有请求参数(包括公共请求参数),不包括字节类型参数,如文件、字节流,剔除 sign 字段,剔除值为空的参数,并按照第一个字符的键值 ASCII 码递增排序(字母升序排序),如果遇到相同字符则按照第二个字符的键值 ASCII 码递增排序,以此类推。
第二步:拼接
将排序后的参数与其对应值,组合成 参数=参数值 的格式,并且把这些参数用 & 字符连接起来,此时生成的字符串为待签名字符串。
第三步:调用签名函数
使用各自语言对应的 SHA256WithRSA(对应 sign_type 为 RSA2)或 SHA1WithRSA(对应 sign_type 为 RSA)签名函数利用商家私钥对签名字符串进行签名,并进行 Base64 编码。把生成的签名 encode 后赋值给 sign 参数,拼接到请求参数中。
第四步:拼接完整请求
拼接完整请求有几个小步骤
- 拼接原始数据
完成 第三步:调用签名函数 后需将生成的签名作为 sign 的 value 拼接到请求数据中。 - Encode 请求数据
将所有一级 key 的 value 值进行 encode 操作。
说明:将接口业务参数 bizContent 的 value 值作为一个整体进行 encode 操作。 - 拼接请求
将编码后的请求数据发送至支付宝网关地址。
说明:支付宝网关地址固定为 https://openapi.alipay.com/gateway.do。
沙箱环境地址可以从 沙箱账号 中获取。
重点在拼接参数的过程,文档中有说明:
差别在哪?就是拼接参数的过程中,如果使用了证书模式,需要在待签名的字符串前面增加
alipay_root_cert_sn=6bc29aa3b4d406c43483ffea81e08d22&app_cert_sn=50fa7bc5dc305a4fbdbe166689ddc827&
,就这么点区别。
cert_sn
好,重点来了,cert_sn 要拿出来单独说。
证书sn这个东西,它不会变化,也就是说解出来了,直接拿着用就行了,文档也说了参照AlipaySignature.getCertSN 实现 app_cert_sn 的提取,这个在java sdk 中可以直接使用。如果图方便,直接用java 把证书的 sn 值解出来拿着用即可,就不用考虑python解析了。
还有 AntCertificationUtil.getRootCertSN 的 alipay_root_cert_sn方法,是用来解析根证书的,根证书有什么不一样?根证书文件打开后实际是4个证书,长这个样子:
看看java 中怎么解析根证书的:
它是解析
signAlgOID
以 1.2.840.113549.1.1
开头的证书的SN值,并把SN值用_
拼起来。
参照了两篇文章python接入支付宝现金红包(公钥证书模式) 和 python获取支付宝公钥证书SN, 对于解析 sn 值有一定帮助。
好,基于这些,我们来解析证书sn和根证书sn:
import hashlib
import OpenSSL
def get_root_cert_sn(root_cert_path):
root_cert_sn = None
with open(root_cert_path, 'r') as file:
certificates = file.read()
# 分割证书内容
cert_list = certificates.strip().split("-----END CERTIFICATE-----")
cert_list = [cert.strip() + "\n-----END CERTIFICATE-----" for cert in cert_list if cert.strip()]
for i, cert_str in enumerate(cert_list):
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_str)
a = cert.to_cryptography()
# 提取证书中以1.2.840.113549.1.1开头的oid进行处理
if a.signature_algorithm_oid.dotted_string.startswith("1.2.840.113549.1.1"):
cert_issue = cert.get_issuer()
sn_string = 'CN=' + cert_issue.CN + ',' + 'OU=' + cert_issue.OU + ',' + 'O=' + cert_issue.O + ',' + 'C=' + cert_issue.C + str(cert.get_serial_number())
cert_sn = hashlib.md5(sn_string.encode('utf-8')).hexdigest()
if not root_cert_sn:
root_cert_sn = cert_sn
else:
root_cert_sn = root_cert_sn + "_" + cert_sn
return root_cert_sn
def get_cert_sn(cert_path):
with open(cert_path, 'r') as file:
cert_str = file.read()
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_str)
cert_issue = cert.get_issuer()
sn_string = 'CN=' + cert_issue.CN + ',' + 'OU=' + cert_issue.OU + ',' + 'O=' + cert_issue.O + ',' + 'C=' + cert_issue.C + str(cert.get_serial_number())
cert_sn = hashlib.md5(sn_string.encode('utf-8')).hexdigest()
return cert_sn
以上,解析出来的证书sn值大概长这个样子:
cert_sn=d41c6a23f088d7db370bca7340db24c0
root_cert_sn=687b59193f3f462dd5336e5abf83c5d8_02941eef3187dddf3d3b83462e1dfcf6
除了发起请求的拼接参数过程,其他步骤秘钥模式与公钥证书模式完全一样,所以在拼接参数的时候把这两个sn值拼到前面即可。
解析响应(验签)
举个栗子🌰,响应内容如下:
其中 xxx_response 里面就是最终的响应信息,sign 是用来验签的。可以看出来响应的内容其实不验签也能用,直接取其中的 xxx_response 就好了(可别被我带偏了)。
如果使用了证书模式相比于秘钥模式会多返回一个 alipay_cert_sn
,这个 alipay_cert_sn 是支付宝公钥证书解析出来的sn值(解析方式同上一节),意思是要使用证书sn值与它相同的证书的公钥去进行验签。
如果设置了接口内容加密方式,需要先解密才能得出这样的内容。如果设置加密,只能选择AES加密方式,更多信息请参见支付宝文档 AES加密说明。
这里,公钥证书中的内容不能直接作为公钥public_key去进行验签,是要通过 支付宝公钥证书 提取到 公钥,用公钥进行验签。
再去翻一眼java 里面是怎么做的:
第一步,是判断支付宝公钥内容,如果无内容,从证书文件地址中读取并转为证书,如果有内容,把内容转为证书。
第二步,从证书中获取公钥,但是这个publickey是不能直接使用的。
第三步,通过base64编码得到公钥,就是最后用于验签的公钥字符串。
那么现在要用 python 把这个过程复刻一遍,就能用支付宝公钥文件验签啦,哐哐一顿造:
import OpenSSL
import base64
def get_cert_pubkey(cert_path):
with open(cert_path, 'r') as file:
cert_str = file.read()
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_str)
pubkey = cert.get_pubkey()
# 将公钥转换为字节
pubkey_bytes = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_ASN1, pubkey)
# 进行 Base64 编码
return base64.b64encode(pubkey_bytes).decode('utf-8')
支付宝公钥文件:
解析后得到一个字符串:
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAmTgvGxDPMy9P6rnX0jN0bVopzjPlKzMbAJBL/4Ynzylvz3Z8OECadVHQ/HlUtOCVzk3Fqg6j9pL6ZvuXZkmcyfHhr0uB/wpegZgsOH+/loVL5CsrHvk7ZLojUSYuv4Od13NNzSleYk6kBZDLVVOOcyfUlEJtR8ft/nGCYpqWzfWdAsS7AvAiUBTweJ/LN0ENu0BNqkWf32+jFRmkIe+cUZ+yDTq/78IK1Z1okfahuQxCzd1YVBV7XdvaAgADt/m82f97c+UVthx2v8r8mfbo5yXdv1vYZpdVXRL9fLApKMKybGRNOiGadFyaa4V4XcS+MniSdfRpd4M59Sa7StLcWwIDAQAB
然后用这个字符串作为 public_key 去验证响应中 xxx_response 的内容即可。可参考如下:
截图来自于 python sdk 中的 SingatureUtils.py,使用 verify_with_rsa 方法可以用于验签,参数
- public_key:就是这个解析证书得到的字符串
- message:是响应内容中 xxx_response 的部分
- sign:是响应内容中的 sign
汇总
以上,支付宝支付过程中的难点应该已经全部打通,没写的支付宝文档里面基本也都说明白了,多看看就好。
至此,嗯,还不能结束。
这 python sdk 里面已经封装了这么多东西,用起来那么方便,就这么丢了不用多可惜,那得想个办法都用起来,不然自己去拼参数,还排序,还拼字符串,还解析响应,一个搞不好弄字段没对上怎么办。
那么我们造一个可以支持支付宝证书模式的 Client 可好,简单点,就参照 DefaultAlipayClient 进行一些改造,把以上这些参数筛选排序、加签、验签、响应封装都利用一下吧,来写个 CertAlipayClient 用于证书模式:
import datetime
import uuid
import hashlib
from OpenSSL import crypto
from alipay.aop.api.constant.ParamConstants import *
from alipay.aop.api.util.WebUtils import *
from alipay.aop.api.util.SignatureUtils import *
from alipay.aop.api.util.CommonUtils import *
from alipay.aop.api.util.EncryptUtils import *
class CertAlipayClient(object):
"""
alipay_client_config:客户端配置,包含app_id、应用私钥、支付宝公钥等
app_cert_path:应用公钥证书路径
alipay_public_cert_path: 支付宝公钥证书路径
alipay_root_cert_path:支付宝根证书路径
logger:日志对象,客户端执行信息会通过此日志对象输出
"""
def __init__(self, alipay_client_config, app_cert_path, alipay_public_cert_path, alipay_root_cert_path, logger=None):
self.__config = alipay_client_config
self.__logger = logger
self.app_cert_sn = self.get_cert_sn(app_cert_path)
self.alipay_public_key = self.get_cert_pubkey(alipay_public_cert_path)
self.alipay_root_cert_sn = self.get_root_cert_sn(alipay_root_cert_path)
"""
从支付宝根证书中获取SN值
支付宝根证书中有4个证书内容,需要判断 signature_algorithm_oid 以 1.2.840.113549.1.1 开头的证书文件,并把SN值用下划线拼起来作为root_cert_sn
"""
def get_root_cert_sn(self, root_cert_path):
root_cert_sn = None
# 使用 with 语句确保文件正确关闭
with open(root_cert_path, 'r') as file:
certificates = file.read()
# 分割证书内容
cert_list = certificates.strip().split("-----END CERTIFICATE-----")
cert_list = [cert.strip() + "\n-----END CERTIFICATE-----" for cert in cert_list if cert.strip()]
for i, cert_str in enumerate(cert_list):
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
cryptography = cert.to_cryptography()
# 提取证书中以1.2.840.113549.1.1开头的oid进行处理
if cryptography.signature_algorithm_oid.dotted_string.startswith("1.2.840.113549.1.1"):
cert_sn = self.__get_cert_sn(cert)
if not root_cert_sn:
root_cert_sn = cert_sn
else:
root_cert_sn = root_cert_sn + "_" + cert_sn
return root_cert_sn
"""
从证书中获取SN值
"""
def get_cert_sn(self, cert_path):
with open(cert_path, 'r') as file:
cert_str = file.read()
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
return self.__get_cert_sn(cert)
def __get_cert_sn(self, cert):
cert_issue = cert.get_issuer()
sn_string = 'CN=' + cert_issue.CN + ',' + 'OU=' + cert_issue.OU + ',' + 'O=' + cert_issue.O + ',' + 'C=' + cert_issue.C + str(
cert.get_serial_number())
cert_sn = hashlib.md5(sn_string.encode('utf-8')).hexdigest()
return cert_sn
"""
从指定证书文件中获取公钥
用于从支付宝公钥证书中提取公钥
"""
def get_cert_pubkey(self, cert_path):
with open(cert_path, 'r') as file:
cert_str = file.read()
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
pubkey = cert.get_pubkey()
pubkey_bytes = crypto.dump_publickey(crypto.FILETYPE_ASN1, pubkey)
return base64.b64encode(pubkey_bytes).decode('utf-8')
"""
内部方法,从params中抽取公共参数
"""
def __get_common_params(self, params):
common_params = dict()
common_params[P_TIMESTAMP] = params[P_TIMESTAMP]
common_params[P_APP_ID] = self.__config.app_id
common_params[P_METHOD] = params[P_METHOD]
common_params[P_CHARSET] = self.__config.charset
common_params[P_FORMAT] = self.__config.format
common_params[P_VERSION] = params[P_VERSION]
common_params[P_SIGN_TYPE] = self.__config.sign_type
common_params['app_cert_sn'] = self.app_cert_sn
common_params['alipay_root_cert_sn'] = self.alipay_root_cert_sn
if self.__config.encrypt_type:
common_params[P_ENCRYPT_TYPE] = self.__config.encrypt_type
if has_value(params, P_APP_AUTH_TOKEN):
common_params[P_APP_AUTH_TOKEN] = params[P_APP_AUTH_TOKEN]
if has_value(params, P_AUTH_TOKEN):
common_params[P_AUTH_TOKEN] = params[P_AUTH_TOKEN]
if has_value(params, P_NOTIFY_URL):
common_params[P_NOTIFY_URL] = params[P_NOTIFY_URL]
if has_value(params, P_RETURN_URL):
common_params[P_RETURN_URL] = params[P_RETURN_URL]
return common_params
"""
内部方法,从params中移除公共参数
"""
def __remove_common_params(self, params):
if not params:
return
for k in COMMON_PARAM_KEYS:
if k in params:
params.pop(k)
"""
内部方法,构造form表单输出结果
"""
def __build_form(self, url, params):
form = "<form name=\"punchout_form\" method=\"post\" action=\""
form += url
form += "\">\n"
if params:
for k, v in params.items():
if not v:
continue
form += "<input type=\"hidden\" name=\""
form += k
form += "\" value=\""
form += v.replace("\"", """)
form += "\">\n"
form += "<input type=\"submit\" value=\"立即支付\" style=\"display:none\" >\n"
form += "</form>\n"
form += "<script>document.forms[0].submit();</script>"
return form
"""
内部方法,通过请求request对象构造请求查询字符串和业务参数
"""
def __prepare_request(self, request):
common_params, params = self.__prepare_request_params(request)
query_string = url_encode(common_params, self.__config.charset)
return query_string, params
"""
内部方法,通过请求request对象构造SDK请求查询字符串
"""
def __prepare_sdk_request(self, request):
common_params, params = self.__prepare_request_params(request)
allParams = dict()
allParams.update(common_params)
allParams.update(params)
query_string = url_encode(allParams, self.__config.charset)
return query_string
"""
内部方法,通过请求request对象构造请求参数
"""
def __prepare_request_params(self, request):
THREAD_LOCAL.logger = self.__logger
params = request.get_params()
if P_BIZ_CONTENT in params:
if self.__config.encrypt_type and self.__config.encrypt_key:
params[P_BIZ_CONTENT] = encrypt_content(params[P_BIZ_CONTENT], self.__config.encrypt_type,
self.__config.encrypt_key, self.__config.charset)
else:
if request.need_encrypt:
raise RequestException("接口" + params[P_METHOD] + "必须使用encrypt_type、encrypt_key加密")
params[P_TIMESTAMP] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
common_params = self.__get_common_params(params)
all_params = dict()
all_params.update(params)
all_params.update(common_params)
sign_content = get_sign_content(all_params)
sign = ''
if not self.__config.skip_sign:
try:
if self.__config.sign_type and self.__config.sign_type == 'RSA2':
sign = sign_with_rsa2(self.__config.app_private_key, sign_content, self.__config.charset)
else:
sign = sign_with_rsa(self.__config.app_private_key, sign_content, self.__config.charset)
except Exception as e:
raise RequestException('[' + THREAD_LOCAL.uuid + ']request sign failed. ' + str(e))
common_params[P_SIGN] = sign
self.__remove_common_params(params)
log_url = self.__config.server_url + '?' + sign_content + "&sign=" + sign
if THREAD_LOCAL.logger:
THREAD_LOCAL.logger.info('[' + THREAD_LOCAL.uuid + ']request:' + log_url)
return common_params, params
"""
内部方法,解析请求返回结果并做验签
"""
def __parse_response(self, response_str):
if PYTHON_VERSION_3:
response_str = response_str.decode(self.__config.charset)
if THREAD_LOCAL.logger:
THREAD_LOCAL.logger.info('[' + THREAD_LOCAL.uuid + ']response:' + response_str)
if self.__config.skip_sign:
m1 = PATTERN_RESPONSE_BEGIN.search(response_str)
em1 = PATTERN_RESPONSE_ENCRYPT_BEGIN.search(response_str)
if not m1 and not em1:
raise ResponseException('[' + THREAD_LOCAL.uuid + ']response shape maybe illegal. ' + response_str)
begin_index = 0
end_index = 0
has_encrypted = False
if m1:
begin_index = m1.end() - 1
end_index = self.__extract_json_object_end_position(response_str, begin_index)
elif em1:
begin_index = em1.end() - 1
end_index = self.__extract_json_base64_value_end_position(response_str, begin_index)
has_encrypted = True
if begin_index >= end_index:
return response_str
response_content = response_str[begin_index:end_index]
if PYTHON_VERSION_3:
response_content = response_content.encode(self.__config.charset)
response_content = response_content.decode(self.__config.charset)
if has_encrypted and self.__config.encrypt_type and self.__config.encrypt_key:
response_content = decrypt_content(response_content[1:-1], self.__config.encrypt_type,
self.__config.encrypt_key, self.__config.charset)
return response_content
response_content = None
sign = None
em1 = None
em2 = None
has_encrypted = False
if self.__config.encrypt_type and self.__config.encrypt_key:
em1 = PATTERN_RESPONSE_ENCRYPT_BEGIN.search(response_str)
em2 = PATTERN_RESPONSE_SIGN_ENCRYPT_BEGIN.search(response_str)
if em1 and em2:
has_encrypted = True
sign_start_index = em2.start()
sign_end_index = em2.end()
while em2:
em2 = PATTERN_RESPONSE_SIGN_BEGIN.search(response_str, pos=em2.end())
if em2:
sign_start_index = em2.start()
sign_end_index = em2.end()
response_content = response_str[em1.end() - 1:sign_start_index + 1]
if PYTHON_VERSION_3:
response_content = response_content.encode(self.__config.charset)
sign = response_str[sign_end_index:response_str.find("\"", sign_end_index)]
if not response_content:
m1 = PATTERN_RESPONSE_BEGIN.search(response_str)
m2 = PATTERN_RESPONSE_SIGN_BEGIN.search(response_str)
# 证书模式返回的内容有 alipay_cert_sn 和 sign,所以取内容可能要截止到 alipay_cert_sn
if not m2:
m2 = re.compile('(\\}[ \\t\\n]*,[ \\t\\n]*\\"alipay_cert_sn\\"[ \\t\\n]*:[ \\t\\n]*\\")').search(response_str)
if not m1 or not m2:
raise ResponseException('[' + THREAD_LOCAL.uuid + ']response shape maybe illegal. ' + response_str)
sign_start_index = m2.start()
while m2:
m2 = PATTERN_RESPONSE_SIGN_BEGIN.search(response_str, pos=m2.end())
if m2:
sign_start_index = m2.start()
response_content = response_str[m1.end() - 1:sign_start_index + 1]
if PYTHON_VERSION_3:
response_content = response_content.encode(self.__config.charset)
# 解析为 JSON 字符串, 提取 sign 值
response_data = json.loads(response_str)
sign = response_data.get('sign')
try:
verify_res = verify_with_rsa(self.alipay_public_key, response_content, sign)
except Exception as e:
raise ResponseException('[' + THREAD_LOCAL.uuid + ']response sign verify failed. ' + str(e) + \
' ' + response_str)
if not verify_res:
raise ResponseException('[' + THREAD_LOCAL.uuid + ']response sign verify failed. ' + response_str)
response_content = response_content.decode(self.__config.charset)
if has_encrypted:
response_content = decrypt_content(response_content[1:-1], self.__config.encrypt_type,
self.__config.encrypt_key, self.__config.charset)
return response_content
'''
提取密文验签内容终点
'''
def __extract_json_base64_value_end_position(self, response_string, begin_position):
for index in range(begin_position, len(response_string)):
# 找到第2个双引号作为终点,由于中间全部是Base64编码的密文,所以不会有干扰的特殊字符
if response_string[index] == '"' and index != begin_position:
return index + 1
# 如果没有找到第2个双引号,说明验签内容片段提取失败,直接尝试选取剩余整个响应字符串进行验签
return len(response_string)
'''
提取明文验签内容终点
'''
def __extract_json_object_end_position(self, response_string, begin_position):
# 记录当前尚未发现配对闭合的大括号
braces = []
# 记录当前字符是否在双引号中
in_quotes = False
# 记录当前字符前面连续的转义字符个数
consecutive_escape_count = 0
# 从待验签字符的起点开始遍历后续字符串,找出待验签字符串的终止点,终点即是与起点{配对的}
for index in range(begin_position, len(response_string)):
# 提取当前字符
current_char = response_string[index]
# 如果当前字符是"且前面有偶数个转义标记(0也是偶数)
if current_char == '"' and consecutive_escape_count % 2 == 0:
# 是否在引号中的状态取反
in_quotes = not in_quotes
# 如果当前字符是{且不在引号中
elif current_char == '{' and not in_quotes:
# 将该{加入未闭合括号中
braces.append("{")
# 如果当前字符是}且不在引号中
elif current_char == '}' and not in_quotes:
# 弹出一个未闭合括号
braces.pop()
# 如果弹出后,未闭合括号为空,说明已经找到终点
if len(braces) == 0:
return index + 1
# 如果当前字符是转义字符
if current_char == '\\':
# 连续转义字符个数+1
consecutive_escape_count += 1
else:
# 连续转义字符个数置0
consecutive_escape_count = 0
# 如果没有找到配对的闭合括号,说明验签内容片段提取失败,直接尝试选取剩余整个响应字符串进行验签
return len(response_string)
"""
执行接口请求
"""
def execute(self, request):
THREAD_LOCAL.uuid = str(uuid.uuid1())
headers = {
'Content-type': 'application/x-www-form-urlencoded;charset=' + self.__config.charset,
"Cache-Control": "no-cache",
"Connection": "Keep-Alive",
"User-Agent": ALIPAY_SDK_PYTHON_VERSION,
"log-uuid": THREAD_LOCAL.uuid,
}
query_string, params = self.__prepare_request(request)
multipart_params = request.get_multipart_params()
if multipart_params and len(multipart_params) > 0:
response = do_multipart_post(self.__config.server_url, query_string, headers, params, multipart_params,
self.__config.charset, self.__config.timeout)
else:
response = do_post(self.__config.server_url, query_string, headers, params, self.__config.charset,
self.__config.timeout)
return self.__parse_response(response)
'''
得到页面跳转接口的url或表单html
'''
def page_execute(self, request, http_method="POST"):
THREAD_LOCAL.uuid = str(uuid.uuid1())
url = self.__config.server_url
pos = url.find("?")
if pos >= 0:
url = url[0:pos]
query_string, params = self.__prepare_request(request)
if http_method == "GET":
return url + "?" + query_string + "&" + url_encode(params, self.__config.charset)
else:
return self.__build_form(url + "?" + query_string, params)
'''
得到sdk调用的参数
'''
def sdk_execute(self, request):
THREAD_LOCAL.uuid = str(uuid.uuid1())
return self.__prepare_sdk_request(request)
在初始化的时候,把 应用公钥证书路径、支付宝公钥证书路径、支付宝根证书路径 给出来,请求和响应跟原 sdk 一样使用,不用关注参数排序、加签、验签。
以下是一些简单的说明,初始化时参考:
alipay_client_config = AlipayClientConfig(sandbox_debug=ALIPAY_SANDBOX_DEBUG)
alipay_client_config.app_id = ALIPAY_APP_ID
alipay_client_config.server_url = ALIPAY_SERVER_URL
alipay_client_config.app_private_key = ALIPAY_APP_PRIVATE_KEY
client = CertAlipayClient(alipay_client_config, ALIPAY_APP_CERT_PATH, ALIPAY_PUBLIC_CERT_PATH, ALIPAY_ROOT_CERT_PATH, logger)
在初始化时,把应用公钥证书读取到并解析得到 app_cert_sn,把支付宝根证书解析得到 alipay_root_cert_sn,在加签时用于拼接参数;把支付宝公钥证书解析得到alipay_public_key,用于验签。
加签,在 __get_common_params
方法中增加了 app_cert_sn
和 alipay_root_cert_sn
参数,加签时会自己拼上去。
common_params['app_cert_sn'] = self.app_cert_sn
common_params['alipay_root_cert_sn'] = self.alipay_root_cert_sn
验签,在 __parse_response
方法中做了一些修改,让截取到正确的响应内容,并使用 alipay_public_key 进行验签:
if not response_content:
m1 = PATTERN_RESPONSE_BEGIN.search(response_str)
m2 = PATTERN_RESPONSE_SIGN_BEGIN.search(response_str)
# 证书模式返回的内容有 alipay_cert_sn 和 sign,所以取内容可能要截止到 alipay_cert_sn
if not m2:
m2 = re.compile('(\\}[ \\t\\n]*,[ \\t\\n]*\\"alipay_cert_sn\\"[ \\t\\n]*:[ \\t\\n]*\\")').search(response_str)
if not m1 or not m2:
raise ResponseException('[' + THREAD_LOCAL.uuid + ']response shape maybe illegal. ' + response_str)
sign_start_index = m2.start()
while m2:
m2 = PATTERN_RESPONSE_SIGN_BEGIN.search(response_str, pos=m2.end())
if m2:
sign_start_index = m2.start()
response_content = response_str[m1.end() - 1:sign_start_index + 1]
if PYTHON_VERSION_3:
response_content = response_content.encode(self.__config.charset)
# 解析为 JSON 字符串, 提取 sign 值
response_data = json.loads(response_str)
sign = response_data.get('sign')
try:
verify_res = verify_with_rsa(self.alipay_public_key, response_content, sign)
except Exception as e:
raise ResponseException('[' + THREAD_LOCAL.uuid + ']response sign verify failed. ' + str(e) + \
' ' + response_str)
验签这里没有做 支付宝证书sn值的验证,如果有需要可以补充进去,可以参照文档自动更新支付宝公钥证书。文档中是这么说的:
支付宝公钥证书由于证书到期等原因,会重新签发新的证书(证书中密钥内容不变),开发者在自行实现的验签逻辑中需要判断当前使用的支付宝公钥证书 SN 与网关响应报文中的 SN 是否一致。若不一致,开发者需先调用 支付宝公钥证书下载接口 下载对应的支付宝公钥证书,再做验签。
致谢
非常感谢您阅读这篇博客,希望它能为您提供有价值的见解和帮助,您的支持是我持续创作的动力。
如果您对本文有任何意见或建议,欢迎在评论区留言,我们可以一起讨论。
文章系原创,整理不易,请勿全篇复制抄袭,转载请注明出处。