import base64
import hashlib
import hmac
import json
import random
import re
import string
import time
from hashlibimport sha256
from Crypto.Cipherimport AES,PKCS1_v1_5
from Crypto.PublicKeyimport RSA
import binascii
from Crypto.Util.Paddingimport unpad
from cryptography.hazmat.primitives.asymmetricimport rsa
from cryptography.hazmat.primitivesimport serialization
from cryptography.hazmat.primitives.serializationimport Encoding
import os
# jarpath = os.path.join(os.path.abspath('.') + '\\aes.jar') # 第二个参数是jar包的路径
# # jarpath = os.path.join(os.path.abspath('.') + '\\utils.jar')
# opensslpath = os.path.join(os.path.abspath('.') + '\\openssl-0.9.8k_X64\\bin\\openssl.exe ')
#
# starkey = 'a2164ada0026ccf7'.encode('utf-8')
# stariv = '82c91325e74bef0f'.encode('utf-8')
# portalkey = 't5SJPflE47xsysmJ42tGqtHG+oKaDg41'.encode('utf-8')
# portaliv = '5UnBXl4uSVtweMyX'.encode('utf-8')
# stagekey = 't5SJPflE47xsysmJ42tGqtHG+oKaDg41'.encode('utf-8')
# stageiv = 'oCDjWzAFbqBriHEF'.encode('utf-8')
# starsigsecret = '9af2e7b2d7562ad5'.encode('utf-8')
# jvslkey = 'suWD3qOZr01FNDIESjSMeMO7HF/5zVMw'.encode('utf-8')
# jarpath = os.path.join(
# os.path.abspath('') +
# '\\aes.jar')
# opensslpath = os.path.join(os.path.abspath('') + '\\openssl-0.9.8k_X64\\bin\\openssl.exe ')
from Crypto.Utilimport Padding
def IV32(IV='oCDjWzAFbqBriHEF'):
# 16位随机数字字母 这里写死不影响 一般为接口中的xjv
iv32 = binascii.b2a_hex(IV.encode()).decode("utf-8")
return iv32
def IV16(iv):
iv16 = binascii.a2b_hex(iv.encode()).decode("utf-8")
return iv16
#
# def Generate_key_pkcs8():
# """生成pkcs8公钥"""
# # lujin = "C:/Python36/Lib/site-packages/AesLibrary/openssl-0.9.8k_X64/bin/openssl.exe "
# # 生成pkcs1私钥
# os.system(
# opensslpath + 'genrsa -out pkcs1_private.pem 1024')
# # 转成pkcs8私钥
# os.system(
#
# opensslpath +
# 'pkcs8 -topk8 -inform PEM -in pkcs1_private.pem -outform pem -nocrypt -out pkcs8_private.pem')
# # 转成pkcs8公钥
# os.system(
#
# opensslpath +
# 'rsa -in pkcs8_private.pem -pubout -out pkcs8_public.pem')
# with open('C:\\Users\\LCHEN205\\Desktop\\Wallboxapitest\\tests\\Business_scene\\Portal\\pkcs8_public.pem') as fp:
# publickey_pkcs8 = fp.read()
# return publickey_pkcs8.replace("\n","")
def RSA_keypair():
# 生成 RSA 公私钥对
private_key = rsa.generate_private_key(
public_exponent=65537, # 公钥指数
key_size=1024) # 密钥长度
public_key = private_key.public_key()
return private_key,public_key
def RSA_privatekeyPkcs8():
# 序列化私钥为PEM PKCS8私钥
privatekeyPkcs8 = RSA_keypair()[0].private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8, # PKCS#8 私钥格式
encryption_algorithm=serialization.NoEncryption()) # 不进行加密
return privatekeyPkcs8.decode('utf_8')
def RSA_publickeyPkcs8():
pem_public_key = RSA_keypair()[1].public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo) # 公钥格式
return pem_public_key.decode('utf_8')
# 序列化私钥为PEM PKCS8公钥
def Generate_key_pkcs8():
# 序列化私钥为PEM PKCS8公钥
# luji = "C:/Python36/Lib/site-packages/AesLibrary/openssl-0.9.8k_X64/bin/openssl.exe "
pem_public_key = RSA_keypair()[1].public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo) # 公钥格式
return pem_public_key.decode('utf-8').replace("\n","")
# cur_path = os.path.abspath(os.path.dirname(__file__))
# opensslpath = cur_path[
# :cur_path.find(
# 'wallboxapitest')] + 'wallboxapitest/utils/openssl-0.9.8k_X64/bin/openssl.exe '
#
# root_path = cur_path[
# :cur_path.find(
# 'wallboxapitest')] + 'wallboxapitest/tests/Business_scene/Portal/pkcs8_public.pem'
# os.system(opensslpath + 'genrsa -out pkcs1_private.pem 1024')
# os.system(
# opensslpath + 'pkcs8 -topk8 -inform PEM -in pkcs1_private.pem -outform pem -nocrypt -out pkcs8_private.pem')
# os.system(opensslpath + 'rsa -in pkcs8_private.pem -pubout -out pkcs8_public.pem')
# with open(root_path) as fp:
# publickey_pkcs8 = fp.read()
# return publickey_pkcs8.replace("\n","")
def RSA_privatekeyPkcs1():
# 序列化私钥为PEM PKCS1私钥
privatekeyPkcs1 = RSA_keypair()[0].private_bytes(
encoding=Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, # PKCS#8 私钥格式
encryption_algorithm=serialization.NoEncryption()) # 不进行加密
return privatekeyPkcs1.decode('utf_8')
def RSA_publickeyPkcs1():
# 序列化私钥为PEM PKCS1公钥
privatekeyPkcs1 = RSA_keypair()[1].public_bytes(
encoding=Encoding.PEM,
format=serialization.PublicFormat.PKCS1) # 不进行加密
return privatekeyPkcs1.decode('utf_8')
# def pkcs7padding(text):
# """
# 明文使用PKCS7填充
# """
# padded_data = Padding.pad(str(text).encode('utf-8'), AES.block_size, style='pkcs7')
# return padded_data
# bs = 16
# length = len(text)
# bytes_length = len(str(text).encode('utf-8'))
# padding_size = length if (bytes_length == length) else bytes_length
# padding = bs - padding_size % bs
# padding_text = chr(padding) * padding
#
# coding = chr(padding)
# return str(text) + padding_text
def aes_encrypt(content,key,iv):
"""
AES加密
"""
cipher = AES.new(key.encode('utf-8'),AES.MODE_CBC,iv.encode('utf-8'))
# 处理明文,使用Padding.pad()函数来实现PKCS7Padding,并使用style='pkcs7'参数来指定填充方式。
# content = json.dumps(content)
if type(content) !=str:
content = json.dumps(content)
content_padding = Padding.pad(content.encode('utf-8'),AES.block_size,style='pkcs7')
# 加密
encrypt_bytes = cipher.encrypt(content_padding)
# encrypt_bytes = cipher.encrypt(content_padding.encode('utf-8'))
# 重新编码
result =str(base64.b64encode(encrypt_bytes),encoding='utf-8')
return result
# def aes_jvsl_encrypt(content):
# """
# AES加密
# applogout
# """
# cipher = AES.new(
# jvslkey,AES.MODE_CBC,
# stageiv)
# # 处理明文
# content_padding = \
# pkcs7padding(content)
# # 加密
# encrypt_bytes = cipher.encrypt(content_padding.encode('utf-8'))
# # 重新编码
# result = str(base64.b64encode(encrypt_bytes),encoding='utf-8')
# return result
def aes_Decrypt(content,key,iv):
"""
AES解密
"""
cipher = AES.new(key.encode('utf-8'),AES.MODE_CBC,iv.encode('utf-8'))
base64_decrypted = base64.b64decode(content.encode('utf-8'))
result = unpad(cipher.decrypt(base64_decrypted),AES.block_size).decode('utf-8')
return result
# def aes_jvsl_Decrypt(content):
# """
# AES解密
# """
# cipher = AES.new(
# jvslkey,AES.MODE_CBC,
# stageiv)
# base64_decrypted = base64.b64decode(content.encode('utf-8'))
# DataStr = cipher.decrypt(base64_decrypted)
# # print(DataStr)
# try:
# result = re.compile(
# '[\\x00-\\x08\\x0b-\\x0c\\x0e-\\x1f\n\r\t]').sub('',DataStr.decode())
# except BaseException:
# result = '转码失败'
# return result
def notifications_body(params,key,iv,starsigsecret,OperatorID="313744932"):
"""notifications加密加签返回处理后的body
等同于requestEncodeLocal,为python本地化方法"""
# OperatorID,默认727:313744932,821:MA21D2RF2
# print(params)
# OperatorID = "313744932"
# 时间戳
timestamp = time.strftime("%Y%m%d%H%M%S",time.localtime())
# 随机的四位数字,加签需要的参数,随机生成就可以
requestSeq ="".join(random.sample(string.digits,k=4))
# 传参为body,对body加密
# cipher = AES.new(key.encode('utf-8'),AES.MODE_CBC,iv.encode('utf-8'))
# # 处理明文,传参为body,对params加密
# content_padding = pkcs7padding(params)
# 加密
aesdata = aes_encrypt(params,key,iv)
# 重新编码
# aesdata = str(base64.b64encode(encrypt_bytes),encoding='utf-8')
# aesdata =
# aes_encrypt(params, key, iv)
# 拼接加签数据,数据分别为运营商id,加密后的body,时间戳,随机四位数字字符串
data = OperatorID + aesdata +str(timestamp) +str(requestSeq)
# sha256对data加签,参数为sigsecret,在init属性定义,还有拼接的加签数据data,该方法适用于星星的sha256
signdata = hmac.new(starsigsecret.encode('utf-8'),data.encode("utf8"),digestmod=sha256).hexdigest().upper()
# 组装加密加签后的密文body,return出去
result ={"Sig": signdata,
"Data": aesdata,
"OperatorID": OperatorID,
"TimeStamp": timestamp,
"Seq": requestSeq}
# return json.dumps(result)
return result
def RSA_pkcs8encrypt(content):
"""portal pkcs8加密"""
pkcs8key = RSA.importKey('''-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsPALr6qX1CKpDYhuvm7FoHr4nTlARjVqpsotXPm639ZMlvDR3rQ8/k+AZKRHxJkHjgdV2Pm6RQonIONYXTv9FRLctxCJAsOHIig0b9yEoKh3hUCxocoP7mAItLcMwXCYhKilEeoEPSWvQTXf6inNrpM/GDcHrgoghU5OT8YZ5l8qlvwGkZZWrXklchMt84ezv9Gb2akFnj/A39wtKI33Jvhk3zbCC/jh9S7XWuLgty1RakmdI64PFGH9j/+OSE9t1ETNOgvju8VxuNfxqf8eh2IP5sYXV8GRmMeyuFNt5GmFZkHKeds15pfhfz9nb8B67wgS6dWi4ZdE6iM5qJ/4XwIDAQAB
-----END PUBLIC KEY-----''')
cipher = PKCS1_v1_5.new(pkcs8key)
# 加密手机号
crypto = base64.urlsafe_b64encode(cipher.encrypt(content.encode('utf-8'))).decode('utf-8')
return crypto
def RSA_pkcs1encrypt(content):
"""pkcs1加密"""
pkcs1key = RSA.importKey(
'''-----BEGIN RSA PUBLIC KEY-----\nMIIBCgKCAQEAsPALr6qX1CKpDYhuvm7FoHr4nTlARjVqpsotXPm639ZMlvDR3rQ8\n/k+AZKRHxJkHjgdV2Pm6RQonIONYXTv9FRLctxCJAsOHIig0b9yEoKh3hUCxocoP\n7mAItLcMwXCYhKilEeoEPSWvQTXf6inNrpM/GDcHrgoghU5OT8YZ5l8qlvwGkZZW\nrXklchMt84ezv9Gb2akFnj/A39wtKI33Jvhk3zbCC/jh9S7XWuLgty1RakmdI64P\nFGH9j/+OSE9t1ETNOgvju8VxuNfxqf8eh2IP5sYXV8GRmMeyuFNt5GmFZkHKeds1\n5pfhfz9nb8B67wgS6dWi4ZdE6iM5qJ/4XwIDAQAB\n-----END RSA PUBLIC KEY-----\n''')
cipher = PKCS1_v1_5.new(pkcs1key)
# 加密手机号
crypto = base64.urlsafe_b64encode(cipher.encrypt(content.encode('utf-8'))).decode('utf-8')
return crypto
def R2_sign(param,payloadkey,secretKey):
"""新接口对参数body进行排序并且加签
不要传value为空的key"""
# 拿到日期格式‘2021-12-01,进行sha256处理
sha256date = sha256(time.strftime('%Y-%m-%d').encode('utf-8')).hexdigest()
# sha256处理后的日期,拼接上payloadkey,进行sha256处理,得到secretKey2
secretKey2 = sha256((sha256date + payloadkey).encode('utf-8')).hexdigest()
# 对入参body进行排序,格式{"pageIndex":"1","pageSize":"20","xjv":"37514a4e4f4e6e4256337a4f324c3347","timestamp":"1638337506000"}
darr = json.loads(json.dumps(param,sort_keys=True,ensure_ascii=False))
stack =list(darr.keys())
my_list =[]
# for i in stack:
# my_list.append("&" + str(i) + "=" + str(darr[i]).replace("'", ''))
for iin stack:
if darr[i] =="":
dict(darr).pop(i)
elif isinstance(darr[i],(dict,list)):
my_list.append("&" +str(i) +"=" +str(json.dumps(darr[i],ensure_ascii=False).replace(" ","")))
else:
my_list.append("&" +str(i) +"=" +str(darr[i]))
# 拼接secretKey2+排序后的body+secretKey,进行sha256处理,得到xjw
return hashlib.sha256(
("secretKey2=" + secretKey2 +''.join(my_list) +"&secretKey=" + secretKey).encode('utf-8')).hexdigest()
def portal_signtimestap(params):
"""portal对入参加签,return sign和时间戳
老接口加签,share,dkey接口也用这个方法,"""
timestap =str(round(time.time() *1000))
Ym = time.strftime("%Y%m",time.localtime())
# 对Ym进行加签,该方法适用于福特的sha256
sha256Timestamp = hashlib.sha256(Ym.encode('utf-8')).hexdigest()
# 对动态盐取值区间,左区间为月份乘3,右区间为月份乘3+10
m =int(time.strftime("%m",time.localtime())) *3
Sha256DynamicSalt = sha256Timestamp[m:m +10]
sign = hashlib.sha256((params + timestap + Sha256DynamicSalt).encode('utf-8')).hexdigest()
return [sign,timestap]
def ms_timestap():
timestap =str(round(time.time() *1000))
return timestap
def sha256guid(guid):
return sha256(guid.encode('utf-8')).hexdigest()
def R1_sign(param,secretKey):
darr = json.loads(
json.dumps(
eval(param),
sort_keys=True,
ensure_ascii=False))
stack =list(darr.keys())
my_list =[]
for iin stack:
if darr[i] =="":
dict(darr).pop(i)
elif isinstance(darr[i],(dict,list)):
my_list.append(
"&" +str(i) +"=" +str(json.dumps(darr[i],ensure_ascii=False).replace(" ","")))
else:
my_list.append("&" +str(i) +"=" +str(darr[i]))
return hashlib.sha256((''.join(my_list) +"&secretKey=" + secretKey).lstrip("&").encode('utf-8')).hexdigest()