Refactor ActionManager and introduce EventManager for NATS-based event handling

- Replaced Etcd with NATS in ActionManager for tracking object actions and broadcasting updates.
- Implemented EventManager to handle object events using NATS, including subscription and publishing functionalities.
- Enhanced logging for better traceability of actions and events.
This commit is contained in:
Tom Butcher 2025-09-05 23:29:47 +01:00
parent ca78fd6e62
commit 8ccdc81de1
2 changed files with 102 additions and 29 deletions

View File

@ -1,6 +1,6 @@
import log4js from 'log4js'; import log4js from 'log4js';
import { loadConfig } from '../config.js'; import { loadConfig } from '../config.js';
import { etcdServer } from '../database/etcd.js'; import { natsServer } from '../database/nats.js';
import { generateEtcId } from '../utils.js'; import { generateEtcId } from '../utils.js';
const config = loadConfig(); const config = loadConfig();
@ -10,7 +10,7 @@ const logger = log4js.getLogger('Action Manager');
logger.level = config.server.logLevel; logger.level = config.server.logLevel;
/** /**
* ActionManager handles tracking object updates using Etcd and broadcasts update events via websockets. * ActionManager handles tracking object updates using NATS and broadcasts update events via websockets.
*/ */
export class ActionManager { export class ActionManager {
constructor(socketClient) { constructor(socketClient) {
@ -20,10 +20,12 @@ export class ActionManager {
async subscribeToObjectActions(id, objectType) { async subscribeToObjectActions(id, objectType) {
logger.debug('Subscribing to object actions...', id, objectType); logger.debug('Subscribing to object actions...', id, objectType);
await etcdServer.onPrefixPutEvent( const subject = `${objectType}s.${id}.actions`;
`/${objectType}s/${id}/actions`,
await natsServer.subscribe(
subject,
this.socketClient.id, this.socketClient.id,
(key, value) => { (subject, value) => {
if (!value?.result) { if (!value?.result) {
logger.trace('Object action:', id); logger.trace('Object action:', id);
this.socketClient.socket.emit( this.socketClient.socket.emit(
@ -34,9 +36,9 @@ export class ActionManager {
action: { ...value } action: { ...value }
}, },
result => { result => {
logger.trace('Got action result:', key); logger.trace('Got action result:', subject);
const actionId = key.split('/').pop(); const actionId = value.actionId || generateEtcId();
etcdServer.setKey(`/${objectType}s/${id}/actions/${actionId}`, { natsServer.publish(`${subject}.${actionId}`, {
...value, ...value,
result: { ...result } result: { ...result }
}); });
@ -49,49 +51,49 @@ export class ActionManager {
} }
async removeObjectActionsListener(id, objectType) { async removeObjectActionsListener(id, objectType) {
await etcdServer.removePrefixWatcher( const subject = `${objectType}s.${id}.actions`;
`/${objectType}s/${id}/actions`, await natsServer.removeSubscription(subject, this.socketClient.id);
this.socketClient.id,
'put'
);
return { success: true }; return { success: true };
} }
async sendObjectAction(id, objectType, action, callback) { async sendObjectAction(id, objectType, action, callback) {
const actionId = generateEtcId();
const subject = `${objectType}s.${id}.actions.${actionId}`;
try { try {
const actionId = generateEtcId();
this.callbacks.set(actionId, callback); this.callbacks.set(actionId, callback);
logger.trace( logger.trace(
`Calling action id: ${actionId}, object id: ${id}, object type: ${objectType} Action:`, `Calling action id: ${actionId}, object id: ${id}, object type: ${objectType} Action:`,
action action
); );
await etcdServer.onKeyPutEvent(
`/${objectType}s/${id}/actions/${actionId}`, // Subscribe to the response subject
await natsServer.subscribe(
subject,
this.socketClient.socketId, this.socketClient.socketId,
async (key, value) => { async (subject, value) => {
if (value.result) { if (value.result) {
logger.trace('Calling result callback...'); logger.trace('Calling result callback...');
const storedCallback = this.callbacks.get(actionId); const storedCallback = this.callbacks.get(actionId);
await etcdServer.removeKeyWatcher( await natsServer.removeSubscription(
`/${objectType}s/${id}/actions/${actionId}`, subject,
this.socketClient.socketId, this.socketClient.socketId
'put'
);
await etcdServer.deleteKey(
`/${objectType}s/${id}/actions/${actionId}`
); );
storedCallback(value.result); storedCallback(value.result);
} }
} }
); );
await etcdServer.setKey(
`/${objectType}s/${id}/actions/${actionId}`, // Publish the action
action await natsServer.publish(`${objectType}s.${id}.actions`, {
); ...action,
actionId: actionId
});
return true; return true;
} catch (error) { } catch (error) {
logger.error( logger.error(
`Failed to set value for /${objectType}s/${id}/object:`, `Failed to send action for ${objectType}s.${id}.actions.${actionId}:`,
error error
); );
return false; return false;

View File

@ -0,0 +1,71 @@
import log4js from 'log4js';
import { loadConfig } from '../config.js';
import { natsServer } from '../database/nats.js';
const config = loadConfig();
// Setup logger
const logger = log4js.getLogger('Event Manager');
logger.level = config.server.logLevel;
/**
* EventManager handles tracking object events using NATS and broadcasts event events via websockets.
*/
export class EventManager {
constructor(socketClient) {
this.socketClient = socketClient;
}
async subscribeToObjectEvent(id, objectType, eventType) {
logger.debug('Subscribing to object event:', eventType, id, objectType);
await natsServer.subscribe(
`${objectType}s.${id}.events.${eventType}`,
this.socketClient.socketId,
(key, value) => {
if (!value?.result) {
logger.trace('Object event detected:', id);
this.socketClient.socket.emit('objectEvent', {
_id: id,
objectType: objectType,
event: { ...value }
});
}
}
);
return { success: true };
}
async removeObjectEventsListener(id, objectType, eventType) {
// Remove specific event subscription for this object
await natsServer.removeSubscription(
`${objectType}s.${id}.events.${eventType}`,
this.socketClient.socketId
);
return { success: true };
}
async sendObjectEvent(id, objectType, event) {
const eventType = event?.type || 'unknown';
try {
logger.trace(
`Calling event: ${eventType}, object id: ${id}, object type: ${objectType} Event:`,
event
);
await natsServer.publish(
`${objectType}s.${id}.events.${eventType}`,
event
);
return { success: true };
} catch (error) {
logger.error(
`Failed to publish event for ${objectType}s.${id}.events.${eventType}:`,
error
);
return {
error:
error?.message ||
`Failed to publish event for ${objectType}s.${id}.events.${eventType}.`
};
}
}
}