From f3a1115a0937214d4a6baa5463354c94e6337e3f Mon Sep 17 00:00:00 2001 From: Tom Butcher Date: Sat, 29 Nov 2025 01:26:55 +0000 Subject: [PATCH] Enhance ActionManager with subscription management and listener removal functionality - Introduced a Set to manage subscriptions for object actions, allowing for better tracking and cleanup of active listeners. - Added a method to remove all action listeners, improving resource management and preventing memory leaks. - Enhanced existing subscription and removal methods to ensure proper handling of callbacks and subscriptions. - Improved logging for subscription actions to aid in debugging and traceability. --- src/actions/actionmanager.js | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/src/actions/actionmanager.js b/src/actions/actionmanager.js index 5fc88b6..588421c 100644 --- a/src/actions/actionmanager.js +++ b/src/actions/actionmanager.js @@ -16,11 +16,13 @@ export class ActionManager { constructor(socketClient) { this.socketClient = socketClient; this.callbacks = new Map(); + this.subscriptions = new Set(); } async subscribeToObjectActions(id, objectType) { logger.debug('Subscribing to object actions...', id, objectType); const subject = `${objectType}s.${id}.actions`; + const subscriptionKey = `${subject}:${this.socketClient.id}`; await natsServer.subscribe( subject, @@ -47,12 +49,18 @@ export class ActionManager { } } ); + + this.subscriptions.add(subscriptionKey); return { success: true }; } async removeObjectActionsListener(id, objectType) { const subject = `${objectType}s.${id}.actions`; + const subscriptionKey = `${subject}:${this.socketClient.id}`; + await natsServer.removeSubscription(subject, this.socketClient.id); + + this.subscriptions.delete(subscriptionKey); return { success: true }; } @@ -68,22 +76,28 @@ export class ActionManager { ); // Subscribe to the response subject + const responseSubscriptionKey = `${subject}:${this.socketClient.socketId}`; await natsServer.subscribe( subject, this.socketClient.socketId, async (subject, value) => { if (value.result) { logger.trace('Calling result callback...'); - const storedCallback = this.callbacks.get(actionId); + const storedCallback = this.callbacks.get(actionId) || undefined; await natsServer.removeSubscription( subject, this.socketClient.socketId ); - storedCallback(value.result); + this.subscriptions.delete(responseSubscriptionKey); + if (storedCallback) { + storedCallback(value.result); + } } } ); + this.subscriptions.add(responseSubscriptionKey); + // Publish the action await natsServer.publish(`${objectType}s.${id}.actions`, { ...action, @@ -99,4 +113,20 @@ export class ActionManager { return false; } } + + async removeAllListeners() { + logger.debug('Removing all action listeners...'); + const removePromises = Array.from(this.subscriptions).map( + subscriptionKey => { + const [subject, socketId] = subscriptionKey.split(':'); + return natsServer.removeSubscription(subject, socketId); + } + ); + + await Promise.all(removePromises); + this.subscriptions.clear(); + this.callbacks.clear(); + logger.debug(`Removed ${removePromises.length} action listener(s)`); + return { success: true }; + } }