diff --git a/src/remote-device/remote-channel.ts b/src/remote-device/remote-channel.ts index dc08f743..537a7a10 100644 --- a/src/remote-device/remote-channel.ts +++ b/src/remote-device/remote-channel.ts @@ -17,6 +17,9 @@ interface DeviceData { } const HEARTBEAT_INTERVAL = 15000; +// Cap a single channel recreate so a hung await can't pin the re-entrancy guard +// true (which would silently disable the connection watchdog). +const RECREATE_TIMEOUT_MS = 30000; export class RemoteChannel { private client: SupabaseClient | null = null; @@ -35,6 +38,10 @@ export class RemoteChannel { // Track last channel state for debug logging private lastChannelState: string | null = null; + // Reconnect diagnostics + guard (see connState() / recreateChannel()) + private reconnectAttempt = 0; // recreateChannel() attempts since last success + private isRecreatingChannel = false; // a recreate is in flight (re-entrancy guard) + private _user: User | null = null; get user(): User | null { return this._user; } @@ -166,7 +173,7 @@ export class RemoteChannel { // ! Ignore silently in Initialization to reconnect after await this.createChannel().catch((error) => { - console.debug('[DEBUG] Failed to create channel, will retry after socket reconnect', error); + console.debug(`[DEBUG] Failed to create channel, will retry after socket reconnect: ${error?.message || error} — ${this.connState()}`); }); } else { @@ -206,10 +213,12 @@ export class RemoteChannel { ) .subscribe((status: string, err: any) => { // Debug: Log all subscription status events - console.debug(`[DEBUG] Channel subscription status: ${status}${err ? ' (error: ' + err + ')' : ''}`); + console.debug(`[DEBUG] Channel subscription status: ${status}${err ? ' (error: ' + (err?.message || err) + ')' : ''} — ${this.connState()}`); if (status === 'SUBSCRIBED') { - console.log('✅ Channel subscribed'); + const recovered = this.reconnectAttempt; + this.reconnectAttempt = 0; + console.log(`✅ Channel subscribed${recovered > 0 ? ` (recovered after ${recovered} attempt${recovered === 1 ? '' : 's'})` : ''}`); // Update device status on successful connection if (this.deviceId) { this.setOnlineStatus(this.deviceId, 'online').catch(e => { @@ -218,20 +227,42 @@ export class RemoteChannel { } resolve(); } else if (status === 'CHANNEL_ERROR') { - // console.error('❌ Channel subscription failed:', err); + // CHANNEL_ERROR is the only status carrying a real error message. + console.error(`❌ Channel error: ${err?.message || 'unknown'} — ${this.connState()}`); this.setOnlineStatus(this.deviceId!, 'offline'); - captureRemote('remote_channel_subscription_error', { error: err || 'Channel error' }).catch(() => { }); + captureRemote('remote_channel_subscription_error', { error: err?.message || 'Channel error' }).catch(() => { }); reject(err || new Error('Failed to initialize tool call channel subscription')); } else if (status === 'TIMED_OUT') { - console.error('⏱️ Channel subscription timed out, Reconnecting...'); + console.error(`⏱️ Channel subscription timed out, Reconnecting... — ${this.connState()}`); this.setOnlineStatus(this.deviceId!, 'offline'); - captureRemote('remote_channel_subscription_timeout', {}).catch(() => { }); + captureRemote('remote_channel_subscription_timeout', { attempt: this.reconnectAttempt }).catch(() => { }); reject(new Error('Tool call channel subscription timed out')); + } else if (status === 'CLOSED') { + // Settle the promise so an in-flight recreateChannel() can't await + // forever (which would wedge the re-entrancy guard / watchdog), and + // mark the device offline like the other degraded states. + console.warn(`⚠️ Channel closed — ${this.connState()}`); + this.setOnlineStatus(this.deviceId!, 'offline'); + reject(new Error('Tool call channel closed during subscribe')); } }); }); } + /** + * Compact connection state for logs — e.g. "socket=open(1) ch=errored attempt=3". + * readyState 1=OPEN (a 1 while joins keep failing = a half-open socket being reused), + * 3=CLOSED, '-'=no socket. Reads realtime-js internals defensively; never throws. + */ + private connState(): string { + let socket = '?'; + try { + const rt: any = (this.client as any)?.realtime; + socket = `${rt?.connectionState?.() ?? '?'}(${rt?.conn?.readyState ?? '-'})`; + } catch { /* best effort */ } + return `socket=${socket} ch=${this.channel?.state ?? '-'} attempt=${this.reconnectAttempt}`; + } + /** * Check if channel is connected, recreate if not. */ @@ -244,47 +275,90 @@ export class RemoteChannel { // Debug: Log current channel state (only if changed) if (!this.lastChannelState || this.lastChannelState !== state) { - console.debug(`[DEBUG] channel state: ${state}`); + console.debug(`[DEBUG] channel state: ${state} — ${this.connState()}`); this.lastChannelState = state; } - // Aggressive health check: Only 'joined' is considered healthy - // Any other state (joining, leaving, closed, errored, etc.) triggers recreation - if (state !== 'joined') { - captureRemote('remote_channel_state_health', { state }); + // 'joined' = healthy, 'joining' = transitional — let realtime-js's own rejoin + // backoff converge instead of tearing the channel down mid-join. (FIX: previously + // recreated on every non-joined state, which amputated that backoff.) + if (state === 'joined' || state === 'joining') return; - console.debug(`[DEBUG] ⚠️ Channel in unhealthy state '${state}' - recreating...`); - this.recreateChannel(); + // Unhealthy: closed, errored, leaving — recreate + captureRemote('remote_channel_state_health', { state, attempt: this.reconnectAttempt }); + console.debug(`[DEBUG] ⚠️ Channel in unhealthy state '${state}' - recreating... — ${this.connState()}`); + this.recreateChannel(); + } + + /** + * Run an async op but reject if it doesn't settle within `ms`, so a hung await + * can't leave isRecreatingChannel stuck true and disable the watchdog. Mirrors + * closeWithTimeout() in desktop-commander-integration.ts. + */ + private async withTimeout(op: () => Promise, ms: number, name: string): Promise { + let timer: NodeJS.Timeout | undefined; + try { + return await Promise.race([ + op(), + new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(`${name} timed out after ${ms}ms`)), ms); + }), + ]); + } finally { + if (timer) clearTimeout(timer); } } /** * Recreate the channel by destroying old one and creating fresh instance. */ - private recreateChannel(): void { + private async recreateChannel(): Promise { if (!this.client || !this.user?.id || !this.onToolCall) { console.warn('Cannot recreate channel - missing parameters'); console.debug('[DEBUG] recreateChannel() aborted - missing prerequisites'); return; } - // Destroy old channel - if (this.channel) { - console.debug('[DEBUG] Destroying old channel'); - this.client.removeChannel(this.channel); - this.channel = null; + // FIX: re-entrancy guard so a 10s health tick can't stack a second recreate + // on top of an in-flight one. + if (this.isRecreatingChannel) { + console.debug('[DEBUG] recreateChannel() skipped - already in progress'); + return; } + this.isRecreatingChannel = true; + this.reconnectAttempt++; // Create fresh channel - console.log('🔄 Recreating channel...'); - console.debug('[DEBUG] Calling createChannel() for recreation'); - this.createChannel().catch(err => { - captureRemote('remote_channel_recreate_error', { err }); - console.debug('[DEBUG] Channel recreation failed:', err.message); - - // TODO: enable only for debug mode - // console.error('Failed to recreate channel:', err); - }); + console.log(`🔄 Recreating channel... (attempt ${this.reconnectAttempt}) — ${this.connState()}`); + + try { + // Cap the whole recreate: a never-settling await (e.g. a subscribe that only + // ever emits CLOSED) must not pin isRecreatingChannel=true and silently disable + // the 10s watchdog. On timeout we reject -> catch -> finally clears the guard. + await this.withTimeout(async () => { + // Destroy old channel — AWAIT it so the channel registry empties before we + // rebuild. (The un-awaited version raced the synchronous new-channel push, so + // realtime-js never tore the socket down and a half-open one got reused.) + if (this.channel) { + console.debug('[DEBUG] Destroying old channel'); + await this.client!.removeChannel(this.channel); + this.channel = null; + } + + // FIX (core): force a brand-new WebSocket. After idle / wifi-loss the socket can + // be HALF-OPEN (readyState OPEN but dead); reusing it made every join TIME_OUT + // forever. disconnect() drops it so the next subscribe() dials a fresh one. + try { await (this.client as any).realtime?.disconnect?.(); } catch { /* best effort */ } + + console.debug('[DEBUG] Calling createChannel() for recreation'); + await this.createChannel(); + }, RECREATE_TIMEOUT_MS, 'recreateChannel'); + } catch (err: any) { + captureRemote('remote_channel_recreate_error', { errMsg: err?.message, attempt: this.reconnectAttempt }); + console.debug(`[DEBUG] Channel recreation failed: ${err?.message} — ${this.connState()}`); + } finally { + this.isRecreatingChannel = false; + } } async markCallExecuting(callId: string) { diff --git a/test/test-remote-channel-reconnect.js b/test/test-remote-channel-reconnect.js new file mode 100644 index 00000000..0d53db35 --- /dev/null +++ b/test/test-remote-channel-reconnect.js @@ -0,0 +1,273 @@ +/** + * Regression test: the device Realtime channel must recover after its underlying + * WebSocket goes half-open. + * + * A half-open socket still reports `conn.readyState` OPEN but the peer is gone + * (e.g. after idle / network loss / sleep), so joins sent over it never get a reply + * and time out. The channel must detect this and re-establish a working subscription + * rather than retrying on the dead socket indefinitely. + * + * The fake SupabaseClient below models the realtime-js semantics this depends on: + * - the socket can be half-open: `conn.readyState` stays 1 but joins TIME_OUT + * - `removeChannel()` removes from the registry on a microtask (deferred), and + * only tears the socket down once the registry is empty + * - `realtime.disconnect()` rebuilds a healthy socket + * + * Three cases: + * - control: when the dead socket is torn down before recreate, it recovers — + * proves the harness can actually observe recovery. + * - recovery: driving the health-check / recreate path after the socket goes + * half-open must end in a working subscription. + * - joining is treated as healthy (no recreate). + * + * Runs as part of `npm test` (needs `npm run build` first, which `npm test` does), + * or standalone: `node test/test-remote-channel-reconnect.js`. + */ +import assert from 'node:assert'; +import { RemoteChannel } from '../dist/remote-device/remote-channel.js'; + +// Keep telemetry from touching the network during the test. +process.env.DESKTOP_COMMANDER_DISABLE_TELEMETRY = '1'; + +// --------------------------------------------------------------------------- +// Fakes that model realtime-js socket/channel semantics relevant to reconnection +// --------------------------------------------------------------------------- + +class FakeChannel { + state = 'joining'; + joinedOnce = false; + rejoinTimer = { tries: 0 }; + constructor(topic, client) { + this.topic = topic; + this.client = client; + } + on() { + return this; + } + subscribe(cb) { + this.joinedOnce = true; + // realtime invokes the subscribe callback asynchronously + Promise.resolve().then(() => { + if (this.client.realtime.socketDead) { + this.state = 'errored'; + this.client.statusLog.push('TIMED_OUT'); + cb('TIMED_OUT'); + } else { + this.state = 'joined'; + this.client.statusLog.push('SUBSCRIBED'); + cb('SUBSCRIBED'); + } + }); + return this; + } + unsubscribe() { + this.state = 'leaving'; + return Promise.resolve({ error: null }); + } +} + +class FakeRealtime { + conn = { readyState: 1 }; // 1 = OPEN; stays OPEN even when half-open/dead + socketDead = false; // true = half-open: reads OPEN but joins TIME_OUT + reconnectTimer = { tries: 0 }; + pendingHeartbeatRef = null; + _heartbeatSentAt = null; + _manuallySetToken = true; + accessTokenValue = null; + rebuilds = 0; + + connectionState() { + return this.conn.readyState === 1 ? 'open' : 'closed'; + } + isConnected() { + return this.conn.readyState === 1; + } + /** Build a fresh, healthy socket (what a real disconnect()+reconnect yields). */ + rebuildSocket() { + this.rebuilds++; + this.conn = { readyState: 1 }; + this.socketDead = false; + } + /** The fix calls this to force a fresh WebSocket before re-subscribing. */ + disconnect() { + this.rebuildSocket(); + return Promise.resolve(); + } +} + +class FakeClient { + realtime = new FakeRealtime(); + channels = []; + statusLog = []; + + channel(topic) { + const ch = new FakeChannel(topic, this); + this.channels.push(ch); + return ch; + } + + /** + * Mirrors realtime-js: removal is DEFERRED (await unsubscribe) and the socket + * is only torn down once the registry is empty. The deferral is what races + * with the synchronous new-channel push in recreateChannel(). + */ + removeChannel(ch) { + return Promise.resolve().then(() => { + const i = this.channels.indexOf(ch); + if (i !== -1) this.channels.splice(i, 1); + ch.state = 'closed'; + if (this.channels.length === 0) this.realtime.rebuildSocket(); + }); + } + + removeAllChannels() { + return Promise.resolve().then(() => { + this.channels.length = 0; + this.realtime.rebuildSocket(); + }); + } + + // setOnlineStatus(): from('mcp_devices').update({...}).eq('id', deviceId) + from() { + const result = Promise.resolve({ error: null }); + const chain = { + update: () => chain, + insert: () => chain, + delete: () => chain, + select: () => chain, + eq: () => result, + }; + return chain; + } +} + +// --------------------------------------------------------------------------- +// Harness +// --------------------------------------------------------------------------- + +const flush = (ms = 0) => new Promise((r) => setTimeout(r, ms)); + +function makeRemoteChannel() { + const rc = new RemoteChannel(); + const client = new FakeClient(); + rc.client = client; // private at TS level; plain property at runtime + rc._user = { id: 'user-1', email: 'tester@example.com' }; + rc.onToolCall = () => {}; + rc.deviceId = 'device-1'; + return { rc, client }; +} + +/** Bring the channel up healthy, then knock the socket into a half-open state. */ +async function goHalfOpen(rc, client) { + await rc.createChannel(); // healthy subscribe -> 'joined' + assert.strictEqual(rc.channel.state, 'joined', 'precondition: channel should be joined'); + client.realtime.socketDead = true; // dead peer, but readyState stays 1 (OPEN) + rc.channel.state = 'errored'; // realtime-js flips the channel to errored on a dead socket + rc.lastChannelState = 'errored'; +} + +/** Simulate the periodic 10s health checks driving recreate, up to N times. */ +async function driveHealthChecks(rc, maxAttempts) { + for (let i = 0; i < maxAttempts; i++) { + if (rc.channel && rc.channel.state === 'joined') return true; + rc.checkConnectionHealth(); // -> recreateChannel() when unhealthy + await flush(); // let deferred removeChannel + subscribe callback run + await flush(); + } + return !!(rc.channel && rc.channel.state === 'joined'); +} + +// Silence the (intentionally verbose) diagnostic logging during the drive so +// the test output stays readable; we summarise via the fake's statusLog instead. +// Must await so console is restored only after the async callbacks have fired. +async function withQuietLogs(fn) { + const { debug, log, warn, error } = console; + console.debug = () => {}; + console.log = () => {}; + console.warn = () => {}; + console.error = () => {}; + try { + return await fn(); + } finally { + console.debug = debug; + console.log = log; + console.warn = warn; + console.error = error; + } +} + +let failures = 0; +async function test(name, fn) { + try { + await fn(); + console.log(`✅ PASS ${name}`); + } catch (e) { + failures++; + console.error(`🔴 FAIL ${name}\n ${e.message}`); + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +async function goHalfOpenThenDrive(rc, client) { + await goHalfOpen(rc, client); + return driveHealthChecks(rc, 8); +} + +async function main() { + // CONTROL: prove the harness CAN observe recovery — when the dead socket is + // actually torn down (disconnect()), the next recreate re-subscribes. + await test('control: recovers when the half-open socket is torn down before recreate', async () => { + const { rc, client } = makeRemoteChannel(); + await withQuietLogs(async () => { + await goHalfOpen(rc, client); + client.realtime.disconnect(); // simulate the fix: force a fresh socket + const recovered = await driveHealthChecks(rc, 6); + assert.strictEqual(recovered, true, 'expected recovery after socket teardown'); + }); + assert.strictEqual(client.realtime.socketDead, false); + }); + + // After the socket goes half-open, driving the health-check / recreate path must + // end in a re-established subscription rather than retrying on the dead socket. + await test('channel recovers after the socket goes half-open', async () => { + const { rc, client } = makeRemoteChannel(); + const recovered = await withQuietLogs(async () => goHalfOpenThenDrive(rc, client)); + + const reusedSocket = client.realtime.rebuilds === 0; + const readyState = client.realtime.conn.readyState; + assert.strictEqual( + recovered, + true, + `channel did not recover after the socket went half-open.\n` + + ` attempts(recreate)=${rc.reconnectAttempt} statuses=[${client.statusLog.join(', ')}]\n` + + ` socketReadyState=${readyState} reused=${reusedSocket} rebuilds=${client.realtime.rebuilds}` + ); + }); + + // 'joining' is transitional — the health check must treat it as healthy and NOT + // tear the channel down mid-join (otherwise it amputates realtime-js's own rejoin). + await test('joining is treated as healthy (no recreate)', async () => { + const { rc, client } = makeRemoteChannel(); + await withQuietLogs(async () => { + await rc.createChannel(); // -> joined + const attemptsBefore = rc.reconnectAttempt; + const rebuildsBefore = client.realtime.rebuilds; + rc.channel.state = 'joining'; // transitional, not yet joined + rc.checkConnectionHealth(); + await flush(); + await flush(); + assert.strictEqual(rc.reconnectAttempt, attemptsBefore, 'joining must not trigger a recreate'); + assert.strictEqual(client.realtime.rebuilds, rebuildsBefore, 'joining must not rebuild the socket'); + }); + }); + + console.log( + `\n${failures ? '🔴' : '✅'} remote-channel reconnect: ${failures} failing test(s).` + ); + process.exit(failures ? 1 : 0); +} + +main();