diff --git a/src/actions/actionmanager.js b/src/actions/actionmanager.js new file mode 100644 index 0000000..895c2cb --- /dev/null +++ b/src/actions/actionmanager.js @@ -0,0 +1,100 @@ +import log4js from 'log4js'; +import { loadConfig } from '../config.js'; +import { etcdServer } from '../database/etcd.js'; +import { generateEtcId } from '../utils.js'; + +const config = loadConfig(); + +// Setup logger +const logger = log4js.getLogger('Action Manager'); +logger.level = config.server.logLevel; + +/** + * ActionManager handles tracking object updates using Etcd and broadcasts update events via websockets. + */ +export class ActionManager { + constructor(socketClient) { + this.socketClient = socketClient; + this.callbacks = new Map(); + } + + async subscribeToObjectActions(id, objectType) { + logger.debug('Subscribing to object actions...', id, objectType); + await etcdServer.onPrefixPutEvent( + `/${objectType}s/${id}/actions`, + this.socketClient.id, + (key, value) => { + if (!value?.result) { + logger.trace('Object action:', id); + this.socketClient.socket.emit( + 'objectAction', + { + _id: id, + objectType: objectType, + action: { ...value } + }, + result => { + logger.trace('Got action result:', key); + const actionId = key.split('/').pop(); + etcdServer.setKey(`/${objectType}s/${id}/actions/${actionId}`, { + ...value, + result: { ...result } + }); + } + ); + } + } + ); + return { success: true }; + } + + async removeObjectActionsListener(id, objectType) { + await etcdServer.removePrefixWatcher( + `/${objectType}s/${id}/actions`, + this.socketClient.id, + 'put' + ); + return { success: true }; + } + + async sendObjectAction(id, objectType, action, callback) { + try { + const actionId = generateEtcId(); + this.callbacks.set(actionId, callback); + logger.trace( + `Calling action id: ${actionId}, object id: ${id}, object type: ${objectType} Action:`, + action + ); + await etcdServer.onKeyPutEvent( + `/${objectType}s/${id}/actions/${actionId}`, + this.socketClient.socketId, + async (key, value) => { + if (value.result) { + logger.trace('Calling result callback...'); + const storedCallback = this.callbacks.get(actionId); + await etcdServer.removeKeyWatcher( + `/${objectType}s/${id}/actions/${actionId}`, + this.socketClient.socketId, + 'put' + ); + await etcdServer.deleteKey( + `/${objectType}s/${id}/actions/${actionId}` + ); + storedCallback(value.result); + } + } + ); + await etcdServer.setKey( + `/${objectType}s/${id}/actions/${actionId}`, + action + ); + return true; + } catch (error) { + logger.error( + `Failed to set value for /${objectType}s/${id}/object:`, + error + ); + return false; + } + } +}