代码如下,坑不少,需要用到苹果的 python 库( start 好少,可见用的人不多)
import time
import aiohttp
import jwt
import certifi
import requests
from typing_extensions import deprecated
from src.consts import SLOG
from src.errors.if_error import IFError
from src.errors.if_error_code import IFErrorCode
from src.utils import PathUtil
import ssl
# github 地址: https://github.com/apple/app-store-server-library-python
from appstoreserverlibrary.models.JWSTransactionDecodedPayload import JWSTransactionDecodedPayload
from appstoreserverlibrary.signed_data_verifier import SignedDataVerifier, VerificationException
from appstoreserverlibrary.models.Environment import Environment
class ApplePurchaseAgent:
def __init__(self, bundle_id: str):
super(ApplePurchaseAgent, self).__init__()
self.bundle_id = bundle_id
jwt_file = PathUtil.key_file("your p8 file")
self.key_id = "p8 key id"
f = open(jwt_file, mode='r')
self.jwt_key = f.read()
f.close()
self.jwt_key_bytes = self.jwt_key.encode("utf-8")
self.root_certs = None
def jwt_data(self) -> dict:
data = {
"iss": self.iss,
"iat": int(time.time()),
"exp": int(time.time()) + 60 * 10,
"aud": 'appstoreconnect-v1',
"bid": self.bundle_id
}
return data
def load_root_certificates(self) -> list:
"""
从本地文件加载 Apple 根证书
证书下载地址:https://www.apple.com/certificateauthority/
"""
if self.root_certs is None:
# 使用 Apple 根证书路径
files = [
"AppleComputerRootCertificate.cer",
"AppleIncRootCertificate.cer",
"AppleRootCA-G2.cer",
"AppleRootCA-G3.cer"
]
temp = []
for name in files:
try:
cert_path = PathUtil.key_file(name)
with open(cert_path, 'rb') as f:
temp.append(f.read())
except Exception as e:
SLOG.error(f"Error loading root certificates: {e}")
return []
self.root_certs = temp
return self.root_certs
def decode_jwt(self, signed_transaction_info: str) -> JWSTransactionDecodedPayload:
"""
使用 Apple 官方库解析 signedTransactionInfo
Args:
signed_transaction_info: 从 Apple 返回的签名交易信息
Returns:
解析后的交易信息字典
"""
try:
# 创建验证器
# 注意:在生产环境中需要提供 root_certificates
root_certifications = self.load_root_certificates()
signed_data_verifier = SignedDataVerifier(
root_certificates=root_certifications, # 在生产环境需要提供 Apple 根证书
enable_online_checks=True,
environment=Environment.SANDBOX, # 根据环境修改
bundle_id=self.bundle_id,
app_apple_id=None # 生产环境需要提供
)
# 验证并解码数据
decoded_payload = signed_data_verifier.verify_and_decode_signed_transaction(signed_transaction_info)
SLOG.info(f"Decoded transaction info: {decoded_payload}")
return decoded_payload
except VerificationException as e:
SLOG.error(f"Verification failed: {e}")
raise IFError(code=IFErrorCode.get_purchase_history_fail, message="get purchase history fail",
show="get purchase history fail")
except Exception as e:
raise IFError(code=IFErrorCode.get_purchase_history_fail, message=f"get purchase history fail, e: {e}",
show="get purchase history fail")
@property
def storekit_url(self) -> str:
return "https://api.storekit.itunes.apple.com"
@property
def storekit_url_sandbox(self) -> str:
return "https://api.storekit-sandbox.itunes.apple.com"
@property
def iss(self) -> str:
return "your iss"
@deprecated("没有使用到,异步容易崩溃")
async def get_transaction_history(self, transaction_id: str) -> JWSTransactionDecodedPayload:
try:
return await self._get_transaction_history(transaction_id=transaction_id, base_url=self.storekit_url)
except IFError as e:
if e.code == IFErrorCode.purchase_verify_no_found: # 可能是沙盒环境,用沙盒环境再验证一次
return await self._get_transaction_history(transaction_id=transaction_id,
base_url=self.storekit_url_sandbox)
else:
raise e
def sync_verify_transaction(self, transaction_id: str) -> JWSTransactionDecodedPayload:
try:
return self._sync_verify_transaction(transaction_id=transaction_id, base_url=self.storekit_url)
except IFError as e:
if e.code == IFErrorCode.purchase_verify_no_found: # 可能是沙盒环境,用沙盒环境再验证一次
return self._sync_verify_transaction(transaction_id=transaction_id, base_url=self.storekit_url_sandbox)
else:
raise e
@deprecated("容易出现 python 崩溃,舍弃")
async def verify_transaction(self, transaction_id: str) -> dict:
try:
return await self._verify_transaction(transaction_id=transaction_id, base_url=self.storekit_url)
except IFError as e:
if e.code == IFErrorCode.purchase_verify_no_found: # 可能是沙盒环境,用沙盒环境再验证一次
return await self._verify_transaction(transaction_id=transaction_id, base_url=self.storekit_url_sandbox)
else:
raise e
async def _get_transaction_history(self, transaction_id: str, base_url: str) -> JWSTransactionDecodedPayload:
SLOG.info(f"start reVerify transaction:{transaction_id}")
jwt_data = self.jwt_data()
private_key = self.jwt_key
key_id = self.key_id
jwt_token = jwt.encode(jwt_data, private_key, algorithm="ES256",
headers={"kid": key_id, "typ": 'JWT', 'alg': 'ES256'})
url = f"{base_url}/inApps/v2/history/{transaction_id}"
headers = {
'Authorization': f"Bearer {jwt_token}",
'Content-type': 'application/json'
}
ssl_context = ssl.create_default_context(cafile=certifi.where())
try:
conn = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=conn) as session:
async with session.get(url, headers=headers, ssl=ssl_context) as response:
if response.status == 404:
raise IFError(code=IFErrorCode.purchase_verify_no_found,
message=f"verify purchase error, status {response.status}",
show="purchase no found error")
if response.status != 200:
SLOG.info(f"verify purchase fail {response.reason}")
text = await response.text()
raise IFError(code=IFErrorCode.purchase_verify_error,
message=f"verify purchase error, status {response.status}, message {text}",
show="verify purchase error")
response_json = await response.json()
if response_json:
signedTransactions = response_json.get("signedTransactions")
if signedTransactions is None:
raise IFError(code=IFErrorCode.purchase_verify_error,
message=f"verify purchase error, signedTransactions is None",
show="verify purchase error")
for trans in signedTransactions:
data = self.decode_jwt(trans)
transaction_id_ = data.transactionId
if transaction_id == transaction_id_:
return data
raise IFError(code=IFErrorCode.purchase_verify_error,
message=f"verify purchase error, transaction_id no found",
show="verify purchase failed, transaction id no found")
else:
raise IFError(code=IFErrorCode.purchase_verify_error,
message=f"verify purchase error, json data is None",
show="verify purchase error")
except aiohttp.ClientError as e:
SLOG.info(f"Error in request to Apple Server : {e}")
raise IFError(code=IFErrorCode.purchase_verify_error,
message=f"verify purchase error, request error {e}",
show="verify purchase error")
def _sync_verify_transaction(self, transaction_id: str, base_url: str) -> JWSTransactionDecodedPayload:
SLOG.info(f"start verify transaction:{transaction_id}")
jwt_data = self.jwt_data()
private_key = self.jwt_key
key_id = self.key_id
jwt_token = jwt.encode(jwt_data, private_key, algorithm="ES256",
headers={"kid": key_id, "typ": 'JWT', 'alg': 'ES256'})
url = f"{base_url}/inApps/v1/transactions/{transaction_id}"
headers = {
'Authorization': f"Bearer {jwt_token}",
'Content-type': 'application/json'
}
response = requests.get(url, headers=headers)
if response.status_code == 404:
text = response.text
raise IFError(code=IFErrorCode.purchase_verify_no_found,
message=f"verify purchase error, status {response.status_code}, message {text}",
show="purchase no found error")
if response.status_code != 200:
SLOG.info(f"verify purchase fail {response.reason}")
text = response.text
raise IFError(code=IFErrorCode.purchase_verify_error,
message=f"verify purchase error, status {response.status_code}, message {text}",
show="verify purchase error")
response_json = response.json()
if response_json:
# 处理响应
SLOG.debug(f"response : {response_json}")
info = response_json.get("signedTransactionInfo")
decode_data = self.decode_jwt(info)
return decode_data
else:
raise IFError(code=IFErrorCode.purchase_verify_error,
message=f"verify purchase error, json data is None",
show="verify purchase error")
async def _verify_transaction(self, transaction_id: str, base_url: str) -> dict:
SLOG.info(f"start verify transaction:{transaction_id}")
jwt_data = self.jwt_data()
private_key = self.jwt_key
key_id = self.key_id
jwt_token = jwt.encode(jwt_data, private_key, algorithm="ES256",
headers={"kid": key_id, "typ": 'JWT', 'alg': 'ES256'})
url = f"{base_url}/inApps/v1/transactions/{transaction_id}"
headers = {
'Authorization': f"Bearer {jwt_token}",
'Content-type': 'application/json'
}
ssl_context = ssl.create_default_context(cafile=certifi.where())
try:
conn = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=conn) as session:
async with session.get(url, headers=headers, ssl=ssl_context) as response:
if response.status == 404:
text = await response.text()
raise IFError(code=IFErrorCode.purchase_verify_no_found,
message=f"verify purchase error, status {response.status}, message {text}",
show="purchase no found error")
if response.status != 200:
SLOG.info(f"verify purchase fail {response.reason}")
text = await response.text()
raise IFError(code=IFErrorCode.purchase_verify_error,
message=f"verify purchase error, status {response.status}, message {text}",
show="verify purchase error")
response_json = await response.json()
if response_json:
# 处理响应
SLOG.debug(f"response : {response_json}")
return response_json
else:
raise IFError(code=IFErrorCode.purchase_verify_error,
message=f"verify purchase error, json data is None",
show="verify purchase error")
except aiohttp.ClientError as e:
SLOG.info(f"Error in request to Apple Server : {e}")
raise IFError(code=IFErrorCode.purchase_verify_error,
message=f"verify purchase error, request error {e}",
show="verify purchase error")