import { Logs, PublicKey } from '@solana/web3.js' import { getConnection } from '../solana/connection' import { config } from '../config' import { getEnabledWatchedAddresses, isTxProcessed } from '../db/queries' import { parseTransaction } from './parser' import type { ParsedOperation } from './types' import { CopyEngine } from '../copier/index' const BYREAL_PROGRAM_ID = new PublicKey('REALQqNEomY6cQGZJUGwywTBD2UmDT32rZcNnfxQ5N2') /** How often to reconcile watched addresses with DB (picks up add/remove/toggle) */ const RECONCILE_INTERVAL_MS = 10_000 /** How long to keep a signature in the dedup set (prevents re-processing) */ const DEDUP_TTL_MS = 120_000 let instance: MonitorService | null = null export class MonitorService { private running = false private subscriptionId: number | null = null private pollInterval: NodeJS.Timeout | null = null private reconcileInterval: NodeJS.Timeout | null = null private lastSignatures: Map = new Map() private watchedAddresses: Set = new Set() private copyEngine: CopyEngine /** Time-based dedup: signatures auto-expire after DEDUP_TTL_MS */ private recentSigs: Set = new Set() /** In-flight lock: prevents concurrent processing of same signature */ private processingSigs: Set = new Set() /** Error log with timestamps */ private errors: string[] = [] private constructor() { this.copyEngine = new CopyEngine() } static getInstance(): MonitorService { if (!instance) { instance = new MonitorService() } return instance } async start(): Promise { if (this.running) return // Load watched addresses const addresses = getEnabledWatchedAddresses() this.watchedAddresses = new Set(addresses.map((a) => a.address)) if (this.watchedAddresses.size === 0) { console.log('[Monitor] No watched addresses configured') } this.running = true this.errors = [] console.log(`[Monitor] Starting with ${this.watchedAddresses.size} watched addresses`) // Snapshot current latest signature for each address so polling // only picks up transactions that occur AFTER monitor start. await this.initLastSignatures() // Start WebSocket subscription to program logs this.subscribeToLogs() // Start polling fallback this.startPolling() // Start periodic reconciliation to pick up DB changes this.startReconciliation() } async stop(): Promise { if (!this.running) return this.running = false // Unsubscribe WebSocket if (this.subscriptionId !== null) { try { const connection = getConnection() await connection.removeOnLogsListener(this.subscriptionId) } catch (e) { console.error('[Monitor] Error unsubscribing:', e) } this.subscriptionId = null } // Stop polling if (this.pollInterval) { clearInterval(this.pollInterval) this.pollInterval = null } // Stop reconciliation if (this.reconcileInterval) { clearInterval(this.reconcileInterval) this.reconcileInterval = null } console.log('[Monitor] Stopped') } getStatus() { const dbAddresses = getEnabledWatchedAddresses() return { running: this.running, watchedCount: this.running ? this.watchedAddresses.size : dbAddresses.length, watchedAddresses: this.running ? Array.from(this.watchedAddresses) : dbAddresses.map((a) => a.address), errors: [...this.errors].slice(-10), } } reloadAddresses() { this.reconcileAddresses() } private recordError(msg: string) { const entry = `${new Date().toISOString()}: ${msg}` console.error(entry) this.errors.push(entry) if (this.errors.length > 50) { this.errors.splice(0, this.errors.length - 50) } } /** * Add signature to dedup set with automatic TTL expiry * @returns true if signature was new (not already in set) */ private markSeen(sig: string): boolean { if (this.recentSigs.has(sig)) return false this.recentSigs.add(sig) setTimeout(() => this.recentSigs.delete(sig), DEDUP_TTL_MS) return true } /** * Reconcile watched addresses with DB (picks up add/remove/toggle changes) */ private async reconcileAddresses() { const addresses = getEnabledWatchedAddresses() const newSet = new Set(addresses.map((a) => a.address)) // Init last signature for newly added addresses so we don't pull history const newAddrs: string[] = [] for (const addr of newSet) { if (!this.watchedAddresses.has(addr)) { console.log(`[Monitor] New address detected: ${addr.slice(0, 12)}...`) newAddrs.push(addr) } } for (const addr of this.watchedAddresses) { if (!newSet.has(addr)) { console.log(`[Monitor] Address removed/disabled: ${addr.slice(0, 12)}...`) } } // Snapshot latest sig for new addresses before they enter the poll loop if (newAddrs.length > 0) { const connection = getConnection() for (const addr of newAddrs) { try { const sigs = await connection.getSignaturesForAddress( new PublicKey(addr), { limit: 1 }, 'confirmed', ) if (sigs.length > 0) { this.lastSignatures.set(addr, sigs[0].signature) } } catch (e) { this.recordError(`[Monitor] Failed to init last signature for new address ${addr.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`) } } } this.watchedAddresses = newSet } private startReconciliation() { this.reconcileInterval = setInterval(() => { if (!this.running) return this.reconcileAddresses() }, RECONCILE_INTERVAL_MS) } /** * Fetch the latest signature for each watched address so the first poll * only processes transactions that happen after this point. */ private async initLastSignatures() { const connection = getConnection() for (const address of this.watchedAddresses) { try { const sigs = await connection.getSignaturesForAddress( new PublicKey(address), { limit: 1 }, 'confirmed', ) if (sigs.length > 0) { this.lastSignatures.set(address, sigs[0].signature) } } catch (e) { this.recordError(`[Monitor] Failed to init last signature for ${address.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`) } } console.log(`[Monitor] Initialized last signatures for ${this.lastSignatures.size} addresses`) } private subscribeToLogs() { try { const connection = getConnection() this.subscriptionId = connection.onLogs( BYREAL_PROGRAM_ID, (logs: Logs) => { this.handleLog(logs) }, 'confirmed', ) console.log('[Monitor] WebSocket subscription active') } catch (e) { this.recordError(`[Monitor] WebSocket subscription failed: ${e instanceof Error ? e.message : String(e)}`) } } private async handleLog(logs: Logs) { if (logs.err) return const sig = logs.signature if (!this.markSeen(sig)) return // Already seen within TTL window try { await this.processTransaction(sig) } catch (e) { this.recordError(`[Monitor] Error processing tx ${sig}: ${e instanceof Error ? e.message : String(e)}`) } } private startPolling() { this.pollInterval = setInterval(async () => { if (!this.running) return for (const address of this.watchedAddresses) { try { await this.pollAddress(address) } catch (e) { this.recordError(`[Monitor] Polling error for ${address.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`) } } }, config.monitorPollInterval) } private async pollAddress(address: string) { const connection = getConnection() const lastSig = this.lastSignatures.get(address) const signatures = await connection.getSignaturesForAddress( new PublicKey(address), { limit: 10, until: lastSig, }, 'confirmed', ) if (signatures.length === 0) return // Update last seen signature this.lastSignatures.set(address, signatures[0].signature) // Process in chronological order (oldest first) for (const sigInfo of signatures.reverse()) { if (sigInfo.err) continue if (!this.markSeen(sigInfo.signature)) continue // Already seen within TTL window try { await this.processTransaction(sigInfo.signature) } catch (e) { this.recordError(`[Monitor] Error processing tx ${sigInfo.signature}: ${e instanceof Error ? e.message : String(e)}`) } } } private async processTransaction(signature: string) { // Skip if already processing (in-flight lock) if (this.processingSigs.has(signature)) return this.processingSigs.add(signature) try { if (isTxProcessed(signature)) { return } const connection = getConnection() const tx = await connection.getParsedTransaction(signature, { commitment: 'confirmed', maxSupportedTransactionVersion: 0, }) if (!tx) return const parsed = parseTransaction(tx, this.watchedAddresses) if (!parsed) return console.log(`[Monitor] Detected ${parsed.type} from ${parsed.signer} (tx: ${signature})`) // Execute copy trade await this.copyEngine.executeCopy(parsed) } catch (e) { this.recordError(`[Monitor] Copy execution failed for ${signature}: ${e instanceof Error ? e.message : String(e)}`) } finally { this.processingSigs.delete(signature) } } }