Python 服务端集成 腾讯云 IM 服务

最近做的项目,需要接入腾讯云 IM,翻看了一下文档,iOS、Android 以及 Web 端基本上都有 SDK 可以集成。我使用的服务端是用 Python 写的,腾讯 IM 暂时还没有 Python 的官方文档。但是在腾讯云的官方论坛上找到了解决方法。
服务端最基本的一个需求是:使用接入用户的 identifier 和应用申请的腾讯云 appid、私钥等信息,通过指定算法,生成用户用来登录腾讯云 IM 的 usersig。

源代码:

#! /usr/bin/python
# coding:utf-8

import OpenSSL, base64, zlib, json, time

ecdsa_pri_key = """
自己的私钥
"""

def list_all_curves():
    list = OpenSSL.crypto.get_elliptic_curves()
    for element in list:
        print element

def get_secp256k1():
    print OpenSSL.crypto.get_elliptic_curve('secp256k1');


def base64_encode_url(data):
    base64_data = base64.b64encode(data)
    base64_data = base64_data.replace('+', '*')
    base64_data = base64_data.replace('/', '-')
    base64_data = base64_data.replace('=', '_')
    return base64_data

def base64_decode_url(base64_data):
    base64_data = base64_data.replace('*', '+')
    base64_data = base64_data.replace('-', '/')
    base64_data = base64_data.replace('_', '=')
    raw_data = base64.b64decode(base64_data)
    return raw_data

class TLSSigAPI:
    """"""    
    __acctype = 0
    __identifier = ""
    __appid3rd = ""
    __sdkappid = 0
    __version = 20151204
    __expire = 3600*24*30       # 默认一个月,需要调整请自行修改
    __pri_key = ""
    __pub_key = ""
    _err_msg = "ok"

    def __get_pri_key(self):
        return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, self.__pri_key);

    def __init__(self, sdkappid, pri_key):
        self.__sdkappid = sdkappid
        self.__pri_key = pri_key

    def __create_dict(self):
        m = {}
        m["TLS.account_type"] = "%d" % self.__acctype
        m["TLS.identifier"] = "%s" % self.__identifier
        m["TLS.appid_at_3rd"] = "%s" % self.__appid3rd
        m["TLS.sdk_appid"] = "%d" % self.__sdkappid
        m["TLS.expire_after"] = "%d" % self.__expire
        m["TLS.version"] = "%d" % self.__version
        m["TLS.time"] = "%d" % time.time()
        return m

    def __encode_to_fix_str(self, m):
        fix_str = "TLS.appid_at_3rd:"+m["TLS.appid_at_3rd"]+"\n" \
                  +"TLS.account_type:"+m["TLS.account_type"]+"\n" \
                  +"TLS.identifier:"+m["TLS.identifier"]+"\n" \
                  +"TLS.sdk_appid:"+m["TLS.sdk_appid"]+"\n" \
                  +"TLS.time:"+m["TLS.time"]+"\n" \
                  +"TLS.expire_after:"+m["TLS.expire_after"]+"\n"
        return fix_str

    def tls_gen_sig(self, identifier):
        self.__identifier = identifier

        m = self.__create_dict()
        fix_str = self.__encode_to_fix_str(m)
        pk_loaded = self.__get_pri_key()
        sig_field = OpenSSL.crypto.sign(pk_loaded, fix_str, "sha256");
        sig_field_base64 = base64.b64encode(sig_field)
        m["TLS.sig"] = sig_field_base64
        json_str = json.dumps(m)
        sig_cmpressed = zlib.compress(json_str)
        base64_sig = base64_encode_url(sig_cmpressed)
        return base64_sig 

def main():
    api = TLSSigAPI(1400001052, ecdsa_pri_key)
    sig = api.tls_gen_sig("xiaojun")
    print sig

if __name__ == "__main__":
    main()

过程:

  1. 将用户的信息组装成一个字符串(json格式的,是直接拼装的,因为顺序不能乱),是哪些信息,可以看 __encode_to_fix_str;
  2. 使用 sha256 将字符串 hash,然后再用私钥签名,一般加密接口都会一把搞定,加密曲线使用的是 secp256k1
  3. 把第2步得到的缓冲区进行 base64;
  4. 将所有用户的信息以及第3步得到签名写进一个 json 串,此时可以不论顺序;
  5. 将 json 进行序列化,再 zlib 压缩,最后 base64(替换了某些字符,具体哪些看代码)。

需要注意的是,加密曲线使用的是 secp256k1, 有些系统的 openssl 是不支持这个曲线的。查看系统所支持的所有曲线,可参考 list_all_curves, 这个方法会打印出所有支持的曲线。如果你的服务器恰好支持这个曲线,那经过上面的过程就可以正常使用了。

如果你的服务器不支持 secp256k1,可以使用 python ecdsa 。

  1. python ecdsa 开发库 下载地址
  2. 使用 pip 安装:pip install ecdsa

由于 python ecdsa 这个开发库仅支持 ec 格式的私钥,从腾讯云下载的私钥格式是 pk #8 的格式,需要使用 openssl 命令进行转换。转换命令如下:

openssl ec -outform PEM -inform PEM -in private.pem -out private_ec.pem

-in 后面的传入下载的私钥 -out 后面是转换后的私钥文件

最终实现代码:

#! /usr/bin/python
# coding:utf-8

import OpenSSL, base64, zlib, json, time, hashlib
from ecdsa import SigningKey,util

# 这里请填写应用自己的私钥
ecdsa_pri_key = """
请填上应用自己的私钥
"""

def base64_encode_url(data):
    base64_data = base64.b64encode(data)
    base64_data = base64_data.replace('+', '*')
    base64_data = base64_data.replace('/', '-')
    base64_data = base64_data.replace('=', '_')
    return base64_data

def base64_decode_url(base64_data):
    base64_data = base64_data.replace('*', '+')
    base64_data = base64_data.replace('-', '/')
    base64_data = base64_data.replace('_', '=')
    raw_data = base64.b64decode(base64_data)
    return raw_data

class TLSSigAPI:
    """"""    
    __acctype = 0
    __identifier = ""
    __appid3rd = ""
    __sdkappid = 0
    __version = 20151204
    __expire = 3600*24*30       # 默认一个月,需要调整请自行修改
    __pri_key = ""
    __pub_key = ""
    _err_msg = "ok"
    

    def __get_pri_key(self):
        return self.__pri_key_loaded

    def __init__(self, sdkappid, pri_key):
        self.__sdkappid = sdkappid
        self.__pri_key = pri_key
        self.__pri_key_loaded = SigningKey.from_pem(self.__pri_key)

    def __create_dict(self):
        m = {}
        m["TLS.account_type"] = "%d" % self.__acctype
        m["TLS.identifier"] = "%s" % self.__identifier
        m["TLS.appid_at_3rd"] = "%s" % self.__appid3rd
        m["TLS.sdk_appid"] = "%d" % self.__sdkappid
        m["TLS.expire_after"] = "%d" % self.__expire
        m["TLS.version"] = "%d" % self.__version
        m["TLS.time"] = "%d" % time.time()
        return m

    def __encode_to_fix_str(self, m):
        fix_str = "TLS.appid_at_3rd:"+m["TLS.appid_at_3rd"]+"\n" \
                  +"TLS.account_type:"+m["TLS.account_type"]+"\n" \
                  +"TLS.identifier:"+m["TLS.identifier"]+"\n" \
                  +"TLS.sdk_appid:"+m["TLS.sdk_appid"]+"\n" \
                  +"TLS.time:"+m["TLS.time"]+"\n" \
                  +"TLS.expire_after:"+m["TLS.expire_after"]+"\n"
        return fix_str

    def tls_gen_sig(self, identifier):
        self.__identifier = identifier
        m = self.__create_dict()
        fix_str = self.__encode_to_fix_str(m)
        pk_loaded = self.__get_pri_key()
        sig_field = pk_loaded.sign(fix_str, hashfunc=hashlib.sha256, sigencode=util.sigencode_der)
        sig_field_base64 = base64.b64encode(sig_field)
        m["TLS.sig"] = sig_field_base64
        json_str = json.dumps(m)
        sig_cmpressed = zlib.compress(json_str)
        base64_sig = base64_encode_url(sig_cmpressed)
        return base64_sig 

def main():
    api = TLSSigAPI(1400001052, ecdsa_pri_key)
    sig = api.tls_gen_sig("xiaojun")
    print sig

if __name__ == "__main__":
    main()

参考链接,腾讯云官方论坛:
http://bbs.qcloud.com/thread-14366-1-1.html
http://bbs.qcloud.com/thread-23280-1-1.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 175,831评论 25 709
  • 发现 关注 消息 iOS 第三方库、插件、知名博客总结 作者大灰狼的小绵羊哥哥关注 2017.06.26 09:4...
    肇东周阅读 14,293评论 4 61
  • 留下参考吧,这个是使用windows下的MarkdownPad2,github离线风格写的,放在这里这个文章内部链...
    莫名其妙的一生阅读 13,376评论 3 5
  • 近期项目中可能会用到framework的封装,所以抽时间学习了一下。网上有许多关于这方面的文章,这里介绍下我自己的...
    天明天阅读 4,734评论 2 0