Python 服务端验证 iOS 内购(消耗型)代码记录

代码如下,坑不少,需要用到苹果的 python 库( start 好少,可见用的人不多)


import time
import aiohttp
import jwt
import certifi
import requests

from typing_extensions import deprecated

from src.consts import SLOG
from src.errors.if_error import IFError
from src.errors.if_error_code import IFErrorCode
from src.utils import PathUtil
import ssl

# github 地址: https://github.com/apple/app-store-server-library-python
from appstoreserverlibrary.models.JWSTransactionDecodedPayload import JWSTransactionDecodedPayload
from appstoreserverlibrary.signed_data_verifier import SignedDataVerifier, VerificationException
from appstoreserverlibrary.models.Environment import Environment



class ApplePurchaseAgent:

    def __init__(self, bundle_id: str):
        super(ApplePurchaseAgent, self).__init__()
        self.bundle_id = bundle_id
        jwt_file = PathUtil.key_file("your  p8 file")
        self.key_id = "p8 key id"
        f = open(jwt_file, mode='r')
        self.jwt_key = f.read()
        f.close()
        self.jwt_key_bytes = self.jwt_key.encode("utf-8")
        self.root_certs = None

    def jwt_data(self) -> dict:
        data = {
            "iss": self.iss,
            "iat": int(time.time()),
            "exp": int(time.time()) + 60 * 10,
            "aud": 'appstoreconnect-v1',
            "bid": self.bundle_id
        }
        return data

    def load_root_certificates(self) -> list:
        """
        从本地文件加载 Apple 根证书
        证书下载地址:https://www.apple.com/certificateauthority/
        """
        if self.root_certs is None:
            # 使用 Apple 根证书路径
            files = [
                "AppleComputerRootCertificate.cer",
                "AppleIncRootCertificate.cer",
                "AppleRootCA-G2.cer",
                "AppleRootCA-G3.cer"
            ]
            temp = []

            for name in files:
                try:
                    cert_path = PathUtil.key_file(name)
                    with open(cert_path, 'rb') as f:
                        temp.append(f.read())
                except Exception as e:
                    SLOG.error(f"Error loading root certificates: {e}")
                    return []
            self.root_certs = temp
        return self.root_certs

    def decode_jwt(self, signed_transaction_info: str) -> JWSTransactionDecodedPayload:
        """
        使用 Apple 官方库解析 signedTransactionInfo

        Args:
            signed_transaction_info: 从 Apple 返回的签名交易信息

        Returns:
            解析后的交易信息字典
        """
        try:
            # 创建验证器
            # 注意:在生产环境中需要提供 root_certificates
            root_certifications = self.load_root_certificates()

            signed_data_verifier = SignedDataVerifier(
                root_certificates=root_certifications,  # 在生产环境需要提供 Apple 根证书
                enable_online_checks=True,
                environment=Environment.SANDBOX,  # 根据环境修改
                bundle_id=self.bundle_id,
                app_apple_id=None  # 生产环境需要提供
            )

            # 验证并解码数据
            decoded_payload = signed_data_verifier.verify_and_decode_signed_transaction(signed_transaction_info)

            SLOG.info(f"Decoded transaction info: {decoded_payload}")
            return decoded_payload

        except VerificationException as e:
            SLOG.error(f"Verification failed: {e}")
            raise IFError(code=IFErrorCode.get_purchase_history_fail, message="get purchase history fail",
                          show="get purchase history fail")
        except Exception as e:
            raise IFError(code=IFErrorCode.get_purchase_history_fail, message=f"get purchase history fail, e: {e}",
                          show="get purchase history fail")

    @property
    def storekit_url(self) -> str:
        return "https://api.storekit.itunes.apple.com"

    @property
    def storekit_url_sandbox(self) -> str:
        return "https://api.storekit-sandbox.itunes.apple.com"

    @property
    def iss(self) -> str:
        return "your iss"

    @deprecated("没有使用到,异步容易崩溃")
    async def get_transaction_history(self, transaction_id: str) -> JWSTransactionDecodedPayload:
        try:
            return await self._get_transaction_history(transaction_id=transaction_id, base_url=self.storekit_url)
        except IFError as e:
            if e.code == IFErrorCode.purchase_verify_no_found:  # 可能是沙盒环境,用沙盒环境再验证一次
                return await self._get_transaction_history(transaction_id=transaction_id,
                                                           base_url=self.storekit_url_sandbox)
            else:
                raise e

    def sync_verify_transaction(self, transaction_id: str) -> JWSTransactionDecodedPayload:
        try:
            return self._sync_verify_transaction(transaction_id=transaction_id, base_url=self.storekit_url)
        except IFError as e:
            if e.code == IFErrorCode.purchase_verify_no_found:  # 可能是沙盒环境,用沙盒环境再验证一次
                return self._sync_verify_transaction(transaction_id=transaction_id, base_url=self.storekit_url_sandbox)
            else:
                raise e

    @deprecated("容易出现 python 崩溃,舍弃")
    async def verify_transaction(self, transaction_id: str) -> dict:
        try:
            return await self._verify_transaction(transaction_id=transaction_id, base_url=self.storekit_url)
        except IFError as e:
            if e.code == IFErrorCode.purchase_verify_no_found:  # 可能是沙盒环境,用沙盒环境再验证一次
                return await self._verify_transaction(transaction_id=transaction_id, base_url=self.storekit_url_sandbox)
            else:
                raise e

    async def _get_transaction_history(self, transaction_id: str, base_url: str) -> JWSTransactionDecodedPayload:
        SLOG.info(f"start reVerify transaction:{transaction_id}")
        jwt_data = self.jwt_data()
        private_key = self.jwt_key
        key_id = self.key_id
        jwt_token = jwt.encode(jwt_data, private_key, algorithm="ES256",
                               headers={"kid": key_id, "typ": 'JWT', 'alg': 'ES256'})

        url = f"{base_url}/inApps/v2/history/{transaction_id}"
        headers = {
            'Authorization': f"Bearer {jwt_token}",
            'Content-type': 'application/json'
        }
        ssl_context = ssl.create_default_context(cafile=certifi.where())
        try:
            conn = aiohttp.TCPConnector(ssl=ssl_context)
            async with aiohttp.ClientSession(connector=conn) as session:
                async with session.get(url, headers=headers, ssl=ssl_context) as response:
                    if response.status == 404:
                        raise IFError(code=IFErrorCode.purchase_verify_no_found,
                                      message=f"verify purchase error, status {response.status}",
                                      show="purchase no found error")
                    if response.status != 200:
                        SLOG.info(f"verify purchase fail {response.reason}")
                        text = await response.text()
                        raise IFError(code=IFErrorCode.purchase_verify_error,
                                      message=f"verify purchase error, status {response.status}, message {text}",
                                      show="verify purchase error")

                    response_json = await response.json()
                    if response_json:
                        signedTransactions = response_json.get("signedTransactions")
                        if signedTransactions is None:
                            raise IFError(code=IFErrorCode.purchase_verify_error,
                                          message=f"verify purchase error, signedTransactions is None",
                                          show="verify purchase error")
                        for trans in signedTransactions:
                            data = self.decode_jwt(trans)
                            transaction_id_ = data.transactionId
                            if transaction_id == transaction_id_:
                                return data
                        raise IFError(code=IFErrorCode.purchase_verify_error,
                                      message=f"verify purchase error, transaction_id no found",
                                      show="verify purchase failed, transaction id no found")
                    else:
                        raise IFError(code=IFErrorCode.purchase_verify_error,
                                      message=f"verify purchase error, json data is None",
                                      show="verify purchase error")

        except aiohttp.ClientError as e:
            SLOG.info(f"Error in request to Apple Server : {e}")
            raise IFError(code=IFErrorCode.purchase_verify_error,
                          message=f"verify purchase error, request error {e}",
                          show="verify purchase error")

    def _sync_verify_transaction(self, transaction_id: str, base_url: str) -> JWSTransactionDecodedPayload:
        SLOG.info(f"start verify transaction:{transaction_id}")
        jwt_data = self.jwt_data()
        private_key = self.jwt_key
        key_id = self.key_id
        jwt_token = jwt.encode(jwt_data, private_key, algorithm="ES256",
                               headers={"kid": key_id, "typ": 'JWT', 'alg': 'ES256'})

        url = f"{base_url}/inApps/v1/transactions/{transaction_id}"
        headers = {
            'Authorization': f"Bearer {jwt_token}",
            'Content-type': 'application/json'
        }
        response = requests.get(url, headers=headers)
        if response.status_code == 404:
            text = response.text
            raise IFError(code=IFErrorCode.purchase_verify_no_found,
                          message=f"verify purchase error, status {response.status_code}, message {text}",
                          show="purchase no found error")
        if response.status_code != 200:
            SLOG.info(f"verify purchase fail {response.reason}")
            text = response.text
            raise IFError(code=IFErrorCode.purchase_verify_error,
                          message=f"verify purchase error, status {response.status_code}, message {text}",
                          show="verify purchase error")

        response_json = response.json()
        if response_json:
            # 处理响应
            SLOG.debug(f"response : {response_json}")
            info = response_json.get("signedTransactionInfo")
            decode_data = self.decode_jwt(info)
            return decode_data
        else:
            raise IFError(code=IFErrorCode.purchase_verify_error,
                          message=f"verify purchase error, json data is None",
                          show="verify purchase error")

    async def _verify_transaction(self, transaction_id: str, base_url: str) -> dict:
        SLOG.info(f"start verify transaction:{transaction_id}")
        jwt_data = self.jwt_data()
        private_key = self.jwt_key
        key_id = self.key_id
        jwt_token = jwt.encode(jwt_data, private_key, algorithm="ES256",
                               headers={"kid": key_id, "typ": 'JWT', 'alg': 'ES256'})

        url = f"{base_url}/inApps/v1/transactions/{transaction_id}"
        headers = {
            'Authorization': f"Bearer {jwt_token}",
            'Content-type': 'application/json'
        }
        ssl_context = ssl.create_default_context(cafile=certifi.where())
        try:
            conn = aiohttp.TCPConnector(ssl=ssl_context)
            async with aiohttp.ClientSession(connector=conn) as session:
                async with session.get(url, headers=headers, ssl=ssl_context) as response:
                    if response.status == 404:
                        text = await response.text()
                        raise IFError(code=IFErrorCode.purchase_verify_no_found,
                                      message=f"verify purchase error, status {response.status}, message {text}",
                                      show="purchase no found error")
                    if response.status != 200:
                        SLOG.info(f"verify purchase fail {response.reason}")
                        text = await response.text()
                        raise IFError(code=IFErrorCode.purchase_verify_error,
                                      message=f"verify purchase error, status {response.status}, message {text}",
                                      show="verify purchase error")

                    response_json = await response.json()
                    if response_json:
                        # 处理响应
                        SLOG.debug(f"response : {response_json}")
                        return response_json
                    else:
                        raise IFError(code=IFErrorCode.purchase_verify_error,
                                      message=f"verify purchase error, json data is None",
                                      show="verify purchase error")

        except aiohttp.ClientError as e:
            SLOG.info(f"Error in request to Apple Server : {e}")
            raise IFError(code=IFErrorCode.purchase_verify_error,
                          message=f"verify purchase error, request error {e}",
                          show="verify purchase error")

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容