Compare commits

..

No commits in common. "e1ba1f78715710b8bd1956af940e907eeec1cf71" and "b9c2e959b9a501a1ee09a827938edb71eb9b6dd9" have entirely different histories.

16 changed files with 255 additions and 810 deletions

4
.gitignore vendored
View File

@ -127,6 +127,4 @@ dist
.yarn/unplugged .yarn/unplugged
.yarn/build-state.yml .yarn/build-state.yml
.yarn/install-state.gz .yarn/install-state.gz
.pnp.* .pnp.*
*.DS_STORE

63
package-lock.json generated
View File

@ -9,10 +9,7 @@
"version": "1.0.0", "version": "1.0.0",
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"@nats-io/nats-core": "^3.1.0",
"@nats-io/transport-node": "^3.1.0",
"axios": "^1.11.0", "axios": "^1.11.0",
"canonical-json": "^0.2.0",
"date-fns": "^4.1.0", "date-fns": "^4.1.0",
"dayjs": "^1.11.13", "dayjs": "^1.11.13",
"dotenv": "^17.2.1", "dotenv": "^17.2.1",
@ -381,51 +378,6 @@
"sparse-bitfield": "^3.0.3" "sparse-bitfield": "^3.0.3"
} }
}, },
"node_modules/@nats-io/nats-core": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/@nats-io/nats-core/-/nats-core-3.1.0.tgz",
"integrity": "sha512-xsSkLEGGcqNF+Ru8dMjPmKtfbBeq/U4meuJJX4Zi+5TBHpjpjNjs4YkCBC/pGYWnEum1/vdNPizjE1RdNHCyBg==",
"license": "Apache-2.0",
"dependencies": {
"@nats-io/nkeys": "2.0.3",
"@nats-io/nuid": "2.0.3"
}
},
"node_modules/@nats-io/nkeys": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/@nats-io/nkeys/-/nkeys-2.0.3.tgz",
"integrity": "sha512-JVt56GuE6Z89KUkI4TXUbSI9fmIfAmk6PMPknijmuL72GcD+UgIomTcRWiNvvJKxA01sBbmIPStqJs5cMRBC3A==",
"license": "Apache-2.0",
"dependencies": {
"tweetnacl": "^1.0.3"
},
"engines": {
"node": ">=18.0.0"
}
},
"node_modules/@nats-io/nuid": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/@nats-io/nuid/-/nuid-2.0.3.tgz",
"integrity": "sha512-TpA3HEBna/qMVudy+3HZr5M3mo/L1JPofpVT4t0HkFGkz2Cn9wrlrQC8tvR8Md5Oa9//GtGG26eN0qEWF5Vqew==",
"license": "Apache-2.0",
"engines": {
"node": ">= 18.x"
}
},
"node_modules/@nats-io/transport-node": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/@nats-io/transport-node/-/transport-node-3.1.0.tgz",
"integrity": "sha512-k5pH7IOKUetwXOMraVgcB5zG0wibcHOwJJuyuY1/5Q4K0XfBJDnb/IbczP5/JJWwMYfxSL9O+46ojtdBHvHRSw==",
"license": "Apache-2.0",
"dependencies": {
"@nats-io/nats-core": "3.1.0",
"@nats-io/nkeys": "2.0.3",
"@nats-io/nuid": "2.0.3"
},
"engines": {
"node": ">= 18.0.0"
}
},
"node_modules/@nodelib/fs.scandir": { "node_modules/@nodelib/fs.scandir": {
"version": "2.1.5", "version": "2.1.5",
"resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz",
@ -1102,15 +1054,6 @@
"node": ">=6" "node": ">=6"
} }
}, },
"node_modules/canonical-json": {
"version": "0.2.0",
"resolved": "https://registry.npmjs.org/canonical-json/-/canonical-json-0.2.0.tgz",
"integrity": "sha512-xeH/NgtNA7kIuKSxopJVdXqCKWyDB79aqxQRQ9FV02fvmqW7DSnjoFyzAUrBpfbwjU6lwTYOLc+HM6KupbsfVQ==",
"license": "MIT",
"bin": {
"canonical-json": "canonical-json.js"
}
},
"node_modules/chalk": { "node_modules/chalk": {
"version": "4.1.2", "version": "4.1.2",
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
@ -6458,12 +6401,6 @@
"strip-bom": "^3.0.0" "strip-bom": "^3.0.0"
} }
}, },
"node_modules/tweetnacl": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-1.0.3.tgz",
"integrity": "sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==",
"license": "Unlicense"
},
"node_modules/type-check": { "node_modules/type-check": {
"version": "0.4.0", "version": "0.4.0",
"resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz", "resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz",

View File

@ -17,10 +17,7 @@
"author": "Tom Butcher", "author": "Tom Butcher",
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"@nats-io/nats-core": "^3.1.0",
"@nats-io/transport-node": "^3.1.0",
"axios": "^1.11.0", "axios": "^1.11.0",
"canonical-json": "^0.2.0",
"date-fns": "^4.1.0", "date-fns": "^4.1.0",
"dayjs": "^1.11.13", "dayjs": "^1.11.13",
"dotenv": "^17.2.1", "dotenv": "^17.2.1",

View File

@ -1,6 +1,6 @@
import log4js from 'log4js'; import log4js from 'log4js';
import { loadConfig } from '../config.js'; import { loadConfig } from '../config.js';
import { natsServer } from '../database/nats.js'; import { etcdServer } from '../database/etcd.js';
import { generateEtcId } from '../utils.js'; import { generateEtcId } from '../utils.js';
const config = loadConfig(); const config = loadConfig();
@ -10,7 +10,7 @@ const logger = log4js.getLogger('Action Manager');
logger.level = config.server.logLevel; logger.level = config.server.logLevel;
/** /**
* ActionManager handles tracking object updates using NATS and broadcasts update events via websockets. * ActionManager handles tracking object updates using Etcd and broadcasts update events via websockets.
*/ */
export class ActionManager { export class ActionManager {
constructor(socketClient) { constructor(socketClient) {
@ -20,12 +20,10 @@ export class ActionManager {
async subscribeToObjectActions(id, objectType) { async subscribeToObjectActions(id, objectType) {
logger.debug('Subscribing to object actions...', id, objectType); logger.debug('Subscribing to object actions...', id, objectType);
const subject = `${objectType}s.${id}.actions`; await etcdServer.onPrefixPutEvent(
`/${objectType}s/${id}/actions`,
await natsServer.subscribe(
subject,
this.socketClient.id, this.socketClient.id,
(subject, value) => { (key, value) => {
if (!value?.result) { if (!value?.result) {
logger.trace('Object action:', id); logger.trace('Object action:', id);
this.socketClient.socket.emit( this.socketClient.socket.emit(
@ -36,9 +34,9 @@ export class ActionManager {
action: { ...value } action: { ...value }
}, },
result => { result => {
logger.trace('Got action result:', subject); logger.trace('Got action result:', key);
const actionId = value.actionId || generateEtcId(); const actionId = key.split('/').pop();
natsServer.publish(`${subject}.${actionId}`, { etcdServer.setKey(`/${objectType}s/${id}/actions/${actionId}`, {
...value, ...value,
result: { ...result } result: { ...result }
}); });
@ -51,49 +49,49 @@ export class ActionManager {
} }
async removeObjectActionsListener(id, objectType) { async removeObjectActionsListener(id, objectType) {
const subject = `${objectType}s.${id}.actions`; await etcdServer.removePrefixWatcher(
await natsServer.removeSubscription(subject, this.socketClient.id); `/${objectType}s/${id}/actions`,
this.socketClient.id,
'put'
);
return { success: true }; return { success: true };
} }
async sendObjectAction(id, objectType, action, callback) { async sendObjectAction(id, objectType, action, callback) {
const actionId = generateEtcId();
const subject = `${objectType}s.${id}.actions.${actionId}`;
try { try {
const actionId = generateEtcId();
this.callbacks.set(actionId, callback); this.callbacks.set(actionId, callback);
logger.trace( logger.trace(
`Calling action id: ${actionId}, object id: ${id}, object type: ${objectType} Action:`, `Calling action id: ${actionId}, object id: ${id}, object type: ${objectType} Action:`,
action action
); );
await etcdServer.onKeyPutEvent(
// Subscribe to the response subject `/${objectType}s/${id}/actions/${actionId}`,
await natsServer.subscribe(
subject,
this.socketClient.socketId, this.socketClient.socketId,
async (subject, value) => { async (key, value) => {
if (value.result) { if (value.result) {
logger.trace('Calling result callback...'); logger.trace('Calling result callback...');
const storedCallback = this.callbacks.get(actionId); const storedCallback = this.callbacks.get(actionId);
await natsServer.removeSubscription( await etcdServer.removeKeyWatcher(
subject, `/${objectType}s/${id}/actions/${actionId}`,
this.socketClient.socketId this.socketClient.socketId,
'put'
);
await etcdServer.deleteKey(
`/${objectType}s/${id}/actions/${actionId}`
); );
storedCallback(value.result); storedCallback(value.result);
} }
} }
); );
await etcdServer.setKey(
// Publish the action `/${objectType}s/${id}/actions/${actionId}`,
await natsServer.publish(`${objectType}s.${id}.actions`, { action
...action, );
actionId: actionId
});
return true; return true;
} catch (error) { } catch (error) {
logger.error( logger.error(
`Failed to send action for ${objectType}s.${id}.actions.${actionId}:`, `Failed to set value for /${objectType}s/${id}/object:`,
error error
); );
return false; return false;

View File

@ -4,7 +4,11 @@ import jwt from 'jsonwebtoken';
import log4js from 'log4js'; import log4js from 'log4js';
// Load configuration // Load configuration
import { loadConfig } from '../config.js'; import { loadConfig } from '../config.js';
import { editObject, getObject, listObjects } from '../database/database.js'; import {
editObject,
getObject,
getObjectByFilter
} from '../database/database.js';
import { hostModel } from '../database/schemas/management/host.schema.js'; import { hostModel } from '../database/schemas/management/host.schema.js';
import { userModel } from '../database/schemas/management/user.schema.js'; import { userModel } from '../database/schemas/management/user.schema.js';
import { generateAuthCode } from '../utils.js'; import { generateAuthCode } from '../utils.js';
@ -78,16 +82,16 @@ export class KeycloakAuth {
roles: this.extractRoles(decodedToken) roles: this.extractRoles(decodedToken)
}; };
const user = await listObjects({ const user = await getObjectByFilter({
model: userModel, model: userModel,
filter: { username: decodedUser.username } filter: { username: decodedUser.username }
}); });
// Cache the verified token // Cache the verified token
const expiresAt = introspection.exp * 1000; // Convert to milliseconds const expiresAt = introspection.exp * 1000; // Convert to milliseconds
this.tokenCache.set(token, { expiresAt, user: user[0] }); this.tokenCache.set(token, { expiresAt, user });
return { valid: true, user: user[0] }; return { valid: true, user };
} catch (error) { } catch (error) {
logger.error('Token verification error:', error.message); logger.error('Token verification error:', error.message);
return { valid: false }; return { valid: false };
@ -163,14 +167,13 @@ export class CodeAuth {
async verifyOtp(otp) { async verifyOtp(otp) {
try { try {
const hosts = await listObjects({ const host = await getObjectByFilter({
model: hostModel, model: hostModel,
filter: { otp: otp }, filter: { otp: otp },
cached: false cached: false
}); });
const host = hosts[0];
if (host == undefined) { if (host == undefined) {
const error = `No host found with OTP: ${otp}`; const error = 'No host found with OTP.';
logger.warn(error); logger.warn(error);
return { valid: false, error: error }; return { valid: false, error: error };
} }
@ -200,10 +203,9 @@ export class CodeAuth {
id: id, id: id,
updateData: { authCode: generateAuthCode() } updateData: { authCode: generateAuthCode() }
}); });
logger.info('Host found with OTP:', otp);
return { valid: true, host: authCodeHost }; return { valid: true, host: authCodeHost };
} catch (error) { } catch (error) {
logger.error('OTP verification error:', error.message); logger.error('Code verification error:', error.message);
return { valid: false, error: error.message }; return { valid: false, error: error.message };
} }
} }
@ -216,6 +218,7 @@ export function createAuthMiddleware(socketUser) {
// Allow the 'authenticate' event through without checks // Allow the 'authenticate' event through without checks
logger.trace('Event:', event);
if (event === 'authenticate') { if (event === 'authenticate') {
next(); next();
return; return;

View File

@ -11,7 +11,6 @@ import {
import log4js from 'log4js'; import log4js from 'log4js';
import { loadConfig } from '../config.js'; import { loadConfig } from '../config.js';
import { userModel } from './schemas/management/user.schema.js'; import { userModel } from './schemas/management/user.schema.js';
import { jsonToCacheKey } from '../utils.js';
const config = loadConfig(); const config = loadConfig();
@ -20,31 +19,44 @@ const cacheLogger = log4js.getLogger('Local Cache');
logger.level = config.server.logLevel; logger.level = config.server.logLevel;
cacheLogger.level = config.server.logLevel; cacheLogger.level = config.server.logLevel;
const objectCache = new NodeCache({ const modelCaches = new Map();
stdTTL: 30, // 30 sec expiration
checkperiod: 600, // 30 sec periodic cleanup
useClones: false // Don't clone objects for better performance
});
const listCache = new NodeCache({ const listCache = new NodeCache({
stdTTL: 30, // 30 sec expiration stdTTL: 30, // 30 sec expiration
checkperiod: 600, // 30 sec periodic cleanup checkperiod: 600, // 30 sec periodic cleanup
useClones: false // Don't clone objects for better performance useClones: false // Don't clone objects for better performance
}); });
export const retrieveObjectCache = ({ model, id, populate = [] }) => { function getModelCache(model) {
const cacheKeyObject = { const modelName = model.modelName;
const modelCache = modelCaches.get(modelName);
if (modelCache == undefined) {
logger.trace('Creating new model cache...');
const newModelCache = new NodeCache({
stdTTL: 30, // 30 sec expiration
checkperiod: 30, // 30 sec periodic cleanup
useClones: false // Don't clone objects for better performance
});
modelCaches.set(modelName, newModelCache);
return newModelCache;
}
logger.trace('Getting model cache...');
return modelCache;
}
export const retrieveObjectCache = ({ model, id }) => {
cacheLogger.trace('Retrieving:', {
model: model.modelName, model: model.modelName,
id, id
populate });
}; const modelCache = getModelCache(model);
const cacheKey = jsonToCacheKey(cacheKeyObject); const cachedObject = modelCache.get(id);
cacheLogger.trace('Retrieving:');
const cachedObject = objectCache.get(cacheKey);
if (cachedObject == undefined) { if (cachedObject == undefined) {
cacheLogger.trace('Miss:', cacheKeyObject); cacheLogger.trace('Miss:', {
model: model.modelName,
id
});
return undefined; return undefined;
} }
@ -56,36 +68,25 @@ export const retrieveObjectCache = ({ model, id, populate = [] }) => {
return cachedObject; return cachedObject;
}; };
export const retrieveListCache = ({ export const retrieveObjectsCache = ({ model }) => {
model, cacheLogger.trace('Retrieving:', {
populate = [], model: model.modelName
filter = {}, });
sort = '', const modelCache = getModelCache(model);
order = 'ascend',
project = {}
}) => {
const cacheKeyObject = {
model: model.modelName,
id,
populate,
filter,
sort,
project,
order
};
cacheLogger.trace('Retrieving:', cacheKeyObject); const modelCacheKeys = modelCache.keys();
const cacheKey = jsonToCacheKey(cacheKeyObject); const cachedList = listCache.get(model.modelName);
const cachedList = listCache.get(cacheKey); if (cachedList == true) {
const cachedObjects = modelCacheKeys.map(key => modelCache.get(key));
if (cachedList != undefined) {
cacheLogger.trace('Hit:', { cacheLogger.trace('Hit:', {
...cacheKeyObject, model: model.modelName,
length: cachedList.length length: cachedObjects.length
}); });
return cachedList;
return cachedObjects;
} }
cacheLogger.trace('Miss:', { cacheLogger.trace('Miss:', {
@ -94,23 +95,21 @@ export const retrieveListCache = ({
return undefined; return undefined;
}; };
export const updateObjectCache = ({ model, id, object, populate = [] }) => { export const updateObjectCache = ({ model, id, object }) => {
const cacheKeyObject = { cacheLogger.trace('Updating:', {
model: model.modelName, model: model.modelName,
id, id
populate });
}; const modelCache = getModelCache(model);
const cachedObject = modelCache.get(id) || {};
const cacheKey = jsonToCacheKey(cacheKeyObject);
cacheLogger.trace('Updating:', cacheKeyObject);
const cachedObject = objectCache.get(cacheKey) || {};
const mergedObject = _.merge(cachedObject, object); const mergedObject = _.merge(cachedObject, object);
objectCache.set(cacheKey, mergedObject); modelCache.set(id, mergedObject);
cacheLogger.trace('Updated:', { ...cacheKeyObject }); cacheLogger.trace('Updated:', {
model: model.modelName,
id
});
return mergedObject; return mergedObject;
}; };
@ -131,39 +130,29 @@ export const deleteObjectCache = ({ model, id }) => {
return mergedObject; return mergedObject;
}; };
export const updateListCache = ({ export const updateObjectsCache = ({ model, objects }) => {
model,
objects,
populate = [],
filter = {},
sort = '',
order = 'ascend',
project = {}
}) => {
const cacheKeyObject = {
model: model.modelName,
populate,
filter,
sort,
project,
order
};
cacheLogger.trace('Updating:', { cacheLogger.trace('Updating:', {
...cacheKeyObject, model: model.modelName,
length: objects.length length: objects.length
}); });
const modelCache = getModelCache(model);
const cacheKey = jsonToCacheKey(cacheKeyObject); objects.forEach(object => {
const cachedObject = modelCache.get(object._id) || {};
listCache.set(cacheKey, objects); const mergedObject = _.merge(cachedObject, object);
modelCache.set(object._id, mergedObject);
});
listCache.set(model.modelName, true);
cacheLogger.trace('Updated:', { cacheLogger.trace('Updated:', {
...cacheKeyObject, model: model.modelName,
length: objects.length length: objects.length
}); });
return objects; return mergedObject;
}; };
// Reusable function to list objects with aggregation, filtering, search, sorting, and pagination // Reusable function to list objects with aggregation, filtering, search, sorting, and pagination
@ -173,29 +162,27 @@ export const listObjects = async ({
filter = {}, filter = {},
sort = '', sort = '',
order = 'ascend', order = 'ascend',
project = {}, // optional: override default projection project, // optional: override default projection
cached = false cached = false
}) => { }) => {
try { try {
logger.trace('Listing objects:', { logger.trace('Listing objects:', {
model, model,
populate, populate,
page,
limit,
filter, filter,
sort, sort,
order, order,
project, project,
cached cache
}); });
var cacheKey = undefined;
var modelCache = getModelCache(model);
if (cached == true) { if (cached == true) {
const objectsCache = retrieveObjectsCache({ const objectsCache = retrieveObjectsCache({ model });
model,
populate,
filter,
sort,
order,
project
});
if (objectsCache != undefined) { if (objectsCache != undefined) {
return objectsCache; return objectsCache;
} }
@ -223,7 +210,7 @@ export const listObjects = async ({
let query = model.find(filter).sort({ [sort]: sortOrder }); let query = model.find(filter).sort({ [sort]: sortOrder });
// Handle populate (array or single value) // Handle populate (array or single value)
if (populate.length > 0) { if (populate) {
if (Array.isArray(populate)) { if (Array.isArray(populate)) {
for (const pop of populate) { for (const pop of populate) {
query = query.populate(pop); query = query.populate(pop);
@ -234,7 +221,7 @@ export const listObjects = async ({
} }
// Handle select (projection) // Handle select (projection)
if (project != {}) { if (project) {
query = query.select(project); query = query.select(project);
} }
@ -244,25 +231,18 @@ export const listObjects = async ({
const finalResult = expandObjectIds(queryResult); const finalResult = expandObjectIds(queryResult);
updateListCache({ updateObjectsCache({ model, objects });
model,
objects: finalResult,
populate,
filter,
sort,
order,
project
});
logger.trace('Retreived from database:', { logger.trace('Retreived from database:', {
model, model,
populate, populate,
page,
limit,
filter, filter,
sort, sort,
order, order,
project, project,
cached, cache
length: finalResult.length
}); });
return finalResult; return finalResult;
} catch (error) { } catch (error) {
@ -272,12 +252,7 @@ export const listObjects = async ({
}; };
// Reusable function to get a single object by ID // Reusable function to get a single object by ID
export const getObject = async ({ export const getObject = async ({ model, id, populate, cached = false }) => {
model,
id,
populate = [],
cached = false
}) => {
try { try {
logger.trace('Getting object:', { logger.trace('Getting object:', {
model, model,
@ -286,7 +261,7 @@ export const getObject = async ({
}); });
if (cached == true) { if (cached == true) {
const cachedObject = retrieveObjectCache({ model, id, populate }); const cachedObject = retrieveObjectCache({ model, id });
if (cachedObject != undefined) { if (cachedObject != undefined) {
return cachedObject; return cachedObject;
} }
@ -324,14 +299,63 @@ export const getObject = async ({
updateObjectCache({ updateObjectCache({
model: model, model: model,
id: finalResult._id.toString(), id: finalResult._id.toString(),
populate,
object: finalResult object: finalResult
}); });
return finalResult; return finalResult;
} catch (error) { } catch (error) {
logger.error('An error retreiving object:', error.message); logger.error('An error retreiving object:', error.message);
throw error; return undefined;
}
};
// Reusable function to get a single object by ID
export const getObjectByFilter = async ({ model, filter, populate }) => {
try {
logger.trace('Getting object:', {
model,
filter,
populate
});
let query = model.findOne(filter).lean();
// Handle populate (array or single value)
if (populate) {
if (Array.isArray(populate)) {
for (const pop of populate) {
query = query.populate(pop);
}
} else if (typeof populate === 'string' || typeof populate === 'object') {
query = query.populate(populate);
}
}
const finalResult = await query;
if (!finalResult) {
logger.warn('Object not found in database:', {
model,
filter,
populate
});
return undefined;
}
logger.trace('Retreived object from database:', {
model,
filter,
populate
});
updateObjectCache({
model: model,
id: finalResult._id.toString(),
object: finalResult
});
return finalResult;
} catch (error) {
logger.error('An error retreiving object:', error.message);
return undefined; return undefined;
} }
}; };
@ -343,7 +367,7 @@ export const editObject = async ({
updateData, updateData,
owner = undefined, owner = undefined,
ownerType = undefined, ownerType = undefined,
populate = [] populate
}) => { }) => {
try { try {
// Determine parentType from model name // Determine parentType from model name

View File

@ -1,307 +0,0 @@
import { connect } from '@nats-io/transport-node';
import log4js from 'log4js';
import { loadConfig } from '../config.js';
const config = loadConfig();
const logger = log4js.getLogger('Nats');
logger.level = config.server.logLevel;
class NatsServer {
constructor() {
this.client = null;
this.subscriptions = new Map(); // subject → { subscription, callbacks }
this.requestHandlers = new Map(); // subject → { handler, callbacks }
this.queuedSubscriptions = new Map(); // subject → { subscription, callbacks, queue }
const natsConfig = config.database?.nats || config.database; // fallback for production config
const host = natsConfig.host || 'localhost';
const port = natsConfig.port || 4222;
this.servers = [`nats://${host}:${port}`];
this.textEncoder = new TextEncoder();
this.textDecoder = new TextDecoder();
logger.trace(`NatsServer: servers set to ${JSON.stringify(this.servers)}`);
}
async connect() {
if (!this.client) {
logger.info('Connecting to NATS...');
logger.trace(
`Creating NATS client with servers ${JSON.stringify(this.servers)}`
);
try {
this.client = await connect({
servers: this.servers,
reconnect: true,
maxReconnectAttempts: -1, // unlimited reconnects
reconnectTimeWait: 1000,
timeout: 20000
});
// Test connection by checking if client is connected
try {
if (this.client.isClosed()) {
throw new Error('NATS client connection failed');
}
logger.trace('NATS client connected successfully.');
} catch (error) {
throw error;
}
} catch (error) {
logger.error('Failed to connect to NATS:', error);
throw error;
}
} else {
logger.trace('NATS client already exists, skipping connection.');
}
return this.client;
}
async getClient() {
if (!this.client) {
logger.trace('No client found, calling connect().');
await this.connect();
}
return this.client;
}
async publish(subject, data) {
const client = await this.getClient();
const payload = typeof data === 'string' ? data : JSON.stringify(data);
try {
client.publish(subject, this.textEncoder.encode(payload));
logger.trace(`Published to subject: ${subject}, data: ${payload}`);
return { success: true };
} catch (error) {
logger.error(`Failed to publish to subject ${subject}:`, error);
throw error;
}
}
async request(subject, data, timeout = 30000) {
const client = await this.getClient();
const payload = typeof data === 'string' ? data : JSON.stringify(data);
try {
const response = await client.request(
subject,
this.textEncoder.encode(payload),
{
timeout: timeout
}
);
const responseData = this.textDecoder.decode(response.data);
logger.trace(`Request to subject: ${subject}, response: ${responseData}`);
// Try to parse as JSON, fallback to string
try {
return JSON.parse(responseData);
} catch {
return responseData;
}
} catch (error) {
if (error.code === 'TIMEOUT') {
logger.trace(`Request timeout for subject: ${subject}`);
return null;
}
throw error;
}
}
async subscribe(subject, owner, callback) {
const client = await this.getClient();
const subscriptionKey = subject;
if (this.subscriptions.has(subscriptionKey)) {
this.subscriptions.get(subscriptionKey).callbacks.set(owner, callback);
logger.trace(
`Added subscription callback for owner=${owner} on subject=${subject}`
);
return { success: true };
}
logger.trace(`Creating new subscription for subject: ${subject}`);
const subscription = client.subscribe(subject);
const callbacks = new Map();
callbacks.set(owner, callback);
(async () => {
for await (const msg of subscription) {
logger.trace(`Message received on subject: ${subject}`);
const data = this.textDecoder.decode(msg.data);
let parsedData;
try {
parsedData = JSON.parse(data);
} catch {
parsedData = data;
}
for (const [ownerId, cb] of callbacks) {
try {
cb(subject, parsedData, msg);
} catch (err) {
logger.error(
`Error in subscription callback for owner=${ownerId}, subject=${subject}:`,
err
);
}
}
}
})().catch(err => {
logger.error(`Subscription error for subject ${subject}:`, err);
});
this.subscriptions.set(subscriptionKey, { subscription, callbacks });
return { success: true };
}
async setRequestHandler(subject, owner, handler) {
const client = await this.getClient();
const handlerKey = subject;
if (this.requestHandlers.has(handlerKey)) {
this.requestHandlers.get(handlerKey).callbacks.set(owner, handler);
logger.trace(
`Added request handler for owner=${owner} on subject=${subject}`
);
return { success: true };
}
logger.trace(`Creating new request handler for subject: ${subject}`);
const subscription = client.subscribe(subject);
const callbacks = new Map();
callbacks.set(owner, handler);
(async () => {
for await (const msg of subscription) {
logger.trace(`Request received on subject: ${subject}`);
const data = this.textDecoder.decode(msg.data);
let parsedData;
try {
parsedData = JSON.parse(data);
} catch {
parsedData = data;
}
for (const [ownerId, cb] of callbacks) {
try {
const response = await cb(subject, parsedData, msg);
const responsePayload =
typeof response === 'string'
? response
: JSON.stringify(response);
msg.respond(this.textEncoder.encode(responsePayload));
} catch (err) {
logger.error(
`Error in request handler for owner=${ownerId}, subject=${subject}:`,
err
);
// Send error response
msg.respond(
this.textEncoder.encode(JSON.stringify({ error: err.message }))
);
}
}
}
})().catch(err => {
logger.error(`Request handler error for subject ${subject}:`, err);
});
this.requestHandlers.set(handlerKey, { subscription, callbacks });
return { success: true };
}
async removeSubscription(subject, owner) {
const entry = this.subscriptions.get(subject);
if (!entry) {
logger.trace(`Subscription not found for subject: ${subject}`);
return false;
}
if (entry.callbacks.delete(owner)) {
logger.trace(
`Removed subscription callback for owner: ${owner} on subject: ${subject}`
);
} else {
logger.trace(
`No subscription callback found for owner: ${owner} on subject: ${subject}`
);
}
if (entry.callbacks.size === 0) {
logger.trace(`No callbacks left, stopping subscription for ${subject}`);
entry.subscription.unsubscribe();
this.subscriptions.delete(subject);
}
return true;
}
async removeRequestHandler(subject, owner) {
const entry = this.requestHandlers.get(subject);
if (!entry) {
logger.trace(`Request handler not found for subject: ${subject}`);
return false;
}
if (entry.callbacks.delete(owner)) {
logger.trace(
`Removed request handler for owner: ${owner} on subject: ${subject}`
);
} else {
logger.trace(
`No request handler found for owner: ${owner} on subject: ${subject}`
);
}
if (entry.callbacks.size === 0) {
logger.trace(`No handlers left, stopping request handler for ${subject}`);
entry.subscription.unsubscribe();
this.requestHandlers.delete(subject);
}
return true;
}
async disconnect() {
logger.info('Disconnecting from NATS...');
// Stop all subscriptions
for (const [subject, entry] of this.subscriptions) {
logger.trace(`Stopping subscription: ${subject}`);
entry.subscription.unsubscribe();
}
this.subscriptions.clear();
// Stop all queued subscriptions
for (const [key, entry] of this.queuedSubscriptions) {
logger.trace(`Stopping queued subscription: ${key}`);
entry.subscription.unsubscribe();
}
this.queuedSubscriptions.clear();
// Stop all request handlers
for (const [subject, entry] of this.requestHandlers) {
logger.trace(`Stopping request handler: ${subject}`);
entry.subscription.unsubscribe();
}
this.requestHandlers.clear();
if (this.client) {
await this.client.close();
this.client = null;
logger.info('Disconnected from NATS');
}
}
}
const natsServer = new NatsServer();
export { NatsServer, natsServer };

View File

@ -16,8 +16,7 @@ const moonrakerSchema = new Schema(
const alertSchema = new Schema( const alertSchema = new Schema(
{ {
priority: { type: String, required: true }, // order to show priority: { type: String, required: true }, // order to show
type: { type: String, required: true }, // selectFilament, error, info, message type: { type: String, required: true } // selectFilament, error, info, message,
message: { type: String, required: false }
}, },
{ timestamps: true, _id: false } { timestamps: true, _id: false }
); );

View File

@ -1,6 +1,6 @@
import { ObjectId } from 'mongodb'; import { ObjectId } from 'mongodb';
import { auditLogModel } from './schemas/management/auditlog.schema.js'; import { auditLogModel } from './schemas/management/auditlog.schema.js';
import { natsServer } from './nats.js'; import { etcdServer } from './etcd.js';
function parseFilter(property, value) { function parseFilter(property, value) {
if (typeof value === 'string') { if (typeof value === 'string') {
@ -411,11 +411,11 @@ async function getAuditLogs(idOrIds) {
} }
async function distributeUpdate(value, id, type) { async function distributeUpdate(value, id, type) {
await natsServer.publish(`${type}s.${id}.object`, value); await etcdServer.setKey(`/${type}s/${id}/object`, value);
} }
async function distributeNew(id, type) { async function distributeNew(id, type) {
await natsServer.publish(`${type}s.new`, id); await etcdServer.setKey(`/${type}s/new`, id);
} }
function flatternObjectIds(object) { function flatternObjectIds(object) {

View File

@ -1,71 +0,0 @@
import log4js from 'log4js';
import { loadConfig } from '../config.js';
import { natsServer } from '../database/nats.js';
const config = loadConfig();
// Setup logger
const logger = log4js.getLogger('Event Manager');
logger.level = config.server.logLevel;
/**
* EventManager handles tracking object events using NATS and broadcasts event events via websockets.
*/
export class EventManager {
constructor(socketClient) {
this.socketClient = socketClient;
}
async subscribeToObjectEvent(id, objectType, eventType) {
logger.debug('Subscribing to object event:', eventType, id, objectType);
await natsServer.subscribe(
`${objectType}s.${id}.events.${eventType}`,
this.socketClient.socketId,
(key, value) => {
if (!value?.result) {
logger.trace('Object event detected:', id);
this.socketClient.socket.emit('objectEvent', {
_id: id,
objectType: objectType,
event: { ...value }
});
}
}
);
return { success: true };
}
async removeObjectEventsListener(id, objectType, eventType) {
// Remove specific event subscription for this object
await natsServer.removeSubscription(
`${objectType}s.${id}.events.${eventType}`,
this.socketClient.socketId
);
return { success: true };
}
async sendObjectEvent(id, objectType, event) {
const eventType = event?.type || 'unknown';
try {
logger.trace(
`Calling event: ${eventType}, object id: ${id}, object type: ${objectType} Event:`,
event
);
await natsServer.publish(
`${objectType}s.${id}.events.${eventType}`,
event
);
return { success: true };
} catch (error) {
logger.error(
`Failed to publish event for ${objectType}s.${id}.events.${eventType}:`,
error
);
return {
error:
error?.message ||
`Failed to publish event for ${objectType}s.${id}.events.${eventType}.`
};
}
}
}

View File

@ -1,7 +1,6 @@
import { loadConfig } from './config.js'; import { loadConfig } from './config.js';
import { SocketManager } from './socket/socketmanager.js'; import { SocketManager } from './socket/socketmanager.js';
import { etcdServer } from './database/etcd.js'; import { etcdServer } from './database/etcd.js';
import { natsServer } from './database/nats.js';
import express from 'express'; import express from 'express';
import log4js from 'log4js'; import log4js from 'log4js';
import http from 'http'; import http from 'http';
@ -30,15 +29,6 @@ import { mongoServer } from './database/mongo.js';
throw err; throw err;
} }
// Connect to NATS (await)
try {
await natsServer.connect();
logger.info('Connected to NATS');
} catch (err) {
logger.error('Failed to connect to NATS:', err);
throw err;
}
// Connect to Mongo DB (await) // Connect to Mongo DB (await)
try { try {
await mongoServer.connect(); await mongoServer.connect();

View File

@ -2,12 +2,11 @@ import log4js from 'log4js';
// Load configuration // Load configuration
import { loadConfig } from '../config.js'; import { loadConfig } from '../config.js';
import { CodeAuth, createAuthMiddleware } from '../auth/auth.js'; import { CodeAuth, createAuthMiddleware } from '../auth/auth.js';
import { editObject, getObject, listObjects } from '../database/database.js'; import { editObject, getObject } from '../database/database.js';
import { hostModel } from '../database/schemas/management/host.schema.js'; import { hostModel } from '../database/schemas/management/host.schema.js';
import { UpdateManager } from '../updates/updatemanager.js'; import { UpdateManager } from '../updates/updatemanager.js';
import { ActionManager } from '../actions/actionmanager.js'; import { ActionManager } from '../actions/actionmanager.js';
import { getModelByName } from '../utils.js'; import { getModelByName } from '../utils.js';
import { EventManager } from '../events/eventmanager.js';
const config = loadConfig(); const config = loadConfig();
@ -24,7 +23,6 @@ export class SocketHost {
this.socketManager = socketManager; this.socketManager = socketManager;
this.updateManager = new UpdateManager(this); this.updateManager = new UpdateManager(this);
this.actionManager = new ActionManager(this); this.actionManager = new ActionManager(this);
this.eventManager = new EventManager(this);
this.codeAuth = new CodeAuth(); this.codeAuth = new CodeAuth();
this.setupSocketEventHandlers(); this.setupSocketEventHandlers();
} }
@ -34,13 +32,6 @@ export class SocketHost {
this.socket.on('authenticate', this.handleAuthenticate.bind(this)); this.socket.on('authenticate', this.handleAuthenticate.bind(this));
this.socket.on('updateHost', this.handleUpdateHost.bind(this)); this.socket.on('updateHost', this.handleUpdateHost.bind(this));
this.socket.on('getObject', this.handleGetObject.bind(this)); this.socket.on('getObject', this.handleGetObject.bind(this));
this.socket.on('editObject', this.handleEditObject.bind(this));
this.socket.on('listObjects', this.handleListObjects.bind(this));
this.socket.on(
'subscribeToObjectActions',
this.handleSubscribeToObjectActions.bind(this)
);
this.socket.on('objectEvent', this.handleObjectEventEvent.bind(this));
this.socket.on('disconnect', this.handleDisconnect.bind(this)); this.socket.on('disconnect', this.handleDisconnect.bind(this));
} }
@ -49,23 +40,6 @@ export class SocketHost {
} }
async handleAuthenticate(data, callback) { async handleAuthenticate(data, callback) {
const setHostOnline = async verifyResult => {
logger.info('Host authenticated and valid.');
this.host = verifyResult.host;
this.id = this.host._id.toString();
this.authenticated = true;
await editObject({
model: hostModel,
id: this.host._id,
updateData: {
online: true,
state: { type: 'online' },
connectedAt: new Date()
},
owner: this.host,
ownerType: 'host'
});
};
logger.trace('handleAuthenticateEvent'); logger.trace('handleAuthenticateEvent');
const id = data.id || undefined; const id = data.id || undefined;
const authCode = data.authCode || undefined; const authCode = data.authCode || undefined;
@ -75,7 +49,17 @@ export class SocketHost {
logger.info('Authenticating host with id + authCode...'); logger.info('Authenticating host with id + authCode...');
const verifyResult = await this.codeAuth.verifyCode(id, authCode); const verifyResult = await this.codeAuth.verifyCode(id, authCode);
if (verifyResult.valid == true) { if (verifyResult.valid == true) {
await setHostOnline(verifyResult); logger.info('Host authenticated and valid.');
this.host = verifyResult.host;
this.id = this.host._id.toString();
this.authenticated = true;
await editObject({
model: hostModel,
id: this.host._id,
updateData: { online: true, state: { type: 'online' } },
owner: this.host,
ownerType: 'host'
});
await this.initializeHost(); await this.initializeHost();
} }
callback(verifyResult); callback(verifyResult);
@ -87,8 +71,8 @@ export class SocketHost {
const verifyResult = await this.codeAuth.verifyOtp(otp); const verifyResult = await this.codeAuth.verifyOtp(otp);
if (verifyResult.valid == true) { if (verifyResult.valid == true) {
logger.info('Host authenticated and valid.'); logger.info('Host authenticated and valid.');
await setHostOnline(verifyResult); this.host = verifyResult.host;
await this.initializeHost(); this.authenticated = true;
} }
callback(verifyResult); callback(verifyResult);
return; return;
@ -107,18 +91,6 @@ export class SocketHost {
}); });
} }
async handleEditObject(data, callback) {
const object = await editObject({
model: getModelByName(data.objectType),
id: data._id,
updateData: data.updateData,
populate: data.populate,
owner: this.host,
ownerType: 'host'
});
callback(object);
}
async handleGetObject(data, callback) { async handleGetObject(data, callback) {
const object = await getObject({ const object = await getObject({
model: getModelByName(data.objectType), model: getModelByName(data.objectType),
@ -129,45 +101,12 @@ export class SocketHost {
callback(object); callback(object);
} }
async handleListObjects(data, callback) {
const object = await listObjects({
model: getModelByName(data.objectType),
id: data._id,
cached: data?.cached,
populate: data?.populate,
filter: data?.filter,
project: data?.project,
sort: data?.sort,
order: data?.order
});
callback(object);
}
async handleObjectEventEvent(data) {
await this.eventManager.sendObjectEvent(
data._id,
data.objectType,
data.event
);
}
async handleSubscribeToObjectActions(data) {
await this.actionManager.subscribeToObjectActions(
data._id,
data.objectType
);
}
async handleDisconnect() { async handleDisconnect() {
if (this.authenticated) { if (this.authenticated) {
await editObject({ await editObject({
model: hostModel, model: hostModel,
id: this.host._id, id: this.host._id,
updateData: { updateData: { online: false, state: { type: 'offline' } },
online: false,
state: { type: 'offline' },
connectedAt: null
},
owner: this.host, owner: this.host,
ownerType: 'host' ownerType: 'host'
}); });

View File

@ -6,7 +6,6 @@ import { generateHostOTP } from '../utils.js';
import { LockManager } from '../lock/lockmanager.js'; import { LockManager } from '../lock/lockmanager.js';
import { UpdateManager } from '../updates/updatemanager.js'; import { UpdateManager } from '../updates/updatemanager.js';
import { ActionManager } from '../actions/actionmanager.js'; import { ActionManager } from '../actions/actionmanager.js';
import { EventManager } from '../events/eventmanager.js';
const config = loadConfig(); const config = loadConfig();
@ -24,7 +23,6 @@ export class SocketUser {
this.lockManager = new LockManager(this); this.lockManager = new LockManager(this);
this.updateManager = new UpdateManager(this); this.updateManager = new UpdateManager(this);
this.actionManager = new ActionManager(this); this.actionManager = new ActionManager(this);
this.eventManager = new EventManager(this);
this.templateManager = socketManager.templateManager; this.templateManager = socketManager.templateManager;
this.keycloakAuth = new KeycloakAuth(); this.keycloakAuth = new KeycloakAuth();
this.setupSocketEventHandlers(); this.setupSocketEventHandlers();
@ -44,22 +42,6 @@ export class SocketUser {
'subscribeToObjectUpdate', 'subscribeToObjectUpdate',
this.handleSubscribeToObjectUpdateEvent.bind(this) this.handleSubscribeToObjectUpdateEvent.bind(this)
); );
this.socket.on(
'subscribeToObjectEvent',
this.handleSubscribeToObjectEventEvent.bind(this)
);
this.socket.on(
'unsubscribeObjectTypeUpdate',
this.handleUnsubscribeToObjectTypeUpdateEvent.bind(this)
);
this.socket.on(
'unsubscribeObjectUpdate',
this.handleUnsubscribeToObjectUpdateEvent.bind(this)
);
this.socket.on(
'unsubscribeObjectEvent',
this.handleUnsubscribeObjectEventEvent.bind(this)
);
this.socket.on( this.socket.on(
'previewTemplate', 'previewTemplate',
this.handlePreviewTemplateEvent.bind(this) this.handlePreviewTemplateEvent.bind(this)
@ -78,7 +60,6 @@ export class SocketUser {
const result = await this.keycloakAuth.verifyToken(token); const result = await this.keycloakAuth.verifyToken(token);
if (result.valid == true) { if (result.valid == true) {
logger.info('User authenticated and valid.'); logger.info('User authenticated and valid.');
this.user = result.user; this.user = result.user;
this.id = this.user._id.toString(); this.id = this.user._id.toString();
this.authenticated = true; this.authenticated = true;
@ -145,47 +126,18 @@ export class SocketUser {
} }
async handleSubscribeToObjectTypeUpdateEvent(data, callback) { async handleSubscribeToObjectTypeUpdateEvent(data, callback) {
await this.updateManager.subscribeToObjectNew(data.objectType); const result = this.updateManager.subscribeToObjectNew(data.objectType);
await this.updateManager.subscribeToObjectDelete(data.objectType); callback(result);
callback({ success: true });
} }
async handleSubscribeToObjectUpdateEvent(data, callback) { async handleSubscribeToObjectUpdateEvent(data, callback) {
const result = await this.updateManager.subscribeToObjectUpdate( const result = this.updateManager.subscribeToObjectUpdate(
data._id, data._id,
data.objectType data.objectType
); );
callback(result); callback(result);
} }
async handleSubscribeToObjectEventEvent(data) {
await this.eventManager.subscribeToObjectEvent(
data._id,
data.objectType,
data.eventType
);
}
async handleUnsubscribeToObjectTypeUpdateEvent(data) {
await this.updateManager.removeObjectNewListener(data.objectType);
await this.updateManager.removeObjectDeleteListener(data.objectType);
}
async handleUnsubscribeToObjectUpdateEvent(data) {
await this.updateManager.removeObjectUpdateListener(
data._id,
data.objectType
);
}
async handleUnsubscribeObjectEventEvent(data) {
await this.eventManager.removeObjectEventsListener(
data._id,
data.objectType,
data.eventType
);
}
async handlePreviewTemplateEvent(data, callback) { async handlePreviewTemplateEvent(data, callback) {
const result = await this.templateManager.renderTemplate( const result = await this.templateManager.renderTemplate(
data._id, data._id,

View File

@ -87,15 +87,6 @@ function getNodeStyles(attributes) {
if (attributes?.shrink) { if (attributes?.shrink) {
styles += `flex-shrink: ${attributes.shrink};`; styles += `flex-shrink: ${attributes.shrink};`;
} }
if (attributes?.color) {
styles += `color: ${attributes.color};`;
}
if (attributes?.background) {
styles += `background: ${attributes.background};`;
}
if (attributes?.scale) {
styles += `transform: scale(${attributes.scale});`;
}
return styles; return styles;
} }
@ -129,36 +120,24 @@ async function transformCustomElements(content) {
tree.match({ tag: 'Text' }, node => ({ tree.match({ tag: 'Text' }, node => ({
...node, ...node,
tag: 'p', tag: 'p',
attrs: { class: 'documentText', style: getNodeStyles(node.attrs) } attrs: { class: 'documentText' }
})), })),
tree => tree =>
tree.match({ tag: 'Bold' }, node => ({ tree.match({ tag: 'Bold' }, node => ({
...node, ...node,
tag: 'strong', tag: 'strong',
attrs: { attrs: { style: 'font-weight: bold;', class: 'documentBoldText' }
style: 'font-weight: bold;',
class: 'documentBoldText',
style: getNodeStyles(node.attrs)
}
})), })),
tree => tree =>
tree.match({ tag: 'Barcode' }, node => { tree.match({ tag: 'Barcode' }, node => {
return { return {
tag: 'div', tag: 'svg',
content: [
{
tag: 'svg',
attrs: {
class: 'documentBarcode',
'jsbarcode-displayValue': 'false',
'jsbarcode-value': node.content[0],
'jsbarcode-format': node.attrs.format
}
}
],
attrs: { attrs: {
class: 'documentBarcodeContainer', class: 'documentBarcode',
style: getNodeStyles(node.attrs) 'jsbarcode-width': node.attrs?.width,
'jsbarcode-height': node.attrs?.height,
'jsbarcode-value': node.content[0],
'jsbarcode-format': node.attrs.format
} }
}; };
}), }),

View File

@ -1,6 +1,9 @@
import log4js from 'log4js'; import log4js from 'log4js';
import { loadConfig } from '../config.js'; import { loadConfig } from '../config.js';
import { natsServer } from '../database/nats.js'; import NodeCache from 'node-cache';
import { etcdServer } from '../database/etcd.js';
import { updateObjectCache } from '../database/database.js';
const config = loadConfig(); const config = loadConfig();
// Setup logger // Setup logger
@ -16,28 +19,13 @@ export class UpdateManager {
} }
async subscribeToObjectNew(objectType) { async subscribeToObjectNew(objectType) {
await natsServer.subscribe( await etcdServer.onKeyPutEvent(
`${objectType}s.new`, `/${objectType}s/new`,
this.socketClient.socketId, this.socketClient.socketId,
(key, value) => { (key, value) => {
logger.trace('Object new event:', value); logger.trace('Object new event:', value);
this.socketClient.socket.emit('objectNew', { this.socketClient.socket.emit('objectNew', {
object: value, _id: value,
objectType: objectType
});
}
);
return { success: true };
}
async subscribeToObjectDelete(objectType) {
await natsServer.subscribe(
`${objectType}s.delete`,
this.socketClient.socketId,
(key, value) => {
logger.trace('Object delete event:', value);
this.socketClient.socket.emit('objectDelete', {
object: value,
objectType: objectType objectType: objectType
}); });
} }
@ -46,8 +34,8 @@ export class UpdateManager {
} }
async subscribeToObjectUpdate(id, objectType) { async subscribeToObjectUpdate(id, objectType) {
await natsServer.subscribe( await etcdServer.onKeyPutEvent(
`${objectType}s.${id}.object`, `/${objectType}s/${id}/object`,
this.socketClient.socketId, this.socketClient.socketId,
(key, value) => { (key, value) => {
logger.trace('Object update event:', id); logger.trace('Object update event:', id);
@ -62,26 +50,52 @@ export class UpdateManager {
} }
async removeObjectNewListener(objectType) { async removeObjectNewListener(objectType) {
await natsServer.removeSubscription( await etcdServer.removeKeyWatcher(
`${objectType}s.new`, `/${objectType}s/new`,
this.socketClient.socketId this.socketClient.socketId,
); 'put'
return { success: true };
}
async removeObjectDeleteListener(objectType) {
await natsServer.removeSubscription(
`${objectType}s.delete`,
this.socketClient.socketId
); );
return { success: true }; return { success: true };
} }
async removeObjectUpdateListener(id, objectType) { async removeObjectUpdateListener(id, objectType) {
await natsServer.removeSubscription( await etcdServer.removeKeyWatcher(
`${objectType}s.${id}.object`, `/${objectType}s/${id}/object`,
this.socketClient.socketId this.socketClient.socketId,
'put'
); );
return { success: true }; return { success: true };
} }
async getObjectUpdate(id, objectType) {
try {
const objectUpdate = {
_id: id,
objectType: objectType,
object: await etcdServer.get(`/${objectType}s/${id}/object`)
};
logger.trace(`Returning path: /${objectType}s/${id}/object`);
return objectUpdate;
} catch (error) {
logger.error(
`UpdateManager: Failed to get current value for /${objectType}s/${id}/object:`,
error
);
return { error: 'Not found' };
}
}
async setObjectUpdate(id, objectType, value) {
try {
await etcdServer.set(`/${objectType}s/${id}/object`, value);
logger.trace(`Set value for path: /${objectType}s/${id}/object`);
return true;
} catch (error) {
logger.error(
`Failed to set value for /${objectType}s/${id}/object:`,
error
);
return false;
}
}
} }

View File

@ -2,7 +2,6 @@ import { editObject } from './database/database.js';
import { hostModel } from './database/schemas/management/host.schema.js'; import { hostModel } from './database/schemas/management/host.schema.js';
import crypto from 'crypto'; import crypto from 'crypto';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import canonicalize from 'canonical-json';
import { loadConfig } from './config.js'; import { loadConfig } from './config.js';
import { userModel } from './database/schemas/management/user.schema.js'; import { userModel } from './database/schemas/management/user.schema.js';
@ -87,11 +86,5 @@ export function getChangedValues(oldObj, newObj, old = false) {
} }
export function getModelByName(modelName) { export function getModelByName(modelName) {
return modelList.filter(model => model.modelName == modelName)[0]; return modelList.filter(model => model.modelName == modelName);
}
export function jsonToCacheKey(obj) {
const normalized = canonicalize(obj);
const hash = crypto.createHash('sha256').update(normalized).digest('hex');
return hash;
} }