Преглед изворни кода

feat: WebSocket 自动重连机制

- 每 30 秒检查 WebSocket 健康状态
- 如果 120 秒内没有收到任何 WS 消息,自动重新订阅
- 重连前清理旧的订阅
- status API 增加 wsReconnects 和 wsLastMessageAgo 字段
- 减少不必要的 dedup 日志噪音

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
zhangchunrui пре 1 недеља
родитељ
комит
883965a9e7
1 измењених фајлова са 58 додато и 1 уклоњено
  1. 58 1
      src/lib/monitor/index.ts

+ 58 - 1
src/lib/monitor/index.ts

@@ -14,6 +14,12 @@ const RECONCILE_INTERVAL_MS = 10_000
 /** How long to keep a signature in the dedup set (prevents re-processing) */
 const DEDUP_TTL_MS = 120_000
 
+/** How often to check WebSocket health and reconnect if needed */
+const WS_HEALTH_CHECK_INTERVAL_MS = 30_000
+
+/** How long without a WS message before considering the connection dead */
+const WS_DEAD_THRESHOLD_MS = 120_000
+
 let instance: MonitorService | null = null
 
 export class MonitorService {
@@ -21,6 +27,7 @@ export class MonitorService {
   private subscriptionId: number | null = null
   private pollInterval: NodeJS.Timeout | null = null
   private reconcileInterval: NodeJS.Timeout | null = null
+  private wsHealthInterval: NodeJS.Timeout | null = null
   private lastSignatures: Map<string, string> = new Map()
   private watchedAddresses: Set<string> = new Set()
   private copyEngine: CopyEngine
@@ -31,6 +38,10 @@ export class MonitorService {
   private processingSigs: Set<string> = new Set()
   /** Error log with timestamps */
   private errors: string[] = []
+  /** Track last WS message time for health check */
+  private lastWsMessageAt: number = 0
+  /** Count of WS reconnections */
+  private wsReconnectCount: number = 0
 
   private constructor() {
     this.copyEngine = new CopyEngine()
@@ -65,6 +76,9 @@ export class MonitorService {
     // Start WebSocket subscription to program logs
     this.subscribeToLogs()
 
+    // Start WebSocket health check & auto-reconnect
+    this.startWsHealthCheck()
+
     // Start polling fallback
     this.startPolling()
 
@@ -100,6 +114,12 @@ export class MonitorService {
       this.reconcileInterval = null
     }
 
+    // Stop WS health check
+    if (this.wsHealthInterval) {
+      clearInterval(this.wsHealthInterval)
+      this.wsHealthInterval = null
+    }
+
     console.log('[Monitor] Stopped')
   }
 
@@ -112,6 +132,10 @@ export class MonitorService {
         ? Array.from(this.watchedAddresses)
         : dbAddresses.map((a) => a.address),
       errors: [...this.errors].slice(-10),
+      wsReconnects: this.wsReconnectCount,
+      wsLastMessageAgo: this.lastWsMessageAt
+        ? Math.round((Date.now() - this.lastWsMessageAt) / 1000)
+        : null,
     }
   }
 
@@ -213,6 +237,17 @@ export class MonitorService {
   }
 
   private subscribeToLogs() {
+    // Clean up existing subscription before creating a new one
+    if (this.subscriptionId !== null) {
+      try {
+        const connection = getConnection()
+        connection.removeOnLogsListener(this.subscriptionId).catch(() => {})
+      } catch {
+        // Ignore cleanup errors
+      }
+      this.subscriptionId = null
+    }
+
     try {
       const connection = getConnection()
       this.subscriptionId = connection.onLogs(
@@ -222,18 +257,40 @@ export class MonitorService {
         },
         'confirmed',
       )
+      this.lastWsMessageAt = Date.now() // Reset timer on new subscription
       console.log('[Monitor] WebSocket subscription active')
     } catch (e) {
       this.recordError(`[Monitor] WebSocket subscription failed: ${e instanceof Error ? e.message : String(e)}`)
     }
   }
 
+  /**
+   * Periodically check if WebSocket is alive.
+   * If no message received within WS_DEAD_THRESHOLD_MS, reconnect.
+   */
+  private startWsHealthCheck() {
+    this.wsHealthInterval = setInterval(() => {
+      if (!this.running) return
+
+      const elapsed = Date.now() - this.lastWsMessageAt
+      if (this.lastWsMessageAt > 0 && elapsed > WS_DEAD_THRESHOLD_MS) {
+        this.wsReconnectCount++
+        console.warn(
+          `[Monitor] WebSocket appears dead (no message for ${Math.round(elapsed / 1000)}s), reconnecting... (attempt #${this.wsReconnectCount})`,
+        )
+        this.recordError(`[Monitor] WebSocket reconnecting (no message for ${Math.round(elapsed / 1000)}s)`)
+        this.subscribeToLogs()
+      }
+    }, WS_HEALTH_CHECK_INTERVAL_MS)
+  }
+
   private async handleLog(logs: Logs) {
+    this.lastWsMessageAt = Date.now()
+
     if (logs.err) return
 
     const sig = logs.signature
     if (!this.markSeen(sig)) {
-      console.log(`[Monitor] WS dedup skip: ${sig.slice(0, 12)}...`)
       return
     }