#!/usr/bin/env python3 """ K1RL QUANT — V50_1S Signal Flip Notifier v2.1 Uses EXACT same Redis subscription as Rewards.py for guaranteed compatibility Enhanced with state persistence, better error handling, and detailed notifications v2.1 FIXES (aligned with Rewards.py): ✅ FIX #1: Store ably_realtime + signal_channel as INSTANCE vars (prevent GC) ✅ FIX #2: Capture asyncio event loop ref (Rewards.py FIX v5.2.1 pattern) ✅ FIX #3: Run send_telegram in thread executor (don't block Redis listener thread) ✅ FIX #4: Track _last_signal_time for health monitoring ✅ FIX #5: Ensure signal_keys is list (Rewards.py line 648) ✅ FIX #6: Add connection health monitoring like Rewards.py _health_monitor_loop """ import asyncio import json import logging import os import time import traceback import requests from concurrent.futures import ThreadPoolExecutor from pathlib import Path from datetime import datetime, timezone, timedelta from typing import Optional # EXACT same imports as Rewards.py from redis_config_v50_1s import ( REDIS_URL, REDIS_PASSWORD, REDIS_DB_FEATURES, CHANNEL_PREFIX, prefixed_channel ) from redis_connection_manager import RedisAblyClient, RedisMessage # Enhanced logging with file output LOG_DIR = Path('/home/user/app/logs') LOG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s [V50_1S-TG] %(levelname)s: %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler(LOG_DIR / 'telegram_notifier_v75.log', encoding='utf-8') ] ) logger = logging.getLogger(__name__) # Telegram config BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "8473902481:AAE0wep-lSXJ9yamU0Sj2KyNTQj9MImA3fk") CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "6224721972") GAS_URL = os.environ.get("GAS_RELAY_URL", "https://script.google.com/macros/s/AKfycbzvhXbTq3Pc5-429QIn-wK6uqBH7Bm-2fLJLsi_uGuD5Htn_e1ca4xCSRskKRD-tVtWaA/exec") # EXACT same channel as Rewards.py uses ABLY_SIGNAL_CHANNEL = prefixed_channel("final_signals") # → "V50_1S:final_signals" # State persistence STATE_FILE = Path('/home/user/app/data/v50_1s_flip_state.json') # Timezone (Harare, Zimbabwe - UTC+2) CAT = timezone(timedelta(hours=2)) # ✅ FIX #3: Thread pool for non-blocking Telegram sends # _on_signal runs in the Redis listener THREAD — blocking it with requests.get() # starves the listener and causes missed signals. Offload sends to a pool. _tg_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="tg-send") SIGNAL_TIMEOUT = 300 # 5 min without signals = problem def now_cat(): """Current time in CAT timezone""" return datetime.now(CAT) def format_time(): """Format current time as HH:MM:SS CAT""" return now_cat().strftime("%H:%M:%S CAT") class V50_1SFlipNotifier: def __init__(self): self.last_signal = None self.flip_count = 0 self.total_signals = 0 self.session_start = time.time() self.last_flip_time = 0 self.cooldown_seconds = 3 # Prevent spam # ✅ FIX #1: Instance-level refs (same as Rewards.py lines 541-542) self.ably_realtime: Optional[RedisAblyClient] = None self.signal_channel = None # ✅ FIX #2: Event loop ref (same as Rewards.py FIX v5.2.1 line 919) self._loop: Optional[asyncio.AbstractEventLoop] = None # ✅ FIX #4: Signal health tracking (same as Rewards.py) self._last_signal_time: float = 0 self._connection_healthy: bool = False # Price reporter — tracks latest price from published signals # Updated in _on_signal on every valid signal received. self.last_price: float = 0.0 # Load persistent state self._load_state() # ───────────────────────── state persistence ────────────────────────── def _save_state(self): """Save state to survive restarts""" try: STATE_FILE.parent.mkdir(parents=True, exist_ok=True) state = { 'last_signal': self.last_signal, 'flip_count': self.flip_count, 'total_signals': self.total_signals, 'last_saved': time.time(), 'symbol': 'V75' } with open(STATE_FILE, 'w') as f: json.dump(state, f, indent=2) logger.debug(f"💾 State saved: {self.flip_count} flips, last signal: {self.last_signal}") except Exception as e: logger.error(f"❌ State save failed: {e}") def _load_state(self): """Load persistent state""" try: if STATE_FILE.exists(): with open(STATE_FILE, 'r') as f: state = json.load(f) if state.get('symbol') != 'V75': logger.info("🔄 Different symbol state detected — starting fresh") return age_hours = (time.time() - state.get('last_saved', 0)) / 3600 if age_hours < 24: self.last_signal = state.get('last_signal') self.flip_count = state.get('flip_count', 0) self.total_signals = state.get('total_signals', 0) logger.info(f"♻️ State restored: {self.flip_count} flips, last: {self.last_signal}, age: {age_hours:.1f}h") else: logger.info(f"🗑️ Stale state ({age_hours:.1f}h old) — starting fresh") except Exception as e: logger.warning(f"⚠️ State load failed (starting fresh): {e}") # ───────────────────────── telegram ──────────────────────────────── def _send_telegram_sync(self, text: str) -> bool: """Send message to Telegram — runs in thread pool, never blocks listener.""" try: branded_text = f"📊 V50_1S 〰️ {text}" params = { 'bot_token': BOT_TOKEN, 'chat_id': CHAT_ID, 'text': branded_text, 'parse_mode': 'HTML' } response = requests.get(GAS_URL, params=params, timeout=10) response.raise_for_status() result = response.json() if result.get('ok'): logger.info(f"✅ Telegram sent: {text[:50]}...") return True else: logger.error(f"❌ Telegram API error: {result}") return False except requests.exceptions.RequestException as e: logger.error(f"❌ Telegram network error: {e}") return False except Exception as e: logger.error(f"❌ Telegram error: {e}") return False def send_telegram(self, text: str): """ ✅ FIX #3: Non-blocking send. If called from the Redis listener thread (inside _on_signal), we offload to the thread pool so the listener can keep consuming messages. """ _tg_executor.submit(self._send_telegram_sync, text) # ───────────────────────── signal processing ────────────────────── def _extract_agents(self, signal_keys): """Extract agent timeframes from signal keys""" agents = [] for key in signal_keys: key_str = str(key).lower() for tf in ['15m', '10m', '5m', '2m', '1m', '30s', '15s', '5s']: if tf in key_str: agents.append(tf) break return list(set(agents)) def _on_signal(self, message: RedisMessage): """ Process signal — EXACT same extraction as Rewards.py _on_signal (lines 616-689). ✅ This callback runs in the Redis listener THREAD (RedisAblyClient V10.1). All work here must be synchronous. Telegram sends are offloaded to _tg_executor. """ try: self.total_signals += 1 self._last_signal_time = time.time() # ✅ FIX #4 # ── data extraction (Rewards.py lines 621-629) ── data = message.data if isinstance(message, RedisMessage) else message # Handle nested envelope: {"event": "message", "data": {...}} if isinstance(data, dict) and 'data' in data: data = data['data'] # Parse if string if isinstance(data, str): data = json.loads(data) # ── field extraction (Rewards.py lines 632-634) ── action = data.get('final_action', data.get('action', '')).upper() price = data.get('price', 0.0) signal_keys = data.get('signal_keys', []) # ✅ FIX #5: Ensure signal_keys is list (Rewards.py line 648) if not isinstance(signal_keys, list): signal_keys = [str(signal_keys)] # ── validation (Rewards.py lines 636-642) ── if action not in ['BUY', 'SELL']: logger.debug(f"⚠️ Invalid action: {action}") return if not price or price == 0.0: logger.warning(f"⚠️ Invalid price: {price}") return logger.info( f"🔔 Signal #{self.total_signals}: {action} @ {price:.5f} " f"| Keys: {len(signal_keys)} " f"| Loop: {'✅' if self._loop and self._loop.is_running() else '❌'}" ) # ── flip detection ── if self.last_signal and self.last_signal != action: # Cooldown check time_since_flip = time.time() - self.last_flip_time if time_since_flip < self.cooldown_seconds: logger.info(f"⏳ Flip cooldown: {self.cooldown_seconds - time_since_flip:.1f}s remaining") self.last_signal = action return self.flip_count += 1 self.last_flip_time = time.time() logger.warning(f"🚨 FLIP #{self.flip_count}: {self.last_signal} → {action} @ {price:.5f}") agents = self._extract_agents(signal_keys) agents_str = ', '.join(a.upper() for a in agents) if agents else 'UNKNOWN' session_minutes = (time.time() - self.session_start) / 60 # V50_1S directional indicators old_color = "🟢" if self.last_signal == "BUY" else "🔴" old_arrow = "📈" if self.last_signal == "BUY" else "📉" new_color = "🟢" if action == "BUY" else "🔴" new_arrow = "📈" if action == "BUY" else "📉" flip_msg = ( f"🚨 VOLATILITY 50_1S SIGNAL FLIP\n\n" f"{old_color} {self.last_signal} {old_arrow} ➤ " f"{new_color} {action} {new_arrow}\n\n" f"💰 Price: {price:.5f}\n" f"⚡ Agent: {agents_str}\n" f"🎯 Flip #: {self.flip_count}\n" f"📊 Tick Rate: 1 tick per second\n\n" f"📈 SESSION STATS\n" f"• Total Signals: {self.total_signals}\n" f"• Session: {session_minutes:.0f} minutes\n" f"• Keys: {len(signal_keys)}\n\n" f"🕐 TIMING\n" f"{format_time()}\n\n" f"Volatility 50_1S — continuous random-walk, 75% annualised vol 〰️" ) # ✅ FIX #3: Non-blocking — offloaded to thread pool self.send_telegram(flip_msg) # Save state after flip self._save_state() # Update last signal self.last_signal = action self.last_price = price # always track latest price except Exception as e: logger.error(f"❌ Signal processing error: {e}") traceback.print_exc() # ───────────────────────── health monitor ───────────────────────── async def _health_monitor_loop(self): """ ✅ FIX #6: Connection health monitoring (same pattern as Rewards.py). Detects stale connections and logs warnings. """ while True: try: await asyncio.sleep(60) # Check Redis connection state if self.ably_realtime: try: redis_state = self.ably_realtime.connection.state self._connection_healthy = redis_state in ('connected', 'ready') except Exception: self._connection_healthy = False else: self._connection_healthy = False # Check signal freshness signal_age = time.time() - self._last_signal_time if self._last_signal_time else float('inf') if self._last_signal_time and signal_age > SIGNAL_TIMEOUT: logger.warning( f"⚠️ No signals for {signal_age:.0f}s " f"(threshold: {SIGNAL_TIMEOUT}s) — " f"Healthy={'✅' if self._connection_healthy else '❌'}" ) except asyncio.CancelledError: break except Exception as e: logger.error(f"❌ Health monitor error: {e}") # ───────────────────────── 10-min price reporter ────────────────────────── async def _price_report_loop(self): """ Every 10 minutes send a Telegram message showing the current price from the latest published signal, plus a brief session snapshot. Fires on the 10-minute mark regardless of flip activity — gives the trader a heartbeat even during quiet periods. """ await asyncio.sleep(600) # wait 10 min before first report while True: try: signal_age = time.time() - self._last_signal_time if self._last_signal_time else -1 session_minutes = (time.time() - self.session_start) / 60 direction_color = "🟢" if self.last_signal == "BUY" else ( "🔴" if self.last_signal == "SELL" else "⚪" ) direction_arrow = "📈" if self.last_signal == "BUY" else ( "📉" if self.last_signal == "SELL" else "➖" ) if self.last_price and self.last_price > 0: price_line = f"💰 Price: {self.last_price:.5f}" else: price_line = "💰 Price: —" age_str = ( f"{signal_age:.0f}s ago" if signal_age >= 0 else "no signal yet" ) report_msg = ( f"🕐 VOLATILITY 50_1S — 10-MIN UPDATE\n\n" f"📊 CURRENT SIGNAL\n" f"{direction_color} Direction: {self.last_signal or '—'} {direction_arrow}\n" f"{price_line}\n" f"🕰 Signal Age: {age_str}\n\n" f"📈 SESSION\n" f"• Signals: {self.total_signals}\n" f"• Flips: {self.flip_count}\n" f"• Runtime: {session_minutes:.0f} minutes\n\n" f"🩺 CONNECTION\n" f"• Redis: {'✅ Active' if self._connection_healthy else '❌ Down'}\n\n" f"🕐 TIME\n" f"{format_time()}\n\n" f"Next update in 10 minutes ⏱" ) self.send_telegram(report_msg) logger.info( f"📢 [10-min report] {self.last_signal or '—'} @ " f"{self.last_price:.5f} | " f"Signals: {self.total_signals} | " f"Flips: {self.flip_count}" ) except asyncio.CancelledError: break except Exception as e: logger.error(f"❌ Price report error: {e}") await asyncio.sleep(600) # 10 minutes # ───────────────────────── main loop ────────────────────────────── async def run(self): """Run the notifier with enhanced monitoring""" # ✅ FIX #2: Capture event loop (Rewards.py line 919) self._loop = asyncio.get_running_loop() logger.info("=" * 60) logger.info("🚀 K1RL QUANT — V50_1S Signal Flip Notifier v2.1") logger.info(f" Channel: {ABLY_SIGNAL_CHANNEL}") logger.info(f" Database: {REDIS_DB_FEATURES}") logger.info(f" Method: Same as Rewards.py (RedisAblyClient V10.1)") logger.info(f" Started: {format_time()}") logger.info(f" State: {self.flip_count} flips, last: {self.last_signal or '—'}") logger.info(f" Loop: captured={'✅' if self._loop else '❌'}") logger.info("=" * 60) try: # ✅ FIX #1: INSTANCE-level refs (Rewards.py lines 557-565) # Local vars could be GC'd if the reference is lost. self.ably_realtime = RedisAblyClient( redis_url=REDIS_URL, password=REDIS_PASSWORD, use_streams=True, database=REDIS_DB_FEATURES # DB 0 (V50_1S features) ) self.signal_channel = self.ably_realtime.channels.get(ABLY_SIGNAL_CHANNEL) logger.info(f"✅ Redis channels initialized (V50_1S — prefix='{CHANNEL_PREFIX}', DB={REDIS_DB_FEATURES})") logger.info(f" Signal: {ABLY_SIGNAL_CHANNEL}") # ✅ Subscribe (Rewards.py line 941) await self.signal_channel.subscribe("message", self._on_signal) logger.info(f"✅ Subscribed to {ABLY_SIGNAL_CHANNEL}:message (robust V10.1 listener)") logger.info(f" Event loop captured: {self._loop is not None} | Running: {self._loop.is_running() if self._loop else False}") # Send startup notification startup_msg = ( f"🚀 VOLATILITY 50_1S MONITOR ONLINE\n\n" f"📊 CONFIGURATION\n" f"• Symbol: Volatility 50 Index (1s)\n" f"• Tick Rate: 1 tick per second\n" f"• Volatility Type: Continuous random-walk volatility\n" f"• Target Agent: 10M\n" f"• Window: 5 minutes\n" f"• Cache: 2000 slots\n\n" f"🎯 MISSION\n" f"Monitoring V50_1S BUY↔SELL flips\n" f"Real-time momentum signal detection active\n\n" f"🕐 STATUS\n" f"Started: {format_time()}\n" f"Mode: Volatility 50_1S Signal Detection\n\n" f"Watching for the next directional flip ⚡" ) self.send_telegram(startup_msg) # ✅ FIX #6: Start health monitor health_task = asyncio.create_task(self._health_monitor_loop()) # 10-minute price reporter price_report_task = asyncio.create_task(self._price_report_loop()) # Main loop with periodic status logger.info("✅ Monitoring for V50_1S signal flips...") status_counter = 0 while True: await asyncio.sleep(60) status_counter += 1 session_minutes = (time.time() - self.session_start) / 60 signal_age = time.time() - self._last_signal_time if self._last_signal_time else -1 logger.info( f"📊 [{format_time()}] Status: {self.total_signals} signals | " f"{self.flip_count} flips | Last: {self.last_signal or '—'} | " f"SignalAge: {signal_age:.0f}s | " f"Healthy: {'✅' if self._connection_healthy else '❌'} | " f"Runtime: {session_minutes:.0f}m" ) # Detailed status every 30 minutes if status_counter % 30 == 0: status_msg = ( f"📊 VOLATILITY 50_1S STATUS REPORT\n\n" f"📈 SESSION\n" f"• Signals: {self.total_signals} received\n" f"• Flips: {self.flip_count} detected\n" f"• Last Signal: {self.last_signal or '—'}\n" f"• Runtime: {session_minutes:.0f} minutes\n\n" f"🩺 HEALTH\n" f"• Connection: {'✅ Active' if self._connection_healthy else '❌ Down'}\n\n" f"🕐 TIMING\n" f"{format_time()}\n\n" f"V50_1S signal monitoring continues..." ) self.send_telegram(status_msg) except KeyboardInterrupt: logger.info("🛑 Stopped by user") self._send_telegram_sync( f"🛑 VOLATILITY 50_1S MONITOR OFFLINE\n\n" f"📊 SESSION SUMMARY\n" f"• Signals Processed: {self.total_signals}\n" f"• Flips Detected: {self.flip_count}\n" f"• Last Signal: {self.last_signal or '—'}\n\n" f"🕐 SHUTDOWN\n" f"{format_time()}\n\n" f"V50_1S signal monitoring paused" ) except Exception as e: logger.error(f"❌ Fatal error: {e}", exc_info=True) self._send_telegram_sync(f"❌ Error: {str(e)[:200]}...") finally: self._save_state() if self.ably_realtime: try: self.ably_realtime.close() except Exception: pass _tg_executor.shutdown(wait=False) logger.info(f"✅ Shutdown complete. Final state: {self.flip_count} flips, {self.total_signals} signals") if __name__ == "__main__": notifier = V50_1SFlipNotifier() asyncio.run(notifier.run())