From 8ccdc81de165feb9d4a227e5411cd5686eea51c5 Mon Sep 17 00:00:00 2001 From: Tom Butcher Date: Fri, 5 Sep 2025 23:29:47 +0100 Subject: [PATCH] Refactor ActionManager and introduce EventManager for NATS-based event handling - Replaced Etcd with NATS in ActionManager for tracking object actions and broadcasting updates. - Implemented EventManager to handle object events using NATS, including subscription and publishing functionalities. - Enhanced logging for better traceability of actions and events. --- src/actions/actionmanager.js | 60 +++++++++++++++--------------- src/events/eventmanager.js | 71 ++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 29 deletions(-) create mode 100644 src/events/eventmanager.js diff --git a/src/actions/actionmanager.js b/src/actions/actionmanager.js index 895c2cb..5fc88b6 100644 --- a/src/actions/actionmanager.js +++ b/src/actions/actionmanager.js @@ -1,6 +1,6 @@ import log4js from 'log4js'; import { loadConfig } from '../config.js'; -import { etcdServer } from '../database/etcd.js'; +import { natsServer } from '../database/nats.js'; import { generateEtcId } from '../utils.js'; const config = loadConfig(); @@ -10,7 +10,7 @@ const logger = log4js.getLogger('Action Manager'); logger.level = config.server.logLevel; /** - * ActionManager handles tracking object updates using Etcd and broadcasts update events via websockets. + * ActionManager handles tracking object updates using NATS and broadcasts update events via websockets. */ export class ActionManager { constructor(socketClient) { @@ -20,10 +20,12 @@ export class ActionManager { async subscribeToObjectActions(id, objectType) { logger.debug('Subscribing to object actions...', id, objectType); - await etcdServer.onPrefixPutEvent( - `/${objectType}s/${id}/actions`, + const subject = `${objectType}s.${id}.actions`; + + await natsServer.subscribe( + subject, this.socketClient.id, - (key, value) => { + (subject, value) => { if (!value?.result) { logger.trace('Object action:', id); this.socketClient.socket.emit( @@ -34,9 +36,9 @@ export class ActionManager { action: { ...value } }, result => { - logger.trace('Got action result:', key); - const actionId = key.split('/').pop(); - etcdServer.setKey(`/${objectType}s/${id}/actions/${actionId}`, { + logger.trace('Got action result:', subject); + const actionId = value.actionId || generateEtcId(); + natsServer.publish(`${subject}.${actionId}`, { ...value, result: { ...result } }); @@ -49,49 +51,49 @@ export class ActionManager { } async removeObjectActionsListener(id, objectType) { - await etcdServer.removePrefixWatcher( - `/${objectType}s/${id}/actions`, - this.socketClient.id, - 'put' - ); + const subject = `${objectType}s.${id}.actions`; + await natsServer.removeSubscription(subject, this.socketClient.id); return { success: true }; } async sendObjectAction(id, objectType, action, callback) { + const actionId = generateEtcId(); + const subject = `${objectType}s.${id}.actions.${actionId}`; + try { - const actionId = generateEtcId(); this.callbacks.set(actionId, callback); logger.trace( `Calling action id: ${actionId}, object id: ${id}, object type: ${objectType} Action:`, action ); - await etcdServer.onKeyPutEvent( - `/${objectType}s/${id}/actions/${actionId}`, + + // Subscribe to the response subject + await natsServer.subscribe( + subject, this.socketClient.socketId, - async (key, value) => { + async (subject, value) => { if (value.result) { logger.trace('Calling result callback...'); const storedCallback = this.callbacks.get(actionId); - await etcdServer.removeKeyWatcher( - `/${objectType}s/${id}/actions/${actionId}`, - this.socketClient.socketId, - 'put' - ); - await etcdServer.deleteKey( - `/${objectType}s/${id}/actions/${actionId}` + await natsServer.removeSubscription( + subject, + this.socketClient.socketId ); storedCallback(value.result); } } ); - await etcdServer.setKey( - `/${objectType}s/${id}/actions/${actionId}`, - action - ); + + // Publish the action + await natsServer.publish(`${objectType}s.${id}.actions`, { + ...action, + actionId: actionId + }); + return true; } catch (error) { logger.error( - `Failed to set value for /${objectType}s/${id}/object:`, + `Failed to send action for ${objectType}s.${id}.actions.${actionId}:`, error ); return false; diff --git a/src/events/eventmanager.js b/src/events/eventmanager.js new file mode 100644 index 0000000..92601a2 --- /dev/null +++ b/src/events/eventmanager.js @@ -0,0 +1,71 @@ +import log4js from 'log4js'; +import { loadConfig } from '../config.js'; +import { natsServer } from '../database/nats.js'; + +const config = loadConfig(); + +// Setup logger +const logger = log4js.getLogger('Event Manager'); +logger.level = config.server.logLevel; + +/** + * EventManager handles tracking object events using NATS and broadcasts event events via websockets. + */ +export class EventManager { + constructor(socketClient) { + this.socketClient = socketClient; + } + + async subscribeToObjectEvent(id, objectType, eventType) { + logger.debug('Subscribing to object event:', eventType, id, objectType); + await natsServer.subscribe( + `${objectType}s.${id}.events.${eventType}`, + this.socketClient.socketId, + (key, value) => { + if (!value?.result) { + logger.trace('Object event detected:', id); + this.socketClient.socket.emit('objectEvent', { + _id: id, + objectType: objectType, + event: { ...value } + }); + } + } + ); + return { success: true }; + } + + async removeObjectEventsListener(id, objectType, eventType) { + // Remove specific event subscription for this object + await natsServer.removeSubscription( + `${objectType}s.${id}.events.${eventType}`, + this.socketClient.socketId + ); + return { success: true }; + } + + async sendObjectEvent(id, objectType, event) { + const eventType = event?.type || 'unknown'; + try { + logger.trace( + `Calling event: ${eventType}, object id: ${id}, object type: ${objectType} Event:`, + event + ); + await natsServer.publish( + `${objectType}s.${id}.events.${eventType}`, + event + ); + return { success: true }; + } catch (error) { + logger.error( + `Failed to publish event for ${objectType}s.${id}.events.${eventType}:`, + error + ); + return { + error: + error?.message || + `Failed to publish event for ${objectType}s.${id}.events.${eventType}.` + }; + } + } +}