Added stats support.

This commit is contained in:
Tom Butcher 2025-12-13 21:03:55 +00:00
parent 950d47bcfa
commit f72c27c588
4 changed files with 315 additions and 143 deletions

View File

@ -5,12 +5,14 @@ import {
editAuditLog, editAuditLog,
distributeUpdate, distributeUpdate,
newAuditLog, newAuditLog,
distributeNew distributeNew,
distributeStats
} from './utils.js'; } from './utils.js';
import log4js from 'log4js'; import log4js from 'log4js';
import { loadConfig } from '../config.js'; import { loadConfig } from '../config.js';
import { jsonToCacheKey } from '../utils.js'; import { getQueryToCacheKey } from '../utils.js';
import { redisServer } from './redis.js'; import { redisServer } from './redis.js';
import { auditLogModel } from './schemas/management/auditlog.schema.js';
const config = loadConfig(); const config = loadConfig();
@ -23,27 +25,19 @@ cacheLogger.level = config.server.logLevel;
const CACHE_TTL_SECONDS = config.database?.redis?.ttlSeconds || 5; const CACHE_TTL_SECONDS = config.database?.redis?.ttlSeconds || 5;
export const retrieveObjectCache = async ({ model, id, populate = [] }) => { export const retrieveObjectCache = async ({ model, id, populate = [] }) => {
const cacheKeyObject = { const cacheKey = getQueryToCacheKey({ model: model.modelName, id, populate });
model: model.modelName,
id: id?.toString()
};
const cacheKey = jsonToCacheKey(cacheKeyObject); cacheLogger.trace('Retrieving:', cacheKey);
cacheLogger.trace('Retrieving:', cacheKeyObject);
try { try {
const cachedObject = await redisServer.getKey(cacheKey); const cachedObject = await redisServer.getKey(cacheKey);
if (cachedObject == null) { if (cachedObject == null) {
cacheLogger.trace('Miss:', cacheKeyObject); cacheLogger.trace('Miss:', cacheKey);
return undefined; return undefined;
} }
cacheLogger.trace('Hit:', { cacheLogger.trace('Hit:', cacheKey);
model: model.modelName,
id: cacheKeyObject.id
});
return cachedObject; return cachedObject;
} catch (err) { } catch (err) {
@ -52,65 +46,43 @@ export const retrieveObjectCache = async ({ model, id, populate = [] }) => {
} }
}; };
export const retrieveListCache = async ({ export const updateObjectCache = async ({
model, model,
populate = [], id,
filter = {}, object,
sort = '', populate = []
order = 'ascend',
project = {}
}) => { }) => {
const cacheKeyObject = { const cacheKeyFilter = `${model.modelName}:${id?.toString()}*`;
model: model.modelName, const cacheKey = getQueryToCacheKey({ model: model.modelName, id, populate });
populate,
filter,
sort,
project,
order
};
const cacheKey = jsonToCacheKey(cacheKeyObject); cacheLogger.trace('Updating:', cacheKeyFilter);
cacheLogger.trace('Retrieving:', cacheKeyObject);
try { try {
const cachedList = await redisServer.getKey(cacheKey); // Get all keys matching the filter pattern
const matchingKeys = await redisServer.getKeysByPattern(cacheKeyFilter);
if (cachedList != null) { logger.trace('Matching keys:', matchingKeys);
cacheLogger.trace('Hit:', {
...cacheKeyObject,
length: cachedList.length
});
return cachedList;
}
cacheLogger.trace('Miss:', { // Merge the object with each cached object and update
model: model.modelName const mergedObjects = [];
}); for (const key of matchingKeys) {
return undefined; logger.trace('Updating object cache:', key);
} catch (err) { const cachedObject = (await redisServer.getKey(key)) || {};
cacheLogger.error('Error retrieving list from Redis cache:', err);
return undefined;
}
};
export const updateObjectCache = async ({ model, id, object }) => {
const cacheKeyObject = {
model: model.modelName,
id: id?.toString()
};
const cacheKey = jsonToCacheKey(cacheKeyObject);
cacheLogger.trace('Updating:', cacheKeyObject);
try {
const cachedObject = (await redisServer.getKey(cacheKey)) || {};
const mergedObject = _.merge(cachedObject, object); const mergedObject = _.merge(cachedObject, object);
await redisServer.setKey(key, mergedObject, CACHE_TTL_SECONDS);
mergedObjects.push(mergedObject);
}
const cacheObject = (await redisServer.getKey(cacheKey)) || {};
const mergedObject = _.merge(cacheObject, object);
await redisServer.setKey(cacheKey, mergedObject, CACHE_TTL_SECONDS); await redisServer.setKey(cacheKey, mergedObject, CACHE_TTL_SECONDS);
cacheLogger.trace('Updated:', { ...cacheKeyObject });
cacheLogger.trace('Updated:', {
filter: cacheKeyFilter,
keysUpdated: matchingKeys.length
});
// Return the merged object
return mergedObject; return mergedObject;
} catch (err) { } catch (err) {
cacheLogger.error('Error updating object in Redis cache:', err); cacheLogger.error('Error updating object in Redis cache:', err);
@ -120,69 +92,27 @@ export const updateObjectCache = async ({ model, id, object }) => {
}; };
export const deleteObjectCache = async ({ model, id }) => { export const deleteObjectCache = async ({ model, id }) => {
const cacheKeyObject = { const cacheKeyFilter = `${model.modelName}:${id?.toString()}*`;
model: model.modelName,
id: id?.toString()
};
cacheLogger.trace('Deleting:', { cacheLogger.trace('Deleting:', cacheKeyFilter);
...cacheKeyObject
});
try { try {
// Note: we currently delete the non-populated key; populated variants will expire via TTL. // Get all keys matching the filter pattern and delete them
const cacheKey = jsonToCacheKey({ ...cacheKeyObject, populate: [] }); const matchingKeys = await redisServer.getKeysByPattern(cacheKeyFilter);
for (const cacheKey of matchingKeys) {
await redisServer.deleteKey(cacheKey); await redisServer.deleteKey(cacheKey);
}
cacheLogger.trace('Deleted:', { cacheLogger.trace('Deleted:', {
...cacheKeyObject filter: cacheKeyFilter,
keysDeleted: matchingKeys.length
}); });
} catch (err) { } catch (err) {
cacheLogger.error('Error deleting object from Redis cache:', err); cacheLogger.error('Error deleting object from Redis cache:', err);
} }
}; };
export const updateListCache = ({
model,
objects,
populate = [],
filter = {},
sort = '',
order = 'ascend',
project = {}
}) => {
const cacheKeyObject = {
model: model.modelName,
populate,
filter,
sort,
project,
order
};
cacheLogger.trace('Updating:', {
...cacheKeyObject,
length: objects.length
});
const cacheKey = jsonToCacheKey(cacheKeyObject);
return (async () => {
try {
await redisServer.setKey(cacheKey, objects, CACHE_TTL_SECONDS);
cacheLogger.trace('Updated:', {
...cacheKeyObject,
length: objects.length
});
} catch (err) {
cacheLogger.error('Error updating list in Redis cache:', err);
}
return objects;
})();
};
// Reusable function to list objects with aggregation, filtering, search, sorting, and pagination // Reusable function to list objects with aggregation, filtering, search, sorting, and pagination
export const listObjects = async ({ export const listObjects = async ({
model, model,
@ -204,20 +134,6 @@ export const listObjects = async ({
cached cached
}); });
if (cached == true) {
const objectsCache = await retrieveListCache({
model,
populate,
filter,
sort,
order,
project
});
if (objectsCache != undefined) {
return objectsCache;
}
}
// Fix: descend should be -1, ascend should be 1 // Fix: descend should be -1, ascend should be 1
const sortOrder = order === 'descend' ? -1 : 1; const sortOrder = order === 'descend' ? -1 : 1;
@ -261,16 +177,6 @@ export const listObjects = async ({
const finalResult = expandObjectIds(queryResult); const finalResult = expandObjectIds(queryResult);
updateListCache({
model,
objects: finalResult,
populate,
filter,
sort,
order,
project
});
logger.trace('Retreived from database:', { logger.trace('Retreived from database:', {
model, model,
populate, populate,
@ -338,7 +244,9 @@ export const getObject = async ({
populate populate
}); });
updateObjectCache({ logger.trace(finalResult);
await updateObjectCache({
model: model, model: model,
id: finalResult._id.toString(), id: finalResult._id.toString(),
populate, populate,
@ -353,6 +261,152 @@ export const getObject = async ({
} }
}; };
// Utility to run one or many rollup aggregations in a single query via $facet.
export const aggregateRollups = async ({
model,
baseFilter = {},
rollupConfigs = []
}) => {
if (!rollupConfigs.length) {
return {};
}
const facetStage = rollupConfigs.reduce((facets, definition, index) => {
const key = definition.name || `rollup${index}`;
const matchStage = {
$match: { ...baseFilter, ...(definition.filter || {}) }
};
const groupStage = { $group: { _id: null } };
(definition.rollups || []).forEach(rollup => {
switch (rollup.operation) {
case 'sum':
groupStage.$group[rollup.name] = { $sum: `$${rollup.property}` };
break;
case 'count':
groupStage.$group[rollup.name] = { $sum: 1 };
break;
case 'avg':
groupStage.$group[rollup.name] = { $avg: `$${rollup.property}` };
break;
default:
throw new Error(`Unsupported rollup operation: ${rollup.operation}`);
}
});
facets[key] = [matchStage, groupStage];
return facets;
}, {});
const [results] = await model.aggregate([{ $facet: facetStage }]);
return rollupConfigs.reduce((acc, definition, index) => {
const key = definition.name || `rollup${index}`;
const rawResult = results?.[key]?.[0] || {};
// Transform the result to nest rollup values under operation type
const transformedResult = {};
(definition.rollups || []).forEach(rollup => {
const value = rawResult[rollup.name] || 0;
// If there's only one rollup and its name matches the key, flatten the structure
if (definition.rollups.length === 1 && rollup.name === key) {
transformedResult[rollup.operation] = value;
} else {
transformedResult[rollup.name] = { [rollup.operation]: value };
}
});
acc[key] = transformedResult;
return acc;
}, {});
};
// Reusable function to aggregate rollups over history using audit logs
export const aggregateRollupsHistory = async ({
model,
baseFilter = {},
rollupConfigs = [],
startDate,
endDate
}) => {
if (!rollupConfigs.length) {
return {};
}
// Helper to map filter keys to audit log structure
const mapFilterToAudit = filter => {
if (Array.isArray(filter)) {
return filter.map(mapFilterToAudit);
}
if (_.isPlainObject(filter)) {
const newFilter = {};
for (const key in filter) {
if (['$or', '$and', '$nor', '$in', '$nin'].includes(key)) {
newFilter[key] = mapFilterToAudit(filter[key]);
} else if (key.startsWith('$')) {
newFilter[key] = mapFilterToAudit(filter[key]);
} else if (
[
'operation',
'parent',
'parentType',
'owner',
'ownerType',
'createdAt',
'updatedAt',
'_id',
'_reference',
'changes'
].includes(key)
) {
newFilter[key] = mapFilterToAudit(filter[key]);
} else {
newFilter[`changes.new.${key}`] = mapFilterToAudit(filter[key]);
}
}
return newFilter;
}
return filter;
};
const matchQuery = {
parentType: model.modelName,
...mapFilterToAudit(baseFilter)
};
if (startDate || endDate) {
matchQuery.createdAt = {};
if (startDate) matchQuery.createdAt.$gte = new Date(startDate);
if (endDate) matchQuery.createdAt.$lte = new Date(endDate);
if (Object.keys(matchQuery.createdAt).length === 0)
delete matchQuery.createdAt;
}
const mappedRollupConfigs = rollupConfigs.map(config => ({
...config,
filter: config.filter ? mapFilterToAudit(config.filter) : undefined,
rollups: (config.rollups || []).map(rollup => ({
...rollup,
property: `changes.new.${rollup.property}`
}))
}));
return await aggregateRollups({
model: auditLogModel,
baseFilter: matchQuery,
rollupConfigs: mappedRollupConfigs
});
};
export const getModelStats = async ({ model }) => {
if (!model.stats) {
return { error: 'Model does not have a stats method.', code: 500 };
}
return await model.stats();
};
// Reusable function to edit an object by ID, with audit logging and distribution // Reusable function to edit an object by ID, with audit logging and distribution
export const editObject = async ({ export const editObject = async ({
model, model,
@ -405,13 +459,24 @@ export const editObject = async ({
// Distribute update // Distribute update
await distributeUpdate(updateData, id, parentType); await distributeUpdate(updateData, id, parentType);
updateObjectCache({ await updateObjectCache({
model: model, model: model,
id: id.toString(), id: id.toString(),
object: { ...previousExpandedObject, ...updateData }, object: { ...previousExpandedObject, ...updateData },
populate populate
}); });
if (model.recalculate) {
logger.debug(`Recalculating ${model.modelName}`);
await model.recalculate(newExpandedObject, owner, ownerType);
}
if (model.stats) {
logger.debug(`Getting stats for ${model.modelName}`);
const statsData = await model.stats(newExpandedObject);
await distributeStats(statsData, parentType);
}
return { ...previousExpandedObject, ...updateData }; return { ...previousExpandedObject, ...updateData };
} catch (error) { } catch (error) {
logger.error('editObject error:', error); logger.error('editObject error:', error);
@ -441,10 +506,11 @@ export const newObject = async ({
await distributeNew(created._id, parentType); await distributeNew(created._id, parentType);
updateObjectCache({ await updateObjectCache({
model: model, model: model,
id: created._id.toString(), id: created._id.toString(),
object: { _id: created._id, ...newData } object: { _id: created._id, ...newData },
populate: []
}); });
return created; return created;

View File

@ -436,6 +436,10 @@ async function distributeUpdate(value, id, type) {
await natsServer.publish(`${type}s.${id}.object`, value); await natsServer.publish(`${type}s.${id}.object`, value);
} }
async function distributeStats(value, type) {
await natsServer.publish(`${type}s.stats`, value);
}
async function distributeNew(id, type) { async function distributeNew(id, type) {
await natsServer.publish(`${type}s.new`, id); await natsServer.publish(`${type}s.new`, id);
} }
@ -541,6 +545,7 @@ export {
expandObjectIds, expandObjectIds,
distributeUpdate, distributeUpdate,
distributeNew, distributeNew,
distributeStats,
getFilter, // <-- add here getFilter, // <-- add here
convertPropertiesString convertPropertiesString
}; };

View File

@ -7,6 +7,7 @@ import { LockManager } from '../lock/lockmanager.js';
import { UpdateManager } from '../updates/updatemanager.js'; import { UpdateManager } from '../updates/updatemanager.js';
import { ActionManager } from '../actions/actionmanager.js'; import { ActionManager } from '../actions/actionmanager.js';
import { EventManager } from '../events/eventmanager.js'; import { EventManager } from '../events/eventmanager.js';
import { StatsManager } from '../stats/statsmanager.js';
const config = loadConfig(); const config = loadConfig();
@ -25,6 +26,7 @@ export class SocketUser {
this.updateManager = new UpdateManager(this); this.updateManager = new UpdateManager(this);
this.actionManager = new ActionManager(this); this.actionManager = new ActionManager(this);
this.eventManager = new EventManager(this); this.eventManager = new EventManager(this);
this.statsManager = new StatsManager(this);
this.templateManager = socketManager.templateManager; this.templateManager = socketManager.templateManager;
this.keycloakAuth = new KeycloakAuth(); this.keycloakAuth = new KeycloakAuth();
this.setupSocketEventHandlers(); this.setupSocketEventHandlers();
@ -60,6 +62,14 @@ export class SocketUser {
'unsubscribeObjectEvent', 'unsubscribeObjectEvent',
this.handleUnsubscribeObjectEventEvent.bind(this) this.handleUnsubscribeObjectEventEvent.bind(this)
); );
this.socket.on(
'subscribeToModelStats',
this.handleSubscribeToStatsEvent.bind(this)
);
this.socket.on(
'unsubscribeModelStats',
this.handleUnsubscribeToStatsEvent.bind(this)
);
this.socket.on( this.socket.on(
'previewTemplate', 'previewTemplate',
this.handlePreviewTemplateEvent.bind(this) this.handlePreviewTemplateEvent.bind(this)
@ -191,6 +201,14 @@ export class SocketUser {
); );
} }
async handleSubscribeToStatsEvent(data) {
await this.statsManager.subscribeToStats(data.objectType);
}
async handleUnsubscribeToStatsEvent(data) {
await this.statsManager.removeStatsListener(data.objectType);
}
async handlePreviewTemplateEvent(data, callback) { async handlePreviewTemplateEvent(data, callback) {
const result = await this.templateManager.renderTemplate( const result = await this.templateManager.renderTemplate(
data._id, data._id,
@ -227,6 +245,7 @@ export class SocketUser {
async handleDisconnect() { async handleDisconnect() {
await this.actionManager.removeAllListeners(); await this.actionManager.removeAllListeners();
await this.eventManager.removeAllListeners(); await this.eventManager.removeAllListeners();
await this.statsManager.removeAllListeners();
logger.info('External user disconnected:', this.socket.user?.username); logger.info('External user disconnected:', this.socket.user?.username);
} }
} }

82
src/stats/statsmanager.js Normal file
View File

@ -0,0 +1,82 @@
import log4js from 'log4js';
import { loadConfig } from '../config.js';
import { natsServer } from '../database/nats.js';
const config = loadConfig();
// Setup logger
const logger = log4js.getLogger('Stats Manager');
logger.level = config.server.logLevel;
/**
* StatsManager handles tracking stats updates using NATS and broadcasts stats updates via websockets.
*/
export class StatsManager {
constructor(socketClient) {
this.socketClient = socketClient;
this.subscriptions = new Set();
}
async subscribeToStats(type) {
logger.debug('Subscribing to stats:', type);
const subject = `${type}s.stats`;
const subscriptionKey = `${subject}:${this.socketClient.socketId}`;
await natsServer.subscribe(
subject,
this.socketClient.socketId,
(key, value) => {
if (!value?.result) {
logger.trace('Stats update detected:', type);
console.log('Stats update detected:', type, value);
this.socketClient.socket.emit('modelStats', {
objectType: type,
stats: { ...value }
});
}
}
);
this.subscriptions.add(subscriptionKey);
return { success: true };
}
async removeStatsListener(type) {
// Remove stats subscription for this type
const subject = `${type}s.stats`;
const subscriptionKey = `${subject}:${this.socketClient.socketId}`;
await natsServer.removeSubscription(subject, this.socketClient.socketId);
this.subscriptions.delete(subscriptionKey);
return { success: true };
}
async sendStats(type, stats) {
try {
logger.trace(`Publishing stats: type=${type}, stats:`, stats);
await natsServer.publish(`${type}s.stats`, stats);
return { success: true };
} catch (error) {
logger.error(`Failed to publish stats for ${type}s.stats:`, error);
return {
error: error?.message || `Failed to publish stats for ${type}s.stats.`
};
}
}
async removeAllListeners() {
logger.debug('Removing all stats listeners...');
const removePromises = Array.from(this.subscriptions).map(
subscriptionKey => {
const [subject, socketId] = subscriptionKey.split(':');
return natsServer.removeSubscription(subject, socketId);
}
);
await Promise.all(removePromises);
this.subscriptions.clear();
logger.debug(`Removed ${removePromises.length} stats listener(s)`);
return { success: true };
}
}