코인

바이낸스 체결 알림 봇 만들기

stupidsoft 2025. 4. 26. 11:17
반응형

 

바이낸스에서 코인 거래를 할 경우 보통은 레버리지를 사용하기 위함이고 그러려면 Future COIN-M 으로 거래하게 마련이다.

그런데 여기서 거래를 하다보면 체결 알림이 오지 않는다.

물론 내가 지금 보고 있는 경우는 상관이 없는데 매도/매수를 걸어두고 다른일을 하는 경우

체결알림이 없다보니 여간 불편한게 아니다.

그래서 텔레그램으로 체결 알림을 만들어서 오라클 서버에 올려두고 사용중이다.

여기 간단한 코드가 있으니 참고하면 좋겠다.

 

import asyncio
import websockets
import requests
import json
import threading
import time


API_KEY = "키"
API_SECRET = "키"

TELEGRAM_TOKEN = '토큰'
TELEGRAM_CHAT_ID = '챗아이디'

# listenKey 발급 함수
def get_listen_key():
    url = "https://fapi.binance.com/fapi/v1/listenKey"
    headers = {"X-MBX-APIKEY": API_KEY}
    resp = requests.post(url, headers=headers)
    return resp.json()["listenKey"]

# 텔레그램 알림 함수
def send_telegram_alert(message):
    url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
    payload = {
        "chat_id": TELEGRAM_CHAT_ID,
        "text": message
    }
    try:
        requests.post(url, data=payload)
    except Exception as e:
        print("🚫 Telegram error:", e)

# listenKey 갱신 함수 (30분마다)
def keep_alive_listen_key(listen_key):
    url = "https://fapi.binance.com/fapi/v1/listenKey"
    headers = {"X-MBX-APIKEY": API_KEY}
    while True:
        try:
            requests.put(url, headers=headers)
            print("🔄 ListenKey refreshed.")
        except Exception as e:
            print("⚠️ ListenKey refresh error:", e)
        time.sleep(30 * 60)  # 30분마다 갱신

# 웹소켓 리스닝 함수
async def listen_user_stream(listen_key):
    ws_url = f"wss://fstream.binance.com/ws/{listen_key}"
    async with websockets.connect(ws_url) as websocket:
        print("📡 Connected to Binance USDT-M user stream.")
        while True:
            try:
                msg = await websocket.recv()
                data = json.loads(msg)
                event_type = data.get("e", "")

                # 체결 완료 시 알림
                if event_type == "ORDER_TRADE_UPDATE":
                    order = data["o"]
                    if order["X"] == "FILLED":  # 체결 완료
                        symbol = order["s"]
                        side = order["S"]
                        price = order["ap"]
                        qty = order["q"]
                        msg = f"📢 체결 완료!\n종목: {symbol}\n방향: {side}\n가격: {price}\n수량: {qty}"
                        print(msg)
                        send_telegram_alert(msg)

            except Exception as e:
                print("❌ WebSocket Error:", e)
                break

# listenKey 갱신을 위한 백그라운드 쓰레드 시작
def start_keep_alive_thread(listen_key):
    t = threading.Thread(target=keep_alive_listen_key, args=(listen_key,), daemon=True)
    t.start()

# 메인 실행
if __name__ == "__main__":
    listen_key = get_listen_key()  # listenKey 발급
    start_keep_alive_thread(listen_key)  # 갱신 쓰레드 시작
    asyncio.run(listen_user_stream(listen_key))  # 웹소켓 리스닝

 

 

반응형