Implemented stats
This commit is contained in:
parent
cebe060ac5
commit
113a16818a
@ -1,5 +1,5 @@
|
||||
import dotenv from 'dotenv';
|
||||
import { fileModel } from '../schemas/management/file.schema.js';
|
||||
import { fileModel } from './schemas/management/file.schema.js';
|
||||
import _ from 'lodash';
|
||||
import {
|
||||
deleteAuditLog,
|
||||
@ -7,7 +7,7 @@ import {
|
||||
expandObjectIds,
|
||||
modelHasRef,
|
||||
getFieldsByRef,
|
||||
jsonToCacheKey,
|
||||
getQueryToCacheKey,
|
||||
} from '../utils.js';
|
||||
import log4js from 'log4js';
|
||||
import {
|
||||
@ -18,9 +18,11 @@ import {
|
||||
distributeChildUpdate,
|
||||
distributeChildDelete,
|
||||
distributeChildNew,
|
||||
distributeStats,
|
||||
} from '../utils.js';
|
||||
import { getAllModels } from '../services/misc/model.js';
|
||||
import { redisServer } from './redis.js';
|
||||
import { auditLogModel } from './schemas/management/auditlog.schema.js';
|
||||
|
||||
dotenv.config();
|
||||
|
||||
@ -35,19 +37,14 @@ const CACHE_TTL_SECONDS = parseInt(process.env.REDIS_CACHE_TTL || '30', 10);
|
||||
export const retrieveObjectCache = async ({ model, id, populate = [] }) => {
|
||||
if (!model || !id) return undefined;
|
||||
|
||||
const cacheKeyObject = {
|
||||
model: model.modelName,
|
||||
id: id.toString(),
|
||||
};
|
||||
const cacheKey = getQueryToCacheKey({ model: model.modelName, id, populate });
|
||||
|
||||
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||
|
||||
cacheLogger.trace('Retrieving object from cache:', cacheKeyObject);
|
||||
cacheLogger.trace('Retrieving object from cache:', { model: model.modelName, id, populate });
|
||||
|
||||
try {
|
||||
const cachedObject = await redisServer.getKey(cacheKey);
|
||||
if (cachedObject == null) {
|
||||
cacheLogger.trace('Cache miss:', cacheKeyObject);
|
||||
cacheLogger.trace('Cache miss:', { model: model.modelName, id });
|
||||
return undefined;
|
||||
}
|
||||
|
||||
@ -66,48 +63,285 @@ export const retrieveObjectCache = async ({ model, id, populate = [] }) => {
|
||||
export const updateObjectCache = async ({ model, id, object, populate = [] }) => {
|
||||
if (!model || !id || !object) return object;
|
||||
|
||||
const cacheKeyObject = {
|
||||
model: model.modelName,
|
||||
id: id.toString(),
|
||||
};
|
||||
const cacheKeyFilter = `${model.modelName}:${id?.toString()}*`;
|
||||
const cacheKey = getQueryToCacheKey({ model: model.modelName, id, populate });
|
||||
|
||||
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||
|
||||
cacheLogger.trace('Updating object cache:', cacheKeyObject);
|
||||
cacheLogger.trace('Updating object cache:', cacheKeyFilter);
|
||||
|
||||
try {
|
||||
const cachedObject = (await redisServer.getKey(cacheKey)) || {};
|
||||
// Get all keys matching the filter pattern
|
||||
const matchingKeys = await redisServer.getKeysByPattern(cacheKeyFilter);
|
||||
// Merge the object with each cached object and update
|
||||
const mergedObjects = [];
|
||||
for (const key of matchingKeys) {
|
||||
logger.trace('Updating object cache:', key);
|
||||
const cachedObject = (await redisServer.getKey(key)) || {};
|
||||
const mergedObject = _.merge(cachedObject, object);
|
||||
await redisServer.setKey(cacheKey, mergedObject, CACHE_TTL_SECONDS);
|
||||
cacheLogger.trace('Updated object cache:', cacheKeyObject);
|
||||
} catch (err) {
|
||||
cacheLogger.error('Error updating object in Redis cache:', err);
|
||||
await redisServer.setKey(key, mergedObject, CACHE_TTL_SECONDS);
|
||||
mergedObjects.push(mergedObject);
|
||||
}
|
||||
|
||||
const cacheObject = (await redisServer.getKey(cacheKey)) || {};
|
||||
const mergedObject = _.merge(cacheObject, object);
|
||||
await redisServer.setKey(cacheKey, mergedObject, CACHE_TTL_SECONDS);
|
||||
|
||||
cacheLogger.trace('Updated object cache:', {
|
||||
filter: cacheKeyFilter,
|
||||
keysUpdated: matchingKeys.length,
|
||||
});
|
||||
|
||||
// Return the merged object
|
||||
return mergedObject;
|
||||
} catch (err) {
|
||||
cacheLogger.error('Error updating object in Redis cache:', err);
|
||||
// Fallback to returning the provided object if cache fails
|
||||
return object;
|
||||
}
|
||||
};
|
||||
|
||||
export const deleteObjectCache = async ({ model, id }) => {
|
||||
if (!model || !id) return;
|
||||
|
||||
const cacheKeyObject = {
|
||||
model: model.modelName,
|
||||
id: id.toString(),
|
||||
populate: [],
|
||||
};
|
||||
const cacheKeyFilter = `${model.modelName}:${id?.toString()}*`;
|
||||
|
||||
const cacheKey = jsonToCacheKey(cacheKeyObject);
|
||||
|
||||
cacheLogger.trace('Deleting object cache:', cacheKeyObject);
|
||||
cacheLogger.trace('Deleting object cache:', cacheKeyFilter);
|
||||
|
||||
try {
|
||||
// Get all keys matching the filter pattern and delete them
|
||||
const matchingKeys = await redisServer.getKeysByPattern(cacheKeyFilter);
|
||||
|
||||
for (const cacheKey of matchingKeys) {
|
||||
await redisServer.deleteKey(cacheKey);
|
||||
cacheLogger.trace('Deleted object cache:', cacheKeyObject);
|
||||
}
|
||||
|
||||
cacheLogger.trace('Deleted object cache:', {
|
||||
filter: cacheKeyFilter,
|
||||
keysDeleted: matchingKeys.length,
|
||||
});
|
||||
} catch (err) {
|
||||
cacheLogger.error('Error deleting object from Redis cache:', err);
|
||||
}
|
||||
};
|
||||
|
||||
// Utility to run one or many rollup aggregations in a single query via $facet.
|
||||
export const aggregateRollups = async ({ model, baseFilter = {}, rollupConfigs = [] }) => {
|
||||
if (!rollupConfigs.length) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const facetStage = rollupConfigs.reduce((facets, definition, index) => {
|
||||
const key = definition.name || `rollup${index}`;
|
||||
const matchStage = { $match: { ...baseFilter, ...(definition.filter || {}) } };
|
||||
const groupStage = { $group: { _id: null } };
|
||||
|
||||
(definition.rollups || []).forEach((rollup) => {
|
||||
switch (rollup.operation) {
|
||||
case 'sum':
|
||||
groupStage.$group[rollup.name] = { $sum: `$${rollup.property}` };
|
||||
break;
|
||||
case 'count':
|
||||
groupStage.$group[rollup.name] = { $sum: 1 };
|
||||
break;
|
||||
case 'avg':
|
||||
groupStage.$group[rollup.name] = { $avg: `$${rollup.property}` };
|
||||
break;
|
||||
default:
|
||||
throw new Error(`Unsupported rollup operation: ${rollup.operation}`);
|
||||
}
|
||||
});
|
||||
|
||||
facets[key] = [matchStage, groupStage];
|
||||
return facets;
|
||||
}, {});
|
||||
|
||||
const [results] = await model.aggregate([{ $facet: facetStage }]);
|
||||
|
||||
return rollupConfigs.reduce((acc, definition, index) => {
|
||||
const key = definition.name || `rollup${index}`;
|
||||
const rawResult = results?.[key]?.[0] || {};
|
||||
|
||||
// Transform the result to nest rollup values under operation type
|
||||
const transformedResult = {};
|
||||
(definition.rollups || []).forEach((rollup) => {
|
||||
const value = rawResult[rollup.name] || 0;
|
||||
// If there's only one rollup and its name matches the key, flatten the structure
|
||||
if (definition.rollups.length === 1 && rollup.name === key) {
|
||||
transformedResult[rollup.operation] = value;
|
||||
} else {
|
||||
transformedResult[rollup.name] = { [rollup.operation]: value };
|
||||
}
|
||||
});
|
||||
|
||||
acc[key] = transformedResult;
|
||||
return acc;
|
||||
}, {});
|
||||
};
|
||||
|
||||
// Reusable function to aggregate rollups over history using state reconstruction
|
||||
export const aggregateRollupsHistory = async ({
|
||||
model,
|
||||
baseFilter = {},
|
||||
rollupConfigs = [],
|
||||
startDate,
|
||||
endDate,
|
||||
}) => {
|
||||
if (!rollupConfigs.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Set default dates if not provided
|
||||
const end = endDate ? new Date(endDate) : new Date();
|
||||
const start = startDate ? new Date(startDate) : new Date(end.getTime() - 24 * 60 * 60 * 1000);
|
||||
|
||||
// Get model name for filtering audit logs
|
||||
const parentType = model.modelName ? model.modelName : 'unknown';
|
||||
|
||||
// 1. Fetch all audit logs for this model type from start date to now
|
||||
// Filter by parentType instead of fetching object IDs first
|
||||
const auditLogs = await auditLogModel
|
||||
.find({
|
||||
parentType,
|
||||
createdAt: { $gte: start },
|
||||
})
|
||||
.sort({ createdAt: -1 }) // Newest first
|
||||
.lean();
|
||||
|
||||
// 2. Extract unique parent IDs from audit logs
|
||||
const parentIds = [...new Set(auditLogs.map((log) => log.parent.toString()))];
|
||||
|
||||
if (parentIds.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// 3. Fetch current state of relevant objects that match baseFilter
|
||||
// Note: This only includes objects that CURRENTLY match the baseFilter.
|
||||
// Objects that matched in the past but don't match now are excluded.
|
||||
const currentObjects = await model
|
||||
.find({
|
||||
_id: { $in: parentIds },
|
||||
...baseFilter,
|
||||
})
|
||||
.lean();
|
||||
const objectMap = new Map();
|
||||
currentObjects.forEach((obj) => {
|
||||
// Ensure _id is a string for map keys
|
||||
objectMap.set(obj._id.toString(), expandObjectIds(obj));
|
||||
});
|
||||
|
||||
if (objectMap.size === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Helper to check if object matches filter
|
||||
const matchesFilter = (obj, filter) => {
|
||||
if (!filter || Object.keys(filter).length === 0) return true;
|
||||
|
||||
for (const [key, expectedValue] of Object.entries(filter)) {
|
||||
const actualValue = _.get(obj, key);
|
||||
|
||||
// Handle simple equality
|
||||
if (actualValue != expectedValue) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
// 3. Generate time buckets (1 minute intervals)
|
||||
const buckets = [];
|
||||
let currentTime = new Date(end);
|
||||
// Round down to nearest minute
|
||||
currentTime.setSeconds(0, 0);
|
||||
|
||||
while (currentTime >= start) {
|
||||
buckets.push(new Date(currentTime));
|
||||
currentTime = new Date(currentTime.getTime() - 60000); // -1 minute
|
||||
}
|
||||
|
||||
// 4. Rewind state and snapshot
|
||||
const results = [];
|
||||
let logIndex = 0;
|
||||
|
||||
// Create a working copy of objects to mutate during rewind
|
||||
// (deep clone to avoid issues if we need original later, though expandObjectIds creates new objs)
|
||||
const workingObjects = new Map();
|
||||
objectMap.forEach((val, key) => workingObjects.set(key, _.cloneDeep(val)));
|
||||
|
||||
// Iterate backwards through time
|
||||
for (const bucketDate of buckets) {
|
||||
// Apply all logs that happened AFTER this bucket time (between last bucket and this one)
|
||||
// Since we iterate backwards, these are logs with createdAt > bucketDate
|
||||
while (logIndex < auditLogs.length) {
|
||||
const log = auditLogs[logIndex];
|
||||
const logDate = new Date(log.createdAt);
|
||||
|
||||
if (logDate <= bucketDate) {
|
||||
// This log happened at or before the current bucket time,
|
||||
// so its effects are already present (or rather, will be handled in a future/earlier bucket).
|
||||
// Stop processing logs for this step.
|
||||
break;
|
||||
}
|
||||
|
||||
// Revert this change
|
||||
const objectId = log.parent.toString();
|
||||
const object = workingObjects.get(objectId);
|
||||
|
||||
if (object) {
|
||||
if (log.operation === 'new') {
|
||||
// Object didn't exist before this creation event
|
||||
workingObjects.delete(objectId);
|
||||
} else if (log.changes && log.changes.old) {
|
||||
// Apply old values to revert state
|
||||
_.merge(object, log.changes.old);
|
||||
}
|
||||
}
|
||||
|
||||
logIndex++;
|
||||
}
|
||||
|
||||
// Snapshot: Calculate rollups for current state of all objects
|
||||
const bucketResult = {
|
||||
date: bucketDate.toISOString(),
|
||||
};
|
||||
|
||||
const activeObjects = Array.from(workingObjects.values());
|
||||
|
||||
rollupConfigs.forEach((config) => {
|
||||
const configName = config.name;
|
||||
|
||||
// Filter objects for this config
|
||||
// Note: We also check baseFilter here in case the object state reverted to something
|
||||
// that no longer matches baseFilter (e.g. active: false)
|
||||
const matchingObjects = activeObjects.filter(
|
||||
(obj) => matchesFilter(obj, baseFilter) && matchesFilter(obj, config.filter)
|
||||
);
|
||||
|
||||
// Calculate rollups
|
||||
(config.rollups || []).forEach((rollup) => {
|
||||
const rollupName = rollup.name;
|
||||
|
||||
let value = 0;
|
||||
if (rollup.operation === 'count') {
|
||||
value = matchingObjects.length;
|
||||
} else if (rollup.operation === 'sum') {
|
||||
value = _.sumBy(matchingObjects, (obj) => _.get(obj, rollup.property) || 0);
|
||||
} else if (rollup.operation === 'avg') {
|
||||
const sum = _.sumBy(matchingObjects, (obj) => _.get(obj, rollup.property) || 0);
|
||||
value = matchingObjects.length ? sum / matchingObjects.length : 0;
|
||||
}
|
||||
|
||||
// Nest the value under the operation type
|
||||
bucketResult[rollupName] = { [rollup.operation]: value };
|
||||
});
|
||||
});
|
||||
|
||||
results.push(bucketResult);
|
||||
}
|
||||
|
||||
// Reverse results to be chronological
|
||||
return results.reverse();
|
||||
};
|
||||
|
||||
// Reusable function to list objects with aggregation, filtering, search, sorting, and pagination
|
||||
export const listObjects = async ({
|
||||
model,
|
||||
@ -324,14 +558,13 @@ export const listObjectsByProperties = async ({
|
||||
} else if (typeof pop === 'object' && pop.path) {
|
||||
pipeline.push({
|
||||
$lookup: {
|
||||
from:
|
||||
pop.options && pop.options.from ? pop.options.from : pop.path.toLowerCase() + 's',
|
||||
from: pop.from ? pop.from : pop.path.toLowerCase(),
|
||||
localField: pop.path,
|
||||
foreignField: '_id',
|
||||
as: pop.path,
|
||||
},
|
||||
});
|
||||
if (!pop.justOne === false) {
|
||||
if (pop?.multiple == false || pop?.multiple == undefined) {
|
||||
// default to unwind unless justOne is explicitly false
|
||||
pipeline.push({
|
||||
$unwind: {
|
||||
@ -363,6 +596,7 @@ export const listObjectsByProperties = async ({
|
||||
|
||||
// Run aggregation
|
||||
const results = await model.aggregate(pipeline);
|
||||
console.log('results', results);
|
||||
return nestGroups(results, properties, filter);
|
||||
} else {
|
||||
// If no properties specified, just return all objects without grouping
|
||||
@ -435,6 +669,22 @@ export const getObject = async ({ model, id, populate }) => {
|
||||
}
|
||||
};
|
||||
|
||||
export const getModelStats = async ({ model }) => {
|
||||
if (!model.stats) {
|
||||
logger.warn(`Model ${model.modelName} does not have a stats method.`);
|
||||
return { error: 'Model does not have a stats method.', code: 500 };
|
||||
}
|
||||
return await model.stats();
|
||||
};
|
||||
|
||||
export const getModelHistory = async ({ model, from, to }) => {
|
||||
if (!model.history && !from && !to) {
|
||||
logger.warn(`Model ${model.modelName} does not have a history method.`);
|
||||
return { error: 'Model does not have a history method.', code: 500 };
|
||||
}
|
||||
return await model.history(from, to);
|
||||
};
|
||||
|
||||
export const listObjectDependencies = async ({ model, id }) => {
|
||||
try {
|
||||
const dependencies = [];
|
||||
@ -562,6 +812,17 @@ export const editObject = async ({ model, id, updateData, user, populate }) => {
|
||||
populate,
|
||||
});
|
||||
|
||||
if (model.recalculate) {
|
||||
logger.debug(`Recalculating ${model.modelName}`);
|
||||
await model.recalculate(updatedObject, user);
|
||||
}
|
||||
|
||||
if (model.stats) {
|
||||
logger.debug(`Getting stats for ${model.modelName}`);
|
||||
const statsData = await model.stats();
|
||||
await distributeStats(statsData, parentType);
|
||||
}
|
||||
|
||||
return updatedObject;
|
||||
} catch (error) {
|
||||
logger.error('editObject error:', error);
|
||||
@ -594,6 +855,17 @@ export const newObject = async ({ model, newData, user = null }, distributeChang
|
||||
populate: [],
|
||||
});
|
||||
|
||||
if (model.recalculate) {
|
||||
logger.debug(`Recalculating ${model.modelName}`);
|
||||
await model.recalculate(created, user);
|
||||
}
|
||||
|
||||
if (model.stats) {
|
||||
logger.debug(`Getting stats for ${model.modelName}`);
|
||||
const statsData = await model.stats();
|
||||
await distributeStats(statsData, parentType);
|
||||
}
|
||||
|
||||
return created;
|
||||
} catch (error) {
|
||||
logger.error('newObject error:', error);
|
||||
@ -625,6 +897,17 @@ export const deleteObject = async ({ model, id, user = null }, distributeChanges
|
||||
// Invalidate cache for this object
|
||||
await deleteObjectCache({ model, id });
|
||||
|
||||
if (model.recalculate) {
|
||||
logger.debug(`Recalculating ${model.modelName}`);
|
||||
await model.recalculate(deleted, user);
|
||||
}
|
||||
|
||||
if (model.stats) {
|
||||
logger.debug(`Getting stats for ${model.modelName}`);
|
||||
const statsData = await model.stats();
|
||||
await distributeStats(statsData, parentType);
|
||||
}
|
||||
|
||||
return { deleted: true, object: deleted };
|
||||
} catch (error) {
|
||||
logger.error('deleteObject error:', error);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user