Updated configuration log level to debug, added product category model and references in product schema, enhanced stock schemas with recalculation methods, and improved socket event handling to support filters for subscriptions.
This commit is contained in:
parent
b9b40c55ca
commit
3196385c72
@ -2,7 +2,7 @@
|
|||||||
"development": {
|
"development": {
|
||||||
"server": {
|
"server": {
|
||||||
"port": 9090,
|
"port": 9090,
|
||||||
"logLevel": "trace",
|
"logLevel": "debug",
|
||||||
"corsOrigins": [
|
"corsOrigins": [
|
||||||
"https://web.farmcontrol.app",
|
"https://web.farmcontrol.app",
|
||||||
"https://dev.tombutcher.work",
|
"https://dev.tombutcher.work",
|
||||||
|
|||||||
@ -1,7 +1,26 @@
|
|||||||
import mongoose from 'mongoose';
|
import mongoose from 'mongoose';
|
||||||
import { generateId } from '../../utils.js';
|
import { generateId } from '../../utils.js';
|
||||||
const { Schema } = mongoose;
|
const { Schema } = mongoose;
|
||||||
import { aggregateRollups, aggregateRollupsHistory } from '../../database.js';
|
import { aggregateRollups, aggregateRollupsHistory, editObject } from '../../database.js';
|
||||||
|
import { stockEventModel } from './stockevent.schema.js';
|
||||||
|
|
||||||
|
const getStockEventTotal = async (stock, parentType) => {
|
||||||
|
const stockId = stock?._id;
|
||||||
|
if (!stockId) return null;
|
||||||
|
|
||||||
|
const parentId =
|
||||||
|
stockId instanceof mongoose.Types.ObjectId ? stockId : new mongoose.Types.ObjectId(stockId);
|
||||||
|
|
||||||
|
const [result] = await stockEventModel.aggregate([
|
||||||
|
{ $match: { parent: parentId, parentType } },
|
||||||
|
{ $group: { _id: null, total: { $sum: '$value' }, count: { $sum: 1 } } },
|
||||||
|
]);
|
||||||
|
|
||||||
|
return {
|
||||||
|
total: result?.total ?? 0,
|
||||||
|
count: result?.count ?? 0,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
// Define the main filamentStock schema
|
// Define the main filamentStock schema
|
||||||
const filamentStockSchema = new Schema(
|
const filamentStockSchema = new Schema(
|
||||||
@ -32,7 +51,11 @@ const filamentStockSchema = new Schema(
|
|||||||
|
|
||||||
filamentStockSchema.pre('validate', async function () {
|
filamentStockSchema.pre('validate', async function () {
|
||||||
if (!this.filament && this.filamentSku) {
|
if (!this.filament && this.filamentSku) {
|
||||||
const sku = await mongoose.model('filamentSku').findById(this.filamentSku).select('filament').lean();
|
const sku = await mongoose
|
||||||
|
.model('filamentSku')
|
||||||
|
.findById(this.filamentSku)
|
||||||
|
.select('filament')
|
||||||
|
.lean();
|
||||||
if (sku?.filament) this.filament = sku.filament;
|
if (sku?.filament) this.filament = sku.filament;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -66,6 +89,33 @@ filamentStockSchema.statics.history = async function (from, to) {
|
|||||||
return results;
|
return results;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
filamentStockSchema.statics.recalculate = async function (filamentStock, user) {
|
||||||
|
const events = await getStockEventTotal(filamentStock, this.modelName);
|
||||||
|
if (!events?.count) return;
|
||||||
|
|
||||||
|
const net = events.total;
|
||||||
|
const startingNet = filamentStock.startingWeight?.net ?? 0;
|
||||||
|
const startingGross = filamentStock.startingWeight?.gross ?? 0;
|
||||||
|
const gross = startingNet > 0 ? (startingGross * net) / startingNet : net;
|
||||||
|
|
||||||
|
console.log('Recalculating filament stock');
|
||||||
|
console.log('events', events);
|
||||||
|
console.log('filamentStock', filamentStock);
|
||||||
|
|
||||||
|
await editObject({
|
||||||
|
model: this,
|
||||||
|
id: filamentStock._id,
|
||||||
|
updateData: {
|
||||||
|
currentWeight: {
|
||||||
|
net,
|
||||||
|
gross,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
user,
|
||||||
|
recalculate: false,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
// Add virtual id getter
|
// Add virtual id getter
|
||||||
filamentStockSchema.virtual('id').get(function () {
|
filamentStockSchema.virtual('id').get(function () {
|
||||||
return this._id;
|
return this._id;
|
||||||
|
|||||||
@ -1,7 +1,26 @@
|
|||||||
import mongoose from 'mongoose';
|
import mongoose from 'mongoose';
|
||||||
import { generateId } from '../../utils.js';
|
import { generateId } from '../../utils.js';
|
||||||
const { Schema } = mongoose;
|
const { Schema } = mongoose;
|
||||||
import { aggregateRollups, aggregateRollupsHistory } from '../../database.js';
|
import { aggregateRollups, aggregateRollupsHistory, editObject } from '../../database.js';
|
||||||
|
import { stockEventModel } from './stockevent.schema.js';
|
||||||
|
|
||||||
|
const getStockEventTotal = async (stock, parentType) => {
|
||||||
|
const stockId = stock?._id;
|
||||||
|
if (!stockId) return null;
|
||||||
|
|
||||||
|
const parentId =
|
||||||
|
stockId instanceof mongoose.Types.ObjectId ? stockId : new mongoose.Types.ObjectId(stockId);
|
||||||
|
|
||||||
|
const [result] = await stockEventModel.aggregate([
|
||||||
|
{ $match: { parent: parentId, parentType } },
|
||||||
|
{ $group: { _id: null, total: { $sum: '$value' }, count: { $sum: 1 } } },
|
||||||
|
]);
|
||||||
|
|
||||||
|
return {
|
||||||
|
total: result?.total ?? 0,
|
||||||
|
count: result?.count ?? 0,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
// Define the main partStock schema
|
// Define the main partStock schema
|
||||||
const partStockSchema = new Schema(
|
const partStockSchema = new Schema(
|
||||||
@ -53,6 +72,21 @@ partStockSchema.statics.history = async function (from, to) {
|
|||||||
return results;
|
return results;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
partStockSchema.statics.recalculate = async function (partStock, user) {
|
||||||
|
const events = await getStockEventTotal(partStock, this.modelName);
|
||||||
|
if (!events?.count) return;
|
||||||
|
|
||||||
|
await editObject({
|
||||||
|
model: this,
|
||||||
|
id: partStock._id,
|
||||||
|
updateData: {
|
||||||
|
currentQuantity: events.total,
|
||||||
|
},
|
||||||
|
user,
|
||||||
|
recalculate: false,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
// Add virtual id getter
|
// Add virtual id getter
|
||||||
partStockSchema.virtual('id').get(function () {
|
partStockSchema.virtual('id').get(function () {
|
||||||
return this._id;
|
return this._id;
|
||||||
|
|||||||
@ -1,7 +1,26 @@
|
|||||||
import mongoose from 'mongoose';
|
import mongoose from 'mongoose';
|
||||||
import { generateId } from '../../utils.js';
|
import { generateId } from '../../utils.js';
|
||||||
const { Schema } = mongoose;
|
const { Schema } = mongoose;
|
||||||
import { aggregateRollups, aggregateRollupsHistory } from '../../database.js';
|
import { aggregateRollups, aggregateRollupsHistory, editObject } from '../../database.js';
|
||||||
|
import { stockEventModel } from './stockevent.schema.js';
|
||||||
|
|
||||||
|
const getStockEventTotal = async (stock, parentType) => {
|
||||||
|
const stockId = stock?._id;
|
||||||
|
if (!stockId) return null;
|
||||||
|
|
||||||
|
const parentId =
|
||||||
|
stockId instanceof mongoose.Types.ObjectId ? stockId : new mongoose.Types.ObjectId(stockId);
|
||||||
|
|
||||||
|
const [result] = await stockEventModel.aggregate([
|
||||||
|
{ $match: { parent: parentId, parentType } },
|
||||||
|
{ $group: { _id: null, total: { $sum: '$value' }, count: { $sum: 1 } } },
|
||||||
|
]);
|
||||||
|
|
||||||
|
return {
|
||||||
|
total: result?.total ?? 0,
|
||||||
|
count: result?.count ?? 0,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
const partStockUsageSchema = new Schema({
|
const partStockUsageSchema = new Schema({
|
||||||
partStock: { type: Schema.Types.ObjectId, ref: 'partStock', required: false },
|
partStock: { type: Schema.Types.ObjectId, ref: 'partStock', required: false },
|
||||||
@ -68,6 +87,21 @@ productStockSchema.statics.history = async function (from, to) {
|
|||||||
return results;
|
return results;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
productStockSchema.statics.recalculate = async function (productStock, user) {
|
||||||
|
const events = await getStockEventTotal(productStock, this.modelName);
|
||||||
|
if (!events?.count) return;
|
||||||
|
|
||||||
|
await editObject({
|
||||||
|
model: this,
|
||||||
|
id: productStock._id,
|
||||||
|
updateData: {
|
||||||
|
currentQuantity: events.total,
|
||||||
|
},
|
||||||
|
user,
|
||||||
|
recalculate: false,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
// Add virtual id getter
|
// Add virtual id getter
|
||||||
productStockSchema.virtual('id').get(function () {
|
productStockSchema.virtual('id').get(function () {
|
||||||
return this._id;
|
return this._id;
|
||||||
|
|||||||
@ -11,7 +11,7 @@ const stockTransferLineSchema = new Schema(
|
|||||||
},
|
},
|
||||||
fromStock: {
|
fromStock: {
|
||||||
type: Schema.Types.ObjectId,
|
type: Schema.Types.ObjectId,
|
||||||
refPath: 'fromStockType',
|
refPath: 'lines.fromStockType',
|
||||||
required: true,
|
required: true,
|
||||||
},
|
},
|
||||||
quantity: { type: Number, required: true },
|
quantity: { type: Number, required: true },
|
||||||
@ -27,7 +27,7 @@ const stockTransferLineSchema = new Schema(
|
|||||||
},
|
},
|
||||||
toStock: {
|
toStock: {
|
||||||
type: Schema.Types.ObjectId,
|
type: Schema.Types.ObjectId,
|
||||||
refPath: 'toStockType',
|
refPath: 'lines.toStockType',
|
||||||
required: false,
|
required: false,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -37,6 +37,7 @@ const stockTransferLineSchema = new Schema(
|
|||||||
const stockTransferSchema = new Schema(
|
const stockTransferSchema = new Schema(
|
||||||
{
|
{
|
||||||
_reference: { type: String, default: () => generateId()() },
|
_reference: { type: String, default: () => generateId()() },
|
||||||
|
name: { type: String, required: true },
|
||||||
state: {
|
state: {
|
||||||
type: { type: String, required: true, default: 'draft' },
|
type: { type: String, required: true, default: 'draft' },
|
||||||
progress: { type: Number, required: false },
|
progress: { type: Number, required: false },
|
||||||
|
|||||||
@ -7,6 +7,7 @@ const productSchema = new Schema(
|
|||||||
{
|
{
|
||||||
_reference: { type: String, default: () => generateId()() },
|
_reference: { type: String, default: () => generateId()() },
|
||||||
name: { type: String, required: true },
|
name: { type: String, required: true },
|
||||||
|
productCategory: { type: Schema.Types.ObjectId, ref: 'productCategory', required: true },
|
||||||
tags: [{ type: String }],
|
tags: [{ type: String }],
|
||||||
version: { type: String },
|
version: { type: String },
|
||||||
vendor: { type: Schema.Types.ObjectId, ref: 'vendor', required: true },
|
vendor: { type: Schema.Types.ObjectId, ref: 'vendor', required: true },
|
||||||
|
|||||||
18
src/database/schemas/management/productcategory.schema.js
Normal file
18
src/database/schemas/management/productcategory.schema.js
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
import mongoose from 'mongoose';
|
||||||
|
import { generateId } from '../../utils.js';
|
||||||
|
|
||||||
|
const productCategorySchema = new mongoose.Schema(
|
||||||
|
{
|
||||||
|
_reference: { type: String, default: () => generateId()() },
|
||||||
|
name: { required: true, type: String },
|
||||||
|
},
|
||||||
|
{ timestamps: true }
|
||||||
|
);
|
||||||
|
|
||||||
|
productCategorySchema.virtual('id').get(function () {
|
||||||
|
return this._id;
|
||||||
|
});
|
||||||
|
|
||||||
|
productCategorySchema.set('toJSON', { virtuals: true });
|
||||||
|
|
||||||
|
export const productCategoryModel = mongoose.model('productCategory', productCategorySchema);
|
||||||
@ -7,6 +7,7 @@ import { gcodeFileModel } from './production/gcodefile.schema.js';
|
|||||||
import { partModel } from './management/part.schema.js';
|
import { partModel } from './management/part.schema.js';
|
||||||
import { partSkuModel } from './management/partsku.schema.js';
|
import { partSkuModel } from './management/partsku.schema.js';
|
||||||
import { productModel } from './management/product.schema.js';
|
import { productModel } from './management/product.schema.js';
|
||||||
|
import { productCategoryModel } from './management/productcategory.schema.js';
|
||||||
import { productSkuModel } from './management/productsku.schema.js';
|
import { productSkuModel } from './management/productsku.schema.js';
|
||||||
import { vendorModel } from './management/vendor.schema.js';
|
import { vendorModel } from './management/vendor.schema.js';
|
||||||
import { materialModel } from './management/material.schema.js';
|
import { materialModel } from './management/material.schema.js';
|
||||||
@ -43,6 +44,7 @@ import { salesOrderModel } from './sales/salesorder.schema.js';
|
|||||||
import { marketplaceModel } from './sales/marketplace.schema.js';
|
import { marketplaceModel } from './sales/marketplace.schema.js';
|
||||||
import { listingModel } from './sales/listing.schema.js';
|
import { listingModel } from './sales/listing.schema.js';
|
||||||
import { listingVarientModel } from './sales/listingvarient.schema.js';
|
import { listingVarientModel } from './sales/listingvarient.schema.js';
|
||||||
|
import { paymentModel } from './finance/payment.schema.js';
|
||||||
|
|
||||||
// Map prefixes to models and id fields
|
// Map prefixes to models and id fields
|
||||||
export const models = {
|
export const models = {
|
||||||
@ -96,6 +98,13 @@ export const models = {
|
|||||||
referenceField: '_reference',
|
referenceField: '_reference',
|
||||||
label: 'Product',
|
label: 'Product',
|
||||||
},
|
},
|
||||||
|
PCG: {
|
||||||
|
model: productCategoryModel,
|
||||||
|
idField: '_id',
|
||||||
|
type: 'productCategory',
|
||||||
|
referenceField: '_reference',
|
||||||
|
label: 'Product Category',
|
||||||
|
},
|
||||||
SKU: {
|
SKU: {
|
||||||
model: productSkuModel,
|
model: productSkuModel,
|
||||||
idField: '_id',
|
idField: '_id',
|
||||||
@ -355,4 +364,11 @@ export const models = {
|
|||||||
label: 'Listing Varient',
|
label: 'Listing Varient',
|
||||||
referenceField: '_reference',
|
referenceField: '_reference',
|
||||||
},
|
},
|
||||||
|
PAY: {
|
||||||
|
model: paymentModel,
|
||||||
|
idField: '_id',
|
||||||
|
type: 'payment',
|
||||||
|
label: 'Payment',
|
||||||
|
referenceField: '_reference',
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|||||||
@ -180,6 +180,37 @@ describe('SocketUser', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('object type subscription handlers', () => {
|
||||||
|
it('should pass filters to type subscription methods', async () => {
|
||||||
|
const data = { objectType: 'note', filter: { 'parent._id': 'parent-id' } };
|
||||||
|
const callback = jest.fn();
|
||||||
|
|
||||||
|
await socketUser.handleSubscribeToObjectTypeUpdateEvent(data, callback);
|
||||||
|
|
||||||
|
expect(socketUser.updateManager.subscribeToObjectNew).toHaveBeenCalledWith(
|
||||||
|
'note',
|
||||||
|
data.filter
|
||||||
|
);
|
||||||
|
expect(
|
||||||
|
socketUser.updateManager.subscribeToObjectDelete
|
||||||
|
).toHaveBeenCalledWith('note', data.filter);
|
||||||
|
expect(callback).toHaveBeenCalledWith({ success: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should pass filters to type unsubscribe methods', async () => {
|
||||||
|
const data = { objectType: 'note', filter: { 'parent._id': 'parent-id' } };
|
||||||
|
|
||||||
|
await socketUser.handleUnsubscribeToObjectTypeUpdateEvent(data);
|
||||||
|
|
||||||
|
expect(
|
||||||
|
socketUser.updateManager.removeObjectNewListener
|
||||||
|
).toHaveBeenCalledWith('note', data.filter);
|
||||||
|
expect(
|
||||||
|
socketUser.updateManager.removeObjectDeleteListener
|
||||||
|
).toHaveBeenCalledWith('note', data.filter);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('handleGenerateHostOtpEvent', () => {
|
describe('handleGenerateHostOtpEvent', () => {
|
||||||
it('should call generateHostOTP and callback', async () => {
|
it('should call generateHostOTP and callback', async () => {
|
||||||
const data = { _id: 'host-id' };
|
const data = { _id: 'host-id' };
|
||||||
|
|||||||
@ -163,8 +163,11 @@ export class SocketUser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async handleSubscribeToObjectTypeUpdateEvent(data, callback) {
|
async handleSubscribeToObjectTypeUpdateEvent(data, callback) {
|
||||||
await this.updateManager.subscribeToObjectNew(data.objectType);
|
await this.updateManager.subscribeToObjectNew(data.objectType, data.filter);
|
||||||
await this.updateManager.subscribeToObjectDelete(data.objectType);
|
await this.updateManager.subscribeToObjectDelete(
|
||||||
|
data.objectType,
|
||||||
|
data.filter
|
||||||
|
);
|
||||||
callback({ success: true });
|
callback({ success: true });
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -185,8 +188,14 @@ export class SocketUser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async handleUnsubscribeToObjectTypeUpdateEvent(data) {
|
async handleUnsubscribeToObjectTypeUpdateEvent(data) {
|
||||||
await this.updateManager.removeObjectNewListener(data.objectType);
|
await this.updateManager.removeObjectNewListener(
|
||||||
await this.updateManager.removeObjectDeleteListener(data.objectType);
|
data.objectType,
|
||||||
|
data.filter
|
||||||
|
);
|
||||||
|
await this.updateManager.removeObjectDeleteListener(
|
||||||
|
data.objectType,
|
||||||
|
data.filter
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async handleUnsubscribeToObjectUpdateEvent(data) {
|
async handleUnsubscribeToObjectUpdateEvent(data) {
|
||||||
|
|||||||
@ -7,6 +7,18 @@ jest.unstable_mockModule('../../database/nats.js', () => ({
|
|||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
jest.unstable_mockModule('../../database/database.js', () => ({
|
||||||
|
listObjects: jest.fn()
|
||||||
|
}));
|
||||||
|
|
||||||
|
jest.unstable_mockModule('../../database/schemas/models.js', () => ({
|
||||||
|
models: {
|
||||||
|
PRN: {
|
||||||
|
model: { modelName: 'printer' }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
jest.unstable_mockModule('log4js', () => ({
|
jest.unstable_mockModule('log4js', () => ({
|
||||||
default: {
|
default: {
|
||||||
getLogger: () => ({
|
getLogger: () => ({
|
||||||
@ -30,6 +42,7 @@ jest.unstable_mockModule('../../config.js', () => ({
|
|||||||
|
|
||||||
const { UpdateManager } = await import('../updatemanager.js');
|
const { UpdateManager } = await import('../updatemanager.js');
|
||||||
const { natsServer } = await import('../../database/nats.js');
|
const { natsServer } = await import('../../database/nats.js');
|
||||||
|
const { listObjects } = await import('../../database/database.js');
|
||||||
|
|
||||||
describe('UpdateManager', () => {
|
describe('UpdateManager', () => {
|
||||||
let mockSocketClient;
|
let mockSocketClient;
|
||||||
@ -54,19 +67,59 @@ describe('UpdateManager', () => {
|
|||||||
|
|
||||||
expect(natsServer.subscribe).toHaveBeenCalledWith(
|
expect(natsServer.subscribe).toHaveBeenCalledWith(
|
||||||
'printers.new',
|
'printers.new',
|
||||||
'test-socket-id',
|
'test-socket-id:{}',
|
||||||
expect.any(Function)
|
expect.any(Function)
|
||||||
);
|
);
|
||||||
|
|
||||||
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
||||||
const data = { name: 'New Printer' };
|
const data = { name: 'New Printer' };
|
||||||
natsCallback('printers.new', data);
|
await natsCallback('printers.new', data);
|
||||||
|
|
||||||
expect(mockSocketClient.socket.emit).toHaveBeenCalledWith('objectNew', {
|
expect(mockSocketClient.socket.emit).toHaveBeenCalledWith('objectNew', {
|
||||||
object: data,
|
object: data,
|
||||||
objectType: 'printer'
|
objectType: 'printer',
|
||||||
|
filter: {}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should emit filtered new object events when the list filter matches', async () => {
|
||||||
|
const filter = { 'state.type': 'ready' };
|
||||||
|
const data = { _id: '123', name: 'New Printer' };
|
||||||
|
listObjects.mockResolvedValueOnce([data]);
|
||||||
|
|
||||||
|
await updateManager.subscribeToObjectNew('printer', filter);
|
||||||
|
|
||||||
|
expect(natsServer.subscribe).toHaveBeenCalledWith(
|
||||||
|
'printers.new',
|
||||||
|
'test-socket-id:{"state.type":"ready"}',
|
||||||
|
expect.any(Function)
|
||||||
|
);
|
||||||
|
|
||||||
|
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
||||||
|
await natsCallback('printers.new', data);
|
||||||
|
|
||||||
|
expect(listObjects).toHaveBeenCalledWith({
|
||||||
|
model: { modelName: 'printer' },
|
||||||
|
filter: { 'state.type': 'ready', _id: '123' }
|
||||||
|
});
|
||||||
|
expect(mockSocketClient.socket.emit).toHaveBeenCalledWith('objectNew', {
|
||||||
|
object: data,
|
||||||
|
objectType: 'printer',
|
||||||
|
filter
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should skip filtered new object events when the list filter misses', async () => {
|
||||||
|
const filter = { 'state.type': 'ready' };
|
||||||
|
listObjects.mockResolvedValueOnce([]);
|
||||||
|
|
||||||
|
await updateManager.subscribeToObjectNew('printer', filter);
|
||||||
|
|
||||||
|
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
||||||
|
await natsCallback('printers.new', { _id: '123' });
|
||||||
|
|
||||||
|
expect(mockSocketClient.socket.emit).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('subscribeToObjectDelete', () => {
|
describe('subscribeToObjectDelete', () => {
|
||||||
@ -75,19 +128,20 @@ describe('UpdateManager', () => {
|
|||||||
|
|
||||||
expect(natsServer.subscribe).toHaveBeenCalledWith(
|
expect(natsServer.subscribe).toHaveBeenCalledWith(
|
||||||
'printers.delete',
|
'printers.delete',
|
||||||
'test-socket-id',
|
'test-socket-id:{}',
|
||||||
expect.any(Function)
|
expect.any(Function)
|
||||||
);
|
);
|
||||||
|
|
||||||
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
||||||
const data = { _id: '123' };
|
const data = { _id: '123' };
|
||||||
natsCallback('printers.delete', data);
|
await natsCallback('printers.delete', data);
|
||||||
|
|
||||||
expect(mockSocketClient.socket.emit).toHaveBeenCalledWith(
|
expect(mockSocketClient.socket.emit).toHaveBeenCalledWith(
|
||||||
'objectDelete',
|
'objectDelete',
|
||||||
{
|
{
|
||||||
object: data,
|
object: data,
|
||||||
objectType: 'printer'
|
objectType: 'printer',
|
||||||
|
filter: {}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
@ -123,7 +177,7 @@ describe('UpdateManager', () => {
|
|||||||
await updateManager.removeObjectNewListener('printer');
|
await updateManager.removeObjectNewListener('printer');
|
||||||
expect(natsServer.removeSubscription).toHaveBeenCalledWith(
|
expect(natsServer.removeSubscription).toHaveBeenCalledWith(
|
||||||
'printers.new',
|
'printers.new',
|
||||||
'test-socket-id'
|
'test-socket-id:{}'
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -131,7 +185,7 @@ describe('UpdateManager', () => {
|
|||||||
await updateManager.removeObjectDeleteListener('printer');
|
await updateManager.removeObjectDeleteListener('printer');
|
||||||
expect(natsServer.removeSubscription).toHaveBeenCalledWith(
|
expect(natsServer.removeSubscription).toHaveBeenCalledWith(
|
||||||
'printers.delete',
|
'printers.delete',
|
||||||
'test-socket-id'
|
'test-socket-id:{}'
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@ -1,12 +1,42 @@
|
|||||||
import log4js from 'log4js';
|
import log4js from 'log4js';
|
||||||
import { loadConfig } from '../config.js';
|
import { loadConfig } from '../config.js';
|
||||||
import { natsServer } from '../database/nats.js';
|
import { natsServer } from '../database/nats.js';
|
||||||
|
import { listObjects } from '../database/database.js';
|
||||||
|
import { models } from '../database/schemas/models.js';
|
||||||
const config = loadConfig();
|
const config = loadConfig();
|
||||||
|
|
||||||
// Setup logger
|
// Setup logger
|
||||||
const logger = log4js.getLogger('Update Manager');
|
const logger = log4js.getLogger('Update Manager');
|
||||||
logger.level = config.server.logLevel;
|
logger.level = config.server.logLevel;
|
||||||
|
|
||||||
|
const modelList = Object.values(models)
|
||||||
|
.map(model => model.model)
|
||||||
|
.filter(model => model != null);
|
||||||
|
|
||||||
|
const normalizeFilter = filter =>
|
||||||
|
filter && typeof filter === 'object' && !Array.isArray(filter) ? filter : {};
|
||||||
|
|
||||||
|
const stableStringify = value => {
|
||||||
|
if (Array.isArray(value)) {
|
||||||
|
return `[${value.map(stableStringify).join(',')}]`;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value && typeof value === 'object') {
|
||||||
|
return `{${Object.keys(value)
|
||||||
|
.sort()
|
||||||
|
.map(key => `${JSON.stringify(key)}:${stableStringify(value[key])}`)
|
||||||
|
.join(',')}}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
return JSON.stringify(value);
|
||||||
|
};
|
||||||
|
|
||||||
|
const getSubscriptionOwner = (socketId, filter) =>
|
||||||
|
`${socketId}:${stableStringify(normalizeFilter(filter))}`;
|
||||||
|
|
||||||
|
const getModelByName = modelName =>
|
||||||
|
modelList.filter(model => model.modelName == modelName)[0];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* UpdateManager handles tracking object updates and broadcasts update events via websockets.
|
* UpdateManager handles tracking object updates and broadcasts update events via websockets.
|
||||||
*/
|
*/
|
||||||
@ -15,31 +45,83 @@ export class UpdateManager {
|
|||||||
this.socketClient = socketClient;
|
this.socketClient = socketClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
async subscribeToObjectNew(objectType) {
|
async matchesObjectTypeFilter(objectType, filter, value) {
|
||||||
|
const normalizedFilter = normalizeFilter(filter);
|
||||||
|
|
||||||
|
if (Object.keys(normalizedFilter).length === 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
const model = getModelByName(objectType);
|
||||||
|
const objectId = value?._id || value;
|
||||||
|
|
||||||
|
if (!model || !objectId) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const objects = await listObjects({
|
||||||
|
model,
|
||||||
|
filter: { ...normalizedFilter, _id: objectId }
|
||||||
|
});
|
||||||
|
|
||||||
|
return Array.isArray(objects) && objects.length > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
async emitObjectTypeEvent(eventName, objectType, filter, value) {
|
||||||
|
const normalizedFilter = normalizeFilter(filter);
|
||||||
|
const matches = await this.matchesObjectTypeFilter(
|
||||||
|
objectType,
|
||||||
|
normalizedFilter,
|
||||||
|
value
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!matches) {
|
||||||
|
logger.trace(`Filtered ${eventName} event:`, {
|
||||||
|
objectType,
|
||||||
|
filter: normalizedFilter,
|
||||||
|
value
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.socketClient.socket.emit(eventName, {
|
||||||
|
object: value,
|
||||||
|
objectType: objectType,
|
||||||
|
filter: normalizedFilter
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async subscribeToObjectNew(objectType, filter = {}) {
|
||||||
|
const normalizedFilter = normalizeFilter(filter);
|
||||||
await natsServer.subscribe(
|
await natsServer.subscribe(
|
||||||
`${objectType}s.new`,
|
`${objectType}s.new`,
|
||||||
this.socketClient.socketId,
|
getSubscriptionOwner(this.socketClient.socketId, normalizedFilter),
|
||||||
(key, value) => {
|
async (key, value) => {
|
||||||
logger.trace('Object new event:', value);
|
logger.trace('Object new event:', value);
|
||||||
this.socketClient.socket.emit('objectNew', {
|
await this.emitObjectTypeEvent(
|
||||||
object: value,
|
'objectNew',
|
||||||
objectType: objectType
|
objectType,
|
||||||
});
|
normalizedFilter,
|
||||||
|
value
|
||||||
|
);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
async subscribeToObjectDelete(objectType) {
|
async subscribeToObjectDelete(objectType, filter = {}) {
|
||||||
|
const normalizedFilter = normalizeFilter(filter);
|
||||||
await natsServer.subscribe(
|
await natsServer.subscribe(
|
||||||
`${objectType}s.delete`,
|
`${objectType}s.delete`,
|
||||||
this.socketClient.socketId,
|
getSubscriptionOwner(this.socketClient.socketId, normalizedFilter),
|
||||||
(key, value) => {
|
async (key, value) => {
|
||||||
logger.trace('Object delete event:', value);
|
logger.trace('Object delete event:', value);
|
||||||
this.socketClient.socket.emit('objectDelete', {
|
await this.emitObjectTypeEvent(
|
||||||
object: value,
|
'objectDelete',
|
||||||
objectType: objectType
|
objectType,
|
||||||
});
|
normalizedFilter,
|
||||||
|
value
|
||||||
|
);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
return { success: true };
|
return { success: true };
|
||||||
@ -62,18 +144,18 @@ export class UpdateManager {
|
|||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeObjectNewListener(objectType) {
|
async removeObjectNewListener(objectType, filter = {}) {
|
||||||
await natsServer.removeSubscription(
|
await natsServer.removeSubscription(
|
||||||
`${objectType}s.new`,
|
`${objectType}s.new`,
|
||||||
this.socketClient.socketId
|
getSubscriptionOwner(this.socketClient.socketId, filter)
|
||||||
);
|
);
|
||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeObjectDeleteListener(objectType) {
|
async removeObjectDeleteListener(objectType, filter = {}) {
|
||||||
await natsServer.removeSubscription(
|
await natsServer.removeSubscription(
|
||||||
`${objectType}s.delete`,
|
`${objectType}s.delete`,
|
||||||
this.socketClient.socketId
|
getSubscriptionOwner(this.socketClient.socketId, filter)
|
||||||
);
|
);
|
||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user