Skip to content

Python示例代码

1. 核心安全组件 (sud_security_util.py)

python
import base64
import json
import os
import time
from typing import Any, Dict, Tuple

from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509 import load_pem_x509_certificate


class SudSecurityUtil:
    """SUD OpenPaaS API 安全通信核心工具类"""

    @staticmethod
    def encrypt_request(url_path: str, app_id: str, key_sn: str, sym_key_base64: str, req_data: Dict[str, Any]) -> Tuple[int, Dict[str, str]]:
        """
        1. 请求业务数据加密 (AES-256-GCM)
        """
        timestamp = int(time.time())
        # 生成 16 字节随机数,并使用 URL 安全的 Base64 编码(去除尾部等号)
        nonce = base64.urlsafe_b64encode(os.urandom(16)).decode('utf-8').rstrip('=')

        # 补齐安全校验字段
        req_data['_n'] = nonce
        req_data['_appid'] = app_id
        req_data['_timestamp'] = timestamp

        # Python 的 json.dumps 默认带有空格,必须使用 separators=(',', ':') 压缩成紧凑格式
        plaintext = json.dumps(req_data, separators=(',', ':'), ensure_ascii=False)

        # 拼接 AAD: urlpath|appid|timestamp|sn
        aad = f"{url_path}|{app_id}|{timestamp}|{key_sn}"

        real_key = base64.b64decode(sym_key_base64)
        iv = os.urandom(12)  # GCM 推荐 12 字节 IV
        
        aesgcm = AESGCM(real_key)
        # encrypt 方法内部会自动将 16 字节的 AuthTag 追加在密文切片末尾
        ciphertext_with_tag = aesgcm.encrypt(
            nonce=iv,
            data=plaintext.encode('utf-8'),
            associated_data=aad.encode('utf-8')
        )

        # 分离真实密文与 AuthTag
        ciphertext = ciphertext_with_tag[:-16]
        auth_tag = ciphertext_with_tag[-16:]

        http_body = {
            "iv": base64.b64encode(iv).decode('utf-8'),
            "data": base64.b64encode(ciphertext).decode('utf-8'),
            "authtag": base64.b64encode(auth_tag).decode('utf-8')
        }

        return timestamp, http_body

    @staticmethod
    def sign_request(url_path: str, app_id: str, timestamp: int, req_data_json: str, private_key_pem: str) -> str:
        """
        2. 请求报文签名 (RSA-PSS)
        """
        # 方案 B:严格按顺序拼接,末尾绝不能加 \n
        payload = f"{url_path}\n{app_id}\n{timestamp}\n{req_data_json}"
        
        # 加载 PKCS#8 格式私钥
        private_key = load_pem_private_key(private_key_pem.encode('utf-8'), password=None)

        # 执行 PSS 签名
        signature = private_key.sign(
            payload.encode('utf-8'),
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=32  # 盐长度需严格对齐 SHA256 (32字节)
            ),
            hashes.SHA256()
        )

        return base64.b64encode(signature).decode('utf-8')

    @staticmethod
    def verify_response_signature(url_path: str, expected_app_id: str, local_cert_sn: str, platform_cert_pem: str,
                                  resp_app_id: str, resp_ts: int, resp_sn: str, resp_sig: str,
                                  resp_deprecated_sn: str, resp_deprecated_sig: str, resp_data_json: str) -> bool:
        """
        3. 响应报文验签 (RSA-PSS)
        """
        if expected_app_id != resp_app_id:
            raise ValueError("安全校验失败:响应的 AppId 与本地不匹配")

        # 证书序列号匹配与平滑轮换支持
        signature_base64 = ""
        if local_cert_sn == resp_sn:
            signature_base64 = resp_sig
        elif resp_deprecated_sn and local_cert_sn == resp_deprecated_sn:
            signature_base64 = resp_deprecated_sig if resp_deprecated_sig else resp_sig
        else:
            raise ValueError(f"验签失败:本地证书编号({local_cert_sn})与平台当前编号({resp_sn})不匹配")

        payload = f"{url_path}\n{resp_app_id}\n{resp_ts}\n{resp_data_json}"

        # 加载 X.509 平台公钥证书
        cert = load_pem_x509_certificate(platform_cert_pem.encode('utf-8'))
        public_key = cert.public_key()

        try:
            public_key.verify(
                base64.b64decode(signature_base64),
                payload.encode('utf-8'),
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=32
                ),
                hashes.SHA256()
            )
            return True
        except InvalidSignature:
            return False

    @staticmethod
    def decrypt_response(url_path: str, app_id: str, resp_ts: int, key_sn: str, sym_key_base64: str, resp_data_json: str) -> Dict[str, Any]:
        """
        4. 响应密文解密 (AES-256-GCM)
        """
        enc_body = json.loads(resp_data_json)
        iv = base64.b64decode(enc_body['iv'])
        data = base64.b64decode(enc_body['data'])
        auth_tag = base64.b64decode(enc_body['authtag'])

        aad = f"{url_path}|{app_id}|{resp_ts}|{key_sn}"
        real_key = base64.b64decode(sym_key_base64)

        # Cryptography 库要求密文与 AuthTag 拼接在一起传入
        combined_data = data + auth_tag
        aesgcm = AESGCM(real_key)

        try:
            # decrypt 方法内部会自动验证 AuthTag 的合法性
            plaintext_bytes = aesgcm.decrypt(
                nonce=iv,
                data=combined_data,
                associated_data=aad.encode('utf-8')
            )
        except Exception as e:
            raise ValueError(f"解密失败: AuthTag 验证未通过或密文被篡改 - {str(e)}")

        real_resp = json.loads(plaintext_bytes.decode('utf-8'))

        # 内部安全字段强校验
        if real_resp.get('_appid') != app_id or real_resp.get('_timestamp') != resp_ts:
            raise ValueError("安全字段校验失败:响应明文内部的 _appid 或 _timestamp 被非法篡改")

        # 联调时可解开此注释进行防重放校验
        # current_ts = int(time.time())
        # if abs(current_ts - real_resp.get('_timestamp', 0)) > 300:
        #     raise ValueError("安全校验失败:响应数据距离当前时间误差超过 5 分钟")

        return real_resp

2. 业务调用演示 (demo.py)

python
import json
from sud_security_util import SudSecurityUtil

# 模拟应用全局配置
CONFIG = {
    "app_id": "cp_10010",
    "url_path": "https://api.sud.com/v1/game/login",  # 方案B: 全路径
    "sym_key_sn": "key_v1",
    "sym_key": "otUpngOjU+nVQaWJIC3D/yMLV17RKaP6t4Ot9tbnzLY=",
    "private_key_pem": "-----BEGIN PRIVATE KEY-----\nMIIEvQI... (应用私钥) ...W6w=\n-----END PRIVATE KEY-----",
    "platform_cert_sn": "sud_cert_v1",
    "platform_cert_pem": "-----BEGIN CERTIFICATE-----\nMIID0jC... (SUD平台公钥证书) ...Q==\n-----END CERTIFICATE-----"
}

def main():
    try:
        print("=== 1. 发起请求:业务数据加密与签名 ===")
        
        # 原始业务请求参数
        biz_req = {
            "appid": CONFIG["app_id"],
            "sud_uid": "user_889900",
            "scene": 0,
            "client_ip": "127.0.0.1"
        }

        # 步骤 A:加密
        req_ts, http_body_dict = SudSecurityUtil.encrypt_request(
            CONFIG["url_path"], CONFIG["app_id"], CONFIG["sym_key_sn"], CONFIG["sym_key"], biz_req
        )
        
        # 再次强调:使用 separators 保证 JSON 无多余空格
        req_data_json = json.dumps(http_body_dict, separators=(',', ':'))

        # 步骤 B:签名
        req_sig = SudSecurityUtil.sign_request(
            CONFIG["url_path"], CONFIG["app_id"], req_ts, req_data_json, CONFIG["private_key_pem"]
        )

        print(f"HTTP Header X-Sud-Timestamp: {req_ts}")
        print(f"HTTP Header X-Sud-Signature: {req_sig}")
        print(f"HTTP Body: {req_data_json}")


        print("\n=== 2. 收到响应:签名验证与密文解密 ===")

        # 模拟从 HTTP 响应中获取的数据
        resp_ts = req_ts 
        resp_header_app_id = CONFIG["app_id"]
        resp_header_sn = CONFIG["platform_cert_sn"]
        resp_header_sig = req_sig  # 演示用,实际为接收的签名
        resp_body_json = '{"iv":"r2WDQt56rEAmMuoR","data":"HExs...","authtag":"z2BF..."}'

        # 步骤 C:验签
        is_sig_valid = SudSecurityUtil.verify_response_signature(
            CONFIG["url_path"], CONFIG["app_id"], CONFIG["platform_cert_sn"], CONFIG["platform_cert_pem"],
            resp_header_app_id, resp_ts, resp_header_sn, resp_header_sig, 
            "", "", resp_body_json
        )

        if is_sig_valid:
            print("响应验签成功!")
            
            # 步骤 D:解密
            decrypted_biz_data = SudSecurityUtil.decrypt_response(
                CONFIG["url_path"], CONFIG["app_id"], resp_ts, CONFIG["sym_key_sn"], CONFIG["sym_key"], resp_body_json
            )
            print("响应解密成功,业务数据:")
            print(json.dumps(decrypted_biz_data, indent=4, ensure_ascii=False))

    except ValueError as e:
        print(f"流程中断: {str(e)}")
    except Exception as e:
        print(f"发生未知错误: {str(e)}")

if __name__ == "__main__":
    main()