733 lines
20 KiB
JavaScript

import dotenv from 'dotenv';
import { fileModel } from '../schemas/management/file.schema.js';
import _ from 'lodash';
import {
deleteAuditLog,
distributeDelete,
expandObjectIds,
modelHasRef,
getFieldsByRef,
jsonToCacheKey,
} from '../utils.js';
import log4js from 'log4js';
import {
editAuditLog,
distributeUpdate,
newAuditLog,
distributeNew,
distributeChildUpdate,
distributeChildDelete,
distributeChildNew,
} from '../utils.js';
import { getAllModels } from '../services/misc/model.js';
import { redisServer } from './redis.js';
dotenv.config();
const logger = log4js.getLogger('Database');
logger.level = process.env.LOG_LEVEL;
const cacheLogger = log4js.getLogger('DatabaseCache');
cacheLogger.level = process.env.LOG_LEVEL;
const CACHE_TTL_SECONDS = parseInt(process.env.REDIS_CACHE_TTL || '30', 10);
export const retrieveObjectCache = async ({ model, id, populate = [] }) => {
if (!model || !id) return undefined;
const cacheKeyObject = {
model: model.modelName,
id: id.toString(),
};
const cacheKey = jsonToCacheKey(cacheKeyObject);
cacheLogger.trace('Retrieving object from cache:', cacheKeyObject);
try {
const cachedObject = await redisServer.getKey(cacheKey);
if (cachedObject == null) {
cacheLogger.trace('Cache miss:', cacheKeyObject);
return undefined;
}
cacheLogger.trace('Cache hit:', {
model: model.modelName,
id: id.toString(),
});
return cachedObject;
} catch (err) {
cacheLogger.error('Error retrieving object from Redis cache:', err);
return undefined;
}
};
export const updateObjectCache = async ({ model, id, object, populate = [] }) => {
if (!model || !id || !object) return object;
const cacheKeyObject = {
model: model.modelName,
id: id.toString(),
};
const cacheKey = jsonToCacheKey(cacheKeyObject);
cacheLogger.trace('Updating object cache:', cacheKeyObject);
try {
const cachedObject = (await redisServer.getKey(cacheKey)) || {};
const mergedObject = _.merge(cachedObject, object);
await redisServer.setKey(cacheKey, mergedObject, CACHE_TTL_SECONDS);
cacheLogger.trace('Updated object cache:', cacheKeyObject);
} catch (err) {
cacheLogger.error('Error updating object in Redis cache:', err);
}
return object;
};
export const deleteObjectCache = async ({ model, id }) => {
if (!model || !id) return;
const cacheKeyObject = {
model: model.modelName,
id: id.toString(),
populate: [],
};
const cacheKey = jsonToCacheKey(cacheKeyObject);
cacheLogger.trace('Deleting object cache:', cacheKeyObject);
try {
await redisServer.deleteKey(cacheKey);
cacheLogger.trace('Deleted object cache:', cacheKeyObject);
} catch (err) {
cacheLogger.error('Error deleting object from Redis cache:', err);
}
};
// Reusable function to list objects with aggregation, filtering, search, sorting, and pagination
export const listObjects = async ({
model,
populate = [],
page = 1,
limit = 25,
filter = {},
sort = '',
order = 'ascend',
project, // optional: override default projection
}) => {
try {
logger.trace('Listing object:', {
model,
populate,
page,
limit,
filter,
sort,
order,
project,
});
// Calculate the skip value based on the page number and limit
const skip = (page - 1) * limit;
// Fix: descend should be -1, ascend should be 1
const sortOrder = order === 'descend' ? -1 : 1;
if (!sort || sort === '') {
sort = 'createdAt';
}
if (filter) {
console.log('filter', filter);
}
// Translate any key ending with ._id to remove the ._id suffix for Mongoose
Object.keys(filter).forEach((key) => {
if (key.endsWith('._id')) {
const baseKey = key.slice(0, -4); // Remove '._id' suffix
filter[baseKey] = filter[key];
delete filter[key];
}
});
// Use find with population and filter
let query = model
.find(filter)
.sort({ [sort]: sortOrder })
.skip(skip)
.limit(Number(limit));
// Handle populate (array or single value)
if (populate) {
if (Array.isArray(populate)) {
for (const pop of populate) {
query = query.populate(pop);
}
} else if (typeof populate === 'string' || typeof populate === 'object') {
query = query.populate(populate);
}
}
// Handle select (projection)
if (project) {
query = query.select(project);
}
query = query.lean();
const queryResult = await query;
return expandObjectIds(queryResult);
} catch (error) {
logger.error('Object list error:', error);
return { error: error, code: 500 };
}
};
// New function to list unique property values
export const listPropertyValues = async ({ model, property, filter = {}, search = '' }) => {
let aggregateCommand = [];
if (search) {
aggregateCommand.push({
$match: {
$text: { $search: search },
},
});
}
if (filter && Object.keys(filter).length > 0) {
aggregateCommand.push({ $match: filter });
}
aggregateCommand.push({ $group: { _id: `$${property}` } });
aggregateCommand.push({ $project: { _id: 0, [property]: '$_id' } });
return await model.aggregate(aggregateCommand);
};
// Helper to build nested structure for listObjectsByProperty
function nestGroups(groups, props, filter, idx = 0) {
if (idx >= props.length) return groups;
const prop = props[idx];
const filterPresent = Object.prototype.hasOwnProperty.call(filter, prop);
// Helper to extract a display key and possible filter values from a value
function getKeyAndFilterVals(value) {
if (value && typeof value === 'object') {
if (value.name) return { key: value.name, filterVals: [value._id?.toString?.(), value.name] };
if (value._id) return { key: value._id.toString(), filterVals: [value._id.toString()] };
}
return { key: value, filterVals: [value] };
}
// Build a map of key -> groups for this property
const keyToGroups = {};
for (const group of groups) {
const { key } = getKeyAndFilterVals(group._id[prop]);
if (!keyToGroups[key]) keyToGroups[key] = [];
keyToGroups[key].push(group);
}
let map = {};
if (filterPresent) {
const filterValue = filter[prop]?.toString?.() ?? filter[prop];
for (const [key, groupList] of Object.entries(keyToGroups)) {
// Check if any group in this key matches the filter (by _id or name)
const matches = groupList.filter((group) => {
const { filterVals } = getKeyAndFilterVals(group._id[prop]);
console.log('filterVals', filterVals);
console.log('filterValue', filterValue);
return filterVals.some((val) => val?.toString() === filterValue);
});
if (matches.length > 0) {
if (idx === props.length - 1) {
// Last property in filter, return items
let items = [];
for (const group of matches) {
items = items.concat(group.objects.map(expandObjectIds));
}
map[key] = items;
} else {
map[key] = nestGroups(matches, props, filter, idx + 1);
}
} else {
map[key] = {};
}
}
} else {
// No filter for this property, just show all keys at this level with empty objects
for (const key of Object.keys(keyToGroups)) {
map[key] = {};
}
}
return map;
}
// Group objects by multiple properties and return nested groupings
export const listObjectsByProperties = async ({
model,
properties = [],
filter = {},
masterFilter = {},
populate,
}) => {
try {
console.log('Props', properties);
const propertiesPresent = !(
!Array.isArray(properties) ||
properties.length === 0 ||
properties[0] == ''
);
// Build aggregation pipeline
const pipeline = [];
// Handle populate (array or single value)
if (populate) {
const populates = Array.isArray(populate) ? populate : [populate];
for (const pop of populates) {
// Support both string and object syntax for populate
if (typeof pop === 'string') {
pipeline.push({
$lookup: {
from: pop.toLowerCase() + 's', // crude pluralization, adjust if needed
localField: pop,
foreignField: '_id',
as: pop,
},
});
// Unwind if it's a single reference
pipeline.push({
$unwind: {
path: `$${pop}`,
preserveNullAndEmptyArrays: true,
},
});
} else if (typeof pop === 'object' && pop.path) {
pipeline.push({
$lookup: {
from:
pop.options && pop.options.from ? pop.options.from : pop.path.toLowerCase() + 's',
localField: pop.path,
foreignField: '_id',
as: pop.path,
},
});
if (!pop.justOne === false) {
// default to unwind unless justOne is explicitly false
pipeline.push({
$unwind: {
path: `$${pop.path}`,
preserveNullAndEmptyArrays: true,
},
});
}
}
}
}
if (masterFilter != {}) {
pipeline.push({ $match: { ...masterFilter } });
}
if (propertiesPresent) {
// Build the $group _id object for all properties
const groupId = {};
for (const prop of properties) {
groupId[prop] = `$${prop}`;
}
pipeline.push({
$group: {
_id: groupId,
objects: { $push: '$$ROOT' },
},
});
// Run aggregation
const results = await model.aggregate(pipeline);
return nestGroups(results, properties, filter);
} else {
// If no properties specified, just return all objects without grouping
// Ensure pipeline is not empty by adding a $match stage if needed
if (pipeline.length === 0) {
pipeline.push({ $match: {} });
}
const results = await model.aggregate(pipeline);
return results;
}
} catch (error) {
logger.error('listObjectsByProperty error:', error);
return { error: error.message, code: 500 };
}
};
// Reusable function to get a single object by ID
export const getObject = async ({ model, id, populate }) => {
try {
logger.trace('Getting object:', {
model,
id,
populate,
});
// Try cache
const cachedObject = await retrieveObjectCache({ model, id, populate });
let query = model.findById(id).lean();
// Auto-populate file references if the model has them
if (modelHasRef(model, 'file')) {
const fileFields = getFieldsByRef(model, 'file');
// Populate all file reference fields
for (const field of fileFields) {
query = query.populate(field);
}
}
// Handle populate (array or single value)
if (populate) {
if (Array.isArray(populate)) {
for (const pop of populate) {
query = query.populate(pop);
}
} else if (typeof populate === 'string' || typeof populate === 'object') {
query = query.populate(populate);
}
}
const result = await query;
if (!result) {
return { error: 'Object not found.', code: 404 };
}
const expanded = _.merge(cachedObject || {}, expandObjectIds(result));
// Update cache with the expanded object
await updateObjectCache({
model,
id: expanded._id,
object: expanded,
populate,
});
return expanded;
} catch (error) {
return { error: error, code: 500 };
}
};
export const listObjectDependencies = async ({ model, id }) => {
try {
const dependencies = [];
const parentModelName = model?.modelName;
if (!parentModelName || !id) {
return [];
}
const allModelEntries = getAllModels();
for (const entry of allModelEntries) {
const targetModel = entry?.model;
if (!targetModel || !targetModel.schema) continue;
const referencingPaths = [];
targetModel.schema.eachPath((pathName, schemaType) => {
const directRef = schemaType?.options?.ref;
const arrayRef = schemaType?.caster?.options?.ref;
const refName = directRef || arrayRef;
if (refName === parentModelName) {
referencingPaths.push(pathName);
}
});
if (referencingPaths.length === 0) continue;
for (const pathName of referencingPaths) {
const filter = { [pathName]: id };
const results = await targetModel.find(filter).lean();
for (const doc of results) {
const object = expandObjectIds(doc);
dependencies.push({
objectType: targetModel.modelName,
_id: object._id,
name: object?.name,
});
}
}
}
return dependencies;
} catch (error) {
logger.error('listObjectDependencies error:', error);
return { error: error.message, code: 500 };
}
};
// Reusable function to edit an object by ID, with audit logging and distribution
export const editObject = async ({ model, id, updateData, user, populate }) => {
try {
// Determine parentType from model name
const parentType = model.modelName ? model.modelName : 'unknown';
// Fetch the and update object
var query = model.findByIdAndUpdate(id, updateData).lean();
if (populate) {
if (Array.isArray(populate)) {
for (const pop of populate) {
query = query.populate(pop);
}
} else if (typeof populate === 'string' || typeof populate === 'object') {
query = query.populate(populate);
}
}
const previousObject = await query;
if (!previousObject) {
return { error: `${parentType} not found.`, code: 404 };
}
const previousExpandedObject = expandObjectIds(previousObject);
// Check if any model parameters have ref: 'file' and flush files if so
if (modelHasRef(model, 'file')) {
logger.debug(`Model ${model.modelName} has file references, checking for files to flush`);
const fileFields = getFieldsByRef(model, 'file');
for (const fieldName of fileFields) {
const fieldValue = previousExpandedObject[fieldName];
if (fieldValue) {
if (Array.isArray(fieldValue)) {
// Handle file arrays
for (const fileRef of fieldValue) {
if (fileRef && fileRef._id) {
logger.debug(`Flushing file from array field ${fieldName}: ${fileRef._id}`);
await flushFile({ id: fileRef._id, user });
}
}
} else if (fieldValue._id) {
// Handle single file reference
logger.debug(`Flushing file from field ${fieldName}: ${fieldValue._id}`);
await flushFile({ id: fieldValue._id, user });
}
}
}
}
// Audit log before update
await editAuditLog(
previousExpandedObject,
{ ...previousExpandedObject, ...updateData },
id,
parentType,
user
);
// Distribute update
await distributeUpdate(updateData, id, parentType);
// Call childUpdate event for any child objects
await distributeChildUpdate(
previousExpandedObject,
{ ...previousExpandedObject, ...updateData },
id,
model
);
const updatedObject = { ...previousExpandedObject, ...updateData };
// Update cache with the new version
await updateObjectCache({
model,
id,
object: updatedObject,
populate,
});
return updatedObject;
} catch (error) {
logger.error('editObject error:', error);
return { error: error.message, code: 500 };
}
};
// Reusable function to create a new object
export const newObject = async ({ model, newData, user = null }, distributeChanges = true) => {
try {
const parentType = model.modelName ? model.modelName : 'unknown';
const result = await model.create(newData);
if (!result || result.length === 0) {
return { error: 'No object created.', code: 500 };
}
const created = expandObjectIds(result.toObject());
await newAuditLog(newData, created._id, parentType, user);
if (distributeChanges == true) {
await distributeNew(created, parentType);
}
await distributeChildNew(created, created._id, model);
// Cache the newly created object
await updateObjectCache({
model,
id: created._id,
object: created,
populate: [],
});
return created;
} catch (error) {
logger.error('newObject error:', error);
return { error: error.message, code: 500 };
}
};
// Reusable function to delete an object by ID, with audit logging and distribution
export const deleteObject = async ({ model, id, user = null }, distributeChanges = true) => {
try {
const parentType = model.modelName ? model.modelName : 'unknown';
// Delete the object
const result = await model.findByIdAndDelete(id);
if (!result) {
return { error: `${parentType} not found.`, code: 404 };
}
const deleted = expandObjectIds(result.toObject());
// Audit log the deletion
await deleteAuditLog(deleted, id, parentType, user, 'delete');
if (distributeChanges == true) {
await distributeDelete(deleted, parentType);
}
await distributeChildDelete(deleted, id, model);
// Invalidate cache for this object
await deleteObjectCache({ model, id });
return { deleted: true, object: deleted };
} catch (error) {
logger.error('deleteObject error:', error);
return { error: error.message, code: 500 };
}
};
export const flushFile = async ({ id, user }) => {
try {
logger.info(`Starting file deletion process for file ID: ${id}`);
// First, check if the file exists
const file = await fileModel.findById(id).lean();
if (!file) {
logger.warn(`File with ID ${id} not found`);
return {
error: 'File not found',
code: 404,
};
}
logger.info(`Found file: ${file.name} (${file._id})`);
// Check if this file has any dependencies
const dependencies = await listObjectDependencies({
model: fileModel,
id: file._id,
});
if (dependencies.length > 0) {
logger.info(
`File ${file._id} (${file.name}) has ${dependencies.length} dependencies, cannot delete`
);
return {
error: 'File has dependencies and cannot be deleted',
code: 409,
dependencies: dependencies.length,
dependencyDetails: dependencies,
};
}
logger.debug(`File ${file._id} (${file.name}) has no dependencies, proceeding with deletion`);
// Delete from database first
const deleteResult = await deleteObject({
model: fileModel,
id: file._id,
user,
});
if (deleteResult.error) {
logger.error(`Failed to delete file ${file._id} from database:`, deleteResult.error);
return {
error: deleteResult.error,
code: deleteResult.code || 500,
};
}
// Try to delete from Ceph storage if it exists
if (file.extension) {
try {
const { deleteFile } = await import('../services/storage/ceph.js');
const { BUCKETS } = await import('../services/storage/ceph.js');
const cephKey = `files/${file._id}${file.extension}`;
await deleteFile(BUCKETS.FILES, cephKey);
logger.debug(`Deleted file from Ceph storage: ${cephKey}`);
} catch (cephError) {
logger.warn(`Failed to delete file ${file._id} from Ceph storage:`, cephError.message);
// Don't treat Ceph deletion failure as a critical error since DB record is already deleted
}
}
const result = {
success: true,
deletedFile: {
fileId: file._id,
fileName: file.name,
deletedAt: new Date(),
},
};
logger.info(`Successfully deleted file: ${file.name} (${file._id})`);
return result;
} catch (error) {
logger.error('Error in flushFile:', error);
return {
error: error.message,
code: 500,
};
}
};
// Helper function to recursively delete objects and their children
export const recursivelyDeleteChildObjects = async (
{ model, id, user = null },
distributeChanges = true
) => {
const deletedIds = [];
// Find all objects that have this object as their parent
const childObjects = await model.find({ parent: id });
// Recursively delete all children first
for (const childObject of childObjects) {
const childDeletedIds = await recursivelyDeleteChildObjects(
{ model, id: childObject._id, user },
false
);
deletedIds.push(...childDeletedIds);
}
// Delete the current object
await deleteObject({ model, id, user }, distributeChanges);
deletedIds.push(id);
return deletedIds;
};