index.ts 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. import { Logs, PublicKey } from '@solana/web3.js'
  2. import { getConnection } from '../solana/connection'
  3. import { config } from '../config'
  4. import { getEnabledWatchedAddresses, isTxProcessed } from '../db/queries'
  5. import { parseTransaction } from './parser'
  6. import type { ParsedOperation } from './types'
  7. import { CopyEngine } from '../copier/index'
  8. const BYREAL_PROGRAM_ID = new PublicKey('REALQqNEomY6cQGZJUGwywTBD2UmDT32rZcNnfxQ5N2')
  9. /** How often to reconcile watched addresses with DB (picks up add/remove/toggle) */
  10. const RECONCILE_INTERVAL_MS = 10_000
  11. /** How long to keep a signature in the dedup set (prevents re-processing) */
  12. const DEDUP_TTL_MS = 120_000
  13. let instance: MonitorService | null = null
  14. export class MonitorService {
  15. private running = false
  16. private subscriptionId: number | null = null
  17. private pollInterval: NodeJS.Timeout | null = null
  18. private reconcileInterval: NodeJS.Timeout | null = null
  19. private lastSignatures: Map<string, string> = new Map()
  20. private watchedAddresses: Set<string> = new Set()
  21. private copyEngine: CopyEngine
  22. /** Time-based dedup: signatures auto-expire after DEDUP_TTL_MS */
  23. private recentSigs: Set<string> = new Set()
  24. /** In-flight lock: prevents concurrent processing of same signature */
  25. private processingSigs: Set<string> = new Set()
  26. /** Error log with timestamps */
  27. private errors: string[] = []
  28. private constructor() {
  29. this.copyEngine = new CopyEngine()
  30. }
  31. static getInstance(): MonitorService {
  32. if (!instance) {
  33. instance = new MonitorService()
  34. }
  35. return instance
  36. }
  37. async start(): Promise<void> {
  38. if (this.running) return
  39. // Load watched addresses
  40. const addresses = getEnabledWatchedAddresses()
  41. this.watchedAddresses = new Set(addresses.map((a) => a.address))
  42. if (this.watchedAddresses.size === 0) {
  43. console.log('[Monitor] No watched addresses configured')
  44. }
  45. this.running = true
  46. this.errors = []
  47. console.log(`[Monitor] Starting with ${this.watchedAddresses.size} watched addresses`)
  48. // Snapshot current latest signature for each address so polling
  49. // only picks up transactions that occur AFTER monitor start.
  50. await this.initLastSignatures()
  51. // Start WebSocket subscription to program logs
  52. this.subscribeToLogs()
  53. // Start polling fallback
  54. this.startPolling()
  55. // Start periodic reconciliation to pick up DB changes
  56. this.startReconciliation()
  57. }
  58. async stop(): Promise<void> {
  59. if (!this.running) return
  60. this.running = false
  61. // Unsubscribe WebSocket
  62. if (this.subscriptionId !== null) {
  63. try {
  64. const connection = getConnection()
  65. await connection.removeOnLogsListener(this.subscriptionId)
  66. } catch (e) {
  67. console.error('[Monitor] Error unsubscribing:', e)
  68. }
  69. this.subscriptionId = null
  70. }
  71. // Stop polling
  72. if (this.pollInterval) {
  73. clearInterval(this.pollInterval)
  74. this.pollInterval = null
  75. }
  76. // Stop reconciliation
  77. if (this.reconcileInterval) {
  78. clearInterval(this.reconcileInterval)
  79. this.reconcileInterval = null
  80. }
  81. console.log('[Monitor] Stopped')
  82. }
  83. getStatus() {
  84. const dbAddresses = getEnabledWatchedAddresses()
  85. return {
  86. running: this.running,
  87. watchedCount: this.running ? this.watchedAddresses.size : dbAddresses.length,
  88. watchedAddresses: this.running
  89. ? Array.from(this.watchedAddresses)
  90. : dbAddresses.map((a) => a.address),
  91. errors: [...this.errors].slice(-10),
  92. }
  93. }
  94. reloadAddresses() {
  95. this.reconcileAddresses()
  96. }
  97. private recordError(msg: string) {
  98. const entry = `${new Date().toISOString()}: ${msg}`
  99. console.error(entry)
  100. this.errors.push(entry)
  101. if (this.errors.length > 50) {
  102. this.errors.splice(0, this.errors.length - 50)
  103. }
  104. }
  105. /**
  106. * Add signature to dedup set with automatic TTL expiry
  107. * @returns true if signature was new (not already in set)
  108. */
  109. private markSeen(sig: string): boolean {
  110. if (this.recentSigs.has(sig)) return false
  111. this.recentSigs.add(sig)
  112. setTimeout(() => this.recentSigs.delete(sig), DEDUP_TTL_MS)
  113. return true
  114. }
  115. /**
  116. * Reconcile watched addresses with DB (picks up add/remove/toggle changes)
  117. */
  118. private async reconcileAddresses() {
  119. const addresses = getEnabledWatchedAddresses()
  120. const newSet = new Set(addresses.map((a) => a.address))
  121. // Init last signature for newly added addresses so we don't pull history
  122. const newAddrs: string[] = []
  123. for (const addr of newSet) {
  124. if (!this.watchedAddresses.has(addr)) {
  125. console.log(`[Monitor] New address detected: ${addr.slice(0, 12)}...`)
  126. newAddrs.push(addr)
  127. }
  128. }
  129. for (const addr of this.watchedAddresses) {
  130. if (!newSet.has(addr)) {
  131. console.log(`[Monitor] Address removed/disabled: ${addr.slice(0, 12)}...`)
  132. }
  133. }
  134. // Snapshot latest sig for new addresses before they enter the poll loop
  135. if (newAddrs.length > 0) {
  136. const connection = getConnection()
  137. for (const addr of newAddrs) {
  138. try {
  139. const sigs = await connection.getSignaturesForAddress(
  140. new PublicKey(addr),
  141. { limit: 1 },
  142. 'confirmed',
  143. )
  144. if (sigs.length > 0) {
  145. this.lastSignatures.set(addr, sigs[0].signature)
  146. }
  147. } catch (e) {
  148. this.recordError(`[Monitor] Failed to init last signature for new address ${addr.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`)
  149. }
  150. }
  151. }
  152. this.watchedAddresses = newSet
  153. }
  154. private startReconciliation() {
  155. this.reconcileInterval = setInterval(() => {
  156. if (!this.running) return
  157. this.reconcileAddresses()
  158. }, RECONCILE_INTERVAL_MS)
  159. }
  160. /**
  161. * Fetch the latest signature for each watched address so the first poll
  162. * only processes transactions that happen after this point.
  163. */
  164. private async initLastSignatures() {
  165. const connection = getConnection()
  166. for (const address of this.watchedAddresses) {
  167. try {
  168. const sigs = await connection.getSignaturesForAddress(
  169. new PublicKey(address),
  170. { limit: 1 },
  171. 'confirmed',
  172. )
  173. if (sigs.length > 0) {
  174. this.lastSignatures.set(address, sigs[0].signature)
  175. }
  176. } catch (e) {
  177. this.recordError(`[Monitor] Failed to init last signature for ${address.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`)
  178. }
  179. }
  180. console.log(`[Monitor] Initialized last signatures for ${this.lastSignatures.size} addresses`)
  181. }
  182. private subscribeToLogs() {
  183. try {
  184. const connection = getConnection()
  185. this.subscriptionId = connection.onLogs(
  186. BYREAL_PROGRAM_ID,
  187. (logs: Logs) => {
  188. this.handleLog(logs)
  189. },
  190. 'confirmed',
  191. )
  192. console.log('[Monitor] WebSocket subscription active')
  193. } catch (e) {
  194. this.recordError(`[Monitor] WebSocket subscription failed: ${e instanceof Error ? e.message : String(e)}`)
  195. }
  196. }
  197. private async handleLog(logs: Logs) {
  198. if (logs.err) return
  199. const sig = logs.signature
  200. if (!this.markSeen(sig)) return // Already seen within TTL window
  201. try {
  202. await this.processTransaction(sig)
  203. } catch (e) {
  204. this.recordError(`[Monitor] Error processing tx ${sig}: ${e instanceof Error ? e.message : String(e)}`)
  205. }
  206. }
  207. private startPolling() {
  208. this.pollInterval = setInterval(async () => {
  209. if (!this.running) return
  210. for (const address of this.watchedAddresses) {
  211. try {
  212. await this.pollAddress(address)
  213. } catch (e) {
  214. this.recordError(`[Monitor] Polling error for ${address.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`)
  215. }
  216. }
  217. }, config.monitorPollInterval)
  218. }
  219. private async pollAddress(address: string) {
  220. const connection = getConnection()
  221. const lastSig = this.lastSignatures.get(address)
  222. const signatures = await connection.getSignaturesForAddress(
  223. new PublicKey(address),
  224. {
  225. limit: 10,
  226. until: lastSig,
  227. },
  228. 'confirmed',
  229. )
  230. if (signatures.length === 0) return
  231. // Update last seen signature
  232. this.lastSignatures.set(address, signatures[0].signature)
  233. // Process in chronological order (oldest first)
  234. for (const sigInfo of signatures.reverse()) {
  235. if (sigInfo.err) continue
  236. if (!this.markSeen(sigInfo.signature)) continue // Already seen within TTL window
  237. try {
  238. await this.processTransaction(sigInfo.signature)
  239. } catch (e) {
  240. this.recordError(`[Monitor] Error processing tx ${sigInfo.signature}: ${e instanceof Error ? e.message : String(e)}`)
  241. }
  242. }
  243. }
  244. private async processTransaction(signature: string) {
  245. // Skip if already processing (in-flight lock)
  246. if (this.processingSigs.has(signature)) return
  247. this.processingSigs.add(signature)
  248. try {
  249. if (isTxProcessed(signature)) {
  250. return
  251. }
  252. const connection = getConnection()
  253. const tx = await connection.getParsedTransaction(signature, {
  254. commitment: 'confirmed',
  255. maxSupportedTransactionVersion: 0,
  256. })
  257. if (!tx) return
  258. const parsed = parseTransaction(tx, this.watchedAddresses)
  259. if (!parsed) return
  260. console.log(`[Monitor] Detected ${parsed.type} from ${parsed.signer} (tx: ${signature})`)
  261. // Execute copy trade
  262. await this.copyEngine.executeCopy(parsed)
  263. } catch (e) {
  264. this.recordError(`[Monitor] Copy execution failed for ${signature}: ${e instanceof Error ? e.message : String(e)}`)
  265. } finally {
  266. this.processingSigs.delete(signature)
  267. }
  268. }
  269. }