Compare commits
3 Commits
c38c1dee24
...
7a79238183
| Author | SHA1 | Date | |
|---|---|---|---|
| 7a79238183 | |||
| 3196385c72 | |||
| b9b40c55ca |
2
Jenkinsfile
vendored
2
Jenkinsfile
vendored
@ -26,7 +26,7 @@ pipeline {
|
||||
stage('Install Dependencies') {
|
||||
steps {
|
||||
nodejs(nodeJSInstallationName: 'Node23') {
|
||||
sh 'pnpm install --frozen-lockfile --production=false'
|
||||
sh 'pnpm install --frozen-lockfile --production=false --ignore-scripts=false'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
"development": {
|
||||
"server": {
|
||||
"port": 9090,
|
||||
"logLevel": "trace",
|
||||
"logLevel": "debug",
|
||||
"corsOrigins": [
|
||||
"https://web.farmcontrol.app",
|
||||
"https://dev.tombutcher.work",
|
||||
|
||||
@ -1,7 +1,26 @@
|
||||
import mongoose from 'mongoose';
|
||||
import { generateId } from '../../utils.js';
|
||||
const { Schema } = mongoose;
|
||||
import { aggregateRollups, aggregateRollupsHistory } from '../../database.js';
|
||||
import { aggregateRollups, aggregateRollupsHistory, editObject } from '../../database.js';
|
||||
import { stockEventModel } from './stockevent.schema.js';
|
||||
|
||||
const getStockEventTotal = async (stock, parentType) => {
|
||||
const stockId = stock?._id;
|
||||
if (!stockId) return null;
|
||||
|
||||
const parentId =
|
||||
stockId instanceof mongoose.Types.ObjectId ? stockId : new mongoose.Types.ObjectId(stockId);
|
||||
|
||||
const [result] = await stockEventModel.aggregate([
|
||||
{ $match: { parent: parentId, parentType } },
|
||||
{ $group: { _id: null, total: { $sum: '$value' }, count: { $sum: 1 } } },
|
||||
]);
|
||||
|
||||
return {
|
||||
total: result?.total ?? 0,
|
||||
count: result?.count ?? 0,
|
||||
};
|
||||
};
|
||||
|
||||
// Define the main filamentStock schema
|
||||
const filamentStockSchema = new Schema(
|
||||
@ -19,6 +38,7 @@ const filamentStockSchema = new Schema(
|
||||
net: { type: Number, required: true },
|
||||
gross: { type: Number, required: true },
|
||||
},
|
||||
filament: { type: mongoose.Schema.Types.ObjectId, ref: 'filament', required: true },
|
||||
filamentSku: { type: mongoose.Schema.Types.ObjectId, ref: 'filamentSku', required: true },
|
||||
stockLocation: {
|
||||
type: mongoose.Schema.Types.ObjectId,
|
||||
@ -29,6 +49,17 @@ const filamentStockSchema = new Schema(
|
||||
{ timestamps: true }
|
||||
);
|
||||
|
||||
filamentStockSchema.pre('validate', async function () {
|
||||
if (!this.filament && this.filamentSku) {
|
||||
const sku = await mongoose
|
||||
.model('filamentSku')
|
||||
.findById(this.filamentSku)
|
||||
.select('filament')
|
||||
.lean();
|
||||
if (sku?.filament) this.filament = sku.filament;
|
||||
}
|
||||
});
|
||||
|
||||
const rollupConfigs = [
|
||||
{
|
||||
name: 'totalCurrentWeight',
|
||||
@ -58,6 +89,33 @@ filamentStockSchema.statics.history = async function (from, to) {
|
||||
return results;
|
||||
};
|
||||
|
||||
filamentStockSchema.statics.recalculate = async function (filamentStock, user) {
|
||||
const events = await getStockEventTotal(filamentStock, this.modelName);
|
||||
if (!events?.count) return;
|
||||
|
||||
const net = events.total;
|
||||
const startingNet = filamentStock.startingWeight?.net ?? 0;
|
||||
const startingGross = filamentStock.startingWeight?.gross ?? 0;
|
||||
const gross = startingNet > 0 ? (startingGross * net) / startingNet : net;
|
||||
|
||||
console.log('Recalculating filament stock');
|
||||
console.log('events', events);
|
||||
console.log('filamentStock', filamentStock);
|
||||
|
||||
await editObject({
|
||||
model: this,
|
||||
id: filamentStock._id,
|
||||
updateData: {
|
||||
currentWeight: {
|
||||
net,
|
||||
gross,
|
||||
},
|
||||
},
|
||||
user,
|
||||
recalculate: false,
|
||||
});
|
||||
};
|
||||
|
||||
// Add virtual id getter
|
||||
filamentStockSchema.virtual('id').get(function () {
|
||||
return this._id;
|
||||
|
||||
@ -1,7 +1,26 @@
|
||||
import mongoose from 'mongoose';
|
||||
import { generateId } from '../../utils.js';
|
||||
const { Schema } = mongoose;
|
||||
import { aggregateRollups, aggregateRollupsHistory } from '../../database.js';
|
||||
import { aggregateRollups, aggregateRollupsHistory, editObject } from '../../database.js';
|
||||
import { stockEventModel } from './stockevent.schema.js';
|
||||
|
||||
const getStockEventTotal = async (stock, parentType) => {
|
||||
const stockId = stock?._id;
|
||||
if (!stockId) return null;
|
||||
|
||||
const parentId =
|
||||
stockId instanceof mongoose.Types.ObjectId ? stockId : new mongoose.Types.ObjectId(stockId);
|
||||
|
||||
const [result] = await stockEventModel.aggregate([
|
||||
{ $match: { parent: parentId, parentType } },
|
||||
{ $group: { _id: null, total: { $sum: '$value' }, count: { $sum: 1 } } },
|
||||
]);
|
||||
|
||||
return {
|
||||
total: result?.total ?? 0,
|
||||
count: result?.count ?? 0,
|
||||
};
|
||||
};
|
||||
|
||||
// Define the main partStock schema
|
||||
const partStockSchema = new Schema(
|
||||
@ -53,6 +72,21 @@ partStockSchema.statics.history = async function (from, to) {
|
||||
return results;
|
||||
};
|
||||
|
||||
partStockSchema.statics.recalculate = async function (partStock, user) {
|
||||
const events = await getStockEventTotal(partStock, this.modelName);
|
||||
if (!events?.count) return;
|
||||
|
||||
await editObject({
|
||||
model: this,
|
||||
id: partStock._id,
|
||||
updateData: {
|
||||
currentQuantity: events.total,
|
||||
},
|
||||
user,
|
||||
recalculate: false,
|
||||
});
|
||||
};
|
||||
|
||||
// Add virtual id getter
|
||||
partStockSchema.virtual('id').get(function () {
|
||||
return this._id;
|
||||
|
||||
@ -1,7 +1,26 @@
|
||||
import mongoose from 'mongoose';
|
||||
import { generateId } from '../../utils.js';
|
||||
const { Schema } = mongoose;
|
||||
import { aggregateRollups, aggregateRollupsHistory } from '../../database.js';
|
||||
import { aggregateRollups, aggregateRollupsHistory, editObject } from '../../database.js';
|
||||
import { stockEventModel } from './stockevent.schema.js';
|
||||
|
||||
const getStockEventTotal = async (stock, parentType) => {
|
||||
const stockId = stock?._id;
|
||||
if (!stockId) return null;
|
||||
|
||||
const parentId =
|
||||
stockId instanceof mongoose.Types.ObjectId ? stockId : new mongoose.Types.ObjectId(stockId);
|
||||
|
||||
const [result] = await stockEventModel.aggregate([
|
||||
{ $match: { parent: parentId, parentType } },
|
||||
{ $group: { _id: null, total: { $sum: '$value' }, count: { $sum: 1 } } },
|
||||
]);
|
||||
|
||||
return {
|
||||
total: result?.total ?? 0,
|
||||
count: result?.count ?? 0,
|
||||
};
|
||||
};
|
||||
|
||||
const partStockUsageSchema = new Schema({
|
||||
partStock: { type: Schema.Types.ObjectId, ref: 'partStock', required: false },
|
||||
@ -68,6 +87,21 @@ productStockSchema.statics.history = async function (from, to) {
|
||||
return results;
|
||||
};
|
||||
|
||||
productStockSchema.statics.recalculate = async function (productStock, user) {
|
||||
const events = await getStockEventTotal(productStock, this.modelName);
|
||||
if (!events?.count) return;
|
||||
|
||||
await editObject({
|
||||
model: this,
|
||||
id: productStock._id,
|
||||
updateData: {
|
||||
currentQuantity: events.total,
|
||||
},
|
||||
user,
|
||||
recalculate: false,
|
||||
});
|
||||
};
|
||||
|
||||
// Add virtual id getter
|
||||
productStockSchema.virtual('id').get(function () {
|
||||
return this._id;
|
||||
|
||||
@ -11,7 +11,7 @@ const stockTransferLineSchema = new Schema(
|
||||
},
|
||||
fromStock: {
|
||||
type: Schema.Types.ObjectId,
|
||||
refPath: 'fromStockType',
|
||||
refPath: 'lines.fromStockType',
|
||||
required: true,
|
||||
},
|
||||
quantity: { type: Number, required: true },
|
||||
@ -27,7 +27,7 @@ const stockTransferLineSchema = new Schema(
|
||||
},
|
||||
toStock: {
|
||||
type: Schema.Types.ObjectId,
|
||||
refPath: 'toStockType',
|
||||
refPath: 'lines.toStockType',
|
||||
required: false,
|
||||
},
|
||||
},
|
||||
@ -37,6 +37,7 @@ const stockTransferLineSchema = new Schema(
|
||||
const stockTransferSchema = new Schema(
|
||||
{
|
||||
_reference: { type: String, default: () => generateId()() },
|
||||
name: { type: String, required: true },
|
||||
state: {
|
||||
type: { type: String, required: true, default: 'draft' },
|
||||
progress: { type: Number, required: false },
|
||||
|
||||
@ -7,6 +7,7 @@ const productSchema = new Schema(
|
||||
{
|
||||
_reference: { type: String, default: () => generateId()() },
|
||||
name: { type: String, required: true },
|
||||
productCategory: { type: Schema.Types.ObjectId, ref: 'productCategory', required: true },
|
||||
tags: [{ type: String }],
|
||||
version: { type: String },
|
||||
vendor: { type: Schema.Types.ObjectId, ref: 'vendor', required: true },
|
||||
|
||||
18
src/database/schemas/management/productcategory.schema.js
Normal file
18
src/database/schemas/management/productcategory.schema.js
Normal file
@ -0,0 +1,18 @@
|
||||
import mongoose from 'mongoose';
|
||||
import { generateId } from '../../utils.js';
|
||||
|
||||
const productCategorySchema = new mongoose.Schema(
|
||||
{
|
||||
_reference: { type: String, default: () => generateId()() },
|
||||
name: { required: true, type: String },
|
||||
},
|
||||
{ timestamps: true }
|
||||
);
|
||||
|
||||
productCategorySchema.virtual('id').get(function () {
|
||||
return this._id;
|
||||
});
|
||||
|
||||
productCategorySchema.set('toJSON', { virtuals: true });
|
||||
|
||||
export const productCategoryModel = mongoose.model('productCategory', productCategorySchema);
|
||||
@ -7,6 +7,7 @@ import { gcodeFileModel } from './production/gcodefile.schema.js';
|
||||
import { partModel } from './management/part.schema.js';
|
||||
import { partSkuModel } from './management/partsku.schema.js';
|
||||
import { productModel } from './management/product.schema.js';
|
||||
import { productCategoryModel } from './management/productcategory.schema.js';
|
||||
import { productSkuModel } from './management/productsku.schema.js';
|
||||
import { vendorModel } from './management/vendor.schema.js';
|
||||
import { materialModel } from './management/material.schema.js';
|
||||
@ -43,6 +44,7 @@ import { salesOrderModel } from './sales/salesorder.schema.js';
|
||||
import { marketplaceModel } from './sales/marketplace.schema.js';
|
||||
import { listingModel } from './sales/listing.schema.js';
|
||||
import { listingVarientModel } from './sales/listingvarient.schema.js';
|
||||
import { paymentModel } from './finance/payment.schema.js';
|
||||
|
||||
// Map prefixes to models and id fields
|
||||
export const models = {
|
||||
@ -96,6 +98,13 @@ export const models = {
|
||||
referenceField: '_reference',
|
||||
label: 'Product',
|
||||
},
|
||||
PCG: {
|
||||
model: productCategoryModel,
|
||||
idField: '_id',
|
||||
type: 'productCategory',
|
||||
referenceField: '_reference',
|
||||
label: 'Product Category',
|
||||
},
|
||||
SKU: {
|
||||
model: productSkuModel,
|
||||
idField: '_id',
|
||||
@ -355,4 +364,11 @@ export const models = {
|
||||
label: 'Listing Varient',
|
||||
referenceField: '_reference',
|
||||
},
|
||||
PAY: {
|
||||
model: paymentModel,
|
||||
idField: '_id',
|
||||
type: 'payment',
|
||||
label: 'Payment',
|
||||
referenceField: '_reference',
|
||||
},
|
||||
};
|
||||
|
||||
@ -14,6 +14,7 @@ const gcodeFileSchema = new mongoose.Schema(
|
||||
name: { required: true, type: String },
|
||||
gcodeFileName: { required: false, type: String },
|
||||
size: { type: Number, required: false },
|
||||
filament: { type: Schema.Types.ObjectId, ref: 'filament', required: true },
|
||||
filamentSku: { type: Schema.Types.ObjectId, ref: 'filamentSku', required: true },
|
||||
parts: [partSchema],
|
||||
file: { type: mongoose.SchemaTypes.ObjectId, ref: 'file', required: false },
|
||||
@ -22,6 +23,13 @@ const gcodeFileSchema = new mongoose.Schema(
|
||||
{ timestamps: true }
|
||||
);
|
||||
|
||||
gcodeFileSchema.pre('validate', async function () {
|
||||
if (!this.filament && this.filamentSku) {
|
||||
const sku = await mongoose.model('filamentSku').findById(this.filamentSku).select('filament').lean();
|
||||
if (sku?.filament) this.filament = sku.filament;
|
||||
}
|
||||
});
|
||||
|
||||
gcodeFileSchema.index({ name: 'text', brand: 'text' });
|
||||
|
||||
gcodeFileSchema.virtual('id').get(function () {
|
||||
|
||||
@ -180,6 +180,37 @@ describe('SocketUser', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('object type subscription handlers', () => {
|
||||
it('should pass filters to type subscription methods', async () => {
|
||||
const data = { objectType: 'note', filter: { 'parent._id': 'parent-id' } };
|
||||
const callback = jest.fn();
|
||||
|
||||
await socketUser.handleSubscribeToObjectTypeUpdateEvent(data, callback);
|
||||
|
||||
expect(socketUser.updateManager.subscribeToObjectNew).toHaveBeenCalledWith(
|
||||
'note',
|
||||
data.filter
|
||||
);
|
||||
expect(
|
||||
socketUser.updateManager.subscribeToObjectDelete
|
||||
).toHaveBeenCalledWith('note', data.filter);
|
||||
expect(callback).toHaveBeenCalledWith({ success: true });
|
||||
});
|
||||
|
||||
it('should pass filters to type unsubscribe methods', async () => {
|
||||
const data = { objectType: 'note', filter: { 'parent._id': 'parent-id' } };
|
||||
|
||||
await socketUser.handleUnsubscribeToObjectTypeUpdateEvent(data);
|
||||
|
||||
expect(
|
||||
socketUser.updateManager.removeObjectNewListener
|
||||
).toHaveBeenCalledWith('note', data.filter);
|
||||
expect(
|
||||
socketUser.updateManager.removeObjectDeleteListener
|
||||
).toHaveBeenCalledWith('note', data.filter);
|
||||
});
|
||||
});
|
||||
|
||||
describe('handleGenerateHostOtpEvent', () => {
|
||||
it('should call generateHostOTP and callback', async () => {
|
||||
const data = { _id: 'host-id' };
|
||||
|
||||
@ -163,8 +163,11 @@ export class SocketUser {
|
||||
}
|
||||
|
||||
async handleSubscribeToObjectTypeUpdateEvent(data, callback) {
|
||||
await this.updateManager.subscribeToObjectNew(data.objectType);
|
||||
await this.updateManager.subscribeToObjectDelete(data.objectType);
|
||||
await this.updateManager.subscribeToObjectNew(data.objectType, data.filter);
|
||||
await this.updateManager.subscribeToObjectDelete(
|
||||
data.objectType,
|
||||
data.filter
|
||||
);
|
||||
callback({ success: true });
|
||||
}
|
||||
|
||||
@ -185,8 +188,14 @@ export class SocketUser {
|
||||
}
|
||||
|
||||
async handleUnsubscribeToObjectTypeUpdateEvent(data) {
|
||||
await this.updateManager.removeObjectNewListener(data.objectType);
|
||||
await this.updateManager.removeObjectDeleteListener(data.objectType);
|
||||
await this.updateManager.removeObjectNewListener(
|
||||
data.objectType,
|
||||
data.filter
|
||||
);
|
||||
await this.updateManager.removeObjectDeleteListener(
|
||||
data.objectType,
|
||||
data.filter
|
||||
);
|
||||
}
|
||||
|
||||
async handleUnsubscribeToObjectUpdateEvent(data) {
|
||||
|
||||
@ -7,6 +7,18 @@ jest.unstable_mockModule('../../database/nats.js', () => ({
|
||||
}
|
||||
}));
|
||||
|
||||
jest.unstable_mockModule('../../database/database.js', () => ({
|
||||
listObjects: jest.fn()
|
||||
}));
|
||||
|
||||
jest.unstable_mockModule('../../database/schemas/models.js', () => ({
|
||||
models: {
|
||||
PRN: {
|
||||
model: { modelName: 'printer' }
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
jest.unstable_mockModule('log4js', () => ({
|
||||
default: {
|
||||
getLogger: () => ({
|
||||
@ -30,6 +42,7 @@ jest.unstable_mockModule('../../config.js', () => ({
|
||||
|
||||
const { UpdateManager } = await import('../updatemanager.js');
|
||||
const { natsServer } = await import('../../database/nats.js');
|
||||
const { listObjects } = await import('../../database/database.js');
|
||||
|
||||
describe('UpdateManager', () => {
|
||||
let mockSocketClient;
|
||||
@ -54,19 +67,59 @@ describe('UpdateManager', () => {
|
||||
|
||||
expect(natsServer.subscribe).toHaveBeenCalledWith(
|
||||
'printers.new',
|
||||
'test-socket-id',
|
||||
'test-socket-id:{}',
|
||||
expect.any(Function)
|
||||
);
|
||||
|
||||
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
||||
const data = { name: 'New Printer' };
|
||||
natsCallback('printers.new', data);
|
||||
await natsCallback('printers.new', data);
|
||||
|
||||
expect(mockSocketClient.socket.emit).toHaveBeenCalledWith('objectNew', {
|
||||
object: data,
|
||||
objectType: 'printer'
|
||||
objectType: 'printer',
|
||||
filter: {}
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit filtered new object events when the list filter matches', async () => {
|
||||
const filter = { 'state.type': 'ready' };
|
||||
const data = { _id: '123', name: 'New Printer' };
|
||||
listObjects.mockResolvedValueOnce([data]);
|
||||
|
||||
await updateManager.subscribeToObjectNew('printer', filter);
|
||||
|
||||
expect(natsServer.subscribe).toHaveBeenCalledWith(
|
||||
'printers.new',
|
||||
'test-socket-id:{"state.type":"ready"}',
|
||||
expect.any(Function)
|
||||
);
|
||||
|
||||
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
||||
await natsCallback('printers.new', data);
|
||||
|
||||
expect(listObjects).toHaveBeenCalledWith({
|
||||
model: { modelName: 'printer' },
|
||||
filter: { 'state.type': 'ready', _id: '123' }
|
||||
});
|
||||
expect(mockSocketClient.socket.emit).toHaveBeenCalledWith('objectNew', {
|
||||
object: data,
|
||||
objectType: 'printer',
|
||||
filter
|
||||
});
|
||||
});
|
||||
|
||||
it('should skip filtered new object events when the list filter misses', async () => {
|
||||
const filter = { 'state.type': 'ready' };
|
||||
listObjects.mockResolvedValueOnce([]);
|
||||
|
||||
await updateManager.subscribeToObjectNew('printer', filter);
|
||||
|
||||
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
||||
await natsCallback('printers.new', { _id: '123' });
|
||||
|
||||
expect(mockSocketClient.socket.emit).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('subscribeToObjectDelete', () => {
|
||||
@ -75,19 +128,20 @@ describe('UpdateManager', () => {
|
||||
|
||||
expect(natsServer.subscribe).toHaveBeenCalledWith(
|
||||
'printers.delete',
|
||||
'test-socket-id',
|
||||
'test-socket-id:{}',
|
||||
expect.any(Function)
|
||||
);
|
||||
|
||||
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
||||
const data = { _id: '123' };
|
||||
natsCallback('printers.delete', data);
|
||||
await natsCallback('printers.delete', data);
|
||||
|
||||
expect(mockSocketClient.socket.emit).toHaveBeenCalledWith(
|
||||
'objectDelete',
|
||||
{
|
||||
object: data,
|
||||
objectType: 'printer'
|
||||
objectType: 'printer',
|
||||
filter: {}
|
||||
}
|
||||
);
|
||||
});
|
||||
@ -123,7 +177,7 @@ describe('UpdateManager', () => {
|
||||
await updateManager.removeObjectNewListener('printer');
|
||||
expect(natsServer.removeSubscription).toHaveBeenCalledWith(
|
||||
'printers.new',
|
||||
'test-socket-id'
|
||||
'test-socket-id:{}'
|
||||
);
|
||||
});
|
||||
|
||||
@ -131,7 +185,7 @@ describe('UpdateManager', () => {
|
||||
await updateManager.removeObjectDeleteListener('printer');
|
||||
expect(natsServer.removeSubscription).toHaveBeenCalledWith(
|
||||
'printers.delete',
|
||||
'test-socket-id'
|
||||
'test-socket-id:{}'
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
@ -1,12 +1,42 @@
|
||||
import log4js from 'log4js';
|
||||
import { loadConfig } from '../config.js';
|
||||
import { natsServer } from '../database/nats.js';
|
||||
import { listObjects } from '../database/database.js';
|
||||
import { models } from '../database/schemas/models.js';
|
||||
const config = loadConfig();
|
||||
|
||||
// Setup logger
|
||||
const logger = log4js.getLogger('Update Manager');
|
||||
logger.level = config.server.logLevel;
|
||||
|
||||
const modelList = Object.values(models)
|
||||
.map(model => model.model)
|
||||
.filter(model => model != null);
|
||||
|
||||
const normalizeFilter = filter =>
|
||||
filter && typeof filter === 'object' && !Array.isArray(filter) ? filter : {};
|
||||
|
||||
const stableStringify = value => {
|
||||
if (Array.isArray(value)) {
|
||||
return `[${value.map(stableStringify).join(',')}]`;
|
||||
}
|
||||
|
||||
if (value && typeof value === 'object') {
|
||||
return `{${Object.keys(value)
|
||||
.sort()
|
||||
.map(key => `${JSON.stringify(key)}:${stableStringify(value[key])}`)
|
||||
.join(',')}}`;
|
||||
}
|
||||
|
||||
return JSON.stringify(value);
|
||||
};
|
||||
|
||||
const getSubscriptionOwner = (socketId, filter) =>
|
||||
`${socketId}:${stableStringify(normalizeFilter(filter))}`;
|
||||
|
||||
const getModelByName = modelName =>
|
||||
modelList.filter(model => model.modelName == modelName)[0];
|
||||
|
||||
/**
|
||||
* UpdateManager handles tracking object updates and broadcasts update events via websockets.
|
||||
*/
|
||||
@ -15,31 +45,83 @@ export class UpdateManager {
|
||||
this.socketClient = socketClient;
|
||||
}
|
||||
|
||||
async subscribeToObjectNew(objectType) {
|
||||
async matchesObjectTypeFilter(objectType, filter, value) {
|
||||
const normalizedFilter = normalizeFilter(filter);
|
||||
|
||||
if (Object.keys(normalizedFilter).length === 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const model = getModelByName(objectType);
|
||||
const objectId = value?._id || value;
|
||||
|
||||
if (!model || !objectId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const objects = await listObjects({
|
||||
model,
|
||||
filter: { ...normalizedFilter, _id: objectId }
|
||||
});
|
||||
|
||||
return Array.isArray(objects) && objects.length > 0;
|
||||
}
|
||||
|
||||
async emitObjectTypeEvent(eventName, objectType, filter, value) {
|
||||
const normalizedFilter = normalizeFilter(filter);
|
||||
const matches = await this.matchesObjectTypeFilter(
|
||||
objectType,
|
||||
normalizedFilter,
|
||||
value
|
||||
);
|
||||
|
||||
if (!matches) {
|
||||
logger.trace(`Filtered ${eventName} event:`, {
|
||||
objectType,
|
||||
filter: normalizedFilter,
|
||||
value
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
this.socketClient.socket.emit(eventName, {
|
||||
object: value,
|
||||
objectType: objectType,
|
||||
filter: normalizedFilter
|
||||
});
|
||||
}
|
||||
|
||||
async subscribeToObjectNew(objectType, filter = {}) {
|
||||
const normalizedFilter = normalizeFilter(filter);
|
||||
await natsServer.subscribe(
|
||||
`${objectType}s.new`,
|
||||
this.socketClient.socketId,
|
||||
(key, value) => {
|
||||
getSubscriptionOwner(this.socketClient.socketId, normalizedFilter),
|
||||
async (key, value) => {
|
||||
logger.trace('Object new event:', value);
|
||||
this.socketClient.socket.emit('objectNew', {
|
||||
object: value,
|
||||
objectType: objectType
|
||||
});
|
||||
await this.emitObjectTypeEvent(
|
||||
'objectNew',
|
||||
objectType,
|
||||
normalizedFilter,
|
||||
value
|
||||
);
|
||||
}
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async subscribeToObjectDelete(objectType) {
|
||||
async subscribeToObjectDelete(objectType, filter = {}) {
|
||||
const normalizedFilter = normalizeFilter(filter);
|
||||
await natsServer.subscribe(
|
||||
`${objectType}s.delete`,
|
||||
this.socketClient.socketId,
|
||||
(key, value) => {
|
||||
getSubscriptionOwner(this.socketClient.socketId, normalizedFilter),
|
||||
async (key, value) => {
|
||||
logger.trace('Object delete event:', value);
|
||||
this.socketClient.socket.emit('objectDelete', {
|
||||
object: value,
|
||||
objectType: objectType
|
||||
});
|
||||
await this.emitObjectTypeEvent(
|
||||
'objectDelete',
|
||||
objectType,
|
||||
normalizedFilter,
|
||||
value
|
||||
);
|
||||
}
|
||||
);
|
||||
return { success: true };
|
||||
@ -62,18 +144,18 @@ export class UpdateManager {
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async removeObjectNewListener(objectType) {
|
||||
async removeObjectNewListener(objectType, filter = {}) {
|
||||
await natsServer.removeSubscription(
|
||||
`${objectType}s.new`,
|
||||
this.socketClient.socketId
|
||||
getSubscriptionOwner(this.socketClient.socketId, filter)
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async removeObjectDeleteListener(objectType) {
|
||||
async removeObjectDeleteListener(objectType, filter = {}) {
|
||||
await natsServer.removeSubscription(
|
||||
`${objectType}s.delete`,
|
||||
this.socketClient.socketId
|
||||
getSubscriptionOwner(this.socketClient.socketId, filter)
|
||||
);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user