Compare commits
9 Commits
b9c2e959b9
...
e1ba1f7871
| Author | SHA1 | Date | |
|---|---|---|---|
| e1ba1f7871 | |||
| ab50a5261d | |||
| 8ccdc81de1 | |||
| ca78fd6e62 | |||
| 7ccc4bb993 | |||
| d6214e316b | |||
| e4c790e7cc | |||
| 4ac87f0141 | |||
| 9fba977b2a |
4
.gitignore
vendored
4
.gitignore
vendored
@ -127,4 +127,6 @@ dist
|
|||||||
.yarn/unplugged
|
.yarn/unplugged
|
||||||
.yarn/build-state.yml
|
.yarn/build-state.yml
|
||||||
.yarn/install-state.gz
|
.yarn/install-state.gz
|
||||||
.pnp.*
|
.pnp.*
|
||||||
|
|
||||||
|
*.DS_STORE
|
||||||
|
|||||||
63
package-lock.json
generated
63
package-lock.json
generated
@ -9,7 +9,10 @@
|
|||||||
"version": "1.0.0",
|
"version": "1.0.0",
|
||||||
"license": "ISC",
|
"license": "ISC",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"@nats-io/nats-core": "^3.1.0",
|
||||||
|
"@nats-io/transport-node": "^3.1.0",
|
||||||
"axios": "^1.11.0",
|
"axios": "^1.11.0",
|
||||||
|
"canonical-json": "^0.2.0",
|
||||||
"date-fns": "^4.1.0",
|
"date-fns": "^4.1.0",
|
||||||
"dayjs": "^1.11.13",
|
"dayjs": "^1.11.13",
|
||||||
"dotenv": "^17.2.1",
|
"dotenv": "^17.2.1",
|
||||||
@ -378,6 +381,51 @@
|
|||||||
"sparse-bitfield": "^3.0.3"
|
"sparse-bitfield": "^3.0.3"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/@nats-io/nats-core": {
|
||||||
|
"version": "3.1.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@nats-io/nats-core/-/nats-core-3.1.0.tgz",
|
||||||
|
"integrity": "sha512-xsSkLEGGcqNF+Ru8dMjPmKtfbBeq/U4meuJJX4Zi+5TBHpjpjNjs4YkCBC/pGYWnEum1/vdNPizjE1RdNHCyBg==",
|
||||||
|
"license": "Apache-2.0",
|
||||||
|
"dependencies": {
|
||||||
|
"@nats-io/nkeys": "2.0.3",
|
||||||
|
"@nats-io/nuid": "2.0.3"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"node_modules/@nats-io/nkeys": {
|
||||||
|
"version": "2.0.3",
|
||||||
|
"resolved": "https://registry.npmjs.org/@nats-io/nkeys/-/nkeys-2.0.3.tgz",
|
||||||
|
"integrity": "sha512-JVt56GuE6Z89KUkI4TXUbSI9fmIfAmk6PMPknijmuL72GcD+UgIomTcRWiNvvJKxA01sBbmIPStqJs5cMRBC3A==",
|
||||||
|
"license": "Apache-2.0",
|
||||||
|
"dependencies": {
|
||||||
|
"tweetnacl": "^1.0.3"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": ">=18.0.0"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"node_modules/@nats-io/nuid": {
|
||||||
|
"version": "2.0.3",
|
||||||
|
"resolved": "https://registry.npmjs.org/@nats-io/nuid/-/nuid-2.0.3.tgz",
|
||||||
|
"integrity": "sha512-TpA3HEBna/qMVudy+3HZr5M3mo/L1JPofpVT4t0HkFGkz2Cn9wrlrQC8tvR8Md5Oa9//GtGG26eN0qEWF5Vqew==",
|
||||||
|
"license": "Apache-2.0",
|
||||||
|
"engines": {
|
||||||
|
"node": ">= 18.x"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"node_modules/@nats-io/transport-node": {
|
||||||
|
"version": "3.1.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@nats-io/transport-node/-/transport-node-3.1.0.tgz",
|
||||||
|
"integrity": "sha512-k5pH7IOKUetwXOMraVgcB5zG0wibcHOwJJuyuY1/5Q4K0XfBJDnb/IbczP5/JJWwMYfxSL9O+46ojtdBHvHRSw==",
|
||||||
|
"license": "Apache-2.0",
|
||||||
|
"dependencies": {
|
||||||
|
"@nats-io/nats-core": "3.1.0",
|
||||||
|
"@nats-io/nkeys": "2.0.3",
|
||||||
|
"@nats-io/nuid": "2.0.3"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": ">= 18.0.0"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/@nodelib/fs.scandir": {
|
"node_modules/@nodelib/fs.scandir": {
|
||||||
"version": "2.1.5",
|
"version": "2.1.5",
|
||||||
"resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz",
|
"resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz",
|
||||||
@ -1054,6 +1102,15 @@
|
|||||||
"node": ">=6"
|
"node": ">=6"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/canonical-json": {
|
||||||
|
"version": "0.2.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/canonical-json/-/canonical-json-0.2.0.tgz",
|
||||||
|
"integrity": "sha512-xeH/NgtNA7kIuKSxopJVdXqCKWyDB79aqxQRQ9FV02fvmqW7DSnjoFyzAUrBpfbwjU6lwTYOLc+HM6KupbsfVQ==",
|
||||||
|
"license": "MIT",
|
||||||
|
"bin": {
|
||||||
|
"canonical-json": "canonical-json.js"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/chalk": {
|
"node_modules/chalk": {
|
||||||
"version": "4.1.2",
|
"version": "4.1.2",
|
||||||
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
|
"resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz",
|
||||||
@ -6401,6 +6458,12 @@
|
|||||||
"strip-bom": "^3.0.0"
|
"strip-bom": "^3.0.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/tweetnacl": {
|
||||||
|
"version": "1.0.3",
|
||||||
|
"resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-1.0.3.tgz",
|
||||||
|
"integrity": "sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==",
|
||||||
|
"license": "Unlicense"
|
||||||
|
},
|
||||||
"node_modules/type-check": {
|
"node_modules/type-check": {
|
||||||
"version": "0.4.0",
|
"version": "0.4.0",
|
||||||
"resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz",
|
"resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz",
|
||||||
|
|||||||
@ -17,7 +17,10 @@
|
|||||||
"author": "Tom Butcher",
|
"author": "Tom Butcher",
|
||||||
"license": "ISC",
|
"license": "ISC",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"@nats-io/nats-core": "^3.1.0",
|
||||||
|
"@nats-io/transport-node": "^3.1.0",
|
||||||
"axios": "^1.11.0",
|
"axios": "^1.11.0",
|
||||||
|
"canonical-json": "^0.2.0",
|
||||||
"date-fns": "^4.1.0",
|
"date-fns": "^4.1.0",
|
||||||
"dayjs": "^1.11.13",
|
"dayjs": "^1.11.13",
|
||||||
"dotenv": "^17.2.1",
|
"dotenv": "^17.2.1",
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
import log4js from 'log4js';
|
import log4js from 'log4js';
|
||||||
import { loadConfig } from '../config.js';
|
import { loadConfig } from '../config.js';
|
||||||
import { etcdServer } from '../database/etcd.js';
|
import { natsServer } from '../database/nats.js';
|
||||||
import { generateEtcId } from '../utils.js';
|
import { generateEtcId } from '../utils.js';
|
||||||
|
|
||||||
const config = loadConfig();
|
const config = loadConfig();
|
||||||
@ -10,7 +10,7 @@ const logger = log4js.getLogger('Action Manager');
|
|||||||
logger.level = config.server.logLevel;
|
logger.level = config.server.logLevel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ActionManager handles tracking object updates using Etcd and broadcasts update events via websockets.
|
* ActionManager handles tracking object updates using NATS and broadcasts update events via websockets.
|
||||||
*/
|
*/
|
||||||
export class ActionManager {
|
export class ActionManager {
|
||||||
constructor(socketClient) {
|
constructor(socketClient) {
|
||||||
@ -20,10 +20,12 @@ export class ActionManager {
|
|||||||
|
|
||||||
async subscribeToObjectActions(id, objectType) {
|
async subscribeToObjectActions(id, objectType) {
|
||||||
logger.debug('Subscribing to object actions...', id, objectType);
|
logger.debug('Subscribing to object actions...', id, objectType);
|
||||||
await etcdServer.onPrefixPutEvent(
|
const subject = `${objectType}s.${id}.actions`;
|
||||||
`/${objectType}s/${id}/actions`,
|
|
||||||
|
await natsServer.subscribe(
|
||||||
|
subject,
|
||||||
this.socketClient.id,
|
this.socketClient.id,
|
||||||
(key, value) => {
|
(subject, value) => {
|
||||||
if (!value?.result) {
|
if (!value?.result) {
|
||||||
logger.trace('Object action:', id);
|
logger.trace('Object action:', id);
|
||||||
this.socketClient.socket.emit(
|
this.socketClient.socket.emit(
|
||||||
@ -34,9 +36,9 @@ export class ActionManager {
|
|||||||
action: { ...value }
|
action: { ...value }
|
||||||
},
|
},
|
||||||
result => {
|
result => {
|
||||||
logger.trace('Got action result:', key);
|
logger.trace('Got action result:', subject);
|
||||||
const actionId = key.split('/').pop();
|
const actionId = value.actionId || generateEtcId();
|
||||||
etcdServer.setKey(`/${objectType}s/${id}/actions/${actionId}`, {
|
natsServer.publish(`${subject}.${actionId}`, {
|
||||||
...value,
|
...value,
|
||||||
result: { ...result }
|
result: { ...result }
|
||||||
});
|
});
|
||||||
@ -49,49 +51,49 @@ export class ActionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async removeObjectActionsListener(id, objectType) {
|
async removeObjectActionsListener(id, objectType) {
|
||||||
await etcdServer.removePrefixWatcher(
|
const subject = `${objectType}s.${id}.actions`;
|
||||||
`/${objectType}s/${id}/actions`,
|
await natsServer.removeSubscription(subject, this.socketClient.id);
|
||||||
this.socketClient.id,
|
|
||||||
'put'
|
|
||||||
);
|
|
||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
async sendObjectAction(id, objectType, action, callback) {
|
async sendObjectAction(id, objectType, action, callback) {
|
||||||
|
const actionId = generateEtcId();
|
||||||
|
const subject = `${objectType}s.${id}.actions.${actionId}`;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const actionId = generateEtcId();
|
|
||||||
this.callbacks.set(actionId, callback);
|
this.callbacks.set(actionId, callback);
|
||||||
logger.trace(
|
logger.trace(
|
||||||
`Calling action id: ${actionId}, object id: ${id}, object type: ${objectType} Action:`,
|
`Calling action id: ${actionId}, object id: ${id}, object type: ${objectType} Action:`,
|
||||||
action
|
action
|
||||||
);
|
);
|
||||||
await etcdServer.onKeyPutEvent(
|
|
||||||
`/${objectType}s/${id}/actions/${actionId}`,
|
// Subscribe to the response subject
|
||||||
|
await natsServer.subscribe(
|
||||||
|
subject,
|
||||||
this.socketClient.socketId,
|
this.socketClient.socketId,
|
||||||
async (key, value) => {
|
async (subject, value) => {
|
||||||
if (value.result) {
|
if (value.result) {
|
||||||
logger.trace('Calling result callback...');
|
logger.trace('Calling result callback...');
|
||||||
const storedCallback = this.callbacks.get(actionId);
|
const storedCallback = this.callbacks.get(actionId);
|
||||||
await etcdServer.removeKeyWatcher(
|
await natsServer.removeSubscription(
|
||||||
`/${objectType}s/${id}/actions/${actionId}`,
|
subject,
|
||||||
this.socketClient.socketId,
|
this.socketClient.socketId
|
||||||
'put'
|
|
||||||
);
|
|
||||||
await etcdServer.deleteKey(
|
|
||||||
`/${objectType}s/${id}/actions/${actionId}`
|
|
||||||
);
|
);
|
||||||
storedCallback(value.result);
|
storedCallback(value.result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
await etcdServer.setKey(
|
|
||||||
`/${objectType}s/${id}/actions/${actionId}`,
|
// Publish the action
|
||||||
action
|
await natsServer.publish(`${objectType}s.${id}.actions`, {
|
||||||
);
|
...action,
|
||||||
|
actionId: actionId
|
||||||
|
});
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
`Failed to set value for /${objectType}s/${id}/object:`,
|
`Failed to send action for ${objectType}s.${id}.actions.${actionId}:`,
|
||||||
error
|
error
|
||||||
);
|
);
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@ -4,11 +4,7 @@ import jwt from 'jsonwebtoken';
|
|||||||
import log4js from 'log4js';
|
import log4js from 'log4js';
|
||||||
// Load configuration
|
// Load configuration
|
||||||
import { loadConfig } from '../config.js';
|
import { loadConfig } from '../config.js';
|
||||||
import {
|
import { editObject, getObject, listObjects } from '../database/database.js';
|
||||||
editObject,
|
|
||||||
getObject,
|
|
||||||
getObjectByFilter
|
|
||||||
} from '../database/database.js';
|
|
||||||
import { hostModel } from '../database/schemas/management/host.schema.js';
|
import { hostModel } from '../database/schemas/management/host.schema.js';
|
||||||
import { userModel } from '../database/schemas/management/user.schema.js';
|
import { userModel } from '../database/schemas/management/user.schema.js';
|
||||||
import { generateAuthCode } from '../utils.js';
|
import { generateAuthCode } from '../utils.js';
|
||||||
@ -82,16 +78,16 @@ export class KeycloakAuth {
|
|||||||
roles: this.extractRoles(decodedToken)
|
roles: this.extractRoles(decodedToken)
|
||||||
};
|
};
|
||||||
|
|
||||||
const user = await getObjectByFilter({
|
const user = await listObjects({
|
||||||
model: userModel,
|
model: userModel,
|
||||||
filter: { username: decodedUser.username }
|
filter: { username: decodedUser.username }
|
||||||
});
|
});
|
||||||
|
|
||||||
// Cache the verified token
|
// Cache the verified token
|
||||||
const expiresAt = introspection.exp * 1000; // Convert to milliseconds
|
const expiresAt = introspection.exp * 1000; // Convert to milliseconds
|
||||||
this.tokenCache.set(token, { expiresAt, user });
|
this.tokenCache.set(token, { expiresAt, user: user[0] });
|
||||||
|
|
||||||
return { valid: true, user };
|
return { valid: true, user: user[0] };
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('Token verification error:', error.message);
|
logger.error('Token verification error:', error.message);
|
||||||
return { valid: false };
|
return { valid: false };
|
||||||
@ -167,13 +163,14 @@ export class CodeAuth {
|
|||||||
|
|
||||||
async verifyOtp(otp) {
|
async verifyOtp(otp) {
|
||||||
try {
|
try {
|
||||||
const host = await getObjectByFilter({
|
const hosts = await listObjects({
|
||||||
model: hostModel,
|
model: hostModel,
|
||||||
filter: { otp: otp },
|
filter: { otp: otp },
|
||||||
cached: false
|
cached: false
|
||||||
});
|
});
|
||||||
|
const host = hosts[0];
|
||||||
if (host == undefined) {
|
if (host == undefined) {
|
||||||
const error = 'No host found with OTP.';
|
const error = `No host found with OTP: ${otp}`;
|
||||||
logger.warn(error);
|
logger.warn(error);
|
||||||
return { valid: false, error: error };
|
return { valid: false, error: error };
|
||||||
}
|
}
|
||||||
@ -203,9 +200,10 @@ export class CodeAuth {
|
|||||||
id: id,
|
id: id,
|
||||||
updateData: { authCode: generateAuthCode() }
|
updateData: { authCode: generateAuthCode() }
|
||||||
});
|
});
|
||||||
|
logger.info('Host found with OTP:', otp);
|
||||||
return { valid: true, host: authCodeHost };
|
return { valid: true, host: authCodeHost };
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('Code verification error:', error.message);
|
logger.error('OTP verification error:', error.message);
|
||||||
return { valid: false, error: error.message };
|
return { valid: false, error: error.message };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -218,7 +216,6 @@ export function createAuthMiddleware(socketUser) {
|
|||||||
|
|
||||||
// Allow the 'authenticate' event through without checks
|
// Allow the 'authenticate' event through without checks
|
||||||
|
|
||||||
logger.trace('Event:', event);
|
|
||||||
if (event === 'authenticate') {
|
if (event === 'authenticate') {
|
||||||
next();
|
next();
|
||||||
return;
|
return;
|
||||||
|
|||||||
@ -11,6 +11,7 @@ import {
|
|||||||
import log4js from 'log4js';
|
import log4js from 'log4js';
|
||||||
import { loadConfig } from '../config.js';
|
import { loadConfig } from '../config.js';
|
||||||
import { userModel } from './schemas/management/user.schema.js';
|
import { userModel } from './schemas/management/user.schema.js';
|
||||||
|
import { jsonToCacheKey } from '../utils.js';
|
||||||
|
|
||||||
const config = loadConfig();
|
const config = loadConfig();
|
||||||
|
|
||||||
@ -19,44 +20,31 @@ const cacheLogger = log4js.getLogger('Local Cache');
|
|||||||
logger.level = config.server.logLevel;
|
logger.level = config.server.logLevel;
|
||||||
cacheLogger.level = config.server.logLevel;
|
cacheLogger.level = config.server.logLevel;
|
||||||
|
|
||||||
const modelCaches = new Map();
|
const objectCache = new NodeCache({
|
||||||
|
stdTTL: 30, // 30 sec expiration
|
||||||
|
checkperiod: 600, // 30 sec periodic cleanup
|
||||||
|
useClones: false // Don't clone objects for better performance
|
||||||
|
});
|
||||||
const listCache = new NodeCache({
|
const listCache = new NodeCache({
|
||||||
stdTTL: 30, // 30 sec expiration
|
stdTTL: 30, // 30 sec expiration
|
||||||
checkperiod: 600, // 30 sec periodic cleanup
|
checkperiod: 600, // 30 sec periodic cleanup
|
||||||
useClones: false // Don't clone objects for better performance
|
useClones: false // Don't clone objects for better performance
|
||||||
});
|
});
|
||||||
|
|
||||||
function getModelCache(model) {
|
export const retrieveObjectCache = ({ model, id, populate = [] }) => {
|
||||||
const modelName = model.modelName;
|
const cacheKeyObject = {
|
||||||
const modelCache = modelCaches.get(modelName);
|
|
||||||
if (modelCache == undefined) {
|
|
||||||
logger.trace('Creating new model cache...');
|
|
||||||
const newModelCache = new NodeCache({
|
|
||||||
stdTTL: 30, // 30 sec expiration
|
|
||||||
checkperiod: 30, // 30 sec periodic cleanup
|
|
||||||
useClones: false // Don't clone objects for better performance
|
|
||||||
});
|
|
||||||
modelCaches.set(modelName, newModelCache);
|
|
||||||
return newModelCache;
|
|
||||||
}
|
|
||||||
logger.trace('Getting model cache...');
|
|
||||||
return modelCache;
|
|
||||||
}
|
|
||||||
|
|
||||||
export const retrieveObjectCache = ({ model, id }) => {
|
|
||||||
cacheLogger.trace('Retrieving:', {
|
|
||||||
model: model.modelName,
|
model: model.modelName,
|
||||||
id
|
id,
|
||||||
});
|
populate
|
||||||
const modelCache = getModelCache(model);
|
};
|
||||||
|
|
||||||
const cachedObject = modelCache.get(id);
|
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||||
|
|
||||||
|
cacheLogger.trace('Retrieving:');
|
||||||
|
const cachedObject = objectCache.get(cacheKey);
|
||||||
|
|
||||||
if (cachedObject == undefined) {
|
if (cachedObject == undefined) {
|
||||||
cacheLogger.trace('Miss:', {
|
cacheLogger.trace('Miss:', cacheKeyObject);
|
||||||
model: model.modelName,
|
|
||||||
id
|
|
||||||
});
|
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,25 +56,36 @@ export const retrieveObjectCache = ({ model, id }) => {
|
|||||||
return cachedObject;
|
return cachedObject;
|
||||||
};
|
};
|
||||||
|
|
||||||
export const retrieveObjectsCache = ({ model }) => {
|
export const retrieveListCache = ({
|
||||||
cacheLogger.trace('Retrieving:', {
|
model,
|
||||||
model: model.modelName
|
populate = [],
|
||||||
});
|
filter = {},
|
||||||
const modelCache = getModelCache(model);
|
sort = '',
|
||||||
|
order = 'ascend',
|
||||||
|
project = {}
|
||||||
|
}) => {
|
||||||
|
const cacheKeyObject = {
|
||||||
|
model: model.modelName,
|
||||||
|
id,
|
||||||
|
populate,
|
||||||
|
filter,
|
||||||
|
sort,
|
||||||
|
project,
|
||||||
|
order
|
||||||
|
};
|
||||||
|
|
||||||
const modelCacheKeys = modelCache.keys();
|
cacheLogger.trace('Retrieving:', cacheKeyObject);
|
||||||
|
|
||||||
const cachedList = listCache.get(model.modelName);
|
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||||
|
|
||||||
if (cachedList == true) {
|
const cachedList = listCache.get(cacheKey);
|
||||||
const cachedObjects = modelCacheKeys.map(key => modelCache.get(key));
|
|
||||||
|
|
||||||
|
if (cachedList != undefined) {
|
||||||
cacheLogger.trace('Hit:', {
|
cacheLogger.trace('Hit:', {
|
||||||
model: model.modelName,
|
...cacheKeyObject,
|
||||||
length: cachedObjects.length
|
length: cachedList.length
|
||||||
});
|
});
|
||||||
|
return cachedList;
|
||||||
return cachedObjects;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cacheLogger.trace('Miss:', {
|
cacheLogger.trace('Miss:', {
|
||||||
@ -95,21 +94,23 @@ export const retrieveObjectsCache = ({ model }) => {
|
|||||||
return undefined;
|
return undefined;
|
||||||
};
|
};
|
||||||
|
|
||||||
export const updateObjectCache = ({ model, id, object }) => {
|
export const updateObjectCache = ({ model, id, object, populate = [] }) => {
|
||||||
cacheLogger.trace('Updating:', {
|
const cacheKeyObject = {
|
||||||
model: model.modelName,
|
model: model.modelName,
|
||||||
id
|
id,
|
||||||
});
|
populate
|
||||||
const modelCache = getModelCache(model);
|
};
|
||||||
const cachedObject = modelCache.get(id) || {};
|
|
||||||
|
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||||
|
|
||||||
|
cacheLogger.trace('Updating:', cacheKeyObject);
|
||||||
|
|
||||||
|
const cachedObject = objectCache.get(cacheKey) || {};
|
||||||
const mergedObject = _.merge(cachedObject, object);
|
const mergedObject = _.merge(cachedObject, object);
|
||||||
|
|
||||||
modelCache.set(id, mergedObject);
|
objectCache.set(cacheKey, mergedObject);
|
||||||
|
|
||||||
cacheLogger.trace('Updated:', {
|
cacheLogger.trace('Updated:', { ...cacheKeyObject });
|
||||||
model: model.modelName,
|
|
||||||
id
|
|
||||||
});
|
|
||||||
|
|
||||||
return mergedObject;
|
return mergedObject;
|
||||||
};
|
};
|
||||||
@ -130,29 +131,39 @@ export const deleteObjectCache = ({ model, id }) => {
|
|||||||
return mergedObject;
|
return mergedObject;
|
||||||
};
|
};
|
||||||
|
|
||||||
export const updateObjectsCache = ({ model, objects }) => {
|
export const updateListCache = ({
|
||||||
cacheLogger.trace('Updating:', {
|
model,
|
||||||
|
objects,
|
||||||
|
populate = [],
|
||||||
|
filter = {},
|
||||||
|
sort = '',
|
||||||
|
order = 'ascend',
|
||||||
|
project = {}
|
||||||
|
}) => {
|
||||||
|
const cacheKeyObject = {
|
||||||
model: model.modelName,
|
model: model.modelName,
|
||||||
|
populate,
|
||||||
|
filter,
|
||||||
|
sort,
|
||||||
|
project,
|
||||||
|
order
|
||||||
|
};
|
||||||
|
|
||||||
|
cacheLogger.trace('Updating:', {
|
||||||
|
...cacheKeyObject,
|
||||||
length: objects.length
|
length: objects.length
|
||||||
});
|
});
|
||||||
const modelCache = getModelCache(model);
|
|
||||||
|
|
||||||
objects.forEach(object => {
|
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||||
const cachedObject = modelCache.get(object._id) || {};
|
|
||||||
|
|
||||||
const mergedObject = _.merge(cachedObject, object);
|
listCache.set(cacheKey, objects);
|
||||||
|
|
||||||
modelCache.set(object._id, mergedObject);
|
|
||||||
});
|
|
||||||
|
|
||||||
listCache.set(model.modelName, true);
|
|
||||||
|
|
||||||
cacheLogger.trace('Updated:', {
|
cacheLogger.trace('Updated:', {
|
||||||
model: model.modelName,
|
...cacheKeyObject,
|
||||||
length: objects.length
|
length: objects.length
|
||||||
});
|
});
|
||||||
|
|
||||||
return mergedObject;
|
return objects;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Reusable function to list objects with aggregation, filtering, search, sorting, and pagination
|
// Reusable function to list objects with aggregation, filtering, search, sorting, and pagination
|
||||||
@ -162,27 +173,29 @@ export const listObjects = async ({
|
|||||||
filter = {},
|
filter = {},
|
||||||
sort = '',
|
sort = '',
|
||||||
order = 'ascend',
|
order = 'ascend',
|
||||||
project, // optional: override default projection
|
project = {}, // optional: override default projection
|
||||||
cached = false
|
cached = false
|
||||||
}) => {
|
}) => {
|
||||||
try {
|
try {
|
||||||
logger.trace('Listing objects:', {
|
logger.trace('Listing objects:', {
|
||||||
model,
|
model,
|
||||||
populate,
|
populate,
|
||||||
page,
|
|
||||||
limit,
|
|
||||||
filter,
|
filter,
|
||||||
sort,
|
sort,
|
||||||
order,
|
order,
|
||||||
project,
|
project,
|
||||||
cache
|
cached
|
||||||
});
|
});
|
||||||
|
|
||||||
var cacheKey = undefined;
|
|
||||||
var modelCache = getModelCache(model);
|
|
||||||
|
|
||||||
if (cached == true) {
|
if (cached == true) {
|
||||||
const objectsCache = retrieveObjectsCache({ model });
|
const objectsCache = retrieveObjectsCache({
|
||||||
|
model,
|
||||||
|
populate,
|
||||||
|
filter,
|
||||||
|
sort,
|
||||||
|
order,
|
||||||
|
project
|
||||||
|
});
|
||||||
if (objectsCache != undefined) {
|
if (objectsCache != undefined) {
|
||||||
return objectsCache;
|
return objectsCache;
|
||||||
}
|
}
|
||||||
@ -210,7 +223,7 @@ export const listObjects = async ({
|
|||||||
let query = model.find(filter).sort({ [sort]: sortOrder });
|
let query = model.find(filter).sort({ [sort]: sortOrder });
|
||||||
|
|
||||||
// Handle populate (array or single value)
|
// Handle populate (array or single value)
|
||||||
if (populate) {
|
if (populate.length > 0) {
|
||||||
if (Array.isArray(populate)) {
|
if (Array.isArray(populate)) {
|
||||||
for (const pop of populate) {
|
for (const pop of populate) {
|
||||||
query = query.populate(pop);
|
query = query.populate(pop);
|
||||||
@ -221,7 +234,7 @@ export const listObjects = async ({
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Handle select (projection)
|
// Handle select (projection)
|
||||||
if (project) {
|
if (project != {}) {
|
||||||
query = query.select(project);
|
query = query.select(project);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,18 +244,25 @@ export const listObjects = async ({
|
|||||||
|
|
||||||
const finalResult = expandObjectIds(queryResult);
|
const finalResult = expandObjectIds(queryResult);
|
||||||
|
|
||||||
updateObjectsCache({ model, objects });
|
updateListCache({
|
||||||
|
model,
|
||||||
|
objects: finalResult,
|
||||||
|
populate,
|
||||||
|
filter,
|
||||||
|
sort,
|
||||||
|
order,
|
||||||
|
project
|
||||||
|
});
|
||||||
|
|
||||||
logger.trace('Retreived from database:', {
|
logger.trace('Retreived from database:', {
|
||||||
model,
|
model,
|
||||||
populate,
|
populate,
|
||||||
page,
|
|
||||||
limit,
|
|
||||||
filter,
|
filter,
|
||||||
sort,
|
sort,
|
||||||
order,
|
order,
|
||||||
project,
|
project,
|
||||||
cache
|
cached,
|
||||||
|
length: finalResult.length
|
||||||
});
|
});
|
||||||
return finalResult;
|
return finalResult;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@ -252,7 +272,12 @@ export const listObjects = async ({
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Reusable function to get a single object by ID
|
// Reusable function to get a single object by ID
|
||||||
export const getObject = async ({ model, id, populate, cached = false }) => {
|
export const getObject = async ({
|
||||||
|
model,
|
||||||
|
id,
|
||||||
|
populate = [],
|
||||||
|
cached = false
|
||||||
|
}) => {
|
||||||
try {
|
try {
|
||||||
logger.trace('Getting object:', {
|
logger.trace('Getting object:', {
|
||||||
model,
|
model,
|
||||||
@ -261,7 +286,7 @@ export const getObject = async ({ model, id, populate, cached = false }) => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (cached == true) {
|
if (cached == true) {
|
||||||
const cachedObject = retrieveObjectCache({ model, id });
|
const cachedObject = retrieveObjectCache({ model, id, populate });
|
||||||
if (cachedObject != undefined) {
|
if (cachedObject != undefined) {
|
||||||
return cachedObject;
|
return cachedObject;
|
||||||
}
|
}
|
||||||
@ -299,63 +324,14 @@ export const getObject = async ({ model, id, populate, cached = false }) => {
|
|||||||
updateObjectCache({
|
updateObjectCache({
|
||||||
model: model,
|
model: model,
|
||||||
id: finalResult._id.toString(),
|
id: finalResult._id.toString(),
|
||||||
|
populate,
|
||||||
object: finalResult
|
object: finalResult
|
||||||
});
|
});
|
||||||
|
|
||||||
return finalResult;
|
return finalResult;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('An error retreiving object:', error.message);
|
logger.error('An error retreiving object:', error.message);
|
||||||
return undefined;
|
throw error;
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Reusable function to get a single object by ID
|
|
||||||
export const getObjectByFilter = async ({ model, filter, populate }) => {
|
|
||||||
try {
|
|
||||||
logger.trace('Getting object:', {
|
|
||||||
model,
|
|
||||||
filter,
|
|
||||||
populate
|
|
||||||
});
|
|
||||||
|
|
||||||
let query = model.findOne(filter).lean();
|
|
||||||
|
|
||||||
// Handle populate (array or single value)
|
|
||||||
if (populate) {
|
|
||||||
if (Array.isArray(populate)) {
|
|
||||||
for (const pop of populate) {
|
|
||||||
query = query.populate(pop);
|
|
||||||
}
|
|
||||||
} else if (typeof populate === 'string' || typeof populate === 'object') {
|
|
||||||
query = query.populate(populate);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const finalResult = await query;
|
|
||||||
|
|
||||||
if (!finalResult) {
|
|
||||||
logger.warn('Object not found in database:', {
|
|
||||||
model,
|
|
||||||
filter,
|
|
||||||
populate
|
|
||||||
});
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.trace('Retreived object from database:', {
|
|
||||||
model,
|
|
||||||
filter,
|
|
||||||
populate
|
|
||||||
});
|
|
||||||
|
|
||||||
updateObjectCache({
|
|
||||||
model: model,
|
|
||||||
id: finalResult._id.toString(),
|
|
||||||
object: finalResult
|
|
||||||
});
|
|
||||||
|
|
||||||
return finalResult;
|
|
||||||
} catch (error) {
|
|
||||||
logger.error('An error retreiving object:', error.message);
|
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -367,7 +343,7 @@ export const editObject = async ({
|
|||||||
updateData,
|
updateData,
|
||||||
owner = undefined,
|
owner = undefined,
|
||||||
ownerType = undefined,
|
ownerType = undefined,
|
||||||
populate
|
populate = []
|
||||||
}) => {
|
}) => {
|
||||||
try {
|
try {
|
||||||
// Determine parentType from model name
|
// Determine parentType from model name
|
||||||
|
|||||||
307
src/database/nats.js
Normal file
307
src/database/nats.js
Normal file
@ -0,0 +1,307 @@
|
|||||||
|
import { connect } from '@nats-io/transport-node';
|
||||||
|
import log4js from 'log4js';
|
||||||
|
import { loadConfig } from '../config.js';
|
||||||
|
|
||||||
|
const config = loadConfig();
|
||||||
|
const logger = log4js.getLogger('Nats');
|
||||||
|
logger.level = config.server.logLevel;
|
||||||
|
|
||||||
|
class NatsServer {
|
||||||
|
constructor() {
|
||||||
|
this.client = null;
|
||||||
|
this.subscriptions = new Map(); // subject → { subscription, callbacks }
|
||||||
|
this.requestHandlers = new Map(); // subject → { handler, callbacks }
|
||||||
|
this.queuedSubscriptions = new Map(); // subject → { subscription, callbacks, queue }
|
||||||
|
|
||||||
|
const natsConfig = config.database?.nats || config.database; // fallback for production config
|
||||||
|
const host = natsConfig.host || 'localhost';
|
||||||
|
const port = natsConfig.port || 4222;
|
||||||
|
this.servers = [`nats://${host}:${port}`];
|
||||||
|
this.textEncoder = new TextEncoder();
|
||||||
|
this.textDecoder = new TextDecoder();
|
||||||
|
|
||||||
|
logger.trace(`NatsServer: servers set to ${JSON.stringify(this.servers)}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
async connect() {
|
||||||
|
if (!this.client) {
|
||||||
|
logger.info('Connecting to NATS...');
|
||||||
|
logger.trace(
|
||||||
|
`Creating NATS client with servers ${JSON.stringify(this.servers)}`
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.client = await connect({
|
||||||
|
servers: this.servers,
|
||||||
|
reconnect: true,
|
||||||
|
maxReconnectAttempts: -1, // unlimited reconnects
|
||||||
|
reconnectTimeWait: 1000,
|
||||||
|
timeout: 20000
|
||||||
|
});
|
||||||
|
|
||||||
|
// Test connection by checking if client is connected
|
||||||
|
try {
|
||||||
|
if (this.client.isClosed()) {
|
||||||
|
throw new Error('NATS client connection failed');
|
||||||
|
}
|
||||||
|
logger.trace('NATS client connected successfully.');
|
||||||
|
} catch (error) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('Failed to connect to NATS:', error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.trace('NATS client already exists, skipping connection.');
|
||||||
|
}
|
||||||
|
return this.client;
|
||||||
|
}
|
||||||
|
|
||||||
|
async getClient() {
|
||||||
|
if (!this.client) {
|
||||||
|
logger.trace('No client found, calling connect().');
|
||||||
|
await this.connect();
|
||||||
|
}
|
||||||
|
return this.client;
|
||||||
|
}
|
||||||
|
|
||||||
|
async publish(subject, data) {
|
||||||
|
const client = await this.getClient();
|
||||||
|
const payload = typeof data === 'string' ? data : JSON.stringify(data);
|
||||||
|
|
||||||
|
try {
|
||||||
|
client.publish(subject, this.textEncoder.encode(payload));
|
||||||
|
logger.trace(`Published to subject: ${subject}, data: ${payload}`);
|
||||||
|
return { success: true };
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Failed to publish to subject ${subject}:`, error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async request(subject, data, timeout = 30000) {
|
||||||
|
const client = await this.getClient();
|
||||||
|
const payload = typeof data === 'string' ? data : JSON.stringify(data);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await client.request(
|
||||||
|
subject,
|
||||||
|
this.textEncoder.encode(payload),
|
||||||
|
{
|
||||||
|
timeout: timeout
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const responseData = this.textDecoder.decode(response.data);
|
||||||
|
logger.trace(`Request to subject: ${subject}, response: ${responseData}`);
|
||||||
|
|
||||||
|
// Try to parse as JSON, fallback to string
|
||||||
|
try {
|
||||||
|
return JSON.parse(responseData);
|
||||||
|
} catch {
|
||||||
|
return responseData;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
if (error.code === 'TIMEOUT') {
|
||||||
|
logger.trace(`Request timeout for subject: ${subject}`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async subscribe(subject, owner, callback) {
|
||||||
|
const client = await this.getClient();
|
||||||
|
const subscriptionKey = subject;
|
||||||
|
|
||||||
|
if (this.subscriptions.has(subscriptionKey)) {
|
||||||
|
this.subscriptions.get(subscriptionKey).callbacks.set(owner, callback);
|
||||||
|
logger.trace(
|
||||||
|
`Added subscription callback for owner=${owner} on subject=${subject}`
|
||||||
|
);
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.trace(`Creating new subscription for subject: ${subject}`);
|
||||||
|
const subscription = client.subscribe(subject);
|
||||||
|
const callbacks = new Map();
|
||||||
|
callbacks.set(owner, callback);
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
for await (const msg of subscription) {
|
||||||
|
logger.trace(`Message received on subject: ${subject}`);
|
||||||
|
const data = this.textDecoder.decode(msg.data);
|
||||||
|
let parsedData;
|
||||||
|
|
||||||
|
try {
|
||||||
|
parsedData = JSON.parse(data);
|
||||||
|
} catch {
|
||||||
|
parsedData = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const [ownerId, cb] of callbacks) {
|
||||||
|
try {
|
||||||
|
cb(subject, parsedData, msg);
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(
|
||||||
|
`Error in subscription callback for owner=${ownerId}, subject=${subject}:`,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})().catch(err => {
|
||||||
|
logger.error(`Subscription error for subject ${subject}:`, err);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.subscriptions.set(subscriptionKey, { subscription, callbacks });
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
async setRequestHandler(subject, owner, handler) {
|
||||||
|
const client = await this.getClient();
|
||||||
|
const handlerKey = subject;
|
||||||
|
|
||||||
|
if (this.requestHandlers.has(handlerKey)) {
|
||||||
|
this.requestHandlers.get(handlerKey).callbacks.set(owner, handler);
|
||||||
|
logger.trace(
|
||||||
|
`Added request handler for owner=${owner} on subject=${subject}`
|
||||||
|
);
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.trace(`Creating new request handler for subject: ${subject}`);
|
||||||
|
const subscription = client.subscribe(subject);
|
||||||
|
const callbacks = new Map();
|
||||||
|
callbacks.set(owner, handler);
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
for await (const msg of subscription) {
|
||||||
|
logger.trace(`Request received on subject: ${subject}`);
|
||||||
|
const data = this.textDecoder.decode(msg.data);
|
||||||
|
let parsedData;
|
||||||
|
|
||||||
|
try {
|
||||||
|
parsedData = JSON.parse(data);
|
||||||
|
} catch {
|
||||||
|
parsedData = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const [ownerId, cb] of callbacks) {
|
||||||
|
try {
|
||||||
|
const response = await cb(subject, parsedData, msg);
|
||||||
|
const responsePayload =
|
||||||
|
typeof response === 'string'
|
||||||
|
? response
|
||||||
|
: JSON.stringify(response);
|
||||||
|
msg.respond(this.textEncoder.encode(responsePayload));
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(
|
||||||
|
`Error in request handler for owner=${ownerId}, subject=${subject}:`,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
// Send error response
|
||||||
|
msg.respond(
|
||||||
|
this.textEncoder.encode(JSON.stringify({ error: err.message }))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})().catch(err => {
|
||||||
|
logger.error(`Request handler error for subject ${subject}:`, err);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.requestHandlers.set(handlerKey, { subscription, callbacks });
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeSubscription(subject, owner) {
|
||||||
|
const entry = this.subscriptions.get(subject);
|
||||||
|
|
||||||
|
if (!entry) {
|
||||||
|
logger.trace(`Subscription not found for subject: ${subject}`);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (entry.callbacks.delete(owner)) {
|
||||||
|
logger.trace(
|
||||||
|
`Removed subscription callback for owner: ${owner} on subject: ${subject}`
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
logger.trace(
|
||||||
|
`No subscription callback found for owner: ${owner} on subject: ${subject}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (entry.callbacks.size === 0) {
|
||||||
|
logger.trace(`No callbacks left, stopping subscription for ${subject}`);
|
||||||
|
entry.subscription.unsubscribe();
|
||||||
|
this.subscriptions.delete(subject);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeRequestHandler(subject, owner) {
|
||||||
|
const entry = this.requestHandlers.get(subject);
|
||||||
|
|
||||||
|
if (!entry) {
|
||||||
|
logger.trace(`Request handler not found for subject: ${subject}`);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (entry.callbacks.delete(owner)) {
|
||||||
|
logger.trace(
|
||||||
|
`Removed request handler for owner: ${owner} on subject: ${subject}`
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
logger.trace(
|
||||||
|
`No request handler found for owner: ${owner} on subject: ${subject}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (entry.callbacks.size === 0) {
|
||||||
|
logger.trace(`No handlers left, stopping request handler for ${subject}`);
|
||||||
|
entry.subscription.unsubscribe();
|
||||||
|
this.requestHandlers.delete(subject);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
async disconnect() {
|
||||||
|
logger.info('Disconnecting from NATS...');
|
||||||
|
|
||||||
|
// Stop all subscriptions
|
||||||
|
for (const [subject, entry] of this.subscriptions) {
|
||||||
|
logger.trace(`Stopping subscription: ${subject}`);
|
||||||
|
entry.subscription.unsubscribe();
|
||||||
|
}
|
||||||
|
this.subscriptions.clear();
|
||||||
|
|
||||||
|
// Stop all queued subscriptions
|
||||||
|
for (const [key, entry] of this.queuedSubscriptions) {
|
||||||
|
logger.trace(`Stopping queued subscription: ${key}`);
|
||||||
|
entry.subscription.unsubscribe();
|
||||||
|
}
|
||||||
|
this.queuedSubscriptions.clear();
|
||||||
|
|
||||||
|
// Stop all request handlers
|
||||||
|
for (const [subject, entry] of this.requestHandlers) {
|
||||||
|
logger.trace(`Stopping request handler: ${subject}`);
|
||||||
|
entry.subscription.unsubscribe();
|
||||||
|
}
|
||||||
|
this.requestHandlers.clear();
|
||||||
|
|
||||||
|
if (this.client) {
|
||||||
|
await this.client.close();
|
||||||
|
this.client = null;
|
||||||
|
logger.info('Disconnected from NATS');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const natsServer = new NatsServer();
|
||||||
|
|
||||||
|
export { NatsServer, natsServer };
|
||||||
@ -16,7 +16,8 @@ const moonrakerSchema = new Schema(
|
|||||||
const alertSchema = new Schema(
|
const alertSchema = new Schema(
|
||||||
{
|
{
|
||||||
priority: { type: String, required: true }, // order to show
|
priority: { type: String, required: true }, // order to show
|
||||||
type: { type: String, required: true } // selectFilament, error, info, message,
|
type: { type: String, required: true }, // selectFilament, error, info, message
|
||||||
|
message: { type: String, required: false }
|
||||||
},
|
},
|
||||||
{ timestamps: true, _id: false }
|
{ timestamps: true, _id: false }
|
||||||
);
|
);
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
import { ObjectId } from 'mongodb';
|
import { ObjectId } from 'mongodb';
|
||||||
import { auditLogModel } from './schemas/management/auditlog.schema.js';
|
import { auditLogModel } from './schemas/management/auditlog.schema.js';
|
||||||
import { etcdServer } from './etcd.js';
|
import { natsServer } from './nats.js';
|
||||||
|
|
||||||
function parseFilter(property, value) {
|
function parseFilter(property, value) {
|
||||||
if (typeof value === 'string') {
|
if (typeof value === 'string') {
|
||||||
@ -411,11 +411,11 @@ async function getAuditLogs(idOrIds) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function distributeUpdate(value, id, type) {
|
async function distributeUpdate(value, id, type) {
|
||||||
await etcdServer.setKey(`/${type}s/${id}/object`, value);
|
await natsServer.publish(`${type}s.${id}.object`, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function distributeNew(id, type) {
|
async function distributeNew(id, type) {
|
||||||
await etcdServer.setKey(`/${type}s/new`, id);
|
await natsServer.publish(`${type}s.new`, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
function flatternObjectIds(object) {
|
function flatternObjectIds(object) {
|
||||||
|
|||||||
71
src/events/eventmanager.js
Normal file
71
src/events/eventmanager.js
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
import log4js from 'log4js';
|
||||||
|
import { loadConfig } from '../config.js';
|
||||||
|
import { natsServer } from '../database/nats.js';
|
||||||
|
|
||||||
|
const config = loadConfig();
|
||||||
|
|
||||||
|
// Setup logger
|
||||||
|
const logger = log4js.getLogger('Event Manager');
|
||||||
|
logger.level = config.server.logLevel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* EventManager handles tracking object events using NATS and broadcasts event events via websockets.
|
||||||
|
*/
|
||||||
|
export class EventManager {
|
||||||
|
constructor(socketClient) {
|
||||||
|
this.socketClient = socketClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
async subscribeToObjectEvent(id, objectType, eventType) {
|
||||||
|
logger.debug('Subscribing to object event:', eventType, id, objectType);
|
||||||
|
await natsServer.subscribe(
|
||||||
|
`${objectType}s.${id}.events.${eventType}`,
|
||||||
|
this.socketClient.socketId,
|
||||||
|
(key, value) => {
|
||||||
|
if (!value?.result) {
|
||||||
|
logger.trace('Object event detected:', id);
|
||||||
|
this.socketClient.socket.emit('objectEvent', {
|
||||||
|
_id: id,
|
||||||
|
objectType: objectType,
|
||||||
|
event: { ...value }
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeObjectEventsListener(id, objectType, eventType) {
|
||||||
|
// Remove specific event subscription for this object
|
||||||
|
await natsServer.removeSubscription(
|
||||||
|
`${objectType}s.${id}.events.${eventType}`,
|
||||||
|
this.socketClient.socketId
|
||||||
|
);
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
async sendObjectEvent(id, objectType, event) {
|
||||||
|
const eventType = event?.type || 'unknown';
|
||||||
|
try {
|
||||||
|
logger.trace(
|
||||||
|
`Calling event: ${eventType}, object id: ${id}, object type: ${objectType} Event:`,
|
||||||
|
event
|
||||||
|
);
|
||||||
|
await natsServer.publish(
|
||||||
|
`${objectType}s.${id}.events.${eventType}`,
|
||||||
|
event
|
||||||
|
);
|
||||||
|
return { success: true };
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to publish event for ${objectType}s.${id}.events.${eventType}:`,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
return {
|
||||||
|
error:
|
||||||
|
error?.message ||
|
||||||
|
`Failed to publish event for ${objectType}s.${id}.events.${eventType}.`
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
10
src/index.js
10
src/index.js
@ -1,6 +1,7 @@
|
|||||||
import { loadConfig } from './config.js';
|
import { loadConfig } from './config.js';
|
||||||
import { SocketManager } from './socket/socketmanager.js';
|
import { SocketManager } from './socket/socketmanager.js';
|
||||||
import { etcdServer } from './database/etcd.js';
|
import { etcdServer } from './database/etcd.js';
|
||||||
|
import { natsServer } from './database/nats.js';
|
||||||
import express from 'express';
|
import express from 'express';
|
||||||
import log4js from 'log4js';
|
import log4js from 'log4js';
|
||||||
import http from 'http';
|
import http from 'http';
|
||||||
@ -29,6 +30,15 @@ import { mongoServer } from './database/mongo.js';
|
|||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Connect to NATS (await)
|
||||||
|
try {
|
||||||
|
await natsServer.connect();
|
||||||
|
logger.info('Connected to NATS');
|
||||||
|
} catch (err) {
|
||||||
|
logger.error('Failed to connect to NATS:', err);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
|
||||||
// Connect to Mongo DB (await)
|
// Connect to Mongo DB (await)
|
||||||
try {
|
try {
|
||||||
await mongoServer.connect();
|
await mongoServer.connect();
|
||||||
|
|||||||
@ -2,11 +2,12 @@ import log4js from 'log4js';
|
|||||||
// Load configuration
|
// Load configuration
|
||||||
import { loadConfig } from '../config.js';
|
import { loadConfig } from '../config.js';
|
||||||
import { CodeAuth, createAuthMiddleware } from '../auth/auth.js';
|
import { CodeAuth, createAuthMiddleware } from '../auth/auth.js';
|
||||||
import { editObject, getObject } from '../database/database.js';
|
import { editObject, getObject, listObjects } from '../database/database.js';
|
||||||
import { hostModel } from '../database/schemas/management/host.schema.js';
|
import { hostModel } from '../database/schemas/management/host.schema.js';
|
||||||
import { UpdateManager } from '../updates/updatemanager.js';
|
import { UpdateManager } from '../updates/updatemanager.js';
|
||||||
import { ActionManager } from '../actions/actionmanager.js';
|
import { ActionManager } from '../actions/actionmanager.js';
|
||||||
import { getModelByName } from '../utils.js';
|
import { getModelByName } from '../utils.js';
|
||||||
|
import { EventManager } from '../events/eventmanager.js';
|
||||||
|
|
||||||
const config = loadConfig();
|
const config = loadConfig();
|
||||||
|
|
||||||
@ -23,6 +24,7 @@ export class SocketHost {
|
|||||||
this.socketManager = socketManager;
|
this.socketManager = socketManager;
|
||||||
this.updateManager = new UpdateManager(this);
|
this.updateManager = new UpdateManager(this);
|
||||||
this.actionManager = new ActionManager(this);
|
this.actionManager = new ActionManager(this);
|
||||||
|
this.eventManager = new EventManager(this);
|
||||||
this.codeAuth = new CodeAuth();
|
this.codeAuth = new CodeAuth();
|
||||||
this.setupSocketEventHandlers();
|
this.setupSocketEventHandlers();
|
||||||
}
|
}
|
||||||
@ -32,6 +34,13 @@ export class SocketHost {
|
|||||||
this.socket.on('authenticate', this.handleAuthenticate.bind(this));
|
this.socket.on('authenticate', this.handleAuthenticate.bind(this));
|
||||||
this.socket.on('updateHost', this.handleUpdateHost.bind(this));
|
this.socket.on('updateHost', this.handleUpdateHost.bind(this));
|
||||||
this.socket.on('getObject', this.handleGetObject.bind(this));
|
this.socket.on('getObject', this.handleGetObject.bind(this));
|
||||||
|
this.socket.on('editObject', this.handleEditObject.bind(this));
|
||||||
|
this.socket.on('listObjects', this.handleListObjects.bind(this));
|
||||||
|
this.socket.on(
|
||||||
|
'subscribeToObjectActions',
|
||||||
|
this.handleSubscribeToObjectActions.bind(this)
|
||||||
|
);
|
||||||
|
this.socket.on('objectEvent', this.handleObjectEventEvent.bind(this));
|
||||||
this.socket.on('disconnect', this.handleDisconnect.bind(this));
|
this.socket.on('disconnect', this.handleDisconnect.bind(this));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -40,6 +49,23 @@ export class SocketHost {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async handleAuthenticate(data, callback) {
|
async handleAuthenticate(data, callback) {
|
||||||
|
const setHostOnline = async verifyResult => {
|
||||||
|
logger.info('Host authenticated and valid.');
|
||||||
|
this.host = verifyResult.host;
|
||||||
|
this.id = this.host._id.toString();
|
||||||
|
this.authenticated = true;
|
||||||
|
await editObject({
|
||||||
|
model: hostModel,
|
||||||
|
id: this.host._id,
|
||||||
|
updateData: {
|
||||||
|
online: true,
|
||||||
|
state: { type: 'online' },
|
||||||
|
connectedAt: new Date()
|
||||||
|
},
|
||||||
|
owner: this.host,
|
||||||
|
ownerType: 'host'
|
||||||
|
});
|
||||||
|
};
|
||||||
logger.trace('handleAuthenticateEvent');
|
logger.trace('handleAuthenticateEvent');
|
||||||
const id = data.id || undefined;
|
const id = data.id || undefined;
|
||||||
const authCode = data.authCode || undefined;
|
const authCode = data.authCode || undefined;
|
||||||
@ -49,17 +75,7 @@ export class SocketHost {
|
|||||||
logger.info('Authenticating host with id + authCode...');
|
logger.info('Authenticating host with id + authCode...');
|
||||||
const verifyResult = await this.codeAuth.verifyCode(id, authCode);
|
const verifyResult = await this.codeAuth.verifyCode(id, authCode);
|
||||||
if (verifyResult.valid == true) {
|
if (verifyResult.valid == true) {
|
||||||
logger.info('Host authenticated and valid.');
|
await setHostOnline(verifyResult);
|
||||||
this.host = verifyResult.host;
|
|
||||||
this.id = this.host._id.toString();
|
|
||||||
this.authenticated = true;
|
|
||||||
await editObject({
|
|
||||||
model: hostModel,
|
|
||||||
id: this.host._id,
|
|
||||||
updateData: { online: true, state: { type: 'online' } },
|
|
||||||
owner: this.host,
|
|
||||||
ownerType: 'host'
|
|
||||||
});
|
|
||||||
await this.initializeHost();
|
await this.initializeHost();
|
||||||
}
|
}
|
||||||
callback(verifyResult);
|
callback(verifyResult);
|
||||||
@ -71,8 +87,8 @@ export class SocketHost {
|
|||||||
const verifyResult = await this.codeAuth.verifyOtp(otp);
|
const verifyResult = await this.codeAuth.verifyOtp(otp);
|
||||||
if (verifyResult.valid == true) {
|
if (verifyResult.valid == true) {
|
||||||
logger.info('Host authenticated and valid.');
|
logger.info('Host authenticated and valid.');
|
||||||
this.host = verifyResult.host;
|
await setHostOnline(verifyResult);
|
||||||
this.authenticated = true;
|
await this.initializeHost();
|
||||||
}
|
}
|
||||||
callback(verifyResult);
|
callback(verifyResult);
|
||||||
return;
|
return;
|
||||||
@ -91,6 +107,18 @@ export class SocketHost {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async handleEditObject(data, callback) {
|
||||||
|
const object = await editObject({
|
||||||
|
model: getModelByName(data.objectType),
|
||||||
|
id: data._id,
|
||||||
|
updateData: data.updateData,
|
||||||
|
populate: data.populate,
|
||||||
|
owner: this.host,
|
||||||
|
ownerType: 'host'
|
||||||
|
});
|
||||||
|
callback(object);
|
||||||
|
}
|
||||||
|
|
||||||
async handleGetObject(data, callback) {
|
async handleGetObject(data, callback) {
|
||||||
const object = await getObject({
|
const object = await getObject({
|
||||||
model: getModelByName(data.objectType),
|
model: getModelByName(data.objectType),
|
||||||
@ -101,12 +129,45 @@ export class SocketHost {
|
|||||||
callback(object);
|
callback(object);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async handleListObjects(data, callback) {
|
||||||
|
const object = await listObjects({
|
||||||
|
model: getModelByName(data.objectType),
|
||||||
|
id: data._id,
|
||||||
|
cached: data?.cached,
|
||||||
|
populate: data?.populate,
|
||||||
|
filter: data?.filter,
|
||||||
|
project: data?.project,
|
||||||
|
sort: data?.sort,
|
||||||
|
order: data?.order
|
||||||
|
});
|
||||||
|
callback(object);
|
||||||
|
}
|
||||||
|
|
||||||
|
async handleObjectEventEvent(data) {
|
||||||
|
await this.eventManager.sendObjectEvent(
|
||||||
|
data._id,
|
||||||
|
data.objectType,
|
||||||
|
data.event
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async handleSubscribeToObjectActions(data) {
|
||||||
|
await this.actionManager.subscribeToObjectActions(
|
||||||
|
data._id,
|
||||||
|
data.objectType
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
async handleDisconnect() {
|
async handleDisconnect() {
|
||||||
if (this.authenticated) {
|
if (this.authenticated) {
|
||||||
await editObject({
|
await editObject({
|
||||||
model: hostModel,
|
model: hostModel,
|
||||||
id: this.host._id,
|
id: this.host._id,
|
||||||
updateData: { online: false, state: { type: 'offline' } },
|
updateData: {
|
||||||
|
online: false,
|
||||||
|
state: { type: 'offline' },
|
||||||
|
connectedAt: null
|
||||||
|
},
|
||||||
owner: this.host,
|
owner: this.host,
|
||||||
ownerType: 'host'
|
ownerType: 'host'
|
||||||
});
|
});
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import { generateHostOTP } from '../utils.js';
|
|||||||
import { LockManager } from '../lock/lockmanager.js';
|
import { LockManager } from '../lock/lockmanager.js';
|
||||||
import { UpdateManager } from '../updates/updatemanager.js';
|
import { UpdateManager } from '../updates/updatemanager.js';
|
||||||
import { ActionManager } from '../actions/actionmanager.js';
|
import { ActionManager } from '../actions/actionmanager.js';
|
||||||
|
import { EventManager } from '../events/eventmanager.js';
|
||||||
|
|
||||||
const config = loadConfig();
|
const config = loadConfig();
|
||||||
|
|
||||||
@ -23,6 +24,7 @@ export class SocketUser {
|
|||||||
this.lockManager = new LockManager(this);
|
this.lockManager = new LockManager(this);
|
||||||
this.updateManager = new UpdateManager(this);
|
this.updateManager = new UpdateManager(this);
|
||||||
this.actionManager = new ActionManager(this);
|
this.actionManager = new ActionManager(this);
|
||||||
|
this.eventManager = new EventManager(this);
|
||||||
this.templateManager = socketManager.templateManager;
|
this.templateManager = socketManager.templateManager;
|
||||||
this.keycloakAuth = new KeycloakAuth();
|
this.keycloakAuth = new KeycloakAuth();
|
||||||
this.setupSocketEventHandlers();
|
this.setupSocketEventHandlers();
|
||||||
@ -42,6 +44,22 @@ export class SocketUser {
|
|||||||
'subscribeToObjectUpdate',
|
'subscribeToObjectUpdate',
|
||||||
this.handleSubscribeToObjectUpdateEvent.bind(this)
|
this.handleSubscribeToObjectUpdateEvent.bind(this)
|
||||||
);
|
);
|
||||||
|
this.socket.on(
|
||||||
|
'subscribeToObjectEvent',
|
||||||
|
this.handleSubscribeToObjectEventEvent.bind(this)
|
||||||
|
);
|
||||||
|
this.socket.on(
|
||||||
|
'unsubscribeObjectTypeUpdate',
|
||||||
|
this.handleUnsubscribeToObjectTypeUpdateEvent.bind(this)
|
||||||
|
);
|
||||||
|
this.socket.on(
|
||||||
|
'unsubscribeObjectUpdate',
|
||||||
|
this.handleUnsubscribeToObjectUpdateEvent.bind(this)
|
||||||
|
);
|
||||||
|
this.socket.on(
|
||||||
|
'unsubscribeObjectEvent',
|
||||||
|
this.handleUnsubscribeObjectEventEvent.bind(this)
|
||||||
|
);
|
||||||
this.socket.on(
|
this.socket.on(
|
||||||
'previewTemplate',
|
'previewTemplate',
|
||||||
this.handlePreviewTemplateEvent.bind(this)
|
this.handlePreviewTemplateEvent.bind(this)
|
||||||
@ -60,6 +78,7 @@ export class SocketUser {
|
|||||||
const result = await this.keycloakAuth.verifyToken(token);
|
const result = await this.keycloakAuth.verifyToken(token);
|
||||||
if (result.valid == true) {
|
if (result.valid == true) {
|
||||||
logger.info('User authenticated and valid.');
|
logger.info('User authenticated and valid.');
|
||||||
|
|
||||||
this.user = result.user;
|
this.user = result.user;
|
||||||
this.id = this.user._id.toString();
|
this.id = this.user._id.toString();
|
||||||
this.authenticated = true;
|
this.authenticated = true;
|
||||||
@ -126,18 +145,47 @@ export class SocketUser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async handleSubscribeToObjectTypeUpdateEvent(data, callback) {
|
async handleSubscribeToObjectTypeUpdateEvent(data, callback) {
|
||||||
const result = this.updateManager.subscribeToObjectNew(data.objectType);
|
await this.updateManager.subscribeToObjectNew(data.objectType);
|
||||||
callback(result);
|
await this.updateManager.subscribeToObjectDelete(data.objectType);
|
||||||
|
callback({ success: true });
|
||||||
}
|
}
|
||||||
|
|
||||||
async handleSubscribeToObjectUpdateEvent(data, callback) {
|
async handleSubscribeToObjectUpdateEvent(data, callback) {
|
||||||
const result = this.updateManager.subscribeToObjectUpdate(
|
const result = await this.updateManager.subscribeToObjectUpdate(
|
||||||
data._id,
|
data._id,
|
||||||
data.objectType
|
data.objectType
|
||||||
);
|
);
|
||||||
callback(result);
|
callback(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async handleSubscribeToObjectEventEvent(data) {
|
||||||
|
await this.eventManager.subscribeToObjectEvent(
|
||||||
|
data._id,
|
||||||
|
data.objectType,
|
||||||
|
data.eventType
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async handleUnsubscribeToObjectTypeUpdateEvent(data) {
|
||||||
|
await this.updateManager.removeObjectNewListener(data.objectType);
|
||||||
|
await this.updateManager.removeObjectDeleteListener(data.objectType);
|
||||||
|
}
|
||||||
|
|
||||||
|
async handleUnsubscribeToObjectUpdateEvent(data) {
|
||||||
|
await this.updateManager.removeObjectUpdateListener(
|
||||||
|
data._id,
|
||||||
|
data.objectType
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async handleUnsubscribeObjectEventEvent(data) {
|
||||||
|
await this.eventManager.removeObjectEventsListener(
|
||||||
|
data._id,
|
||||||
|
data.objectType,
|
||||||
|
data.eventType
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
async handlePreviewTemplateEvent(data, callback) {
|
async handlePreviewTemplateEvent(data, callback) {
|
||||||
const result = await this.templateManager.renderTemplate(
|
const result = await this.templateManager.renderTemplate(
|
||||||
data._id,
|
data._id,
|
||||||
|
|||||||
@ -87,6 +87,15 @@ function getNodeStyles(attributes) {
|
|||||||
if (attributes?.shrink) {
|
if (attributes?.shrink) {
|
||||||
styles += `flex-shrink: ${attributes.shrink};`;
|
styles += `flex-shrink: ${attributes.shrink};`;
|
||||||
}
|
}
|
||||||
|
if (attributes?.color) {
|
||||||
|
styles += `color: ${attributes.color};`;
|
||||||
|
}
|
||||||
|
if (attributes?.background) {
|
||||||
|
styles += `background: ${attributes.background};`;
|
||||||
|
}
|
||||||
|
if (attributes?.scale) {
|
||||||
|
styles += `transform: scale(${attributes.scale});`;
|
||||||
|
}
|
||||||
return styles;
|
return styles;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,24 +129,36 @@ async function transformCustomElements(content) {
|
|||||||
tree.match({ tag: 'Text' }, node => ({
|
tree.match({ tag: 'Text' }, node => ({
|
||||||
...node,
|
...node,
|
||||||
tag: 'p',
|
tag: 'p',
|
||||||
attrs: { class: 'documentText' }
|
attrs: { class: 'documentText', style: getNodeStyles(node.attrs) }
|
||||||
})),
|
})),
|
||||||
tree =>
|
tree =>
|
||||||
tree.match({ tag: 'Bold' }, node => ({
|
tree.match({ tag: 'Bold' }, node => ({
|
||||||
...node,
|
...node,
|
||||||
tag: 'strong',
|
tag: 'strong',
|
||||||
attrs: { style: 'font-weight: bold;', class: 'documentBoldText' }
|
attrs: {
|
||||||
|
style: 'font-weight: bold;',
|
||||||
|
class: 'documentBoldText',
|
||||||
|
style: getNodeStyles(node.attrs)
|
||||||
|
}
|
||||||
})),
|
})),
|
||||||
tree =>
|
tree =>
|
||||||
tree.match({ tag: 'Barcode' }, node => {
|
tree.match({ tag: 'Barcode' }, node => {
|
||||||
return {
|
return {
|
||||||
tag: 'svg',
|
tag: 'div',
|
||||||
|
content: [
|
||||||
|
{
|
||||||
|
tag: 'svg',
|
||||||
|
attrs: {
|
||||||
|
class: 'documentBarcode',
|
||||||
|
'jsbarcode-displayValue': 'false',
|
||||||
|
'jsbarcode-value': node.content[0],
|
||||||
|
'jsbarcode-format': node.attrs.format
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
attrs: {
|
attrs: {
|
||||||
class: 'documentBarcode',
|
class: 'documentBarcodeContainer',
|
||||||
'jsbarcode-width': node.attrs?.width,
|
style: getNodeStyles(node.attrs)
|
||||||
'jsbarcode-height': node.attrs?.height,
|
|
||||||
'jsbarcode-value': node.content[0],
|
|
||||||
'jsbarcode-format': node.attrs.format
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}),
|
}),
|
||||||
|
|||||||
@ -1,9 +1,6 @@
|
|||||||
import log4js from 'log4js';
|
import log4js from 'log4js';
|
||||||
import { loadConfig } from '../config.js';
|
import { loadConfig } from '../config.js';
|
||||||
import NodeCache from 'node-cache';
|
import { natsServer } from '../database/nats.js';
|
||||||
import { etcdServer } from '../database/etcd.js';
|
|
||||||
import { updateObjectCache } from '../database/database.js';
|
|
||||||
|
|
||||||
const config = loadConfig();
|
const config = loadConfig();
|
||||||
|
|
||||||
// Setup logger
|
// Setup logger
|
||||||
@ -19,13 +16,28 @@ export class UpdateManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async subscribeToObjectNew(objectType) {
|
async subscribeToObjectNew(objectType) {
|
||||||
await etcdServer.onKeyPutEvent(
|
await natsServer.subscribe(
|
||||||
`/${objectType}s/new`,
|
`${objectType}s.new`,
|
||||||
this.socketClient.socketId,
|
this.socketClient.socketId,
|
||||||
(key, value) => {
|
(key, value) => {
|
||||||
logger.trace('Object new event:', value);
|
logger.trace('Object new event:', value);
|
||||||
this.socketClient.socket.emit('objectNew', {
|
this.socketClient.socket.emit('objectNew', {
|
||||||
_id: value,
|
object: value,
|
||||||
|
objectType: objectType
|
||||||
|
});
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
async subscribeToObjectDelete(objectType) {
|
||||||
|
await natsServer.subscribe(
|
||||||
|
`${objectType}s.delete`,
|
||||||
|
this.socketClient.socketId,
|
||||||
|
(key, value) => {
|
||||||
|
logger.trace('Object delete event:', value);
|
||||||
|
this.socketClient.socket.emit('objectDelete', {
|
||||||
|
object: value,
|
||||||
objectType: objectType
|
objectType: objectType
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -34,8 +46,8 @@ export class UpdateManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async subscribeToObjectUpdate(id, objectType) {
|
async subscribeToObjectUpdate(id, objectType) {
|
||||||
await etcdServer.onKeyPutEvent(
|
await natsServer.subscribe(
|
||||||
`/${objectType}s/${id}/object`,
|
`${objectType}s.${id}.object`,
|
||||||
this.socketClient.socketId,
|
this.socketClient.socketId,
|
||||||
(key, value) => {
|
(key, value) => {
|
||||||
logger.trace('Object update event:', id);
|
logger.trace('Object update event:', id);
|
||||||
@ -50,52 +62,26 @@ export class UpdateManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async removeObjectNewListener(objectType) {
|
async removeObjectNewListener(objectType) {
|
||||||
await etcdServer.removeKeyWatcher(
|
await natsServer.removeSubscription(
|
||||||
`/${objectType}s/new`,
|
`${objectType}s.new`,
|
||||||
this.socketClient.socketId,
|
this.socketClient.socketId
|
||||||
'put'
|
);
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeObjectDeleteListener(objectType) {
|
||||||
|
await natsServer.removeSubscription(
|
||||||
|
`${objectType}s.delete`,
|
||||||
|
this.socketClient.socketId
|
||||||
);
|
);
|
||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeObjectUpdateListener(id, objectType) {
|
async removeObjectUpdateListener(id, objectType) {
|
||||||
await etcdServer.removeKeyWatcher(
|
await natsServer.removeSubscription(
|
||||||
`/${objectType}s/${id}/object`,
|
`${objectType}s.${id}.object`,
|
||||||
this.socketClient.socketId,
|
this.socketClient.socketId
|
||||||
'put'
|
|
||||||
);
|
);
|
||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
async getObjectUpdate(id, objectType) {
|
|
||||||
try {
|
|
||||||
const objectUpdate = {
|
|
||||||
_id: id,
|
|
||||||
objectType: objectType,
|
|
||||||
object: await etcdServer.get(`/${objectType}s/${id}/object`)
|
|
||||||
};
|
|
||||||
logger.trace(`Returning path: /${objectType}s/${id}/object`);
|
|
||||||
return objectUpdate;
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`UpdateManager: Failed to get current value for /${objectType}s/${id}/object:`,
|
|
||||||
error
|
|
||||||
);
|
|
||||||
return { error: 'Not found' };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async setObjectUpdate(id, objectType, value) {
|
|
||||||
try {
|
|
||||||
await etcdServer.set(`/${objectType}s/${id}/object`, value);
|
|
||||||
logger.trace(`Set value for path: /${objectType}s/${id}/object`);
|
|
||||||
return true;
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`Failed to set value for /${objectType}s/${id}/object:`,
|
|
||||||
error
|
|
||||||
);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,6 +2,7 @@ import { editObject } from './database/database.js';
|
|||||||
import { hostModel } from './database/schemas/management/host.schema.js';
|
import { hostModel } from './database/schemas/management/host.schema.js';
|
||||||
import crypto from 'crypto';
|
import crypto from 'crypto';
|
||||||
import { nanoid } from 'nanoid';
|
import { nanoid } from 'nanoid';
|
||||||
|
import canonicalize from 'canonical-json';
|
||||||
|
|
||||||
import { loadConfig } from './config.js';
|
import { loadConfig } from './config.js';
|
||||||
import { userModel } from './database/schemas/management/user.schema.js';
|
import { userModel } from './database/schemas/management/user.schema.js';
|
||||||
@ -86,5 +87,11 @@ export function getChangedValues(oldObj, newObj, old = false) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function getModelByName(modelName) {
|
export function getModelByName(modelName) {
|
||||||
return modelList.filter(model => model.modelName == modelName);
|
return modelList.filter(model => model.modelName == modelName)[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
export function jsonToCacheKey(obj) {
|
||||||
|
const normalized = canonicalize(obj);
|
||||||
|
const hash = crypto.createHash('sha256').update(normalized).digest('hex');
|
||||||
|
return hash;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user