直接上代码:
import asyncio
import json
import logging
import time
from dingding import DingSend
import websockets
import gzip
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
log_console = logging.info
#通过爬虫的方式读取bg的盘口
# 新增:获取当前毫秒级时间戳
def get_current_timestamp_ms():
return int(time.time() * 1000)
# 新增:发送ping心跳消息
async def send_ping_message(ws):
try:
await ws.send("ping")
log_console(f"已发送ping消息")
except Exception as e:
log_console(f"发送ping消息失败:{e}")
class BgWebsocketClient:
def __init__(self, uri):
self.uri = uri
self.websocket = None
self.market_status = False
async def connect(self):
# 配置项
print("建立链接")
headers = {
'Upgrade': 'websocket',
'Origin': 'https://www.bitget.com',
'Cache-Control': 'no-cache',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Pragma': 'no-cache',
'Connection': 'Upgrade',
'Sec-WebSocket-Key': 'WUcBLyC+ry/xLhQ1pJP8bw==',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36',
'Sec-WebSocket-Version': '13',
'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits',
}
print(type(headers))
self.websocket = await websockets.connect(uri=self.uri,
additional_headers=headers)
print('[ExWebsocketClient]connect ok')
await self.on_open()
async def on_open(self):
global last_ping_ts
last_ping_ts = int(time.time())
print("打开链接")
subscribe_messages = [
{"op": "subscribe",
"args": [{"channel": "depth", "instType": "mc", "instId": "DEEPUSDT", "params": {"scale": "0.01"}}]}
]
await self.websocket.send("ping")
for msg in subscribe_messages:
try:
json_msg = json.dumps(msg)
await self.websocket.send(json_msg)
log_console(f"已发送订阅消息:{json_msg}")
time.sleep(0.1)
except Exception as e:
log_console(f"发送订阅消息失败:{e}")
async def on_message(self, message):
# log_console(f"收到消息:{message}")
decompressed_data = gzip.decompress(message)
data = decompressed_data.decode('utf-8', errors='ignore')
print("文本形式:", data)
# print("返回消息了:",message)
# data = json.loads(message)
# b = data.get('b', 0)
# if b != 0:
# ask = data.get('a', 0)
# exAsk = b
# exBid = ask
# if CheckFunc:
# await CheckFunc()
# if int(time.time()) - last_ping_ts > 30:
# last_ping_ts = int(time.time())
# await send_ping_message(self.websocket)
async def on_error(self, error):
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), f"Error: {error}")
error_message = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), f"Error: {error}"
DingSend(f"Ex连接失败 msg: {error_message}", at_all=True)
async def on_close(self, code, reason):
print("ex关闭连接")
async def run(self):
try:
await self.connect()
async for message in self.websocket:
await self.on_message(message)
except Exception as e:
await self.on_error(e)
finally:
await self.on_close(None, None)
await self.websocket.close()
async def ex_main():
while True:
print("进入了while循环")
client = BgWebsocketClient(
"wss://stream.bitget.com/public/v1/stream?compress=true&terminalType=1")
await client.run()
await asyncio.sleep(1)
if __name__ == "__main__":
# asyncio.run(ex_main("mexc_btc.json"))
# coin = "mexc_btc"
# with open(f"{coin}.json", 'r+') as f:
# init = json.loads(f.read())
# print(init)
asyncio.run(ex_main())