2026-01-24通过爬虫的方式读取bg的盘口

直接上代码:

import asyncio
import json
import logging
import time
from dingding import DingSend
import websockets
import gzip
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
log_console = logging.info
#通过爬虫的方式读取bg的盘口


# 新增:获取当前毫秒级时间戳
def get_current_timestamp_ms():
    return int(time.time() * 1000)


# 新增:发送ping心跳消息
async def send_ping_message(ws):
    try:
        await ws.send("ping")
        log_console(f"已发送ping消息")
    except Exception as e:
        log_console(f"发送ping消息失败:{e}")


class BgWebsocketClient:

    def __init__(self, uri):
        self.uri = uri
        self.websocket = None
        self.market_status = False

    async def connect(self):
        # 配置项

        print("建立链接")

        headers = {
            'Upgrade': 'websocket',
            'Origin': 'https://www.bitget.com',
            'Cache-Control': 'no-cache',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Pragma': 'no-cache',
            'Connection': 'Upgrade',
            'Sec-WebSocket-Key': 'WUcBLyC+ry/xLhQ1pJP8bw==',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36',
            'Sec-WebSocket-Version': '13',
            'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits',
        }

        print(type(headers))
        self.websocket = await websockets.connect(uri=self.uri,
                                                  additional_headers=headers)
        print('[ExWebsocketClient]connect ok')
        await self.on_open()

    async def on_open(self):
        global last_ping_ts
        last_ping_ts = int(time.time())
        print("打开链接")
        subscribe_messages = [
            {"op": "subscribe",
             "args": [{"channel": "depth", "instType": "mc", "instId": "DEEPUSDT", "params": {"scale": "0.01"}}]}
        ]
        await self.websocket.send("ping")
        for msg in subscribe_messages:
            try:
                json_msg = json.dumps(msg)
                await self.websocket.send(json_msg)
                log_console(f"已发送订阅消息:{json_msg}")
                time.sleep(0.1)

            except Exception as e:
                log_console(f"发送订阅消息失败:{e}")

    async def on_message(self, message):
        # log_console(f"收到消息:{message}")
        decompressed_data = gzip.decompress(message)
        data = decompressed_data.decode('utf-8', errors='ignore')
        print("文本形式:", data)
        # print("返回消息了:",message)

        # data = json.loads(message)
        # b = data.get('b', 0)
        # if b != 0:
        #     ask = data.get('a', 0)
        #     exAsk = b
        #     exBid = ask
        # if CheckFunc:
        #     await CheckFunc()
        # if int(time.time()) - last_ping_ts > 30:
        #     last_ping_ts = int(time.time())
        #     await send_ping_message(self.websocket)

    async def on_error(self, error):

        print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), f"Error: {error}")
        error_message = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), f"Error: {error}"
        DingSend(f"Ex连接失败 msg: {error_message}", at_all=True)

    async def on_close(self, code, reason):
        print("ex关闭连接")

    async def run(self):
        try:
            await self.connect()
            async for message in self.websocket:
                await self.on_message(message)
        except Exception as e:
            await self.on_error(e)
        finally:
            await self.on_close(None, None)
            await self.websocket.close()


async def ex_main():
    while True:
        print("进入了while循环")

        client = BgWebsocketClient(
            "wss://stream.bitget.com/public/v1/stream?compress=true&terminalType=1")
        await client.run()
        await asyncio.sleep(1)


if __name__ == "__main__":
    # asyncio.run(ex_main("mexc_btc.json"))
    # coin = "mexc_btc"
    # with open(f"{coin}.json", 'r+') as f:
    #     init = json.loads(f.read())

    # print(init)
    asyncio.run(ex_main())
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容