diff --git a/src/actions/actionmanager.js b/src/actions/actionmanager.js index 5fc88b6..588421c 100644 --- a/src/actions/actionmanager.js +++ b/src/actions/actionmanager.js @@ -16,11 +16,13 @@ export class ActionManager { constructor(socketClient) { this.socketClient = socketClient; this.callbacks = new Map(); + this.subscriptions = new Set(); } async subscribeToObjectActions(id, objectType) { logger.debug('Subscribing to object actions...', id, objectType); const subject = `${objectType}s.${id}.actions`; + const subscriptionKey = `${subject}:${this.socketClient.id}`; await natsServer.subscribe( subject, @@ -47,12 +49,18 @@ export class ActionManager { } } ); + + this.subscriptions.add(subscriptionKey); return { success: true }; } async removeObjectActionsListener(id, objectType) { const subject = `${objectType}s.${id}.actions`; + const subscriptionKey = `${subject}:${this.socketClient.id}`; + await natsServer.removeSubscription(subject, this.socketClient.id); + + this.subscriptions.delete(subscriptionKey); return { success: true }; } @@ -68,22 +76,28 @@ export class ActionManager { ); // Subscribe to the response subject + const responseSubscriptionKey = `${subject}:${this.socketClient.socketId}`; await natsServer.subscribe( subject, this.socketClient.socketId, async (subject, value) => { if (value.result) { logger.trace('Calling result callback...'); - const storedCallback = this.callbacks.get(actionId); + const storedCallback = this.callbacks.get(actionId) || undefined; await natsServer.removeSubscription( subject, this.socketClient.socketId ); - storedCallback(value.result); + this.subscriptions.delete(responseSubscriptionKey); + if (storedCallback) { + storedCallback(value.result); + } } } ); + this.subscriptions.add(responseSubscriptionKey); + // Publish the action await natsServer.publish(`${objectType}s.${id}.actions`, { ...action, @@ -99,4 +113,20 @@ export class ActionManager { return false; } } + + async removeAllListeners() { + logger.debug('Removing all action listeners...'); + const removePromises = Array.from(this.subscriptions).map( + subscriptionKey => { + const [subject, socketId] = subscriptionKey.split(':'); + return natsServer.removeSubscription(subject, socketId); + } + ); + + await Promise.all(removePromises); + this.subscriptions.clear(); + this.callbacks.clear(); + logger.debug(`Removed ${removePromises.length} action listener(s)`); + return { success: true }; + } }