#!/usr/bin/env python3
"""
K1RL QUANT — V50_1S Signal Flip Notifier v2.1
Uses EXACT same Redis subscription as Rewards.py for guaranteed compatibility
Enhanced with state persistence, better error handling, and detailed notifications
v2.1 FIXES (aligned with Rewards.py):
✅ FIX #1: Store ably_realtime + signal_channel as INSTANCE vars (prevent GC)
✅ FIX #2: Capture asyncio event loop ref (Rewards.py FIX v5.2.1 pattern)
✅ FIX #3: Run send_telegram in thread executor (don't block Redis listener thread)
✅ FIX #4: Track _last_signal_time for health monitoring
✅ FIX #5: Ensure signal_keys is list (Rewards.py line 648)
✅ FIX #6: Add connection health monitoring like Rewards.py _health_monitor_loop
"""
import asyncio
import json
import logging
import os
import time
import traceback
import requests
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from datetime import datetime, timezone, timedelta
from typing import Optional
# EXACT same imports as Rewards.py
from redis_config_v50_1s import (
REDIS_URL, REDIS_PASSWORD,
REDIS_DB_FEATURES,
CHANNEL_PREFIX, prefixed_channel
)
from redis_connection_manager import RedisAblyClient, RedisMessage
# Enhanced logging with file output
LOG_DIR = Path('/home/user/app/logs')
LOG_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [V50_1S-TG] %(levelname)s: %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(LOG_DIR / 'telegram_notifier_v75.log', encoding='utf-8')
]
)
logger = logging.getLogger(__name__)
# Telegram config
BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "8473902481:AAE0wep-lSXJ9yamU0Sj2KyNTQj9MImA3fk")
CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "6224721972")
GAS_URL = os.environ.get("GAS_RELAY_URL", "https://script.google.com/macros/s/AKfycbzvhXbTq3Pc5-429QIn-wK6uqBH7Bm-2fLJLsi_uGuD5Htn_e1ca4xCSRskKRD-tVtWaA/exec")
# EXACT same channel as Rewards.py uses
ABLY_SIGNAL_CHANNEL = prefixed_channel("final_signals") # → "V50_1S:final_signals"
# State persistence
STATE_FILE = Path('/home/user/app/data/v50_1s_flip_state.json')
# Timezone (Harare, Zimbabwe - UTC+2)
CAT = timezone(timedelta(hours=2))
# ✅ FIX #3: Thread pool for non-blocking Telegram sends
# _on_signal runs in the Redis listener THREAD — blocking it with requests.get()
# starves the listener and causes missed signals. Offload sends to a pool.
_tg_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="tg-send")
SIGNAL_TIMEOUT = 300 # 5 min without signals = problem
def now_cat():
"""Current time in CAT timezone"""
return datetime.now(CAT)
def format_time():
"""Format current time as HH:MM:SS CAT"""
return now_cat().strftime("%H:%M:%S CAT")
class V50_1SFlipNotifier:
def __init__(self):
self.last_signal = None
self.flip_count = 0
self.total_signals = 0
self.session_start = time.time()
self.last_flip_time = 0
self.cooldown_seconds = 3 # Prevent spam
# ✅ FIX #1: Instance-level refs (same as Rewards.py lines 541-542)
self.ably_realtime: Optional[RedisAblyClient] = None
self.signal_channel = None
# ✅ FIX #2: Event loop ref (same as Rewards.py FIX v5.2.1 line 919)
self._loop: Optional[asyncio.AbstractEventLoop] = None
# ✅ FIX #4: Signal health tracking (same as Rewards.py)
self._last_signal_time: float = 0
self._connection_healthy: bool = False
# Price reporter — tracks latest price from published signals
# Updated in _on_signal on every valid signal received.
self.last_price: float = 0.0
# Load persistent state
self._load_state()
# ───────────────────────── state persistence ──────────────────────────
def _save_state(self):
"""Save state to survive restarts"""
try:
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
state = {
'last_signal': self.last_signal,
'flip_count': self.flip_count,
'total_signals': self.total_signals,
'last_saved': time.time(),
'symbol': 'V75'
}
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
logger.debug(f"💾 State saved: {self.flip_count} flips, last signal: {self.last_signal}")
except Exception as e:
logger.error(f"❌ State save failed: {e}")
def _load_state(self):
"""Load persistent state"""
try:
if STATE_FILE.exists():
with open(STATE_FILE, 'r') as f:
state = json.load(f)
if state.get('symbol') != 'V75':
logger.info("🔄 Different symbol state detected — starting fresh")
return
age_hours = (time.time() - state.get('last_saved', 0)) / 3600
if age_hours < 24:
self.last_signal = state.get('last_signal')
self.flip_count = state.get('flip_count', 0)
self.total_signals = state.get('total_signals', 0)
logger.info(f"♻️ State restored: {self.flip_count} flips, last: {self.last_signal}, age: {age_hours:.1f}h")
else:
logger.info(f"🗑️ Stale state ({age_hours:.1f}h old) — starting fresh")
except Exception as e:
logger.warning(f"⚠️ State load failed (starting fresh): {e}")
# ───────────────────────── telegram ────────────────────────────────
def _send_telegram_sync(self, text: str) -> bool:
"""Send message to Telegram — runs in thread pool, never blocks listener."""
try:
branded_text = f"📊 V50_1S 〰️ {text}"
params = {
'bot_token': BOT_TOKEN,
'chat_id': CHAT_ID,
'text': branded_text,
'parse_mode': 'HTML'
}
response = requests.get(GAS_URL, params=params, timeout=10)
response.raise_for_status()
result = response.json()
if result.get('ok'):
logger.info(f"✅ Telegram sent: {text[:50]}...")
return True
else:
logger.error(f"❌ Telegram API error: {result}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"❌ Telegram network error: {e}")
return False
except Exception as e:
logger.error(f"❌ Telegram error: {e}")
return False
def send_telegram(self, text: str):
"""
✅ FIX #3: Non-blocking send.
If called from the Redis listener thread (inside _on_signal), we offload
to the thread pool so the listener can keep consuming messages.
"""
_tg_executor.submit(self._send_telegram_sync, text)
# ───────────────────────── signal processing ──────────────────────
def _extract_agents(self, signal_keys):
"""Extract agent timeframes from signal keys"""
agents = []
for key in signal_keys:
key_str = str(key).lower()
for tf in ['15m', '10m', '5m', '2m', '1m', '30s', '15s', '5s']:
if tf in key_str:
agents.append(tf)
break
return list(set(agents))
def _on_signal(self, message: RedisMessage):
"""
Process signal — EXACT same extraction as Rewards.py _on_signal (lines 616-689).
✅ This callback runs in the Redis listener THREAD (RedisAblyClient V10.1).
All work here must be synchronous. Telegram sends are offloaded to _tg_executor.
"""
try:
self.total_signals += 1
self._last_signal_time = time.time() # ✅ FIX #4
# ── data extraction (Rewards.py lines 621-629) ──
data = message.data if isinstance(message, RedisMessage) else message
# Handle nested envelope: {"event": "message", "data": {...}}
if isinstance(data, dict) and 'data' in data:
data = data['data']
# Parse if string
if isinstance(data, str):
data = json.loads(data)
# ── field extraction (Rewards.py lines 632-634) ──
action = data.get('final_action', data.get('action', '')).upper()
price = data.get('price', 0.0)
signal_keys = data.get('signal_keys', [])
# ✅ FIX #5: Ensure signal_keys is list (Rewards.py line 648)
if not isinstance(signal_keys, list):
signal_keys = [str(signal_keys)]
# ── validation (Rewards.py lines 636-642) ──
if action not in ['BUY', 'SELL']:
logger.debug(f"⚠️ Invalid action: {action}")
return
if not price or price == 0.0:
logger.warning(f"⚠️ Invalid price: {price}")
return
logger.info(
f"🔔 Signal #{self.total_signals}: {action} @ {price:.5f} "
f"| Keys: {len(signal_keys)} "
f"| Loop: {'✅' if self._loop and self._loop.is_running() else '❌'}"
)
# ── flip detection ──
if self.last_signal and self.last_signal != action:
# Cooldown check
time_since_flip = time.time() - self.last_flip_time
if time_since_flip < self.cooldown_seconds:
logger.info(f"⏳ Flip cooldown: {self.cooldown_seconds - time_since_flip:.1f}s remaining")
self.last_signal = action
return
self.flip_count += 1
self.last_flip_time = time.time()
logger.warning(f"🚨 FLIP #{self.flip_count}: {self.last_signal} → {action} @ {price:.5f}")
agents = self._extract_agents(signal_keys)
agents_str = ', '.join(a.upper() for a in agents) if agents else 'UNKNOWN'
session_minutes = (time.time() - self.session_start) / 60
# V50_1S directional indicators
old_color = "🟢" if self.last_signal == "BUY" else "🔴"
old_arrow = "📈" if self.last_signal == "BUY" else "📉"
new_color = "🟢" if action == "BUY" else "🔴"
new_arrow = "📈" if action == "BUY" else "📉"
flip_msg = (
f"🚨 VOLATILITY 50_1S SIGNAL FLIP\n\n"
f"{old_color} {self.last_signal} {old_arrow} ➤ "
f"{new_color} {action} {new_arrow}\n\n"
f"💰 Price: {price:.5f}\n"
f"⚡ Agent: {agents_str}\n"
f"🎯 Flip #: {self.flip_count}\n"
f"📊 Tick Rate: 1 tick per second\n\n"
f"📈 SESSION STATS\n"
f"• Total Signals: {self.total_signals}\n"
f"• Session: {session_minutes:.0f} minutes\n"
f"• Keys: {len(signal_keys)}\n\n"
f"🕐 TIMING\n"
f"{format_time()}\n\n"
f"Volatility 50_1S — continuous random-walk, 75% annualised vol 〰️"
)
# ✅ FIX #3: Non-blocking — offloaded to thread pool
self.send_telegram(flip_msg)
# Save state after flip
self._save_state()
# Update last signal
self.last_signal = action
self.last_price = price # always track latest price
except Exception as e:
logger.error(f"❌ Signal processing error: {e}")
traceback.print_exc()
# ───────────────────────── health monitor ─────────────────────────
async def _health_monitor_loop(self):
"""
✅ FIX #6: Connection health monitoring (same pattern as Rewards.py).
Detects stale connections and logs warnings.
"""
while True:
try:
await asyncio.sleep(60)
# Check Redis connection state
if self.ably_realtime:
try:
redis_state = self.ably_realtime.connection.state
self._connection_healthy = redis_state in ('connected', 'ready')
except Exception:
self._connection_healthy = False
else:
self._connection_healthy = False
# Check signal freshness
signal_age = time.time() - self._last_signal_time if self._last_signal_time else float('inf')
if self._last_signal_time and signal_age > SIGNAL_TIMEOUT:
logger.warning(
f"⚠️ No signals for {signal_age:.0f}s "
f"(threshold: {SIGNAL_TIMEOUT}s) — "
f"Healthy={'✅' if self._connection_healthy else '❌'}"
)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"❌ Health monitor error: {e}")
# ───────────────────────── 10-min price reporter ──────────────────────────
async def _price_report_loop(self):
"""
Every 10 minutes send a Telegram message showing the current price
from the latest published signal, plus a brief session snapshot.
Fires on the 10-minute mark regardless of flip activity — gives the
trader a heartbeat even during quiet periods.
"""
await asyncio.sleep(600) # wait 10 min before first report
while True:
try:
signal_age = time.time() - self._last_signal_time if self._last_signal_time else -1
session_minutes = (time.time() - self.session_start) / 60
direction_color = "🟢" if self.last_signal == "BUY" else (
"🔴" if self.last_signal == "SELL" else "⚪"
)
direction_arrow = "📈" if self.last_signal == "BUY" else (
"📉" if self.last_signal == "SELL" else "➖"
)
if self.last_price and self.last_price > 0:
price_line = f"💰 Price: {self.last_price:.5f}"
else:
price_line = "💰 Price: —"
age_str = (
f"{signal_age:.0f}s ago" if signal_age >= 0
else "no signal yet"
)
report_msg = (
f"🕐 VOLATILITY 50_1S — 10-MIN UPDATE\n\n"
f"📊 CURRENT SIGNAL\n"
f"{direction_color} Direction: {self.last_signal or '—'} {direction_arrow}\n"
f"{price_line}\n"
f"🕰 Signal Age: {age_str}\n\n"
f"📈 SESSION\n"
f"• Signals: {self.total_signals}\n"
f"• Flips: {self.flip_count}\n"
f"• Runtime: {session_minutes:.0f} minutes\n\n"
f"🩺 CONNECTION\n"
f"• Redis: {'✅ Active' if self._connection_healthy else '❌ Down'}\n\n"
f"🕐 TIME\n"
f"{format_time()}\n\n"
f"Next update in 10 minutes ⏱"
)
self.send_telegram(report_msg)
logger.info(
f"📢 [10-min report] {self.last_signal or '—'} @ "
f"{self.last_price:.5f} | "
f"Signals: {self.total_signals} | "
f"Flips: {self.flip_count}"
)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"❌ Price report error: {e}")
await asyncio.sleep(600) # 10 minutes
# ───────────────────────── main loop ──────────────────────────────
async def run(self):
"""Run the notifier with enhanced monitoring"""
# ✅ FIX #2: Capture event loop (Rewards.py line 919)
self._loop = asyncio.get_running_loop()
logger.info("=" * 60)
logger.info("🚀 K1RL QUANT — V50_1S Signal Flip Notifier v2.1")
logger.info(f" Channel: {ABLY_SIGNAL_CHANNEL}")
logger.info(f" Database: {REDIS_DB_FEATURES}")
logger.info(f" Method: Same as Rewards.py (RedisAblyClient V10.1)")
logger.info(f" Started: {format_time()}")
logger.info(f" State: {self.flip_count} flips, last: {self.last_signal or '—'}")
logger.info(f" Loop: captured={'✅' if self._loop else '❌'}")
logger.info("=" * 60)
try:
# ✅ FIX #1: INSTANCE-level refs (Rewards.py lines 557-565)
# Local vars could be GC'd if the reference is lost.
self.ably_realtime = RedisAblyClient(
redis_url=REDIS_URL,
password=REDIS_PASSWORD,
use_streams=True,
database=REDIS_DB_FEATURES # DB 0 (V50_1S features)
)
self.signal_channel = self.ably_realtime.channels.get(ABLY_SIGNAL_CHANNEL)
logger.info(f"✅ Redis channels initialized (V50_1S — prefix='{CHANNEL_PREFIX}', DB={REDIS_DB_FEATURES})")
logger.info(f" Signal: {ABLY_SIGNAL_CHANNEL}")
# ✅ Subscribe (Rewards.py line 941)
await self.signal_channel.subscribe("message", self._on_signal)
logger.info(f"✅ Subscribed to {ABLY_SIGNAL_CHANNEL}:message (robust V10.1 listener)")
logger.info(f" Event loop captured: {self._loop is not None} | Running: {self._loop.is_running() if self._loop else False}")
# Send startup notification
startup_msg = (
f"🚀 VOLATILITY 50_1S MONITOR ONLINE\n\n"
f"📊 CONFIGURATION\n"
f"• Symbol: Volatility 50 Index (1s)\n"
f"• Tick Rate: 1 tick per second\n"
f"• Volatility Type: Continuous random-walk volatility\n"
f"• Target Agent: 10M\n"
f"• Window: 5 minutes\n"
f"• Cache: 2000 slots\n\n"
f"🎯 MISSION\n"
f"Monitoring V50_1S BUY↔SELL flips\n"
f"Real-time momentum signal detection active\n\n"
f"🕐 STATUS\n"
f"Started: {format_time()}\n"
f"Mode: Volatility 50_1S Signal Detection\n\n"
f"Watching for the next directional flip ⚡"
)
self.send_telegram(startup_msg)
# ✅ FIX #6: Start health monitor
health_task = asyncio.create_task(self._health_monitor_loop())
# 10-minute price reporter
price_report_task = asyncio.create_task(self._price_report_loop())
# Main loop with periodic status
logger.info("✅ Monitoring for V50_1S signal flips...")
status_counter = 0
while True:
await asyncio.sleep(60)
status_counter += 1
session_minutes = (time.time() - self.session_start) / 60
signal_age = time.time() - self._last_signal_time if self._last_signal_time else -1
logger.info(
f"📊 [{format_time()}] Status: {self.total_signals} signals | "
f"{self.flip_count} flips | Last: {self.last_signal or '—'} | "
f"SignalAge: {signal_age:.0f}s | "
f"Healthy: {'✅' if self._connection_healthy else '❌'} | "
f"Runtime: {session_minutes:.0f}m"
)
# Detailed status every 30 minutes
if status_counter % 30 == 0:
status_msg = (
f"📊 VOLATILITY 50_1S STATUS REPORT\n\n"
f"📈 SESSION\n"
f"• Signals: {self.total_signals} received\n"
f"• Flips: {self.flip_count} detected\n"
f"• Last Signal: {self.last_signal or '—'}\n"
f"• Runtime: {session_minutes:.0f} minutes\n\n"
f"🩺 HEALTH\n"
f"• Connection: {'✅ Active' if self._connection_healthy else '❌ Down'}\n\n"
f"🕐 TIMING\n"
f"{format_time()}\n\n"
f"V50_1S signal monitoring continues..."
)
self.send_telegram(status_msg)
except KeyboardInterrupt:
logger.info("🛑 Stopped by user")
self._send_telegram_sync(
f"🛑 VOLATILITY 50_1S MONITOR OFFLINE\n\n"
f"📊 SESSION SUMMARY\n"
f"• Signals Processed: {self.total_signals}\n"
f"• Flips Detected: {self.flip_count}\n"
f"• Last Signal: {self.last_signal or '—'}\n\n"
f"🕐 SHUTDOWN\n"
f"{format_time()}\n\n"
f"V50_1S signal monitoring paused"
)
except Exception as e:
logger.error(f"❌ Fatal error: {e}", exc_info=True)
self._send_telegram_sync(f"❌ Error: {str(e)[:200]}...")
finally:
self._save_state()
if self.ably_realtime:
try:
self.ably_realtime.close()
except Exception:
pass
_tg_executor.shutdown(wait=False)
logger.info(f"✅ Shutdown complete. Final state: {self.flip_count} flips, {self.total_signals} signals")
if __name__ == "__main__":
notifier = V50_1SFlipNotifier()
asyncio.run(notifier.run())