| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- import { Logs, PublicKey } from '@solana/web3.js'
- import { getConnection } from '../solana/connection'
- import { config } from '../config'
- import { getEnabledWatchedAddresses, isTxProcessed } from '../db/queries'
- import { parseTransaction } from './parser'
- import type { ParsedOperation } from './types'
- import { CopyEngine } from '../copier/index'
- const BYREAL_PROGRAM_ID = new PublicKey('REALQqNEomY6cQGZJUGwywTBD2UmDT32rZcNnfxQ5N2')
- /** How often to reconcile watched addresses with DB (picks up add/remove/toggle) */
- const RECONCILE_INTERVAL_MS = 10_000
- /** How long to keep a signature in the dedup set (prevents re-processing) */
- const DEDUP_TTL_MS = 120_000
- let instance: MonitorService | null = null
- export class MonitorService {
- private running = false
- private subscriptionId: number | null = null
- private pollInterval: NodeJS.Timeout | null = null
- private reconcileInterval: NodeJS.Timeout | null = null
- private lastSignatures: Map<string, string> = new Map()
- private watchedAddresses: Set<string> = new Set()
- private copyEngine: CopyEngine
- /** Time-based dedup: signatures auto-expire after DEDUP_TTL_MS */
- private recentSigs: Set<string> = new Set()
- /** In-flight lock: prevents concurrent processing of same signature */
- private processingSigs: Set<string> = new Set()
- /** Error log with timestamps */
- private errors: string[] = []
- private constructor() {
- this.copyEngine = new CopyEngine()
- }
- static getInstance(): MonitorService {
- if (!instance) {
- instance = new MonitorService()
- }
- return instance
- }
- async start(): Promise<void> {
- if (this.running) return
- // Load watched addresses
- const addresses = getEnabledWatchedAddresses()
- this.watchedAddresses = new Set(addresses.map((a) => a.address))
- if (this.watchedAddresses.size === 0) {
- console.log('[Monitor] No watched addresses configured')
- }
- this.running = true
- this.errors = []
- console.log(`[Monitor] Starting with ${this.watchedAddresses.size} watched addresses`)
- // Snapshot current latest signature for each address so polling
- // only picks up transactions that occur AFTER monitor start.
- await this.initLastSignatures()
- // Start WebSocket subscription to program logs
- this.subscribeToLogs()
- // Start polling fallback
- this.startPolling()
- // Start periodic reconciliation to pick up DB changes
- this.startReconciliation()
- }
- async stop(): Promise<void> {
- if (!this.running) return
- this.running = false
- // Unsubscribe WebSocket
- if (this.subscriptionId !== null) {
- try {
- const connection = getConnection()
- await connection.removeOnLogsListener(this.subscriptionId)
- } catch (e) {
- console.error('[Monitor] Error unsubscribing:', e)
- }
- this.subscriptionId = null
- }
- // Stop polling
- if (this.pollInterval) {
- clearInterval(this.pollInterval)
- this.pollInterval = null
- }
- // Stop reconciliation
- if (this.reconcileInterval) {
- clearInterval(this.reconcileInterval)
- this.reconcileInterval = null
- }
- console.log('[Monitor] Stopped')
- }
- getStatus() {
- const dbAddresses = getEnabledWatchedAddresses()
- return {
- running: this.running,
- watchedCount: this.running ? this.watchedAddresses.size : dbAddresses.length,
- watchedAddresses: this.running
- ? Array.from(this.watchedAddresses)
- : dbAddresses.map((a) => a.address),
- errors: [...this.errors].slice(-10),
- }
- }
- reloadAddresses() {
- this.reconcileAddresses()
- }
- private recordError(msg: string) {
- const entry = `${new Date().toISOString()}: ${msg}`
- console.error(entry)
- this.errors.push(entry)
- if (this.errors.length > 50) {
- this.errors.splice(0, this.errors.length - 50)
- }
- }
- /**
- * Add signature to dedup set with automatic TTL expiry
- * @returns true if signature was new (not already in set)
- */
- private markSeen(sig: string): boolean {
- if (this.recentSigs.has(sig)) return false
- this.recentSigs.add(sig)
- setTimeout(() => this.recentSigs.delete(sig), DEDUP_TTL_MS)
- return true
- }
- /**
- * Reconcile watched addresses with DB (picks up add/remove/toggle changes)
- */
- private async reconcileAddresses() {
- const addresses = getEnabledWatchedAddresses()
- const newSet = new Set(addresses.map((a) => a.address))
- // Init last signature for newly added addresses so we don't pull history
- const newAddrs: string[] = []
- for (const addr of newSet) {
- if (!this.watchedAddresses.has(addr)) {
- console.log(`[Monitor] New address detected: ${addr.slice(0, 12)}...`)
- newAddrs.push(addr)
- }
- }
- for (const addr of this.watchedAddresses) {
- if (!newSet.has(addr)) {
- console.log(`[Monitor] Address removed/disabled: ${addr.slice(0, 12)}...`)
- }
- }
- // Snapshot latest sig for new addresses before they enter the poll loop
- if (newAddrs.length > 0) {
- const connection = getConnection()
- for (const addr of newAddrs) {
- try {
- const sigs = await connection.getSignaturesForAddress(
- new PublicKey(addr),
- { limit: 1 },
- 'confirmed',
- )
- if (sigs.length > 0) {
- this.lastSignatures.set(addr, sigs[0].signature)
- }
- } catch (e) {
- this.recordError(`[Monitor] Failed to init last signature for new address ${addr.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`)
- }
- }
- }
- this.watchedAddresses = newSet
- }
- private startReconciliation() {
- this.reconcileInterval = setInterval(() => {
- if (!this.running) return
- this.reconcileAddresses()
- }, RECONCILE_INTERVAL_MS)
- }
- /**
- * Fetch the latest signature for each watched address so the first poll
- * only processes transactions that happen after this point.
- */
- private async initLastSignatures() {
- const connection = getConnection()
- for (const address of this.watchedAddresses) {
- try {
- const sigs = await connection.getSignaturesForAddress(
- new PublicKey(address),
- { limit: 1 },
- 'confirmed',
- )
- if (sigs.length > 0) {
- this.lastSignatures.set(address, sigs[0].signature)
- }
- } catch (e) {
- this.recordError(`[Monitor] Failed to init last signature for ${address.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`)
- }
- }
- console.log(`[Monitor] Initialized last signatures for ${this.lastSignatures.size} addresses`)
- }
- private subscribeToLogs() {
- try {
- const connection = getConnection()
- this.subscriptionId = connection.onLogs(
- BYREAL_PROGRAM_ID,
- (logs: Logs) => {
- this.handleLog(logs)
- },
- 'confirmed',
- )
- console.log('[Monitor] WebSocket subscription active')
- } catch (e) {
- this.recordError(`[Monitor] WebSocket subscription failed: ${e instanceof Error ? e.message : String(e)}`)
- }
- }
- private async handleLog(logs: Logs) {
- if (logs.err) return
- const sig = logs.signature
- if (!this.markSeen(sig)) return // Already seen within TTL window
- try {
- await this.processTransaction(sig)
- } catch (e) {
- this.recordError(`[Monitor] Error processing tx ${sig}: ${e instanceof Error ? e.message : String(e)}`)
- }
- }
- private startPolling() {
- this.pollInterval = setInterval(async () => {
- if (!this.running) return
- for (const address of this.watchedAddresses) {
- try {
- await this.pollAddress(address)
- } catch (e) {
- this.recordError(`[Monitor] Polling error for ${address.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`)
- }
- }
- }, config.monitorPollInterval)
- }
- private async pollAddress(address: string) {
- const connection = getConnection()
- const lastSig = this.lastSignatures.get(address)
- const signatures = await connection.getSignaturesForAddress(
- new PublicKey(address),
- {
- limit: 10,
- until: lastSig,
- },
- 'confirmed',
- )
- if (signatures.length === 0) return
- // Update last seen signature
- this.lastSignatures.set(address, signatures[0].signature)
- // Process in chronological order (oldest first)
- for (const sigInfo of signatures.reverse()) {
- if (sigInfo.err) continue
- if (!this.markSeen(sigInfo.signature)) continue // Already seen within TTL window
- try {
- await this.processTransaction(sigInfo.signature)
- } catch (e) {
- this.recordError(`[Monitor] Error processing tx ${sigInfo.signature}: ${e instanceof Error ? e.message : String(e)}`)
- }
- }
- }
- private async processTransaction(signature: string) {
- // Skip if already processing (in-flight lock)
- if (this.processingSigs.has(signature)) return
- this.processingSigs.add(signature)
- try {
- if (isTxProcessed(signature)) {
- return
- }
- const connection = getConnection()
- const tx = await connection.getParsedTransaction(signature, {
- commitment: 'confirmed',
- maxSupportedTransactionVersion: 0,
- })
- if (!tx) return
- const parsed = parseTransaction(tx, this.watchedAddresses)
- if (!parsed) return
- console.log(`[Monitor] Detected ${parsed.type} from ${parsed.signer} (tx: ${signature})`)
- // Execute copy trade
- await this.copyEngine.executeCopy(parsed)
- } catch (e) {
- this.recordError(`[Monitor] Copy execution failed for ${signature}: ${e instanceof Error ? e.message : String(e)}`)
- } finally {
- this.processingSigs.delete(signature)
- }
- }
- }
|