Enhance ActionManager with subscription management and listener removal functionality

- Introduced a Set to manage subscriptions for object actions, allowing for better tracking and cleanup of active listeners.
- Added a method to remove all action listeners, improving resource management and preventing memory leaks.
- Enhanced existing subscription and removal methods to ensure proper handling of callbacks and subscriptions.
- Improved logging for subscription actions to aid in debugging and traceability.
This commit is contained in:
Tom Butcher 2025-11-29 01:26:55 +00:00
parent 362265da72
commit f3a1115a09

View File

@ -16,11 +16,13 @@ export class ActionManager {
constructor(socketClient) {
this.socketClient = socketClient;
this.callbacks = new Map();
this.subscriptions = new Set();
}
async subscribeToObjectActions(id, objectType) {
logger.debug('Subscribing to object actions...', id, objectType);
const subject = `${objectType}s.${id}.actions`;
const subscriptionKey = `${subject}:${this.socketClient.id}`;
await natsServer.subscribe(
subject,
@ -47,12 +49,18 @@ export class ActionManager {
}
}
);
this.subscriptions.add(subscriptionKey);
return { success: true };
}
async removeObjectActionsListener(id, objectType) {
const subject = `${objectType}s.${id}.actions`;
const subscriptionKey = `${subject}:${this.socketClient.id}`;
await natsServer.removeSubscription(subject, this.socketClient.id);
this.subscriptions.delete(subscriptionKey);
return { success: true };
}
@ -68,22 +76,28 @@ export class ActionManager {
);
// Subscribe to the response subject
const responseSubscriptionKey = `${subject}:${this.socketClient.socketId}`;
await natsServer.subscribe(
subject,
this.socketClient.socketId,
async (subject, value) => {
if (value.result) {
logger.trace('Calling result callback...');
const storedCallback = this.callbacks.get(actionId);
const storedCallback = this.callbacks.get(actionId) || undefined;
await natsServer.removeSubscription(
subject,
this.socketClient.socketId
);
storedCallback(value.result);
this.subscriptions.delete(responseSubscriptionKey);
if (storedCallback) {
storedCallback(value.result);
}
}
}
);
this.subscriptions.add(responseSubscriptionKey);
// Publish the action
await natsServer.publish(`${objectType}s.${id}.actions`, {
...action,
@ -99,4 +113,20 @@ export class ActionManager {
return false;
}
}
async removeAllListeners() {
logger.debug('Removing all action listeners...');
const removePromises = Array.from(this.subscriptions).map(
subscriptionKey => {
const [subject, socketId] = subscriptionKey.split(':');
return natsServer.removeSubscription(subject, socketId);
}
);
await Promise.all(removePromises);
this.subscriptions.clear();
this.callbacks.clear();
logger.debug(`Removed ${removePromises.length} action listener(s)`);
return { success: true };
}
}