最近做的项目,需要接入腾讯云 IM,翻看了一下文档,iOS、Android 以及 Web 端基本上都有 SDK 可以集成。我使用的服务端是用 Python 写的,腾讯 IM 暂时还没有 Python 的官方文档。但是在腾讯云的官方论坛上找到了解决方法。
服务端最基本的一个需求是:使用接入用户的 identifier 和应用申请的腾讯云 appid、私钥等信息,通过指定算法,生成用户用来登录腾讯云 IM 的 usersig。
源代码:
#! /usr/bin/python
# coding:utf-8
import OpenSSL, base64, zlib, json, time
ecdsa_pri_key = """
自己的私钥
"""
def list_all_curves():
list = OpenSSL.crypto.get_elliptic_curves()
for element in list:
print element
def get_secp256k1():
print OpenSSL.crypto.get_elliptic_curve('secp256k1');
def base64_encode_url(data):
base64_data = base64.b64encode(data)
base64_data = base64_data.replace('+', '*')
base64_data = base64_data.replace('/', '-')
base64_data = base64_data.replace('=', '_')
return base64_data
def base64_decode_url(base64_data):
base64_data = base64_data.replace('*', '+')
base64_data = base64_data.replace('-', '/')
base64_data = base64_data.replace('_', '=')
raw_data = base64.b64decode(base64_data)
return raw_data
class TLSSigAPI:
""""""
__acctype = 0
__identifier = ""
__appid3rd = ""
__sdkappid = 0
__version = 20151204
__expire = 3600*24*30 # 默认一个月,需要调整请自行修改
__pri_key = ""
__pub_key = ""
_err_msg = "ok"
def __get_pri_key(self):
return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, self.__pri_key);
def __init__(self, sdkappid, pri_key):
self.__sdkappid = sdkappid
self.__pri_key = pri_key
def __create_dict(self):
m = {}
m["TLS.account_type"] = "%d" % self.__acctype
m["TLS.identifier"] = "%s" % self.__identifier
m["TLS.appid_at_3rd"] = "%s" % self.__appid3rd
m["TLS.sdk_appid"] = "%d" % self.__sdkappid
m["TLS.expire_after"] = "%d" % self.__expire
m["TLS.version"] = "%d" % self.__version
m["TLS.time"] = "%d" % time.time()
return m
def __encode_to_fix_str(self, m):
fix_str = "TLS.appid_at_3rd:"+m["TLS.appid_at_3rd"]+"\n" \
+"TLS.account_type:"+m["TLS.account_type"]+"\n" \
+"TLS.identifier:"+m["TLS.identifier"]+"\n" \
+"TLS.sdk_appid:"+m["TLS.sdk_appid"]+"\n" \
+"TLS.time:"+m["TLS.time"]+"\n" \
+"TLS.expire_after:"+m["TLS.expire_after"]+"\n"
return fix_str
def tls_gen_sig(self, identifier):
self.__identifier = identifier
m = self.__create_dict()
fix_str = self.__encode_to_fix_str(m)
pk_loaded = self.__get_pri_key()
sig_field = OpenSSL.crypto.sign(pk_loaded, fix_str, "sha256");
sig_field_base64 = base64.b64encode(sig_field)
m["TLS.sig"] = sig_field_base64
json_str = json.dumps(m)
sig_cmpressed = zlib.compress(json_str)
base64_sig = base64_encode_url(sig_cmpressed)
return base64_sig
def main():
api = TLSSigAPI(1400001052, ecdsa_pri_key)
sig = api.tls_gen_sig("xiaojun")
print sig
if __name__ == "__main__":
main()
过程:
- 将用户的信息组装成一个字符串(json格式的,是直接拼装的,因为顺序不能乱),是哪些信息,可以看 __encode_to_fix_str;
- 使用 sha256 将字符串 hash,然后再用私钥签名,一般加密接口都会一把搞定,加密曲线使用的是 secp256k1;
- 把第2步得到的缓冲区进行 base64;
- 将所有用户的信息以及第3步得到签名写进一个 json 串,此时可以不论顺序;
- 将 json 进行序列化,再 zlib 压缩,最后 base64(替换了某些字符,具体哪些看代码)。
需要注意的是,加密曲线使用的是 secp256k1, 有些系统的 openssl 是不支持这个曲线的。查看系统所支持的所有曲线,可参考 list_all_curves
, 这个方法会打印出所有支持的曲线。如果你的服务器恰好支持这个曲线,那经过上面的过程就可以正常使用了。
如果你的服务器不支持 secp256k1,可以使用 python ecdsa 。
- python ecdsa 开发库 下载地址
- 使用 pip 安装:
pip install ecdsa
由于 python ecdsa 这个开发库仅支持 ec 格式的私钥,从腾讯云下载的私钥格式是 pk #8 的格式,需要使用 openssl 命令进行转换。转换命令如下:
openssl ec -outform PEM -inform PEM -in private.pem -out private_ec.pem
-in 后面的传入下载的私钥 -out 后面是转换后的私钥文件
最终实现代码:
#! /usr/bin/python
# coding:utf-8
import OpenSSL, base64, zlib, json, time, hashlib
from ecdsa import SigningKey,util
# 这里请填写应用自己的私钥
ecdsa_pri_key = """
请填上应用自己的私钥
"""
def base64_encode_url(data):
base64_data = base64.b64encode(data)
base64_data = base64_data.replace('+', '*')
base64_data = base64_data.replace('/', '-')
base64_data = base64_data.replace('=', '_')
return base64_data
def base64_decode_url(base64_data):
base64_data = base64_data.replace('*', '+')
base64_data = base64_data.replace('-', '/')
base64_data = base64_data.replace('_', '=')
raw_data = base64.b64decode(base64_data)
return raw_data
class TLSSigAPI:
""""""
__acctype = 0
__identifier = ""
__appid3rd = ""
__sdkappid = 0
__version = 20151204
__expire = 3600*24*30 # 默认一个月,需要调整请自行修改
__pri_key = ""
__pub_key = ""
_err_msg = "ok"
def __get_pri_key(self):
return self.__pri_key_loaded
def __init__(self, sdkappid, pri_key):
self.__sdkappid = sdkappid
self.__pri_key = pri_key
self.__pri_key_loaded = SigningKey.from_pem(self.__pri_key)
def __create_dict(self):
m = {}
m["TLS.account_type"] = "%d" % self.__acctype
m["TLS.identifier"] = "%s" % self.__identifier
m["TLS.appid_at_3rd"] = "%s" % self.__appid3rd
m["TLS.sdk_appid"] = "%d" % self.__sdkappid
m["TLS.expire_after"] = "%d" % self.__expire
m["TLS.version"] = "%d" % self.__version
m["TLS.time"] = "%d" % time.time()
return m
def __encode_to_fix_str(self, m):
fix_str = "TLS.appid_at_3rd:"+m["TLS.appid_at_3rd"]+"\n" \
+"TLS.account_type:"+m["TLS.account_type"]+"\n" \
+"TLS.identifier:"+m["TLS.identifier"]+"\n" \
+"TLS.sdk_appid:"+m["TLS.sdk_appid"]+"\n" \
+"TLS.time:"+m["TLS.time"]+"\n" \
+"TLS.expire_after:"+m["TLS.expire_after"]+"\n"
return fix_str
def tls_gen_sig(self, identifier):
self.__identifier = identifier
m = self.__create_dict()
fix_str = self.__encode_to_fix_str(m)
pk_loaded = self.__get_pri_key()
sig_field = pk_loaded.sign(fix_str, hashfunc=hashlib.sha256, sigencode=util.sigencode_der)
sig_field_base64 = base64.b64encode(sig_field)
m["TLS.sig"] = sig_field_base64
json_str = json.dumps(m)
sig_cmpressed = zlib.compress(json_str)
base64_sig = base64_encode_url(sig_cmpressed)
return base64_sig
def main():
api = TLSSigAPI(1400001052, ecdsa_pri_key)
sig = api.tls_gen_sig("xiaojun")
print sig
if __name__ == "__main__":
main()
参考链接,腾讯云官方论坛:
http://bbs.qcloud.com/thread-14366-1-1.html
http://bbs.qcloud.com/thread-23280-1-1.html