2
这是把在线自适应(自动寻找最优开/平仓阈值)完整合并后的单文件脚本。直接保存为 okx_bitget_arb_adaptive.py 运行即可(先小仓或先用 DRY_RUN=True 验证)。
你要把 6 个占位符密钥换成你自己的(我保留了硬编码位点,便于你直接填)。
# -*- coding: utf-8 -*- """ OKX ↔ Bitget 永续套利(在线自适应阈值) - 实时订阅标记价/最新价 - 价差接近零时锁价开仓;达到阈值平仓 - 持续记录价差CSV;每隔固定时间用最近窗口做网格搜索,动态更新开/平仓阈值 - 参数抑制抖动:单次更新限制步长 """ import os, hmac, base64, hashlib, json, time, asyncio, datetime as dt import csv, math, statistics, pathlib import requests, websockets from collections import deque ######################## # ==== 配置:硬编码密钥(在本地替换占位符) ==== # ######################## OKX_API_KEY = "REPLACE_WITH_YOUR_OKX_API_KEY" OKX_API_SECRET = "REPLACE_WITH_YOUR_OKX_API_SECRET" OKX_API_PASSPHRASE = "REPLACE_WITH_YOUR_OKX_PASSPHRASE" BG_API_KEY = "REPLACE_WITH_YOUR_BITGET_API_KEY" BG_API_SECRET = "REPLACE_WITH_YOUR_BITGET_API_SECRET" BG_PASSPHRASE = "REPLACE_WITH_YOUR_BITGET_PASSPHRASE" ######################## # ==== 交易/策略参数 ==== # ######################## # 标的 OKX_INST_ID = "BTC-USDT-SWAP" # OKX 永续 BG_SYMBOL = "BTCUSDT" # Bitget USDT本位永续 BG_PRODUCT = "USDT-FUTURES" # Bitget 产品线 # 初始阈值(会被在线优化动态更新) OPEN_EQUALITY_USD = 2.0 # |价差| <= 2 开仓锁价 CLOSE_SPREAD_USD = 18.0 # |价差| >= 18 平仓 USE_MARK_PRICE = True # True=标记价;False=最新成交价 # 手数/张数(先小量测试) OKX_SZ = "10" BG_SIZE = "10" # 开仓方向(默认 OKX 多 / Bitget 空;可改) OPEN_OKX_SIDE = "buy" # buy/sell OPEN_BG_SIDE = "sell" # buy/sell OKX_TD_MODE = "cross" # cross/isolated OKX_POS_SIDE = "long" if OPEN_OKX_SIDE=="buy" else "short" BG_MARGIN_COIN = "USDT" # REST & WS OKX_BASE = "https://www.okx.com" BG_BASE = "https://api.bitget.com" OKX_WS_PUBLIC = "wss://ws.okx.com:8443/ws/v5/public" BG_WS_PUBLIC = "wss://ws.bitget.com/v2/ws/public" # 运行控制 DRY_RUN = True # True=只打印不下单;False=真下单(建议先True验证) REQUEST_TIMEOUT = 10 WS_RETRY_DELAY = 2 ######################## # ==== 在线自适应配置 ==== # ######################## CSV_LOG_PATH = "spread_log.csv" # 价差日志 FEE_USD_PER_SIDE = 2.0 # 单边总成本估计(手续费+滑点+资金费摊销),自行校准 SLIPPAGE_USD = 0.5 # 触发时额外滑点预算(用于回测) USE_SHARPE = False # 目标函数:False=最大化总收益;True=最大化夏普 ANNUALIZATION_K = 365.0 # 夏普年化基数(按日收益) ROLLING_HOURS = 24 # 在线优化窗口(过去N小时数据) OPT_INTERVAL_MIN = 10 # 每隔N分钟重优化 MAX_STEP_USD = 2.0 # 单次参数最大更新步长,抑制抖动 # 网格搜索范围(可按你的风格扩大/缩窄) OPEN_GRID = [round(x * 0.5, 2) for x in range(1, 11)] # 0.5,1.0,...,5.0 CLOSE_GRID = list(range(8, 41, 2)) # 8,10,...,40 # 内存滚动缓存(减少IO) ROLLING_CACHE_MAX = 200000 rolling_buf = deque(maxlen=ROLLING_CACHE_MAX) last_opt_ts = 0 # 上次优化时间(epoch秒) ######################## # ==== 工具函数 ==== # ######################## def iso_ts_ms_str(): return str(int(time.time() * 1000)) def okx_headers(method, path, body): ts = dt.datetime.utcnow().isoformat(timespec="milliseconds") + "Z" prehash = f"{ts}{method.upper()}{path}{body}" sign = base64.b64encode(hmac.new(OKX_API_SECRET.encode(), prehash.encode(), hashlib.sha256).digest()).decode() return { "OK-ACCESS-KEY": OKX_API_KEY, "OK-ACCESS-SIGN": sign, "OK-ACCESS-TIMESTAMP": ts, "OK-ACCESS-PASSPHRASE": OKX_API_PASSPHRASE, "Content-Type": "application/json" } def bg_sign(ts_ms:str, method:str, path:str, body:str, secret:str)->str: msg = f"{ts_ms}{method.upper()}{path}{body}".encode() sign = hmac.new(secret.encode(), msg, hashlib.sha256).digest() return base64.b64encode(sign).decode() def bg_headers(method, path, body): ts = iso_ts_ms_str() sign = bg_sign(ts, method, path, body, BG_API_SECRET) return { "ACCESS-KEY": BG_API_KEY, "ACCESS-SIGN": sign, "ACCESS-PASSPHRASE": BG_PASSPHRASE, "ACCESS-TIMESTAMP": ts, "Content-Type": "application/json" } ######################## # ==== 下单/平仓(OKX) ==== # ######################## def okx_place_market(instId, tdMode, side, posSide, sz): path = "/api/v5/trade/order" url = OKX_BASE + path body = { "instId": instId, "tdMode": tdMode, "side": side, # buy/sell "posSide": posSide, # long/short(对冲) "ordType": "market", "sz": str(sz) } data = json.dumps(body, separators=(",",":")) if DRY_RUN: print("[DRYRUN][OKX] place", data) return {"dryrun":True, "req":body} r = requests.post(url, headers=okx_headers("POST", path, data), data=data, timeout=REQUEST_TIMEOUT) r.raise_for_status() return r.json() def okx_close_market(instId, tdMode, posSide, sz): side = "sell" if posSide=="long" else "buy" return okx_place_market(instId, tdMode, side, posSide, sz) ######################## # ==== 下单/平仓(Bitget v2) ==== # ######################## def bg_place_market(symbol, productType, marginCoin, side, tradeSide, size): path = "/api/v2/mix/order/place-order" url = BG_BASE + path body = { "symbol": symbol, "productType": productType, # USDT-FUTURES "marginCoin": marginCoin, # USDT "side": side, # buy/sell "tradeSide": tradeSide, # open/close "orderType": "market", "size": str(size), "clientOid": f"arb_{int(time.time()*1000)}" } data = json.dumps(body, separators=(",",":")) if DRY_RUN: print("[DRYRUN][Bitget] place", data) return {"dryrun":True, "req":body} r = requests.post(url, headers=bg_headers("POST", path, data), data=data, timeout=REQUEST_TIMEOUT) r.raise_for_status() return r.json() def bg_open(symbol, productType, marginCoin, side, size): return bg_place_market(symbol, productType, marginCoin, side, "open", size) def bg_close(symbol, productType, marginCoin, side, size): opp_side = "buy" if side=="sell" else "sell" return bg_place_market(symbol, productType, marginCoin, opp_side, "close", size) ######################## # ==== 价格源(WS) ==== # ######################## class PriceFeed: def __init__(self): self.okx_price = None self.bg_price = None def okx_channel(self): if USE_MARK_PRICE: return {"op":"subscribe","args":[{"channel":"mark-price","instId":OKX_INST_ID}]} else: return {"op":"subscribe","args":[{"channel":"tickers","instId":OKX_INST_ID}]} def bg_channel(self): channel = "markPrice" if USE_MARK_PRICE else "ticker" return {"op":"subscribe","args":[{"instType":"USDT-FUTURES","channel":channel,"instId":BG_SYMBOL}]} async def okx_loop(self): while True: try: async with websockets.connect(OKX_WS_PUBLIC, ping_interval=20) as ws: await ws.send(json.dumps(self.okx_channel())) async for msg in ws: data = json.loads(msg) if data.get("event")=="subscribe": print("[OKX] subscribed") elif "data" in data: d = data["data"][0] self.okx_price = float(d["markPx"] if USE_MARK_PRICE else d["last"]) except Exception as e: print("[OKX WS] error:", e) await asyncio.sleep(WS_RETRY_DELAY) async def bg_loop(self): while True: try: async with websockets.connect(BG_WS_PUBLIC, ping_interval=20) as ws: await ws.send(json.dumps(self.bg_channel())) async for msg in ws: data = json.loads(msg) if data.get("event")=="subscribe": print("[Bitget] subscribed") elif "arg" in data and "data" in data: d = data["data"][0] if USE_MARK_PRICE: self.bg_price = float(d.get("markPrice") or d.get("price")) else: self.bg_price = float(d.get("last") or d.get("price")) except Exception as e: print("[Bitget WS] error:", e) await asyncio.sleep(WS_RETRY_DELAY) ######################## # ==== 价差日志 & 回测优化 ==== # ######################## def ensure_csv_header(path: str): p = pathlib.Path(path) if not p.exists(): with open(path, "w", newline="") as f: w = csv.writer(f) w.writerow(["ts_ms","okx","bg","spread"]) def log_spread(ts_ms: int, okx: float, bg: float): ensure_csv_header(CSV_LOG_PATH) with open(CSV_LOG_PATH, "a", newline="") as f: csv.writer(f).writerow([ts_ms, f"{okx:.4f}", f"{bg:.4f}", f"{okx-bg:.4f}"]) rolling_buf.append((ts_ms, okx, bg, okx - bg)) def load_recent_spreads(hours: int): now_ms = int(time.time() * 1000) cutoff = now_ms - hours * 3600 * 1000 data = [row for row in rolling_buf if row[0] >= cutoff] if data: return data try: out = [] with open(CSV_LOG_PATH, "r") as f: r = csv.DictReader(f) for row in r: ts = int(row["ts_ms"]) if ts >= cutoff: okx = float(row["okx"]); bg = float(row["bg"]) sp = float(row["spread"]) out.append((ts, okx, bg, sp)) return out except FileNotFoundError: return [] def simulate_pnl(spreads, open_eq: float, close_sp: float, fee_per_side: float = FEE_USD_PER_SIDE, slip: float = SLIPPAGE_USD): """历史spread回测:锁价开→阈值平。返回(总收益, 日收益序列)""" pos = False entry = None pnl = 0.0 day_pnl = {} for ts, _, _, sp in spreads: day = dt.datetime.utcfromtimestamp(ts/1000).date().isoformat() if not pos: if abs(sp) <= open_eq: pos = True entry = sp pnl -= (fee_per_side*2) # 同时两边开仓 pnl -= slip else: if abs(sp) >= close_sp: pnl += (abs(sp) - abs(entry)) pnl -= (fee_per_side*2) # 两边平仓 pnl -= slip pos = False entry = None day_pnl.setdefault(day, 0.0) day_pnl[day] = pnl # 累计→日度增量 days = sorted(day_pnl.keys()) daily = [] prev = 0.0 for d in days: daily.append(day_pnl[d]-prev) prev = day_pnl[d] return pnl, daily def score_pnl(total_pnl: float, daily_pnls): if not USE_SHARPE: return total_pnl if len(daily_pnls) < 2: return -1e9 mu = statistics.mean(daily_pnls) sd = statistics.pstdev(daily_pnls) or 1e-9 return (mu / sd) * math.sqrt(ANNUALIZATION_K) def grid_search_opt(spreads): """返回(best_open, best_close, best_score, best_total_pnl)""" best = (None, None, -1e18, 0.0) for o in OPEN_GRID: for c in CLOSE_GRID: if c <= o + 2.0: # 开/平阈值至少拉开2美元,避免抖动 continue total, daily = simulate_pnl(spreads, o, c) s = score_pnl(total, daily) if s > best[2]: best = (o, c, s, total) return best def clamp_step(current: float, target: float, step: float): if current is None: return target if target > current: return min(target, current + step) if target < current: return max(target, current - step) return current def maybe_reoptimize(state): """定期基于最近窗口重算阈值;仅在空仓时更新""" global OPEN_EQUALITY_USD, CLOSE_SPREAD_USD, last_opt_ts if state.has_position: return now = time.time() if now - last_opt_ts < OPT_INTERVAL_MIN * 60: return spreads = load_recent_spreads(ROLLING_HOURS) if len(spreads) < 500: # 数据太少不优化 return o, c, s, total = grid_search_opt(spreads) if o is None: return new_open = clamp_step(OPEN_EQUALITY_USD, o, MAX_STEP_USD) new_close = clamp_step(CLOSE_SPREAD_USD, c, MAX_STEP_USD) print(f"\n[OPT] {ROLLING_HOURS}h best_open={o:.2f}, best_close={c:.2f}, " f"score={s:.4f}, backtest_total={total:.2f} -> APPLY open={new_open:.2f}, close={new_close:.2f}") OPEN_EQUALITY_USD = round(new_open, 2) CLOSE_SPREAD_USD = round(new_close, 2) last_opt_ts = now ######################## # ==== 策略执行 ==== # ######################## class ArbState: def __init__(self): self.has_position = False self.entry_spread = None self.okx_side_open = OPEN_OKX_SIDE self.bg_side_open = OPEN_BG_SIDE async def main(): feed = PriceFeed() state = ArbState() tasks = [asyncio.create_task(feed.okx_loop()), asyncio.create_task(feed.bg_loop())] try: print(f"[INIT] USE_MARK_PRICE={USE_MARK_PRICE} | DRY_RUN={DRY_RUN}") print(f"[INIT] INIT OPEN_EQUALITY_USD={OPEN_EQUALITY_USD} | CLOSE_SPREAD_USD={CLOSE_SPREAD_USD}") while True: await asyncio.sleep(0.2) if feed.okx_price is None or feed.bg_price is None: continue spread = feed.okx_price - feed.bg_price # OKX - Bitget now = dt.datetime.now().strftime("%H:%M:%S") print(f"{now} P_okx={feed.okx_price:.2f} P_bg={feed.bg_price:.2f} " f"spread={spread:.2f} | open={OPEN_EQUALITY_USD:.2f} close={CLOSE_SPREAD_USD:.2f}", end="\r") # 记录价差(供回测优化) ts_ms = int(time.time()*1000) log_spread(ts_ms, feed.okx_price, feed.bg_price) # 无持仓 → 锁价开仓 if not state.has_position and abs(spread) <= OPEN_EQUALITY_USD: print(f"\n[OPEN] |spread|<=OPEN({OPEN_EQUALITY_USD:.2f}),尝试锁价开仓 spread={spread:.2f}") try: okx_res = okx_place_market(OKX_INST_ID, OKX_TD_MODE, state.okx_side_open, OKX_POS_SIDE, OKX_SZ) bg_res = bg_open(BG_SYMBOL, BG_PRODUCT, BG_MARGIN_COIN, state.bg_side_open, BG_SIZE) print("[OKX OPEN RES]", okx_res) print("[BG OPEN RES]", bg_res) state.has_position = True state.entry_spread = spread except Exception as e: print("[OPEN ERROR]", e) # 有持仓 → 触发平仓 if state.has_position and abs(spread) >= CLOSE_SPREAD_USD: print(f"\n[CLOSE] |spread|>=CLOSE({CLOSE_SPREAD_USD:.2f}),同时平仓 spread={spread:.2f}") try: okx_res = okx_close_market(OKX_INST_ID, OKX_TD_MODE, OKX_POS_SIDE, OKX_SZ) bg_res = bg_close(BG_SYMBOL, BG_PRODUCT, BG_MARGIN_COIN, state.bg_side_open, BG_SIZE) print("[OKX CLOSE RES]", okx_res) print("[BG CLOSE RES]", bg_res) pnl = (abs(spread) - abs(state.entry_spread or 0.0)) print(f"[PNL est] Δ|spread| ≈ {pnl:.2f} USD(未含费率/资金费/滑点)") state.has_position = False state.entry_spread = None except Exception as e: print("[CLOSE ERROR]", e) # 空仓时尝试在线重优化阈值 maybe_reoptimize(state) finally: for t in tasks: t.cancel() if __name__ == "__main__": asyncio.run(main())
运行提示
pip install websockets requests(无第三方也能跑;可选 python-dotenv 已不需要)。
先 DRY_RUN=True 跑通数据和下单请求格式;再改 False 小仓验证。
想要更激进/保守,调整 OPEN_GRID/CLOSE_GRID 搜索范围、FEE_USD_PER_SIDE 成本估计、ROLLING_HOURS/OPT_INTERVAL_MIN 优化频率。
要不要我再给一个**“σ倍数阈值(OPEN=k1·σ,CLOSE=k2·σ)”的自适应版本,或把双向套利(当 OKX 更贵时自动反向)也一并集成进去?你更偏好哪一种优化目标:最大收益还是夏普比率**?
1
# -*- coding: utf-8 -*-
import hmac, base64, hashlib, json, time, asyncio, datetime as dt
import requests, websockets
########################
# ==== 配置:硬编码密钥(在本地替换占位符) ==== #
########################
OKX_API_KEY = "REPLACE_WITH_YOUR_OKX_API_KEY"
OKX_API_SECRET = "REPLACE_WITH_YOUR_OKX_API_SECRET"
OKX_API_PASSPHRASE = "REPLACE_WITH_YOUR_OKX_API_PASSPHRASE"
BG_API_KEY = "REPLACE_WITH_YOUR_BITGET_API_KEY"
BG_API_SECRET = "REPLACE_WITH_YOUR_BITGET_API_SECRET"
BG_PASSPHRASE = "REPLACE_WITH_YOUR_BITGET_PASSPHRASE"
########################
# ==== 交易/策略参数 ==== #
########################
# 标的
OKX_INST_ID = "BTC-USDT-SWAP" # OKX 永续
BG_SYMBOL = "BTCUSDT" # Bitget USDT本位永续
BG_PRODUCT = "USDT-FUTURES" # Bitget 产品线
# 阈值
OPEN_EQUALITY_USD = 2.0 # |价差| <= 2 开仓锁价
CLOSE_SPREAD_USD = 18.0 # |价差| >= 18 平仓
USE_MARK_PRICE = True # True=标记价;False=最新成交价
# 手数/张数(先小量测试)
OKX_SZ = "10"
BG_SIZE = "10"
# 开仓方向(默认 OKX 多 / Bitget 空;可改)
OPEN_OKX_SIDE = "buy" # buy/sell
OPEN_BG_SIDE = "sell" # buy/sell
OKX_TD_MODE = "cross" # cross/isolated
OKX_POS_SIDE = "long" if OPEN_OKX_SIDE=="buy" else "short"
BG_MARGIN_COIN = "USDT"
# REST & WS
OKX_BASE = "https://www.okx.com"
BG_BASE = "https://api.bitget.com"
OKX_WS_PUBLIC = "wss://ws.okx.com:8443/ws/v5/public"
BG_WS_PUBLIC = "wss://ws.bitget.com/v2/ws/public"
# 运行控制
DRY_RUN = False # False=真下单(先小仓验证)
REQUEST_TIMEOUT = 10
WS_RETRY_DELAY = 2
########################
# ==== 工具函数 ==== #
########################
def iso_ts_ms_str():
return str(int(time.time() * 1000))
def okx_headers(method, path, body):
ts = dt.datetime.utcnow().isoformat(timespec="milliseconds") + "Z"
prehash = f"{ts}{method.upper()}{path}{body}"
sign = base64.b64encode(hmac.new(OKX_API_SECRET.encode(), prehash.encode(), hashlib.sha256).digest()).decode()
return {
"OK-ACCESS-KEY": OKX_API_KEY,
"OK-ACCESS-SIGN": sign,
"OK-ACCESS-TIMESTAMP": ts,
"OK-ACCESS-PASSPHRASE": OKX_API_PASSPHRASE,
"Content-Type": "application/json"
}
def bg_sign(ts_ms:str, method:str, path:str, body:str, secret:str)->str:
msg = f"{ts_ms}{method.upper()}{path}{body}".encode()
sign = hmac.new(secret.encode(), msg, hashlib.sha256).digest()
return base64.b64encode(sign).decode()
def bg_headers(method, path, body):
ts = iso_ts_ms_str()
sign = bg_sign(ts, method, path, body, BG_API_SECRET)
return {
"ACCESS-KEY": BG_API_KEY,
"ACCESS-SIGN": sign,
"ACCESS-PASSPHRASE": BG_PASSPHRASE,
"ACCESS-TIMESTAMP": ts,
"Content-Type": "application/json"
}
########################
# ==== 下单/平仓(OKX) ==== #
########################
def okx_place_market(instId, tdMode, side, posSide, sz):
path = "/api/v5/trade/order"
url = OKX_BASE + path
body = {
"instId": instId,
"tdMode": tdMode,
"side": side, # buy/sell
"posSide": posSide, # long/short(对冲)
"ordType": "market",
"sz": str(sz)
}
data = json.dumps(body, separators=(",",":"))
if DRY_RUN:
print("[DRYRUN][OKX] place", data)
return {"dryrun":True, "req":body}
r = requests.post(url, headers=okx_headers("POST", path, data), data=data, timeout=REQUEST_TIMEOUT)
r.raise_for_status()
return r.json()
def okx_close_market(instId, tdMode, posSide, sz):
side = "sell" if posSide=="long" else "buy"
return okx_place_market(instId, tdMode, side, posSide, sz)
########################
# ==== 下单/平仓(Bitget v2) ==== #
########################
def bg_place_market(symbol, productType, marginCoin, side, tradeSide, size):
path = "/api/v2/mix/order/place-order"
url = BG_BASE + path
body = {
"symbol": symbol,
"productType": productType, # USDT-FUTURES
"marginCoin": marginCoin, # USDT
"side": side, # buy/sell
"tradeSide": tradeSide, # open/close
"orderType": "market",
"size": str(size),
"clientOid": f"arb_{int(time.time()*1000)}"
}
data = json.dumps(body, separators=(",",":"))
if DRY_RUN:
print("[DRYRUN][Bitget] place", data)
return {"dryrun":True, "req":body}
r = requests.post(url, headers=bg_headers("POST", path, data), data=data, timeout=REQUEST_TIMEOUT)
r.raise_for_status()
return r.json()
def bg_open(symbol, productType, marginCoin, side, size):
return bg_place_market(symbol, productType, marginCoin, side, "open", size)
def bg_close(symbol, productType, marginCoin, side, size):
opp_side = "buy" if side=="sell" else "sell"
return bg_place_market(symbol, productType, marginCoin, opp_side, "close", size)
########################
# ==== 价格源(WS) ==== #
########################
class PriceFeed:
def __init__(self):
self.okx_price = None
self.bg_price = None
def okx_channel(self):
if USE_MARK_PRICE:
return {"op":"subscribe","args":[{"channel":"mark-price","instId":OKX_INST_ID}]}
else:
return {"op":"subscribe","args":[{"channel":"tickers","instId":OKX_INST_ID}]}
def bg_channel(self):
channel = "markPrice" if USE_MARK_PRICE else "ticker"
return {"op":"subscribe","args":[{"instType":"USDT-FUTURES","channel":channel,"instId":BG_SYMBOL}]}
async def okx_loop(self):
while True:
try:
async with websockets.connect(OKX_WS_PUBLIC, ping_interval=20) as ws:
await ws.send(json.dumps(self.okx_channel()))
async for msg in ws:
data = json.loads(msg)
if data.get("event")=="subscribe":
print("[OKX] subscribed")
elif "data" in data:
d = data["data"][0]
self.okx_price = float(d["markPx"] if USE_MARK_PRICE else d["last"])
except Exception as e:
print("[OKX WS] error:", e)
await asyncio.sleep(WS_RETRY_DELAY)
async def bg_loop(self):
while True:
try:
async with websockets.connect(BG_WS_PUBLIC, ping_interval=20) as ws:
await ws.send(json.dumps(self.bg_channel()))
async for msg in ws:
data = json.loads(msg)
if data.get("event")=="subscribe":
print("[Bitget] subscribed")
elif "arg" in data and "data" in data:
d = data["data"][0]
if USE_MARK_PRICE:
self.bg_price = float(d.get("markPrice") or d.get("price"))
else:
self.bg_price = float(d.get("last") or d.get("price"))
except Exception as e:
print("[Bitget WS] error:", e)
await asyncio.sleep(WS_RETRY_DELAY)
########################
# ==== 策略执行 ==== #
########################
class ArbState:
def __init__(self):
self.has_position = False
self.entry_spread = None
self.okx_side_open = OPEN_OKX_SIDE
self.bg_side_open = OPEN_BG_SIDE
async def main():
feed = PriceFeed()
state = ArbState()
tasks = [asyncio.create_task(feed.okx_loop()),
asyncio.create_task(feed.bg_loop())]
try:
while True:
await asyncio.sleep(0.2)
if feed.okx_price is None or feed.bg_price is None:
continue
spread = feed.okx_price - feed.bg_price # OKX - Bitget
now = dt.datetime.now().strftime("%H:%M:%S")
print(f"{now} P_okx={feed.okx_price:.2f} P_bg={feed.bg_price:.2f} spread={spread:.2f}", end="\r")
# 无持仓 → 锁价开仓
if not state.has_position and abs(spread) <= OPEN_EQUALITY_USD:
print("\n[OPEN] |spread|<=OPEN_EQUALITY_USD,尝试锁价开仓")
try:
okx_res = okx_place_market(OKX_INST_ID, OKX_TD_MODE, state.okx_side_open, OKX_POS_SIDE, OKX_SZ)
bg_res = bg_open(BG_SYMBOL, BG_PRODUCT, BG_MARGIN_COIN, state.bg_side_open, BG_SIZE)
print("[OKX OPEN RES]", okx_res)
print("[BG OPEN RES]", bg_res)
state.has_position = True
state.entry_spread = spread
except Exception as e:
print("[OPEN ERROR]", e)
# 有持仓 → 触发平仓
if state.has_position and abs(spread) >= CLOSE_SPREAD_USD:
print(f"\n[CLOSE] |spread|>=CLOSE_SPREAD_USD,开始同时平仓 | spread={spread:.2f}")
try:
okx_res = okx_close_market(OKX_INST_ID, OKX_TD_MODE, OKX_POS_SIDE, OKX_SZ)
bg_res = bg_close(BG_SYMBOL, BG_PRODUCT, BG_MARGIN_COIN, state.bg_side_open, BG_SIZE)
print("[OKX CLOSE RES]", okx_res)
print("[BG CLOSE RES]", bg_res)
pnl = (spread - (state.entry_spread or 0.0))
print(f"[PNL est] Δspread = {pnl:.2f} USD(未含费率/资金费/滑点)")
state.has_position = False
state.entry_spread = None
except Exception as e:
print("[CLOSE ERROR]", e)
finally:
for t in tasks:
t.cancel()
if __name__ == "__main__":
asyncio.run(main())