ws 订单交易账户

import websocket, json, hmac, hashlib, base64, time, threading, sys, os

API_KEY = os.getenv("API_KEY", "PNZDM8CB-KC6")
SECRET_KEY = os.getenv("SECRET_KEY", "b16258356a825")
WS_URL = "wss://ws.ws.com/ws/private"

start_ts = None
received = False

def sign_ws(ts):
    text = f"GET\n/ws\nsignTimestamp={ts}"
    mac = hmac.new(SECRET_KEY.encode(), text.encode(), hashlib.sha256).digest()
    return base64.b64encode(mac).decode()

def assert_order(data):
    for f in ["orderId", "symbol", "side", "eventType"]:
        if f not in data:
            raise AssertionError(f"missing field {f}")

def run_ping(ws):
    while True:
        time.sleep(2)
        try:
            ws.send(json.dumps({"event": "ping"}))
            # print("[ping] sent")
        except Exception as e:
            print("[ping] 发送失败", e)
            break

def on_open(ws):
    ts = int(time.time() * 1000)
    sign = sign_ws(ts)
    auth = {
        "event": "subscribe",
        "channel": ["auth"],
        "params": {
            "key": API_KEY,
            "signTimestamp": ts,
            "signature": sign
        }
    }
    ws.send(json.dumps(auth))
    print("[认证] 已发送 auth 请求")

    # 启动 ping 保活线程
    threading.Thread(target=run_ping, args=(ws,), daemon=True).start()

import json

def on_message(ws, msg):
    global start_ts, received
    d = json.loads(msg)
    print("dddddddddddddd")
    print(json.dumps(d, indent=4, ensure_ascii=False))
    if d.get("event") == "subscribe":
        print(f"[订阅确认✅] 已成功订阅频道: {d.get('channel')},详情: {d}")
        return

    if d.get("channel") == "auth":
        ok = d.get("data", {}).get("success")
        if ok:
            # print("[认证✅] 成功,订阅 orders")
            # # order_sub = {"event": "subscribe", "channel": ["orders"], "symbols": ["all"]} #订单
            # order_sub = {"event": "subscribe", "channel": ["orders", "balances"]} #Balances
            # start_ts = time.time()
            # ws.send(json.dumps(order_sub))

            #交易相关

            print("[认证✅] 成功,订阅 orders")
            # 创建订单
            # order_sub = {"id": "123456789","event": "createOrder", "params": {
            #                                                                   "symbol": "SHIB_USDT",
            #                                                                   "type": "LIMIT",
            #                                                                   "quantity": "206813",
            #                                                                   "side": "BUY",
            #                                                                   "price": "0.000010000",
            #                                                                   "timeInForce": "GTC",
            #                                                                   "clientOrderId": "SHIB_USDT1234Abc"
            #                                                                }} # createOrder

            # # 取消订单
            # order_sub = {
            #                "id": "1234567",
            #                "event": "cancelOrders",
            #                "params": {
            #                   "orderIds": ["458201096570052608", "170904091512410112"],
            #                   "clientOrderIds": ["45662xyz", "xasd2343"]
            #                }
            #             } #cancelOrders

            #取消所有订单
            order_sub = {
                       "id": "1234567",
                       "event": "cancelAllOrders",
                       "params": {
                       }
                    }

            start_ts = time.time()
            ws.send(json.dumps(order_sub))


        else:
            print("[认证❌]", d)
            ws.close()
            sys.exit(1)
        return

    elif d.get("channel") == "orders":

        # 详细打印订单数据,格式化显示
        orders_data = d.get("data")
        print("[订单数据] 详细内容:")
        print(json.dumps(orders_data, indent=4, ensure_ascii=False))

    elif d.get("channel") == "balances":

        # 详细打印订单数据,格式化显示
        orders_data = d.get("data")
        print("[balances账户数据] 详细内容:")
        print(json.dumps(orders_data, indent=4, ensure_ascii=False))

def on_error(ws, e): print("[错误]", e)
def on_close(ws, code, reason): print(f"[关闭] {code} {reason}")

def main():
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(WS_URL,
                                on_open=on_open,
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)
    ws.run_forever(ping_interval=20, ping_timeout=10)

if __name__ == "__main__":
    main()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容