diff --git a/src/database/database.js b/src/database/database.js index de114b9..7c26a54 100644 --- a/src/database/database.js +++ b/src/database/database.js @@ -1,5 +1,5 @@ import dotenv from 'dotenv'; -import { fileModel } from '../schemas/management/file.schema.js'; +import { fileModel } from './schemas/management/file.schema.js'; import _ from 'lodash'; import { deleteAuditLog, @@ -7,7 +7,7 @@ import { expandObjectIds, modelHasRef, getFieldsByRef, - jsonToCacheKey, + getQueryToCacheKey, } from '../utils.js'; import log4js from 'log4js'; import { @@ -18,9 +18,11 @@ import { distributeChildUpdate, distributeChildDelete, distributeChildNew, + distributeStats, } from '../utils.js'; import { getAllModels } from '../services/misc/model.js'; import { redisServer } from './redis.js'; +import { auditLogModel } from './schemas/management/auditlog.schema.js'; dotenv.config(); @@ -35,19 +37,14 @@ const CACHE_TTL_SECONDS = parseInt(process.env.REDIS_CACHE_TTL || '30', 10); export const retrieveObjectCache = async ({ model, id, populate = [] }) => { if (!model || !id) return undefined; - const cacheKeyObject = { - model: model.modelName, - id: id.toString(), - }; + const cacheKey = getQueryToCacheKey({ model: model.modelName, id, populate }); - const cacheKey = jsonToCacheKey(cacheKeyObject); - - cacheLogger.trace('Retrieving object from cache:', cacheKeyObject); + cacheLogger.trace('Retrieving object from cache:', { model: model.modelName, id, populate }); try { const cachedObject = await redisServer.getKey(cacheKey); if (cachedObject == null) { - cacheLogger.trace('Cache miss:', cacheKeyObject); + cacheLogger.trace('Cache miss:', { model: model.modelName, id }); return undefined; } @@ -66,48 +63,285 @@ export const retrieveObjectCache = async ({ model, id, populate = [] }) => { export const updateObjectCache = async ({ model, id, object, populate = [] }) => { if (!model || !id || !object) return object; - const cacheKeyObject = { - model: model.modelName, - id: id.toString(), - }; + const cacheKeyFilter = `${model.modelName}:${id?.toString()}*`; + const cacheKey = getQueryToCacheKey({ model: model.modelName, id, populate }); - const cacheKey = jsonToCacheKey(cacheKeyObject); - - cacheLogger.trace('Updating object cache:', cacheKeyObject); + cacheLogger.trace('Updating object cache:', cacheKeyFilter); try { - const cachedObject = (await redisServer.getKey(cacheKey)) || {}; - const mergedObject = _.merge(cachedObject, object); + // Get all keys matching the filter pattern + const matchingKeys = await redisServer.getKeysByPattern(cacheKeyFilter); + // Merge the object with each cached object and update + const mergedObjects = []; + for (const key of matchingKeys) { + logger.trace('Updating object cache:', key); + const cachedObject = (await redisServer.getKey(key)) || {}; + const mergedObject = _.merge(cachedObject, object); + await redisServer.setKey(key, mergedObject, CACHE_TTL_SECONDS); + mergedObjects.push(mergedObject); + } + + const cacheObject = (await redisServer.getKey(cacheKey)) || {}; + const mergedObject = _.merge(cacheObject, object); await redisServer.setKey(cacheKey, mergedObject, CACHE_TTL_SECONDS); - cacheLogger.trace('Updated object cache:', cacheKeyObject); + + cacheLogger.trace('Updated object cache:', { + filter: cacheKeyFilter, + keysUpdated: matchingKeys.length, + }); + + // Return the merged object + return mergedObject; } catch (err) { cacheLogger.error('Error updating object in Redis cache:', err); + // Fallback to returning the provided object if cache fails + return object; } - - return object; }; export const deleteObjectCache = async ({ model, id }) => { if (!model || !id) return; - const cacheKeyObject = { - model: model.modelName, - id: id.toString(), - populate: [], - }; + const cacheKeyFilter = `${model.modelName}:${id?.toString()}*`; - const cacheKey = jsonToCacheKey(cacheKeyObject); - - cacheLogger.trace('Deleting object cache:', cacheKeyObject); + cacheLogger.trace('Deleting object cache:', cacheKeyFilter); try { - await redisServer.deleteKey(cacheKey); - cacheLogger.trace('Deleted object cache:', cacheKeyObject); + // Get all keys matching the filter pattern and delete them + const matchingKeys = await redisServer.getKeysByPattern(cacheKeyFilter); + + for (const cacheKey of matchingKeys) { + await redisServer.deleteKey(cacheKey); + } + + cacheLogger.trace('Deleted object cache:', { + filter: cacheKeyFilter, + keysDeleted: matchingKeys.length, + }); } catch (err) { cacheLogger.error('Error deleting object from Redis cache:', err); } }; +// Utility to run one or many rollup aggregations in a single query via $facet. +export const aggregateRollups = async ({ model, baseFilter = {}, rollupConfigs = [] }) => { + if (!rollupConfigs.length) { + return {}; + } + + const facetStage = rollupConfigs.reduce((facets, definition, index) => { + const key = definition.name || `rollup${index}`; + const matchStage = { $match: { ...baseFilter, ...(definition.filter || {}) } }; + const groupStage = { $group: { _id: null } }; + + (definition.rollups || []).forEach((rollup) => { + switch (rollup.operation) { + case 'sum': + groupStage.$group[rollup.name] = { $sum: `$${rollup.property}` }; + break; + case 'count': + groupStage.$group[rollup.name] = { $sum: 1 }; + break; + case 'avg': + groupStage.$group[rollup.name] = { $avg: `$${rollup.property}` }; + break; + default: + throw new Error(`Unsupported rollup operation: ${rollup.operation}`); + } + }); + + facets[key] = [matchStage, groupStage]; + return facets; + }, {}); + + const [results] = await model.aggregate([{ $facet: facetStage }]); + + return rollupConfigs.reduce((acc, definition, index) => { + const key = definition.name || `rollup${index}`; + const rawResult = results?.[key]?.[0] || {}; + + // Transform the result to nest rollup values under operation type + const transformedResult = {}; + (definition.rollups || []).forEach((rollup) => { + const value = rawResult[rollup.name] || 0; + // If there's only one rollup and its name matches the key, flatten the structure + if (definition.rollups.length === 1 && rollup.name === key) { + transformedResult[rollup.operation] = value; + } else { + transformedResult[rollup.name] = { [rollup.operation]: value }; + } + }); + + acc[key] = transformedResult; + return acc; + }, {}); +}; + +// Reusable function to aggregate rollups over history using state reconstruction +export const aggregateRollupsHistory = async ({ + model, + baseFilter = {}, + rollupConfigs = [], + startDate, + endDate, +}) => { + if (!rollupConfigs.length) { + return []; + } + + // Set default dates if not provided + const end = endDate ? new Date(endDate) : new Date(); + const start = startDate ? new Date(startDate) : new Date(end.getTime() - 24 * 60 * 60 * 1000); + + // Get model name for filtering audit logs + const parentType = model.modelName ? model.modelName : 'unknown'; + + // 1. Fetch all audit logs for this model type from start date to now + // Filter by parentType instead of fetching object IDs first + const auditLogs = await auditLogModel + .find({ + parentType, + createdAt: { $gte: start }, + }) + .sort({ createdAt: -1 }) // Newest first + .lean(); + + // 2. Extract unique parent IDs from audit logs + const parentIds = [...new Set(auditLogs.map((log) => log.parent.toString()))]; + + if (parentIds.length === 0) { + return []; + } + + // 3. Fetch current state of relevant objects that match baseFilter + // Note: This only includes objects that CURRENTLY match the baseFilter. + // Objects that matched in the past but don't match now are excluded. + const currentObjects = await model + .find({ + _id: { $in: parentIds }, + ...baseFilter, + }) + .lean(); + const objectMap = new Map(); + currentObjects.forEach((obj) => { + // Ensure _id is a string for map keys + objectMap.set(obj._id.toString(), expandObjectIds(obj)); + }); + + if (objectMap.size === 0) { + return []; + } + + // Helper to check if object matches filter + const matchesFilter = (obj, filter) => { + if (!filter || Object.keys(filter).length === 0) return true; + + for (const [key, expectedValue] of Object.entries(filter)) { + const actualValue = _.get(obj, key); + + // Handle simple equality + if (actualValue != expectedValue) { + return false; + } + } + return true; + }; + + // 3. Generate time buckets (1 minute intervals) + const buckets = []; + let currentTime = new Date(end); + // Round down to nearest minute + currentTime.setSeconds(0, 0); + + while (currentTime >= start) { + buckets.push(new Date(currentTime)); + currentTime = new Date(currentTime.getTime() - 60000); // -1 minute + } + + // 4. Rewind state and snapshot + const results = []; + let logIndex = 0; + + // Create a working copy of objects to mutate during rewind + // (deep clone to avoid issues if we need original later, though expandObjectIds creates new objs) + const workingObjects = new Map(); + objectMap.forEach((val, key) => workingObjects.set(key, _.cloneDeep(val))); + + // Iterate backwards through time + for (const bucketDate of buckets) { + // Apply all logs that happened AFTER this bucket time (between last bucket and this one) + // Since we iterate backwards, these are logs with createdAt > bucketDate + while (logIndex < auditLogs.length) { + const log = auditLogs[logIndex]; + const logDate = new Date(log.createdAt); + + if (logDate <= bucketDate) { + // This log happened at or before the current bucket time, + // so its effects are already present (or rather, will be handled in a future/earlier bucket). + // Stop processing logs for this step. + break; + } + + // Revert this change + const objectId = log.parent.toString(); + const object = workingObjects.get(objectId); + + if (object) { + if (log.operation === 'new') { + // Object didn't exist before this creation event + workingObjects.delete(objectId); + } else if (log.changes && log.changes.old) { + // Apply old values to revert state + _.merge(object, log.changes.old); + } + } + + logIndex++; + } + + // Snapshot: Calculate rollups for current state of all objects + const bucketResult = { + date: bucketDate.toISOString(), + }; + + const activeObjects = Array.from(workingObjects.values()); + + rollupConfigs.forEach((config) => { + const configName = config.name; + + // Filter objects for this config + // Note: We also check baseFilter here in case the object state reverted to something + // that no longer matches baseFilter (e.g. active: false) + const matchingObjects = activeObjects.filter( + (obj) => matchesFilter(obj, baseFilter) && matchesFilter(obj, config.filter) + ); + + // Calculate rollups + (config.rollups || []).forEach((rollup) => { + const rollupName = rollup.name; + + let value = 0; + if (rollup.operation === 'count') { + value = matchingObjects.length; + } else if (rollup.operation === 'sum') { + value = _.sumBy(matchingObjects, (obj) => _.get(obj, rollup.property) || 0); + } else if (rollup.operation === 'avg') { + const sum = _.sumBy(matchingObjects, (obj) => _.get(obj, rollup.property) || 0); + value = matchingObjects.length ? sum / matchingObjects.length : 0; + } + + // Nest the value under the operation type + bucketResult[rollupName] = { [rollup.operation]: value }; + }); + }); + + results.push(bucketResult); + } + + // Reverse results to be chronological + return results.reverse(); +}; + // Reusable function to list objects with aggregation, filtering, search, sorting, and pagination export const listObjects = async ({ model, @@ -324,14 +558,13 @@ export const listObjectsByProperties = async ({ } else if (typeof pop === 'object' && pop.path) { pipeline.push({ $lookup: { - from: - pop.options && pop.options.from ? pop.options.from : pop.path.toLowerCase() + 's', + from: pop.from ? pop.from : pop.path.toLowerCase(), localField: pop.path, foreignField: '_id', as: pop.path, }, }); - if (!pop.justOne === false) { + if (pop?.multiple == false || pop?.multiple == undefined) { // default to unwind unless justOne is explicitly false pipeline.push({ $unwind: { @@ -363,6 +596,7 @@ export const listObjectsByProperties = async ({ // Run aggregation const results = await model.aggregate(pipeline); + console.log('results', results); return nestGroups(results, properties, filter); } else { // If no properties specified, just return all objects without grouping @@ -435,6 +669,22 @@ export const getObject = async ({ model, id, populate }) => { } }; +export const getModelStats = async ({ model }) => { + if (!model.stats) { + logger.warn(`Model ${model.modelName} does not have a stats method.`); + return { error: 'Model does not have a stats method.', code: 500 }; + } + return await model.stats(); +}; + +export const getModelHistory = async ({ model, from, to }) => { + if (!model.history && !from && !to) { + logger.warn(`Model ${model.modelName} does not have a history method.`); + return { error: 'Model does not have a history method.', code: 500 }; + } + return await model.history(from, to); +}; + export const listObjectDependencies = async ({ model, id }) => { try { const dependencies = []; @@ -562,6 +812,17 @@ export const editObject = async ({ model, id, updateData, user, populate }) => { populate, }); + if (model.recalculate) { + logger.debug(`Recalculating ${model.modelName}`); + await model.recalculate(updatedObject, user); + } + + if (model.stats) { + logger.debug(`Getting stats for ${model.modelName}`); + const statsData = await model.stats(); + await distributeStats(statsData, parentType); + } + return updatedObject; } catch (error) { logger.error('editObject error:', error); @@ -594,6 +855,17 @@ export const newObject = async ({ model, newData, user = null }, distributeChang populate: [], }); + if (model.recalculate) { + logger.debug(`Recalculating ${model.modelName}`); + await model.recalculate(created, user); + } + + if (model.stats) { + logger.debug(`Getting stats for ${model.modelName}`); + const statsData = await model.stats(); + await distributeStats(statsData, parentType); + } + return created; } catch (error) { logger.error('newObject error:', error); @@ -625,6 +897,17 @@ export const deleteObject = async ({ model, id, user = null }, distributeChanges // Invalidate cache for this object await deleteObjectCache({ model, id }); + if (model.recalculate) { + logger.debug(`Recalculating ${model.modelName}`); + await model.recalculate(deleted, user); + } + + if (model.stats) { + logger.debug(`Getting stats for ${model.modelName}`); + const statsData = await model.stats(); + await distributeStats(statsData, parentType); + } + return { deleted: true, object: deleted }; } catch (error) { logger.error('deleteObject error:', error);