Kaynağa Gözat

fix: skip historical txs by snapshotting latest sig on monitor start

pollAddress with undefined `until` fetches the 10 most recent
historical transactions. Now we fetch each address's latest signature
before starting the poll loop, and do the same for newly added
addresses during reconciliation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
zhangchunrui 2 hafta önce
ebeveyn
işleme
8015558677
1 değiştirilmiş dosya ile 50 ekleme ve 2 silme
  1. 50 2
      src/lib/monitor/index.ts

+ 50 - 2
src/lib/monitor/index.ts

@@ -58,6 +58,10 @@ export class MonitorService {
     this.errors = []
     console.log(`[Monitor] Starting with ${this.watchedAddresses.size} watched addresses`)
 
+    // Snapshot current latest signature for each address so polling
+    // only picks up transactions that occur AFTER monitor start.
+    await this.initLastSignatures()
+
     // Start WebSocket subscription to program logs
     this.subscribeToLogs()
 
@@ -138,14 +142,16 @@ export class MonitorService {
   /**
    * Reconcile watched addresses with DB (picks up add/remove/toggle changes)
    */
-  private reconcileAddresses() {
+  private async reconcileAddresses() {
     const addresses = getEnabledWatchedAddresses()
     const newSet = new Set(addresses.map((a) => a.address))
 
-    // Log changes
+    // Init last signature for newly added addresses so we don't pull history
+    const newAddrs: string[] = []
     for (const addr of newSet) {
       if (!this.watchedAddresses.has(addr)) {
         console.log(`[Monitor] New address detected: ${addr.slice(0, 12)}...`)
+        newAddrs.push(addr)
       }
     }
     for (const addr of this.watchedAddresses) {
@@ -154,6 +160,25 @@ export class MonitorService {
       }
     }
 
+    // Snapshot latest sig for new addresses before they enter the poll loop
+    if (newAddrs.length > 0) {
+      const connection = getConnection()
+      for (const addr of newAddrs) {
+        try {
+          const sigs = await connection.getSignaturesForAddress(
+            new PublicKey(addr),
+            { limit: 1 },
+            'confirmed',
+          )
+          if (sigs.length > 0) {
+            this.lastSignatures.set(addr, sigs[0].signature)
+          }
+        } catch (e) {
+          this.recordError(`[Monitor] Failed to init last signature for new address ${addr.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`)
+        }
+      }
+    }
+
     this.watchedAddresses = newSet
   }
 
@@ -164,6 +189,29 @@ export class MonitorService {
     }, RECONCILE_INTERVAL_MS)
   }
 
+  /**
+   * Fetch the latest signature for each watched address so the first poll
+   * only processes transactions that happen after this point.
+   */
+  private async initLastSignatures() {
+    const connection = getConnection()
+    for (const address of this.watchedAddresses) {
+      try {
+        const sigs = await connection.getSignaturesForAddress(
+          new PublicKey(address),
+          { limit: 1 },
+          'confirmed',
+        )
+        if (sigs.length > 0) {
+          this.lastSignatures.set(address, sigs[0].signature)
+        }
+      } catch (e) {
+        this.recordError(`[Monitor] Failed to init last signature for ${address.slice(0, 12)}...: ${e instanceof Error ? e.message : String(e)}`)
+      }
+    }
+    console.log(`[Monitor] Initialized last signatures for ${this.lastSignatures.size} addresses`)
+  }
+
   private subscribeToLogs() {
     try {
       const connection = getConnection()