From f72c27c58898aa5414abac0893985ddcb57ce73a Mon Sep 17 00:00:00 2001 From: Tom Butcher Date: Sat, 13 Dec 2025 21:03:55 +0000 Subject: [PATCH] Added stats support. --- src/database/database.js | 352 ++++++++++++++++++++++---------------- src/database/utils.js | 5 + src/socket/socketuser.js | 19 ++ src/stats/statsmanager.js | 82 +++++++++ 4 files changed, 315 insertions(+), 143 deletions(-) create mode 100644 src/stats/statsmanager.js diff --git a/src/database/database.js b/src/database/database.js index 9ea6a25..e6e10c5 100644 --- a/src/database/database.js +++ b/src/database/database.js @@ -5,12 +5,14 @@ import { editAuditLog, distributeUpdate, newAuditLog, - distributeNew + distributeNew, + distributeStats } from './utils.js'; import log4js from 'log4js'; import { loadConfig } from '../config.js'; -import { jsonToCacheKey } from '../utils.js'; +import { getQueryToCacheKey } from '../utils.js'; import { redisServer } from './redis.js'; +import { auditLogModel } from './schemas/management/auditlog.schema.js'; const config = loadConfig(); @@ -23,27 +25,19 @@ cacheLogger.level = config.server.logLevel; const CACHE_TTL_SECONDS = config.database?.redis?.ttlSeconds || 5; export const retrieveObjectCache = async ({ model, id, populate = [] }) => { - const cacheKeyObject = { - model: model.modelName, - id: id?.toString() - }; + const cacheKey = getQueryToCacheKey({ model: model.modelName, id, populate }); - const cacheKey = jsonToCacheKey(cacheKeyObject); - - cacheLogger.trace('Retrieving:', cacheKeyObject); + cacheLogger.trace('Retrieving:', cacheKey); try { const cachedObject = await redisServer.getKey(cacheKey); if (cachedObject == null) { - cacheLogger.trace('Miss:', cacheKeyObject); + cacheLogger.trace('Miss:', cacheKey); return undefined; } - cacheLogger.trace('Hit:', { - model: model.modelName, - id: cacheKeyObject.id - }); + cacheLogger.trace('Hit:', cacheKey); return cachedObject; } catch (err) { @@ -52,65 +46,43 @@ export const retrieveObjectCache = async ({ model, id, populate = [] }) => { } }; -export const retrieveListCache = async ({ +export const updateObjectCache = async ({ model, - populate = [], - filter = {}, - sort = '', - order = 'ascend', - project = {} + id, + object, + populate = [] }) => { - const cacheKeyObject = { - model: model.modelName, - populate, - filter, - sort, - project, - order - }; + const cacheKeyFilter = `${model.modelName}:${id?.toString()}*`; + const cacheKey = getQueryToCacheKey({ model: model.modelName, id, populate }); - const cacheKey = jsonToCacheKey(cacheKeyObject); - - cacheLogger.trace('Retrieving:', cacheKeyObject); + cacheLogger.trace('Updating:', cacheKeyFilter); try { - const cachedList = await redisServer.getKey(cacheKey); + // Get all keys matching the filter pattern + const matchingKeys = await redisServer.getKeysByPattern(cacheKeyFilter); - if (cachedList != null) { - cacheLogger.trace('Hit:', { - ...cacheKeyObject, - length: cachedList.length - }); - return cachedList; + logger.trace('Matching keys:', matchingKeys); + + // Merge the object with each cached object and update + const mergedObjects = []; + for (const key of matchingKeys) { + logger.trace('Updating object cache:', key); + const cachedObject = (await redisServer.getKey(key)) || {}; + const mergedObject = _.merge(cachedObject, object); + await redisServer.setKey(key, mergedObject, CACHE_TTL_SECONDS); + mergedObjects.push(mergedObject); } - cacheLogger.trace('Miss:', { - model: model.modelName - }); - return undefined; - } catch (err) { - cacheLogger.error('Error retrieving list from Redis cache:', err); - return undefined; - } -}; - -export const updateObjectCache = async ({ model, id, object }) => { - const cacheKeyObject = { - model: model.modelName, - id: id?.toString() - }; - - const cacheKey = jsonToCacheKey(cacheKeyObject); - - cacheLogger.trace('Updating:', cacheKeyObject); - - try { - const cachedObject = (await redisServer.getKey(cacheKey)) || {}; - const mergedObject = _.merge(cachedObject, object); - + const cacheObject = (await redisServer.getKey(cacheKey)) || {}; + const mergedObject = _.merge(cacheObject, object); await redisServer.setKey(cacheKey, mergedObject, CACHE_TTL_SECONDS); - cacheLogger.trace('Updated:', { ...cacheKeyObject }); + cacheLogger.trace('Updated:', { + filter: cacheKeyFilter, + keysUpdated: matchingKeys.length + }); + + // Return the merged object return mergedObject; } catch (err) { cacheLogger.error('Error updating object in Redis cache:', err); @@ -120,69 +92,27 @@ export const updateObjectCache = async ({ model, id, object }) => { }; export const deleteObjectCache = async ({ model, id }) => { - const cacheKeyObject = { - model: model.modelName, - id: id?.toString() - }; + const cacheKeyFilter = `${model.modelName}:${id?.toString()}*`; - cacheLogger.trace('Deleting:', { - ...cacheKeyObject - }); + cacheLogger.trace('Deleting:', cacheKeyFilter); try { - // Note: we currently delete the non-populated key; populated variants will expire via TTL. - const cacheKey = jsonToCacheKey({ ...cacheKeyObject, populate: [] }); - await redisServer.deleteKey(cacheKey); + // Get all keys matching the filter pattern and delete them + const matchingKeys = await redisServer.getKeysByPattern(cacheKeyFilter); + + for (const cacheKey of matchingKeys) { + await redisServer.deleteKey(cacheKey); + } cacheLogger.trace('Deleted:', { - ...cacheKeyObject + filter: cacheKeyFilter, + keysDeleted: matchingKeys.length }); } catch (err) { cacheLogger.error('Error deleting object from Redis cache:', err); } }; -export const updateListCache = ({ - model, - objects, - populate = [], - filter = {}, - sort = '', - order = 'ascend', - project = {} -}) => { - const cacheKeyObject = { - model: model.modelName, - populate, - filter, - sort, - project, - order - }; - - cacheLogger.trace('Updating:', { - ...cacheKeyObject, - length: objects.length - }); - - const cacheKey = jsonToCacheKey(cacheKeyObject); - - return (async () => { - try { - await redisServer.setKey(cacheKey, objects, CACHE_TTL_SECONDS); - - cacheLogger.trace('Updated:', { - ...cacheKeyObject, - length: objects.length - }); - } catch (err) { - cacheLogger.error('Error updating list in Redis cache:', err); - } - - return objects; - })(); -}; - // Reusable function to list objects with aggregation, filtering, search, sorting, and pagination export const listObjects = async ({ model, @@ -204,20 +134,6 @@ export const listObjects = async ({ cached }); - if (cached == true) { - const objectsCache = await retrieveListCache({ - model, - populate, - filter, - sort, - order, - project - }); - if (objectsCache != undefined) { - return objectsCache; - } - } - // Fix: descend should be -1, ascend should be 1 const sortOrder = order === 'descend' ? -1 : 1; @@ -261,16 +177,6 @@ export const listObjects = async ({ const finalResult = expandObjectIds(queryResult); - updateListCache({ - model, - objects: finalResult, - populate, - filter, - sort, - order, - project - }); - logger.trace('Retreived from database:', { model, populate, @@ -338,7 +244,9 @@ export const getObject = async ({ populate }); - updateObjectCache({ + logger.trace(finalResult); + + await updateObjectCache({ model: model, id: finalResult._id.toString(), populate, @@ -353,6 +261,152 @@ export const getObject = async ({ } }; +// Utility to run one or many rollup aggregations in a single query via $facet. +export const aggregateRollups = async ({ + model, + baseFilter = {}, + rollupConfigs = [] +}) => { + if (!rollupConfigs.length) { + return {}; + } + + const facetStage = rollupConfigs.reduce((facets, definition, index) => { + const key = definition.name || `rollup${index}`; + const matchStage = { + $match: { ...baseFilter, ...(definition.filter || {}) } + }; + const groupStage = { $group: { _id: null } }; + + (definition.rollups || []).forEach(rollup => { + switch (rollup.operation) { + case 'sum': + groupStage.$group[rollup.name] = { $sum: `$${rollup.property}` }; + break; + case 'count': + groupStage.$group[rollup.name] = { $sum: 1 }; + break; + case 'avg': + groupStage.$group[rollup.name] = { $avg: `$${rollup.property}` }; + break; + default: + throw new Error(`Unsupported rollup operation: ${rollup.operation}`); + } + }); + + facets[key] = [matchStage, groupStage]; + return facets; + }, {}); + + const [results] = await model.aggregate([{ $facet: facetStage }]); + + return rollupConfigs.reduce((acc, definition, index) => { + const key = definition.name || `rollup${index}`; + const rawResult = results?.[key]?.[0] || {}; + + // Transform the result to nest rollup values under operation type + const transformedResult = {}; + (definition.rollups || []).forEach(rollup => { + const value = rawResult[rollup.name] || 0; + // If there's only one rollup and its name matches the key, flatten the structure + if (definition.rollups.length === 1 && rollup.name === key) { + transformedResult[rollup.operation] = value; + } else { + transformedResult[rollup.name] = { [rollup.operation]: value }; + } + }); + + acc[key] = transformedResult; + return acc; + }, {}); +}; + +// Reusable function to aggregate rollups over history using audit logs +export const aggregateRollupsHistory = async ({ + model, + baseFilter = {}, + rollupConfigs = [], + startDate, + endDate +}) => { + if (!rollupConfigs.length) { + return {}; + } + + // Helper to map filter keys to audit log structure + const mapFilterToAudit = filter => { + if (Array.isArray(filter)) { + return filter.map(mapFilterToAudit); + } + + if (_.isPlainObject(filter)) { + const newFilter = {}; + for (const key in filter) { + if (['$or', '$and', '$nor', '$in', '$nin'].includes(key)) { + newFilter[key] = mapFilterToAudit(filter[key]); + } else if (key.startsWith('$')) { + newFilter[key] = mapFilterToAudit(filter[key]); + } else if ( + [ + 'operation', + 'parent', + 'parentType', + 'owner', + 'ownerType', + 'createdAt', + 'updatedAt', + '_id', + '_reference', + 'changes' + ].includes(key) + ) { + newFilter[key] = mapFilterToAudit(filter[key]); + } else { + newFilter[`changes.new.${key}`] = mapFilterToAudit(filter[key]); + } + } + return newFilter; + } + + return filter; + }; + + const matchQuery = { + parentType: model.modelName, + ...mapFilterToAudit(baseFilter) + }; + + if (startDate || endDate) { + matchQuery.createdAt = {}; + if (startDate) matchQuery.createdAt.$gte = new Date(startDate); + if (endDate) matchQuery.createdAt.$lte = new Date(endDate); + if (Object.keys(matchQuery.createdAt).length === 0) + delete matchQuery.createdAt; + } + + const mappedRollupConfigs = rollupConfigs.map(config => ({ + ...config, + filter: config.filter ? mapFilterToAudit(config.filter) : undefined, + rollups: (config.rollups || []).map(rollup => ({ + ...rollup, + property: `changes.new.${rollup.property}` + })) + })); + + return await aggregateRollups({ + model: auditLogModel, + baseFilter: matchQuery, + rollupConfigs: mappedRollupConfigs + }); +}; + +export const getModelStats = async ({ model }) => { + if (!model.stats) { + return { error: 'Model does not have a stats method.', code: 500 }; + } + return await model.stats(); +}; + // Reusable function to edit an object by ID, with audit logging and distribution export const editObject = async ({ model, @@ -405,13 +459,24 @@ export const editObject = async ({ // Distribute update await distributeUpdate(updateData, id, parentType); - updateObjectCache({ + await updateObjectCache({ model: model, id: id.toString(), object: { ...previousExpandedObject, ...updateData }, populate }); + if (model.recalculate) { + logger.debug(`Recalculating ${model.modelName}`); + await model.recalculate(newExpandedObject, owner, ownerType); + } + + if (model.stats) { + logger.debug(`Getting stats for ${model.modelName}`); + const statsData = await model.stats(newExpandedObject); + await distributeStats(statsData, parentType); + } + return { ...previousExpandedObject, ...updateData }; } catch (error) { logger.error('editObject error:', error); @@ -441,10 +506,11 @@ export const newObject = async ({ await distributeNew(created._id, parentType); - updateObjectCache({ + await updateObjectCache({ model: model, id: created._id.toString(), - object: { _id: created._id, ...newData } + object: { _id: created._id, ...newData }, + populate: [] }); return created; diff --git a/src/database/utils.js b/src/database/utils.js index e56e7ab..d641e49 100644 --- a/src/database/utils.js +++ b/src/database/utils.js @@ -436,6 +436,10 @@ async function distributeUpdate(value, id, type) { await natsServer.publish(`${type}s.${id}.object`, value); } +async function distributeStats(value, type) { + await natsServer.publish(`${type}s.stats`, value); +} + async function distributeNew(id, type) { await natsServer.publish(`${type}s.new`, id); } @@ -541,6 +545,7 @@ export { expandObjectIds, distributeUpdate, distributeNew, + distributeStats, getFilter, // <-- add here convertPropertiesString }; diff --git a/src/socket/socketuser.js b/src/socket/socketuser.js index 69d3d20..3662565 100644 --- a/src/socket/socketuser.js +++ b/src/socket/socketuser.js @@ -7,6 +7,7 @@ import { LockManager } from '../lock/lockmanager.js'; import { UpdateManager } from '../updates/updatemanager.js'; import { ActionManager } from '../actions/actionmanager.js'; import { EventManager } from '../events/eventmanager.js'; +import { StatsManager } from '../stats/statsmanager.js'; const config = loadConfig(); @@ -25,6 +26,7 @@ export class SocketUser { this.updateManager = new UpdateManager(this); this.actionManager = new ActionManager(this); this.eventManager = new EventManager(this); + this.statsManager = new StatsManager(this); this.templateManager = socketManager.templateManager; this.keycloakAuth = new KeycloakAuth(); this.setupSocketEventHandlers(); @@ -60,6 +62,14 @@ export class SocketUser { 'unsubscribeObjectEvent', this.handleUnsubscribeObjectEventEvent.bind(this) ); + this.socket.on( + 'subscribeToModelStats', + this.handleSubscribeToStatsEvent.bind(this) + ); + this.socket.on( + 'unsubscribeModelStats', + this.handleUnsubscribeToStatsEvent.bind(this) + ); this.socket.on( 'previewTemplate', this.handlePreviewTemplateEvent.bind(this) @@ -191,6 +201,14 @@ export class SocketUser { ); } + async handleSubscribeToStatsEvent(data) { + await this.statsManager.subscribeToStats(data.objectType); + } + + async handleUnsubscribeToStatsEvent(data) { + await this.statsManager.removeStatsListener(data.objectType); + } + async handlePreviewTemplateEvent(data, callback) { const result = await this.templateManager.renderTemplate( data._id, @@ -227,6 +245,7 @@ export class SocketUser { async handleDisconnect() { await this.actionManager.removeAllListeners(); await this.eventManager.removeAllListeners(); + await this.statsManager.removeAllListeners(); logger.info('External user disconnected:', this.socket.user?.username); } } diff --git a/src/stats/statsmanager.js b/src/stats/statsmanager.js new file mode 100644 index 0000000..4f83b82 --- /dev/null +++ b/src/stats/statsmanager.js @@ -0,0 +1,82 @@ +import log4js from 'log4js'; +import { loadConfig } from '../config.js'; +import { natsServer } from '../database/nats.js'; + +const config = loadConfig(); + +// Setup logger +const logger = log4js.getLogger('Stats Manager'); +logger.level = config.server.logLevel; + +/** + * StatsManager handles tracking stats updates using NATS and broadcasts stats updates via websockets. + */ +export class StatsManager { + constructor(socketClient) { + this.socketClient = socketClient; + this.subscriptions = new Set(); + } + + async subscribeToStats(type) { + logger.debug('Subscribing to stats:', type); + const subject = `${type}s.stats`; + const subscriptionKey = `${subject}:${this.socketClient.socketId}`; + + await natsServer.subscribe( + subject, + this.socketClient.socketId, + (key, value) => { + if (!value?.result) { + logger.trace('Stats update detected:', type); + console.log('Stats update detected:', type, value); + this.socketClient.socket.emit('modelStats', { + objectType: type, + stats: { ...value } + }); + } + } + ); + + this.subscriptions.add(subscriptionKey); + return { success: true }; + } + + async removeStatsListener(type) { + // Remove stats subscription for this type + const subject = `${type}s.stats`; + const subscriptionKey = `${subject}:${this.socketClient.socketId}`; + + await natsServer.removeSubscription(subject, this.socketClient.socketId); + + this.subscriptions.delete(subscriptionKey); + return { success: true }; + } + + async sendStats(type, stats) { + try { + logger.trace(`Publishing stats: type=${type}, stats:`, stats); + await natsServer.publish(`${type}s.stats`, stats); + return { success: true }; + } catch (error) { + logger.error(`Failed to publish stats for ${type}s.stats:`, error); + return { + error: error?.message || `Failed to publish stats for ${type}s.stats.` + }; + } + } + + async removeAllListeners() { + logger.debug('Removing all stats listeners...'); + const removePromises = Array.from(this.subscriptions).map( + subscriptionKey => { + const [subject, socketId] = subscriptionKey.split(':'); + return natsServer.removeSubscription(subject, socketId); + } + ); + + await Promise.all(removePromises); + this.subscriptions.clear(); + logger.debug(`Removed ${removePromises.length} stats listener(s)`); + return { success: true }; + } +}