Compare commits
9 Commits
b9c2e959b9
...
e1ba1f7871
| Author | SHA1 | Date | |
|---|---|---|---|
| e1ba1f7871 | |||
| ab50a5261d | |||
| 8ccdc81de1 | |||
| ca78fd6e62 | |||
| 7ccc4bb993 | |||
| d6214e316b | |||
| e4c790e7cc | |||
| 4ac87f0141 | |||
| 9fba977b2a |
4
.gitignore
vendored
4
.gitignore
vendored
@ -127,4 +127,6 @@ dist
|
||||
.yarn/unplugged
|
||||
.yarn/build-state.yml
|
||||
.yarn/install-state.gz
|
||||
.pnp.*
|
||||
.pnp.*
|
||||
|
||||
*.DS_STORE
|
||||
|
||||
63
package-lock.json
generated
63
package-lock.json
generated
@ -9,7 +9,10 @@
|
||||
"version": "1.0.0",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@nats-io/nats-core": "^3.1.0",
|
||||
"@nats-io/transport-node": "^3.1.0",
|
||||
"axios": "^1.11.0",
|
||||
"canonical-json": "^0.2.0",
|
||||
"date-fns": "^4.1.0",
|
||||
"dayjs": "^1.11.13",
|
||||
"dotenv": "^17.2.1",
|
||||
@ -378,6 +381,51 @@
|
||||
"sparse-bitfield": "^3.0.3"
|
||||
}
|
||||
},
|
||||
"node_modules/@nats-io/nats-core": {
|
||||
"version": "3.1.0",
|
||||
"resolved": "https://registry.npmjs.org/@nats-io/nats-core/-/nats-core-3.1.0.tgz",
|
||||
"integrity": "sha512-xsSkLEGGcqNF+Ru8dMjPmKtfbBeq/U4meuJJX4Zi+5TBHpjpjNjs4YkCBC/pGYWnEum1/vdNPizjE1RdNHCyBg==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@nats-io/nkeys": "2.0.3",
|
||||
"@nats-io/nuid": "2.0.3"
|
||||
}
|
||||
},
|
||||
"node_modules/@nats-io/nkeys": {
|
||||
"version": "2.0.3",
|
||||
"resolved": "https://registry.npmjs.org/@nats-io/nkeys/-/nkeys-2.0.3.tgz",
|
||||
"integrity": "sha512-JVt56GuE6Z89KUkI4TXUbSI9fmIfAmk6PMPknijmuL72GcD+UgIomTcRWiNvvJKxA01sBbmIPStqJs5cMRBC3A==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"tweetnacl": "^1.0.3"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=18.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@nats-io/nuid": {
|
||||
"version": "2.0.3",
|
||||
"resolved": "https://registry.npmjs.org/@nats-io/nuid/-/nuid-2.0.3.tgz",
|
||||
"integrity": "sha512-TpA3HEBna/qMVudy+3HZr5M3mo/L1JPofpVT4t0HkFGkz2Cn9wrlrQC8tvR8Md5Oa9//GtGG26eN0qEWF5Vqew==",
|
||||
"license": "Apache-2.0",
|
||||
"engines": {
|
||||
"node": ">= 18.x"
|
||||
}
|
||||
},
|
||||
"node_modules/@nats-io/transport-node": {
|
||||
"version": "3.1.0",
|
||||
"resolved": "https://registry.npmjs.org/@nats-io/transport-node/-/transport-node-3.1.0.tgz",
|
||||
"integrity": "sha512-k5pH7IOKUetwXOMraVgcB5zG0wibcHOwJJuyuY1/5Q4K0XfBJDnb/IbczP5/JJWwMYfxSL9O+46ojtdBHvHRSw==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@nats-io/nats-core": "3.1.0",
|
||||
"@nats-io/nkeys": "2.0.3",
|
||||
"@nats-io/nuid": "2.0.3"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">= 18.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@nodelib/fs.scandir": {
|
||||
"version": "2.1.5",
|
||||
"resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz",
|
||||
@ -1054,6 +1102,15 @@
|
||||
"node": ">=6"
|
||||
}
|
||||
},
|
||||
"node_modules/canonical-json": {
|
||||
"version": "0.2.0",
|
||||
"resolved": "https://registry.npmjs.org/canonical-json/-/canonical-json-0.2.0.tgz",
|
||||
"integrity": "sha512-xeH/NgtNA7kIuKSxopJVdXqCKWyDB79aqxQRQ9FV02fvmqW7DSnjoFyzAUrBpfbwjU6lwTYOLc+HM6KupbsfVQ==",
|
||||
"license": "MIT",
|
||||
"bin": {
|
||||
"canonical-json": "canonical-json.js"
|
||||
}
|
||||
},
|
||||
"node_modules/chalk": {
|
||||
"version": "4.1.2",
|
||||
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
|
||||
@ -6401,6 +6458,12 @@
|
||||
"strip-bom": "^3.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/tweetnacl": {
|
||||
"version": "1.0.3",
|
||||
"resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-1.0.3.tgz",
|
||||
"integrity": "sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==",
|
||||
"license": "Unlicense"
|
||||
},
|
||||
"node_modules/type-check": {
|
||||
"version": "0.4.0",
|
||||
"resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz",
|
||||
|
||||
@ -17,7 +17,10 @@
|
||||
"author": "Tom Butcher",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@nats-io/nats-core": "^3.1.0",
|
||||
"@nats-io/transport-node": "^3.1.0",
|
||||
"axios": "^1.11.0",
|
||||
"canonical-json": "^0.2.0",
|
||||
"date-fns": "^4.1.0",
|
||||
"dayjs": "^1.11.13",
|
||||
"dotenv": "^17.2.1",
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import log4js from 'log4js';
|
||||
import { loadConfig } from '../config.js';
|
||||
import { etcdServer } from '../database/etcd.js';
|
||||
import { natsServer } from '../database/nats.js';
|
||||
import { generateEtcId } from '../utils.js';
|
||||
|
||||
const config = loadConfig();
|
||||
@ -10,7 +10,7 @@ const logger = log4js.getLogger('Action Manager');
|
||||
logger.level = config.server.logLevel;
|
||||
|
||||
/**
|
||||
* ActionManager handles tracking object updates using Etcd and broadcasts update events via websockets.
|
||||
* ActionManager handles tracking object updates using NATS and broadcasts update events via websockets.
|
||||
*/
|
||||
export class ActionManager {
|
||||
constructor(socketClient) {
|
||||
@ -20,10 +20,12 @@ export class ActionManager {
|
||||
|
||||
async subscribeToObjectActions(id, objectType) {
|
||||
logger.debug('Subscribing to object actions...', id, objectType);
|
||||
await etcdServer.onPrefixPutEvent(
|
||||
`/${objectType}s/${id}/actions`,
|
||||
const subject = `${objectType}s.${id}.actions`;
|
||||
|
||||
await natsServer.subscribe(
|
||||
subject,
|
||||
this.socketClient.id,
|
||||
(key, value) => {
|
||||
(subject, value) => {
|
||||
if (!value?.result) {
|
||||
logger.trace('Object action:', id);
|
||||
this.socketClient.socket.emit(
|
||||
@ -34,9 +36,9 @@ export class ActionManager {
|
||||
action: { ...value }
|
||||
},
|
||||
result => {
|
||||
logger.trace('Got action result:', key);
|
||||
const actionId = key.split('/').pop();
|
||||
etcdServer.setKey(`/${objectType}s/${id}/actions/${actionId}`, {
|
||||
logger.trace('Got action result:', subject);
|
||||
const actionId = value.actionId || generateEtcId();
|
||||
natsServer.publish(`${subject}.${actionId}`, {
|
||||
...value,
|
||||
result: { ...result }
|
||||
});
|
||||
@ -49,49 +51,49 @@ export class ActionManager {
|
||||
}
|
||||
|
||||
async removeObjectActionsListener(id, objectType) {
|
||||
await etcdServer.removePrefixWatcher(
|
||||
`/${objectType}s/${id}/actions`,
|
||||
this.socketClient.id,
|
||||
'put'
|
||||
);
|
||||
const subject = `${objectType}s.${id}.actions`;
|
||||
await natsServer.removeSubscription(subject, this.socketClient.id);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async sendObjectAction(id, objectType, action, callback) {
|
||||
const actionId = generateEtcId();
|
||||
const subject = `${objectType}s.${id}.actions.${actionId}`;
|
||||
|
||||
try {
|
||||
const actionId = generateEtcId();
|
||||
this.callbacks.set(actionId, callback);
|
||||
logger.trace(
|
||||
`Calling action id: ${actionId}, object id: ${id}, object type: ${objectType} Action:`,
|
||||
action
|
||||
);
|
||||
await etcdServer.onKeyPutEvent(
|
||||
`/${objectType}s/${id}/actions/${actionId}`,
|
||||
|
||||
// Subscribe to the response subject
|
||||
await natsServer.subscribe(
|
||||
subject,
|
||||
this.socketClient.socketId,
|
||||
async (key, value) => {
|
||||
async (subject, value) => {
|
||||
if (value.result) {
|
||||
logger.trace('Calling result callback...');
|
||||
const storedCallback = this.callbacks.get(actionId);
|
||||
await etcdServer.removeKeyWatcher(
|
||||
`/${objectType}s/${id}/actions/${actionId}`,
|
||||
this.socketClient.socketId,
|
||||
'put'
|
||||
);
|
||||
await etcdServer.deleteKey(
|
||||
`/${objectType}s/${id}/actions/${actionId}`
|
||||
await natsServer.removeSubscription(
|
||||
subject,
|
||||
this.socketClient.socketId
|
||||
);
|
||||
storedCallback(value.result);
|
||||
}
|
||||
}
|
||||
);
|
||||
await etcdServer.setKey(
|
||||
`/${objectType}s/${id}/actions/${actionId}`,
|
||||
action
|
||||
);
|
||||
|
||||
// Publish the action
|
||||
await natsServer.publish(`${objectType}s.${id}.actions`, {
|
||||
...action,
|
||||
actionId: actionId
|
||||
});
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to set value for /${objectType}s/${id}/object:`,
|
||||
`Failed to send action for ${objectType}s.${id}.actions.${actionId}:`,
|
||||
error
|
||||
);
|
||||
return false;
|
||||
|
||||
@ -4,11 +4,7 @@ import jwt from 'jsonwebtoken';
|
||||
import log4js from 'log4js';
|
||||
// Load configuration
|
||||
import { loadConfig } from '../config.js';
|
||||
import {
|
||||
editObject,
|
||||
getObject,
|
||||
getObjectByFilter
|
||||
} from '../database/database.js';
|
||||
import { editObject, getObject, listObjects } from '../database/database.js';
|
||||
import { hostModel } from '../database/schemas/management/host.schema.js';
|
||||
import { userModel } from '../database/schemas/management/user.schema.js';
|
||||
import { generateAuthCode } from '../utils.js';
|
||||
@ -82,16 +78,16 @@ export class KeycloakAuth {
|
||||
roles: this.extractRoles(decodedToken)
|
||||
};
|
||||
|
||||
const user = await getObjectByFilter({
|
||||
const user = await listObjects({
|
||||
model: userModel,
|
||||
filter: { username: decodedUser.username }
|
||||
});
|
||||
|
||||
// Cache the verified token
|
||||
const expiresAt = introspection.exp * 1000; // Convert to milliseconds
|
||||
this.tokenCache.set(token, { expiresAt, user });
|
||||
this.tokenCache.set(token, { expiresAt, user: user[0] });
|
||||
|
||||
return { valid: true, user };
|
||||
return { valid: true, user: user[0] };
|
||||
} catch (error) {
|
||||
logger.error('Token verification error:', error.message);
|
||||
return { valid: false };
|
||||
@ -167,13 +163,14 @@ export class CodeAuth {
|
||||
|
||||
async verifyOtp(otp) {
|
||||
try {
|
||||
const host = await getObjectByFilter({
|
||||
const hosts = await listObjects({
|
||||
model: hostModel,
|
||||
filter: { otp: otp },
|
||||
cached: false
|
||||
});
|
||||
const host = hosts[0];
|
||||
if (host == undefined) {
|
||||
const error = 'No host found with OTP.';
|
||||
const error = `No host found with OTP: ${otp}`;
|
||||
logger.warn(error);
|
||||
return { valid: false, error: error };
|
||||
}
|
||||
@ -203,9 +200,10 @@ export class CodeAuth {
|
||||
id: id,
|
||||
updateData: { authCode: generateAuthCode() }
|
||||
});
|
||||
logger.info('Host found with OTP:', otp);
|
||||
return { valid: true, host: authCodeHost };
|
||||
} catch (error) {
|
||||
logger.error('Code verification error:', error.message);
|
||||
logger.error('OTP verification error:', error.message);
|
||||
return { valid: false, error: error.message };
|
||||
}
|
||||
}
|
||||
@ -218,7 +216,6 @@ export function createAuthMiddleware(socketUser) {
|
||||
|
||||
// Allow the 'authenticate' event through without checks
|
||||
|
||||
logger.trace('Event:', event);
|
||||
if (event === 'authenticate') {
|
||||
next();
|
||||
return;
|
||||
|
||||
@ -11,6 +11,7 @@ import {
|
||||
import log4js from 'log4js';
|
||||
import { loadConfig } from '../config.js';
|
||||
import { userModel } from './schemas/management/user.schema.js';
|
||||
import { jsonToCacheKey } from '../utils.js';
|
||||
|
||||
const config = loadConfig();
|
||||
|
||||
@ -19,44 +20,31 @@ const cacheLogger = log4js.getLogger('Local Cache');
|
||||
logger.level = config.server.logLevel;
|
||||
cacheLogger.level = config.server.logLevel;
|
||||
|
||||
const modelCaches = new Map();
|
||||
const objectCache = new NodeCache({
|
||||
stdTTL: 30, // 30 sec expiration
|
||||
checkperiod: 600, // 30 sec periodic cleanup
|
||||
useClones: false // Don't clone objects for better performance
|
||||
});
|
||||
const listCache = new NodeCache({
|
||||
stdTTL: 30, // 30 sec expiration
|
||||
checkperiod: 600, // 30 sec periodic cleanup
|
||||
useClones: false // Don't clone objects for better performance
|
||||
});
|
||||
|
||||
function getModelCache(model) {
|
||||
const modelName = model.modelName;
|
||||
const modelCache = modelCaches.get(modelName);
|
||||
if (modelCache == undefined) {
|
||||
logger.trace('Creating new model cache...');
|
||||
const newModelCache = new NodeCache({
|
||||
stdTTL: 30, // 30 sec expiration
|
||||
checkperiod: 30, // 30 sec periodic cleanup
|
||||
useClones: false // Don't clone objects for better performance
|
||||
});
|
||||
modelCaches.set(modelName, newModelCache);
|
||||
return newModelCache;
|
||||
}
|
||||
logger.trace('Getting model cache...');
|
||||
return modelCache;
|
||||
}
|
||||
|
||||
export const retrieveObjectCache = ({ model, id }) => {
|
||||
cacheLogger.trace('Retrieving:', {
|
||||
export const retrieveObjectCache = ({ model, id, populate = [] }) => {
|
||||
const cacheKeyObject = {
|
||||
model: model.modelName,
|
||||
id
|
||||
});
|
||||
const modelCache = getModelCache(model);
|
||||
id,
|
||||
populate
|
||||
};
|
||||
|
||||
const cachedObject = modelCache.get(id);
|
||||
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||
|
||||
cacheLogger.trace('Retrieving:');
|
||||
const cachedObject = objectCache.get(cacheKey);
|
||||
|
||||
if (cachedObject == undefined) {
|
||||
cacheLogger.trace('Miss:', {
|
||||
model: model.modelName,
|
||||
id
|
||||
});
|
||||
cacheLogger.trace('Miss:', cacheKeyObject);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
@ -68,25 +56,36 @@ export const retrieveObjectCache = ({ model, id }) => {
|
||||
return cachedObject;
|
||||
};
|
||||
|
||||
export const retrieveObjectsCache = ({ model }) => {
|
||||
cacheLogger.trace('Retrieving:', {
|
||||
model: model.modelName
|
||||
});
|
||||
const modelCache = getModelCache(model);
|
||||
export const retrieveListCache = ({
|
||||
model,
|
||||
populate = [],
|
||||
filter = {},
|
||||
sort = '',
|
||||
order = 'ascend',
|
||||
project = {}
|
||||
}) => {
|
||||
const cacheKeyObject = {
|
||||
model: model.modelName,
|
||||
id,
|
||||
populate,
|
||||
filter,
|
||||
sort,
|
||||
project,
|
||||
order
|
||||
};
|
||||
|
||||
const modelCacheKeys = modelCache.keys();
|
||||
cacheLogger.trace('Retrieving:', cacheKeyObject);
|
||||
|
||||
const cachedList = listCache.get(model.modelName);
|
||||
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||
|
||||
if (cachedList == true) {
|
||||
const cachedObjects = modelCacheKeys.map(key => modelCache.get(key));
|
||||
const cachedList = listCache.get(cacheKey);
|
||||
|
||||
if (cachedList != undefined) {
|
||||
cacheLogger.trace('Hit:', {
|
||||
model: model.modelName,
|
||||
length: cachedObjects.length
|
||||
...cacheKeyObject,
|
||||
length: cachedList.length
|
||||
});
|
||||
|
||||
return cachedObjects;
|
||||
return cachedList;
|
||||
}
|
||||
|
||||
cacheLogger.trace('Miss:', {
|
||||
@ -95,21 +94,23 @@ export const retrieveObjectsCache = ({ model }) => {
|
||||
return undefined;
|
||||
};
|
||||
|
||||
export const updateObjectCache = ({ model, id, object }) => {
|
||||
cacheLogger.trace('Updating:', {
|
||||
export const updateObjectCache = ({ model, id, object, populate = [] }) => {
|
||||
const cacheKeyObject = {
|
||||
model: model.modelName,
|
||||
id
|
||||
});
|
||||
const modelCache = getModelCache(model);
|
||||
const cachedObject = modelCache.get(id) || {};
|
||||
id,
|
||||
populate
|
||||
};
|
||||
|
||||
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||
|
||||
cacheLogger.trace('Updating:', cacheKeyObject);
|
||||
|
||||
const cachedObject = objectCache.get(cacheKey) || {};
|
||||
const mergedObject = _.merge(cachedObject, object);
|
||||
|
||||
modelCache.set(id, mergedObject);
|
||||
objectCache.set(cacheKey, mergedObject);
|
||||
|
||||
cacheLogger.trace('Updated:', {
|
||||
model: model.modelName,
|
||||
id
|
||||
});
|
||||
cacheLogger.trace('Updated:', { ...cacheKeyObject });
|
||||
|
||||
return mergedObject;
|
||||
};
|
||||
@ -130,29 +131,39 @@ export const deleteObjectCache = ({ model, id }) => {
|
||||
return mergedObject;
|
||||
};
|
||||
|
||||
export const updateObjectsCache = ({ model, objects }) => {
|
||||
cacheLogger.trace('Updating:', {
|
||||
export const updateListCache = ({
|
||||
model,
|
||||
objects,
|
||||
populate = [],
|
||||
filter = {},
|
||||
sort = '',
|
||||
order = 'ascend',
|
||||
project = {}
|
||||
}) => {
|
||||
const cacheKeyObject = {
|
||||
model: model.modelName,
|
||||
populate,
|
||||
filter,
|
||||
sort,
|
||||
project,
|
||||
order
|
||||
};
|
||||
|
||||
cacheLogger.trace('Updating:', {
|
||||
...cacheKeyObject,
|
||||
length: objects.length
|
||||
});
|
||||
const modelCache = getModelCache(model);
|
||||
|
||||
objects.forEach(object => {
|
||||
const cachedObject = modelCache.get(object._id) || {};
|
||||
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||
|
||||
const mergedObject = _.merge(cachedObject, object);
|
||||
|
||||
modelCache.set(object._id, mergedObject);
|
||||
});
|
||||
|
||||
listCache.set(model.modelName, true);
|
||||
listCache.set(cacheKey, objects);
|
||||
|
||||
cacheLogger.trace('Updated:', {
|
||||
model: model.modelName,
|
||||
...cacheKeyObject,
|
||||
length: objects.length
|
||||
});
|
||||
|
||||
return mergedObject;
|
||||
return objects;
|
||||
};
|
||||
|
||||
// Reusable function to list objects with aggregation, filtering, search, sorting, and pagination
|
||||
@ -162,27 +173,29 @@ export const listObjects = async ({
|
||||
filter = {},
|
||||
sort = '',
|
||||
order = 'ascend',
|
||||
project, // optional: override default projection
|
||||
project = {}, // optional: override default projection
|
||||
cached = false
|
||||
}) => {
|
||||
try {
|
||||
logger.trace('Listing objects:', {
|
||||
model,
|
||||
populate,
|
||||
page,
|
||||
limit,
|
||||
filter,
|
||||
sort,
|
||||
order,
|
||||
project,
|
||||
cache
|
||||
cached
|
||||
});
|
||||
|
||||
var cacheKey = undefined;
|
||||
var modelCache = getModelCache(model);
|
||||
|
||||
if (cached == true) {
|
||||
const objectsCache = retrieveObjectsCache({ model });
|
||||
const objectsCache = retrieveObjectsCache({
|
||||
model,
|
||||
populate,
|
||||
filter,
|
||||
sort,
|
||||
order,
|
||||
project
|
||||
});
|
||||
if (objectsCache != undefined) {
|
||||
return objectsCache;
|
||||
}
|
||||
@ -210,7 +223,7 @@ export const listObjects = async ({
|
||||
let query = model.find(filter).sort({ [sort]: sortOrder });
|
||||
|
||||
// Handle populate (array or single value)
|
||||
if (populate) {
|
||||
if (populate.length > 0) {
|
||||
if (Array.isArray(populate)) {
|
||||
for (const pop of populate) {
|
||||
query = query.populate(pop);
|
||||
@ -221,7 +234,7 @@ export const listObjects = async ({
|
||||
}
|
||||
|
||||
// Handle select (projection)
|
||||
if (project) {
|
||||
if (project != {}) {
|
||||
query = query.select(project);
|
||||
}
|
||||
|
||||
@ -231,18 +244,25 @@ export const listObjects = async ({
|
||||
|
||||
const finalResult = expandObjectIds(queryResult);
|
||||
|
||||
updateObjectsCache({ model, objects });
|
||||
updateListCache({
|
||||
model,
|
||||
objects: finalResult,
|
||||
populate,
|
||||
filter,
|
||||
sort,
|
||||
order,
|
||||
project
|
||||
});
|
||||
|
||||
logger.trace('Retreived from database:', {
|
||||
model,
|
||||
populate,
|
||||
page,
|
||||
limit,
|
||||
filter,
|
||||
sort,
|
||||
order,
|
||||
project,
|
||||
cache
|
||||
cached,
|
||||
length: finalResult.length
|
||||
});
|
||||
return finalResult;
|
||||
} catch (error) {
|
||||
@ -252,7 +272,12 @@ export const listObjects = async ({
|
||||
};
|
||||
|
||||
// Reusable function to get a single object by ID
|
||||
export const getObject = async ({ model, id, populate, cached = false }) => {
|
||||
export const getObject = async ({
|
||||
model,
|
||||
id,
|
||||
populate = [],
|
||||
cached = false
|
||||
}) => {
|
||||
try {
|
||||
logger.trace('Getting object:', {
|
||||
model,
|
||||
@ -261,7 +286,7 @@ export const getObject = async ({ model, id, populate, cached = false }) => {
|
||||
});
|
||||
|
||||
if (cached == true) {
|
||||
const cachedObject = retrieveObjectCache({ model, id });
|
||||
const cachedObject = retrieveObjectCache({ model, id, populate });
|
||||
if (cachedObject != undefined) {
|
||||
return cachedObject;
|
||||
}
|
||||
@ -299,63 +324,14 @@ export const getObject = async ({ model, id, populate, cached = false }) => {
|
||||
updateObjectCache({
|
||||
model: model,
|
||||
id: finalResult._id.toString(),
|
||||
populate,
|
||||
object: finalResult
|
||||
});
|
||||
|
||||
return finalResult;
|
||||
} catch (error) {
|
||||
logger.error('An error retreiving object:', error.message);
|
||||
return undefined;
|
||||
}
|
||||
};
|
||||
|
||||
// Reusable function to get a single object by ID
|
||||
export const getObjectByFilter = async ({ model, filter, populate }) => {
|
||||
try {
|
||||
logger.trace('Getting object:', {
|
||||
model,
|
||||
filter,
|
||||
populate
|
||||
});
|
||||
|
||||
let query = model.findOne(filter).lean();
|
||||
|
||||
// Handle populate (array or single value)
|
||||
if (populate) {
|
||||
if (Array.isArray(populate)) {
|
||||
for (const pop of populate) {
|
||||
query = query.populate(pop);
|
||||
}
|
||||
} else if (typeof populate === 'string' || typeof populate === 'object') {
|
||||
query = query.populate(populate);
|
||||
}
|
||||
}
|
||||
const finalResult = await query;
|
||||
|
||||
if (!finalResult) {
|
||||
logger.warn('Object not found in database:', {
|
||||
model,
|
||||
filter,
|
||||
populate
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
|
||||
logger.trace('Retreived object from database:', {
|
||||
model,
|
||||
filter,
|
||||
populate
|
||||
});
|
||||
|
||||
updateObjectCache({
|
||||
model: model,
|
||||
id: finalResult._id.toString(),
|
||||
object: finalResult
|
||||
});
|
||||
|
||||
return finalResult;
|
||||
} catch (error) {
|
||||
logger.error('An error retreiving object:', error.message);
|
||||
throw error;
|
||||
return undefined;
|
||||
}
|
||||
};
|
||||
@ -367,7 +343,7 @@ export const editObject = async ({
|
||||
updateData,
|
||||
owner = undefined,
|
||||
ownerType = undefined,
|
||||
populate
|
||||
populate = []
|
||||
}) => {
|
||||
try {
|
||||
// Determine parentType from model name
|
||||
|
||||
307
src/database/nats.js
Normal file
307
src/database/nats.js
Normal file
@ -0,0 +1,307 @@
|
||||
import { connect } from '@nats-io/transport-node';
|
||||
import log4js from 'log4js';
|
||||
import { loadConfig } from '../config.js';
|
||||
|
||||
const config = loadConfig();
|
||||
const logger = log4js.getLogger('Nats');
|
||||
logger.level = config.server.logLevel;
|
||||
|
||||
class NatsServer {
|
||||
constructor() {
|
||||
this.client = null;
|
||||
this.subscriptions = new Map(); // subject → { subscription, callbacks }
|
||||
this.requestHandlers = new Map(); // subject → { handler, callbacks }
|
||||
this.queuedSubscriptions = new Map(); // subject → { subscription, callbacks, queue }
|
||||
|
||||
const natsConfig = config.database?.nats || config.database; // fallback for production config
|
||||
const host = natsConfig.host || 'localhost';
|
||||
const port = natsConfig.port || 4222;
|
||||
this.servers = [`nats://${host}:${port}`];
|
||||
this.textEncoder = new TextEncoder();
|
||||
this.textDecoder = new TextDecoder();
|
||||
|
||||
logger.trace(`NatsServer: servers set to ${JSON.stringify(this.servers)}`);
|
||||
}
|
||||
|
||||
async connect() {
|
||||
if (!this.client) {
|
||||
logger.info('Connecting to NATS...');
|
||||
logger.trace(
|
||||
`Creating NATS client with servers ${JSON.stringify(this.servers)}`
|
||||
);
|
||||
|
||||
try {
|
||||
this.client = await connect({
|
||||
servers: this.servers,
|
||||
reconnect: true,
|
||||
maxReconnectAttempts: -1, // unlimited reconnects
|
||||
reconnectTimeWait: 1000,
|
||||
timeout: 20000
|
||||
});
|
||||
|
||||
// Test connection by checking if client is connected
|
||||
try {
|
||||
if (this.client.isClosed()) {
|
||||
throw new Error('NATS client connection failed');
|
||||
}
|
||||
logger.trace('NATS client connected successfully.');
|
||||
} catch (error) {
|
||||
throw error;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to connect to NATS:', error);
|
||||
throw error;
|
||||
}
|
||||
} else {
|
||||
logger.trace('NATS client already exists, skipping connection.');
|
||||
}
|
||||
return this.client;
|
||||
}
|
||||
|
||||
async getClient() {
|
||||
if (!this.client) {
|
||||
logger.trace('No client found, calling connect().');
|
||||
await this.connect();
|
||||
}
|
||||
return this.client;
|
||||
}
|
||||
|
||||
async publish(subject, data) {
|
||||
const client = await this.getClient();
|
||||
const payload = typeof data === 'string' ? data : JSON.stringify(data);
|
||||
|
||||
try {
|
||||
client.publish(subject, this.textEncoder.encode(payload));
|
||||
logger.trace(`Published to subject: ${subject}, data: ${payload}`);
|
||||
return { success: true };
|
||||
} catch (error) {
|
||||
logger.error(`Failed to publish to subject ${subject}:`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async request(subject, data, timeout = 30000) {
|
||||
const client = await this.getClient();
|
||||
const payload = typeof data === 'string' ? data : JSON.stringify(data);
|
||||
|
||||
try {
|
||||
const response = await client.request(
|
||||
subject,
|
||||
this.textEncoder.encode(payload),
|
||||
{
|
||||
timeout: timeout
|
||||
}
|
||||
);
|
||||
|
||||
const responseData = this.textDecoder.decode(response.data);
|
||||
logger.trace(`Request to subject: ${subject}, response: ${responseData}`);
|
||||
|
||||
// Try to parse as JSON, fallback to string
|
||||
try {
|
||||
return JSON.parse(responseData);
|
||||
} catch {
|
||||
return responseData;
|
||||
}
|
||||
} catch (error) {
|
||||
if (error.code === 'TIMEOUT') {
|
||||
logger.trace(`Request timeout for subject: ${subject}`);
|
||||
return null;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async subscribe(subject, owner, callback) {
|
||||
const client = await this.getClient();
|
||||
const subscriptionKey = subject;
|
||||
|
||||
if (this.subscriptions.has(subscriptionKey)) {
|
||||
this.subscriptions.get(subscriptionKey).callbacks.set(owner, callback);
|
||||
logger.trace(
|
||||
`Added subscription callback for owner=${owner} on subject=${subject}`
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
logger.trace(`Creating new subscription for subject: ${subject}`);
|
||||
const subscription = client.subscribe(subject);
|
||||
const callbacks = new Map();
|
||||
callbacks.set(owner, callback);
|
||||
|
||||
(async () => {
|
||||
for await (const msg of subscription) {
|
||||
logger.trace(`Message received on subject: ${subject}`);
|
||||
const data = this.textDecoder.decode(msg.data);
|
||||
let parsedData;
|
||||
|
||||
try {
|
||||
parsedData = JSON.parse(data);
|
||||
} catch {
|
||||
parsedData = data;
|
||||
}
|
||||
|
||||
for (const [ownerId, cb] of callbacks) {
|
||||
try {
|
||||
cb(subject, parsedData, msg);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`Error in subscription callback for owner=${ownerId}, subject=${subject}:`,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
})().catch(err => {
|
||||
logger.error(`Subscription error for subject ${subject}:`, err);
|
||||
});
|
||||
|
||||
this.subscriptions.set(subscriptionKey, { subscription, callbacks });
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async setRequestHandler(subject, owner, handler) {
|
||||
const client = await this.getClient();
|
||||
const handlerKey = subject;
|
||||
|
||||
if (this.requestHandlers.has(handlerKey)) {
|
||||
this.requestHandlers.get(handlerKey).callbacks.set(owner, handler);
|
||||
logger.trace(
|
||||
`Added request handler for owner=${owner} on subject=${subject}`
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
logger.trace(`Creating new request handler for subject: ${subject}`);
|
||||
const subscription = client.subscribe(subject);
|
||||
const callbacks = new Map();
|
||||
callbacks.set(owner, handler);
|
||||
|
||||
(async () => {
|
||||
for await (const msg of subscription) {
|
||||
logger.trace(`Request received on subject: ${subject}`);
|
||||
const data = this.textDecoder.decode(msg.data);
|
||||
let parsedData;
|
||||
|
||||
try {
|
||||
parsedData = JSON.parse(data);
|
||||
} catch {
|
||||
parsedData = data;
|
||||
}
|
||||
|
||||
for (const [ownerId, cb] of callbacks) {
|
||||
try {
|
||||
const response = await cb(subject, parsedData, msg);
|
||||
const responsePayload =
|
||||
typeof response === 'string'
|
||||
? response
|
||||
: JSON.stringify(response);
|
||||
msg.respond(this.textEncoder.encode(responsePayload));
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`Error in request handler for owner=${ownerId}, subject=${subject}:`,
|
||||
err
|
||||
);
|
||||
// Send error response
|
||||
msg.respond(
|
||||
this.textEncoder.encode(JSON.stringify({ error: err.message }))
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
})().catch(err => {
|
||||
logger.error(`Request handler error for subject ${subject}:`, err);
|
||||
});
|
||||
|
||||
this.requestHandlers.set(handlerKey, { subscription, callbacks });
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async removeSubscription(subject, owner) {
|
||||
const entry = this.subscriptions.get(subject);
|
||||
|
||||
if (!entry) {
|
||||
logger.trace(`Subscription not found for subject: ${subject}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (entry.callbacks.delete(owner)) {
|
||||
logger.trace(
|
||||
`Removed subscription callback for owner: ${owner} on subject: ${subject}`
|
||||
);
|
||||
} else {
|
||||
logger.trace(
|
||||
`No subscription callback found for owner: ${owner} on subject: ${subject}`
|
||||
);
|
||||
}
|
||||
|
||||
if (entry.callbacks.size === 0) {
|
||||
logger.trace(`No callbacks left, stopping subscription for ${subject}`);
|
||||
entry.subscription.unsubscribe();
|
||||
this.subscriptions.delete(subject);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
async removeRequestHandler(subject, owner) {
|
||||
const entry = this.requestHandlers.get(subject);
|
||||
|
||||
if (!entry) {
|
||||
logger.trace(`Request handler not found for subject: ${subject}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (entry.callbacks.delete(owner)) {
|
||||
logger.trace(
|
||||
`Removed request handler for owner: ${owner} on subject: ${subject}`
|
||||
);
|
||||
} else {
|
||||
logger.trace(
|
||||
`No request handler found for owner: ${owner} on subject: ${subject}`
|
||||
);
|
||||
}
|
||||
|
||||
if (entry.callbacks.size === 0) {
|
||||
logger.trace(`No handlers left, stopping request handler for ${subject}`);
|
||||
entry.subscription.unsubscribe();
|
||||
this.requestHandlers.delete(subject);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
async disconnect() {
|
||||
logger.info('Disconnecting from NATS...');
|
||||
|
||||
// Stop all subscriptions
|
||||
for (const [subject, entry] of this.subscriptions) {
|
||||
logger.trace(`Stopping subscription: ${subject}`);
|
||||
entry.subscription.unsubscribe();
|
||||
}
|
||||
this.subscriptions.clear();
|
||||
|
||||
// Stop all queued subscriptions
|
||||
for (const [key, entry] of this.queuedSubscriptions) {
|
||||
logger.trace(`Stopping queued subscription: ${key}`);
|
||||
entry.subscription.unsubscribe();
|
||||
}
|
||||
this.queuedSubscriptions.clear();
|
||||
|
||||
// Stop all request handlers
|
||||
for (const [subject, entry] of this.requestHandlers) {
|
||||
logger.trace(`Stopping request handler: ${subject}`);
|
||||
entry.subscription.unsubscribe();
|
||||
}
|
||||
this.requestHandlers.clear();
|
||||
|
||||
if (this.client) {
|
||||
await this.client.close();
|
||||
this.client = null;
|
||||
logger.info('Disconnected from NATS');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const natsServer = new NatsServer();
|
||||
|
||||
export { NatsServer, natsServer };
|
||||
@ -16,7 +16,8 @@ const moonrakerSchema = new Schema(
|
||||
const alertSchema = new Schema(
|
||||
{
|
||||
priority: { type: String, required: true }, // order to show
|
||||
type: { type: String, required: true } // selectFilament, error, info, message,
|
||||
type: { type: String, required: true }, // selectFilament, error, info, message
|
||||
message: { type: String, required: false }
|
||||
},
|
||||
{ timestamps: true, _id: false }
|
||||
);
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { ObjectId } from 'mongodb';
|
||||
import { auditLogModel } from './schemas/management/auditlog.schema.js';
|
||||
import { etcdServer } from './etcd.js';
|
||||
import { natsServer } from './nats.js';
|
||||
|
||||
function parseFilter(property, value) {
|
||||
if (typeof value === 'string') {
|
||||
@ -411,11 +411,11 @@ async function getAuditLogs(idOrIds) {
|
||||
}
|
||||
|
||||
async function distributeUpdate(value, id, type) {
|
||||
await etcdServer.setKey(`/${type}s/${id}/object`, value);
|
||||
await natsServer.publish(`${type}s.${id}.object`, value);
|
||||
}
|
||||
|
||||
async function distributeNew(id, type) {
|
||||
await etcdServer.setKey(`/${type}s/new`, id);
|
||||
await natsServer.publish(`${type}s.new`, id);
|
||||
}
|
||||
|
||||
function flatternObjectIds(object) {
|
||||
|
||||
71
src/events/eventmanager.js
Normal file
71
src/events/eventmanager.js
Normal file
@ -0,0 +1,71 @@
|
||||
import log4js from 'log4js';
|
||||
import { loadConfig } from '../config.js';
|
||||
import { natsServer } from '../database/nats.js';
|
||||
|
||||
const config = loadConfig();
|
||||
|
||||
// Setup logger
|
||||
const logger = log4js.getLogger('Event Manager');
|
||||
logger.level = config.server.logLevel;
|
||||
|
||||
/**
|
||||
* EventManager handles tracking object events using NATS and broadcasts event events via websockets.
|
||||
*/
|
||||
export class EventManager {
|
||||
constructor(socketClient) {
|
||||
this.socketClient = socketClient;
|
||||
}
|
||||
|
||||
async subscribeToObjectEvent(id, objectType, eventType) {
|
||||
logger.debug('Subscribing to object event:', eventType, id, objectType);
|
||||
await natsServer.subscribe(
|
||||
`${objectType}s.${id}.events.${eventType}`,
|
||||
this.socketClient.socketId,
|
||||
(key, value) => {
|
||||
if (!value?.result) {
|
||||
logger.trace('Object event detected:', id);
|
||||
this.socketClient.socket.emit('objectEvent', {
|
||||
_id: id,
|
||||
objectType: objectType,
|
||||
event: { ...value }
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async removeObjectEventsListener(id, objectType, eventType) {
|
||||
// Remove specific event subscription for this object
|
||||
await natsServer.removeSubscription(
|
||||
`${objectType}s.${id}.events.${eventType}`,
|
||||
this.socketClient.socketId
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async sendObjectEvent(id, objectType, event) {
|
||||
const eventType = event?.type || 'unknown';
|
||||
try {
|
||||
logger.trace(
|
||||
`Calling event: ${eventType}, object id: ${id}, object type: ${objectType} Event:`,
|
||||
event
|
||||
);
|
||||
await natsServer.publish(
|
||||
`${objectType}s.${id}.events.${eventType}`,
|
||||
event
|
||||
);
|
||||
return { success: true };
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to publish event for ${objectType}s.${id}.events.${eventType}:`,
|
||||
error
|
||||
);
|
||||
return {
|
||||
error:
|
||||
error?.message ||
|
||||
`Failed to publish event for ${objectType}s.${id}.events.${eventType}.`
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
10
src/index.js
10
src/index.js
@ -1,6 +1,7 @@
|
||||
import { loadConfig } from './config.js';
|
||||
import { SocketManager } from './socket/socketmanager.js';
|
||||
import { etcdServer } from './database/etcd.js';
|
||||
import { natsServer } from './database/nats.js';
|
||||
import express from 'express';
|
||||
import log4js from 'log4js';
|
||||
import http from 'http';
|
||||
@ -29,6 +30,15 @@ import { mongoServer } from './database/mongo.js';
|
||||
throw err;
|
||||
}
|
||||
|
||||
// Connect to NATS (await)
|
||||
try {
|
||||
await natsServer.connect();
|
||||
logger.info('Connected to NATS');
|
||||
} catch (err) {
|
||||
logger.error('Failed to connect to NATS:', err);
|
||||
throw err;
|
||||
}
|
||||
|
||||
// Connect to Mongo DB (await)
|
||||
try {
|
||||
await mongoServer.connect();
|
||||
|
||||
@ -2,11 +2,12 @@ import log4js from 'log4js';
|
||||
// Load configuration
|
||||
import { loadConfig } from '../config.js';
|
||||
import { CodeAuth, createAuthMiddleware } from '../auth/auth.js';
|
||||
import { editObject, getObject } from '../database/database.js';
|
||||
import { editObject, getObject, listObjects } from '../database/database.js';
|
||||
import { hostModel } from '../database/schemas/management/host.schema.js';
|
||||
import { UpdateManager } from '../updates/updatemanager.js';
|
||||
import { ActionManager } from '../actions/actionmanager.js';
|
||||
import { getModelByName } from '../utils.js';
|
||||
import { EventManager } from '../events/eventmanager.js';
|
||||
|
||||
const config = loadConfig();
|
||||
|
||||
@ -23,6 +24,7 @@ export class SocketHost {
|
||||
this.socketManager = socketManager;
|
||||
this.updateManager = new UpdateManager(this);
|
||||
this.actionManager = new ActionManager(this);
|
||||
this.eventManager = new EventManager(this);
|
||||
this.codeAuth = new CodeAuth();
|
||||
this.setupSocketEventHandlers();
|
||||
}
|
||||
@ -32,6 +34,13 @@ export class SocketHost {
|
||||
this.socket.on('authenticate', this.handleAuthenticate.bind(this));
|
||||
this.socket.on('updateHost', this.handleUpdateHost.bind(this));
|
||||
this.socket.on('getObject', this.handleGetObject.bind(this));
|
||||
this.socket.on('editObject', this.handleEditObject.bind(this));
|
||||
this.socket.on('listObjects', this.handleListObjects.bind(this));
|
||||
this.socket.on(
|
||||
'subscribeToObjectActions',
|
||||
this.handleSubscribeToObjectActions.bind(this)
|
||||
);
|
||||
this.socket.on('objectEvent', this.handleObjectEventEvent.bind(this));
|
||||
this.socket.on('disconnect', this.handleDisconnect.bind(this));
|
||||
}
|
||||
|
||||
@ -40,6 +49,23 @@ export class SocketHost {
|
||||
}
|
||||
|
||||
async handleAuthenticate(data, callback) {
|
||||
const setHostOnline = async verifyResult => {
|
||||
logger.info('Host authenticated and valid.');
|
||||
this.host = verifyResult.host;
|
||||
this.id = this.host._id.toString();
|
||||
this.authenticated = true;
|
||||
await editObject({
|
||||
model: hostModel,
|
||||
id: this.host._id,
|
||||
updateData: {
|
||||
online: true,
|
||||
state: { type: 'online' },
|
||||
connectedAt: new Date()
|
||||
},
|
||||
owner: this.host,
|
||||
ownerType: 'host'
|
||||
});
|
||||
};
|
||||
logger.trace('handleAuthenticateEvent');
|
||||
const id = data.id || undefined;
|
||||
const authCode = data.authCode || undefined;
|
||||
@ -49,17 +75,7 @@ export class SocketHost {
|
||||
logger.info('Authenticating host with id + authCode...');
|
||||
const verifyResult = await this.codeAuth.verifyCode(id, authCode);
|
||||
if (verifyResult.valid == true) {
|
||||
logger.info('Host authenticated and valid.');
|
||||
this.host = verifyResult.host;
|
||||
this.id = this.host._id.toString();
|
||||
this.authenticated = true;
|
||||
await editObject({
|
||||
model: hostModel,
|
||||
id: this.host._id,
|
||||
updateData: { online: true, state: { type: 'online' } },
|
||||
owner: this.host,
|
||||
ownerType: 'host'
|
||||
});
|
||||
await setHostOnline(verifyResult);
|
||||
await this.initializeHost();
|
||||
}
|
||||
callback(verifyResult);
|
||||
@ -71,8 +87,8 @@ export class SocketHost {
|
||||
const verifyResult = await this.codeAuth.verifyOtp(otp);
|
||||
if (verifyResult.valid == true) {
|
||||
logger.info('Host authenticated and valid.');
|
||||
this.host = verifyResult.host;
|
||||
this.authenticated = true;
|
||||
await setHostOnline(verifyResult);
|
||||
await this.initializeHost();
|
||||
}
|
||||
callback(verifyResult);
|
||||
return;
|
||||
@ -91,6 +107,18 @@ export class SocketHost {
|
||||
});
|
||||
}
|
||||
|
||||
async handleEditObject(data, callback) {
|
||||
const object = await editObject({
|
||||
model: getModelByName(data.objectType),
|
||||
id: data._id,
|
||||
updateData: data.updateData,
|
||||
populate: data.populate,
|
||||
owner: this.host,
|
||||
ownerType: 'host'
|
||||
});
|
||||
callback(object);
|
||||
}
|
||||
|
||||
async handleGetObject(data, callback) {
|
||||
const object = await getObject({
|
||||
model: getModelByName(data.objectType),
|
||||
@ -101,12 +129,45 @@ export class SocketHost {
|
||||
callback(object);
|
||||
}
|
||||
|
||||
async handleListObjects(data, callback) {
|
||||
const object = await listObjects({
|
||||
model: getModelByName(data.objectType),
|
||||
id: data._id,
|
||||
cached: data?.cached,
|
||||
populate: data?.populate,
|
||||
filter: data?.filter,
|
||||
project: data?.project,
|
||||
sort: data?.sort,
|
||||
order: data?.order
|
||||
});
|
||||
callback(object);
|
||||
}
|
||||
|
||||
async handleObjectEventEvent(data) {
|
||||
await this.eventManager.sendObjectEvent(
|
||||
data._id,
|
||||
data.objectType,
|
||||
data.event
|
||||
);
|
||||
}
|
||||
|
||||
async handleSubscribeToObjectActions(data) {
|
||||
await this.actionManager.subscribeToObjectActions(
|
||||
data._id,
|
||||
data.objectType
|
||||
);
|
||||
}
|
||||
|
||||
async handleDisconnect() {
|
||||
if (this.authenticated) {
|
||||
await editObject({
|
||||
model: hostModel,
|
||||
id: this.host._id,
|
||||
updateData: { online: false, state: { type: 'offline' } },
|
||||
updateData: {
|
||||
online: false,
|
||||
state: { type: 'offline' },
|
||||
connectedAt: null
|
||||
},
|
||||
owner: this.host,
|
||||
ownerType: 'host'
|
||||
});
|
||||
|
||||
@ -6,6 +6,7 @@ import { generateHostOTP } from '../utils.js';
|
||||
import { LockManager } from '../lock/lockmanager.js';
|
||||
import { UpdateManager } from '../updates/updatemanager.js';
|
||||
import { ActionManager } from '../actions/actionmanager.js';
|
||||
import { EventManager } from '../events/eventmanager.js';
|
||||
|
||||
const config = loadConfig();
|
||||
|
||||
@ -23,6 +24,7 @@ export class SocketUser {
|
||||
this.lockManager = new LockManager(this);
|
||||
this.updateManager = new UpdateManager(this);
|
||||
this.actionManager = new ActionManager(this);
|
||||
this.eventManager = new EventManager(this);
|
||||
this.templateManager = socketManager.templateManager;
|
||||
this.keycloakAuth = new KeycloakAuth();
|
||||
this.setupSocketEventHandlers();
|
||||
@ -42,6 +44,22 @@ export class SocketUser {
|
||||
'subscribeToObjectUpdate',
|
||||
this.handleSubscribeToObjectUpdateEvent.bind(this)
|
||||
);
|
||||
this.socket.on(
|
||||
'subscribeToObjectEvent',
|
||||
this.handleSubscribeToObjectEventEvent.bind(this)
|
||||
);
|
||||
this.socket.on(
|
||||
'unsubscribeObjectTypeUpdate',
|
||||
this.handleUnsubscribeToObjectTypeUpdateEvent.bind(this)
|
||||
);
|
||||
this.socket.on(
|
||||
'unsubscribeObjectUpdate',
|
||||
this.handleUnsubscribeToObjectUpdateEvent.bind(this)
|
||||
);
|
||||
this.socket.on(
|
||||
'unsubscribeObjectEvent',
|
||||
this.handleUnsubscribeObjectEventEvent.bind(this)
|
||||
);
|
||||
this.socket.on(
|
||||
'previewTemplate',
|
||||
this.handlePreviewTemplateEvent.bind(this)
|
||||
@ -60,6 +78,7 @@ export class SocketUser {
|
||||
const result = await this.keycloakAuth.verifyToken(token);
|
||||
if (result.valid == true) {
|
||||
logger.info('User authenticated and valid.');
|
||||
|
||||
this.user = result.user;
|
||||
this.id = this.user._id.toString();
|
||||
this.authenticated = true;
|
||||
@ -126,18 +145,47 @@ export class SocketUser {
|
||||
}
|
||||
|
||||
async handleSubscribeToObjectTypeUpdateEvent(data, callback) {
|
||||
const result = this.updateManager.subscribeToObjectNew(data.objectType);
|
||||
callback(result);
|
||||
await this.updateManager.subscribeToObjectNew(data.objectType);
|
||||
await this.updateManager.subscribeToObjectDelete(data.objectType);
|
||||
callback({ success: true });
|
||||
}
|
||||
|
||||
async handleSubscribeToObjectUpdateEvent(data, callback) {
|
||||
const result = this.updateManager.subscribeToObjectUpdate(
|
||||
const result = await this.updateManager.subscribeToObjectUpdate(
|
||||
data._id,
|
||||
data.objectType
|
||||
);
|
||||
callback(result);
|
||||
}
|
||||
|
||||
async handleSubscribeToObjectEventEvent(data) {
|
||||
await this.eventManager.subscribeToObjectEvent(
|
||||
data._id,
|
||||
data.objectType,
|
||||
data.eventType
|
||||
);
|
||||
}
|
||||
|
||||
async handleUnsubscribeToObjectTypeUpdateEvent(data) {
|
||||
await this.updateManager.removeObjectNewListener(data.objectType);
|
||||
await this.updateManager.removeObjectDeleteListener(data.objectType);
|
||||
}
|
||||
|
||||
async handleUnsubscribeToObjectUpdateEvent(data) {
|
||||
await this.updateManager.removeObjectUpdateListener(
|
||||
data._id,
|
||||
data.objectType
|
||||
);
|
||||
}
|
||||
|
||||
async handleUnsubscribeObjectEventEvent(data) {
|
||||
await this.eventManager.removeObjectEventsListener(
|
||||
data._id,
|
||||
data.objectType,
|
||||
data.eventType
|
||||
);
|
||||
}
|
||||
|
||||
async handlePreviewTemplateEvent(data, callback) {
|
||||
const result = await this.templateManager.renderTemplate(
|
||||
data._id,
|
||||
|
||||
@ -87,6 +87,15 @@ function getNodeStyles(attributes) {
|
||||
if (attributes?.shrink) {
|
||||
styles += `flex-shrink: ${attributes.shrink};`;
|
||||
}
|
||||
if (attributes?.color) {
|
||||
styles += `color: ${attributes.color};`;
|
||||
}
|
||||
if (attributes?.background) {
|
||||
styles += `background: ${attributes.background};`;
|
||||
}
|
||||
if (attributes?.scale) {
|
||||
styles += `transform: scale(${attributes.scale});`;
|
||||
}
|
||||
return styles;
|
||||
}
|
||||
|
||||
@ -120,24 +129,36 @@ async function transformCustomElements(content) {
|
||||
tree.match({ tag: 'Text' }, node => ({
|
||||
...node,
|
||||
tag: 'p',
|
||||
attrs: { class: 'documentText' }
|
||||
attrs: { class: 'documentText', style: getNodeStyles(node.attrs) }
|
||||
})),
|
||||
tree =>
|
||||
tree.match({ tag: 'Bold' }, node => ({
|
||||
...node,
|
||||
tag: 'strong',
|
||||
attrs: { style: 'font-weight: bold;', class: 'documentBoldText' }
|
||||
attrs: {
|
||||
style: 'font-weight: bold;',
|
||||
class: 'documentBoldText',
|
||||
style: getNodeStyles(node.attrs)
|
||||
}
|
||||
})),
|
||||
tree =>
|
||||
tree.match({ tag: 'Barcode' }, node => {
|
||||
return {
|
||||
tag: 'svg',
|
||||
tag: 'div',
|
||||
content: [
|
||||
{
|
||||
tag: 'svg',
|
||||
attrs: {
|
||||
class: 'documentBarcode',
|
||||
'jsbarcode-displayValue': 'false',
|
||||
'jsbarcode-value': node.content[0],
|
||||
'jsbarcode-format': node.attrs.format
|
||||
}
|
||||
}
|
||||
],
|
||||
attrs: {
|
||||
class: 'documentBarcode',
|
||||
'jsbarcode-width': node.attrs?.width,
|
||||
'jsbarcode-height': node.attrs?.height,
|
||||
'jsbarcode-value': node.content[0],
|
||||
'jsbarcode-format': node.attrs.format
|
||||
class: 'documentBarcodeContainer',
|
||||
style: getNodeStyles(node.attrs)
|
||||
}
|
||||
};
|
||||
}),
|
||||
|
||||
@ -1,9 +1,6 @@
|
||||
import log4js from 'log4js';
|
||||
import { loadConfig } from '../config.js';
|
||||
import NodeCache from 'node-cache';
|
||||
import { etcdServer } from '../database/etcd.js';
|
||||
import { updateObjectCache } from '../database/database.js';
|
||||
|
||||
import { natsServer } from '../database/nats.js';
|
||||
const config = loadConfig();
|
||||
|
||||
// Setup logger
|
||||
@ -19,13 +16,28 @@ export class UpdateManager {
|
||||
}
|
||||
|
||||
async subscribeToObjectNew(objectType) {
|
||||
await etcdServer.onKeyPutEvent(
|
||||
`/${objectType}s/new`,
|
||||
await natsServer.subscribe(
|
||||
`${objectType}s.new`,
|
||||
this.socketClient.socketId,
|
||||
(key, value) => {
|
||||
logger.trace('Object new event:', value);
|
||||
this.socketClient.socket.emit('objectNew', {
|
||||
_id: value,
|
||||
object: value,
|
||||
objectType: objectType
|
||||
});
|
||||
}
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async subscribeToObjectDelete(objectType) {
|
||||
await natsServer.subscribe(
|
||||
`${objectType}s.delete`,
|
||||
this.socketClient.socketId,
|
||||
(key, value) => {
|
||||
logger.trace('Object delete event:', value);
|
||||
this.socketClient.socket.emit('objectDelete', {
|
||||
object: value,
|
||||
objectType: objectType
|
||||
});
|
||||
}
|
||||
@ -34,8 +46,8 @@ export class UpdateManager {
|
||||
}
|
||||
|
||||
async subscribeToObjectUpdate(id, objectType) {
|
||||
await etcdServer.onKeyPutEvent(
|
||||
`/${objectType}s/${id}/object`,
|
||||
await natsServer.subscribe(
|
||||
`${objectType}s.${id}.object`,
|
||||
this.socketClient.socketId,
|
||||
(key, value) => {
|
||||
logger.trace('Object update event:', id);
|
||||
@ -50,52 +62,26 @@ export class UpdateManager {
|
||||
}
|
||||
|
||||
async removeObjectNewListener(objectType) {
|
||||
await etcdServer.removeKeyWatcher(
|
||||
`/${objectType}s/new`,
|
||||
this.socketClient.socketId,
|
||||
'put'
|
||||
await natsServer.removeSubscription(
|
||||
`${objectType}s.new`,
|
||||
this.socketClient.socketId
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async removeObjectDeleteListener(objectType) {
|
||||
await natsServer.removeSubscription(
|
||||
`${objectType}s.delete`,
|
||||
this.socketClient.socketId
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async removeObjectUpdateListener(id, objectType) {
|
||||
await etcdServer.removeKeyWatcher(
|
||||
`/${objectType}s/${id}/object`,
|
||||
this.socketClient.socketId,
|
||||
'put'
|
||||
await natsServer.removeSubscription(
|
||||
`${objectType}s.${id}.object`,
|
||||
this.socketClient.socketId
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async getObjectUpdate(id, objectType) {
|
||||
try {
|
||||
const objectUpdate = {
|
||||
_id: id,
|
||||
objectType: objectType,
|
||||
object: await etcdServer.get(`/${objectType}s/${id}/object`)
|
||||
};
|
||||
logger.trace(`Returning path: /${objectType}s/${id}/object`);
|
||||
return objectUpdate;
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`UpdateManager: Failed to get current value for /${objectType}s/${id}/object:`,
|
||||
error
|
||||
);
|
||||
return { error: 'Not found' };
|
||||
}
|
||||
}
|
||||
|
||||
async setObjectUpdate(id, objectType, value) {
|
||||
try {
|
||||
await etcdServer.set(`/${objectType}s/${id}/object`, value);
|
||||
logger.trace(`Set value for path: /${objectType}s/${id}/object`);
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to set value for /${objectType}s/${id}/object:`,
|
||||
error
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ import { editObject } from './database/database.js';
|
||||
import { hostModel } from './database/schemas/management/host.schema.js';
|
||||
import crypto from 'crypto';
|
||||
import { nanoid } from 'nanoid';
|
||||
import canonicalize from 'canonical-json';
|
||||
|
||||
import { loadConfig } from './config.js';
|
||||
import { userModel } from './database/schemas/management/user.schema.js';
|
||||
@ -86,5 +87,11 @@ export function getChangedValues(oldObj, newObj, old = false) {
|
||||
}
|
||||
|
||||
export function getModelByName(modelName) {
|
||||
return modelList.filter(model => model.modelName == modelName);
|
||||
return modelList.filter(model => model.modelName == modelName)[0];
|
||||
}
|
||||
|
||||
export function jsonToCacheKey(obj) {
|
||||
const normalized = canonicalize(obj);
|
||||
const hash = crypto.createHash('sha256').update(normalized).digest('hex');
|
||||
return hash;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user