Compare commits
3 Commits
4bfc7fae2a
...
4f9ed6039b
| Author | SHA1 | Date | |
|---|---|---|---|
| 4f9ed6039b | |||
| b1245595d9 | |||
| 3481b07a55 |
@ -15,6 +15,8 @@ const paymentSchema = new Schema(
|
||||
},
|
||||
paymentDate: { type: Date, required: false },
|
||||
postedAt: { type: Date, required: false },
|
||||
authorisedAt: { type: Date, required: false },
|
||||
declinedAt: { type: Date, required: false },
|
||||
cancelledAt: { type: Date, required: false },
|
||||
paymentMethod: { type: String, required: false },
|
||||
notes: { type: String, required: false },
|
||||
@ -39,6 +41,22 @@ const rollupConfigs = [
|
||||
{ name: 'postedAmount', property: 'amount', operation: 'sum' },
|
||||
],
|
||||
},
|
||||
{
|
||||
name: 'authorised',
|
||||
filter: { 'state.type': 'authorised' },
|
||||
rollups: [
|
||||
{ name: 'authorisedCount', property: 'state.type', operation: 'count' },
|
||||
{ name: 'authorisedAmount', property: 'amount', operation: 'sum' },
|
||||
],
|
||||
},
|
||||
{
|
||||
name: 'declined',
|
||||
filter: { 'state.type': 'declined' },
|
||||
rollups: [
|
||||
{ name: 'declinedCount', property: 'state.type', operation: 'count' },
|
||||
{ name: 'declinedAmount', property: 'amount', operation: 'sum' },
|
||||
],
|
||||
},
|
||||
{
|
||||
name: 'cancelled',
|
||||
filter: { 'state.type': 'cancelled' },
|
||||
|
||||
@ -36,7 +36,7 @@ import { fileModel } from './management/file.schema.js';
|
||||
import { courierServiceModel } from './management/courierservice.schema.js';
|
||||
import { courierModel } from './management/courier.schema.js';
|
||||
import { taxRateModel } from './management/taxrate.schema.js';
|
||||
import { taxRecordModel } from './management/taxrecord.schema.js';
|
||||
import { taxRecordModel } from './finance/taxrecord.schema.js';
|
||||
import { shipmentModel } from './inventory/shipment.schema.js';
|
||||
import { invoiceModel } from './finance/invoice.schema.js';
|
||||
import { clientModel } from './sales/client.schema.js';
|
||||
|
||||
@ -7,18 +7,6 @@ jest.unstable_mockModule('../../database/nats.js', () => ({
|
||||
}
|
||||
}));
|
||||
|
||||
jest.unstable_mockModule('../../database/database.js', () => ({
|
||||
listObjects: jest.fn()
|
||||
}));
|
||||
|
||||
jest.unstable_mockModule('../../database/schemas/models.js', () => ({
|
||||
models: {
|
||||
PRN: {
|
||||
model: { modelName: 'printer' }
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
jest.unstable_mockModule('log4js', () => ({
|
||||
default: {
|
||||
getLogger: () => ({
|
||||
@ -42,7 +30,6 @@ jest.unstable_mockModule('../../config.js', () => ({
|
||||
|
||||
const { UpdateManager } = await import('../updatemanager.js');
|
||||
const { natsServer } = await import('../../database/nats.js');
|
||||
const { listObjects } = await import('../../database/database.js');
|
||||
|
||||
describe('UpdateManager', () => {
|
||||
let mockSocketClient;
|
||||
@ -84,8 +71,11 @@ describe('UpdateManager', () => {
|
||||
|
||||
it('should emit filtered new object events when the list filter matches', async () => {
|
||||
const filter = { 'state.type': 'ready' };
|
||||
const data = { _id: '123', name: 'New Printer' };
|
||||
listObjects.mockResolvedValueOnce([data]);
|
||||
const data = {
|
||||
_id: '123',
|
||||
name: 'New Printer',
|
||||
state: { type: 'ready' }
|
||||
};
|
||||
|
||||
await updateManager.subscribeToObjectNew('printer', filter);
|
||||
|
||||
@ -98,10 +88,6 @@ describe('UpdateManager', () => {
|
||||
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
||||
await natsCallback('printers.new', data);
|
||||
|
||||
expect(listObjects).toHaveBeenCalledWith({
|
||||
model: { modelName: 'printer' },
|
||||
filter: { 'state.type': 'ready', _id: '123' }
|
||||
});
|
||||
expect(mockSocketClient.socket.emit).toHaveBeenCalledWith('objectNew', {
|
||||
object: data,
|
||||
objectType: 'printer',
|
||||
@ -111,7 +97,6 @@ describe('UpdateManager', () => {
|
||||
|
||||
it('should skip filtered new object events when the list filter misses', async () => {
|
||||
const filter = { 'state.type': 'ready' };
|
||||
listObjects.mockResolvedValueOnce([]);
|
||||
|
||||
await updateManager.subscribeToObjectNew('printer', filter);
|
||||
|
||||
@ -120,6 +105,35 @@ describe('UpdateManager', () => {
|
||||
|
||||
expect(mockSocketClient.socket.emit).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should match reference filters when the id is populated or flat', async () => {
|
||||
const filter = { 'parent._id': 'parent-id' };
|
||||
|
||||
await updateManager.subscribeToObjectNew('note', filter);
|
||||
const natsCallback = natsServer.subscribe.mock.calls[0][2];
|
||||
|
||||
await natsCallback('notes.new', {
|
||||
_id: 'note-1',
|
||||
parent: { _id: 'parent-id' }
|
||||
});
|
||||
expect(mockSocketClient.socket.emit).toHaveBeenCalledTimes(1);
|
||||
|
||||
mockSocketClient.socket.emit.mockClear();
|
||||
|
||||
await natsCallback('notes.new', {
|
||||
_id: 'note-2',
|
||||
parent: 'parent-id'
|
||||
});
|
||||
expect(mockSocketClient.socket.emit).toHaveBeenCalledTimes(1);
|
||||
|
||||
mockSocketClient.socket.emit.mockClear();
|
||||
|
||||
await natsCallback('notes.new', {
|
||||
_id: 'note-3',
|
||||
parent: 'other-parent'
|
||||
});
|
||||
expect(mockSocketClient.socket.emit).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('subscribeToObjectDelete', () => {
|
||||
|
||||
@ -1,21 +1,64 @@
|
||||
import log4js from 'log4js';
|
||||
import _ from 'lodash';
|
||||
import { loadConfig } from '../config.js';
|
||||
import { natsServer } from '../database/nats.js';
|
||||
import { listObjects } from '../database/database.js';
|
||||
import { models } from '../database/schemas/models.js';
|
||||
const config = loadConfig();
|
||||
|
||||
// Setup logger
|
||||
const logger = log4js.getLogger('Update Manager');
|
||||
logger.level = config.server.logLevel;
|
||||
|
||||
const modelList = Object.values(models)
|
||||
.map(model => model.model)
|
||||
.filter(model => model != null);
|
||||
|
||||
const normalizeFilter = filter =>
|
||||
filter && typeof filter === 'object' && !Array.isArray(filter) ? filter : {};
|
||||
|
||||
const getFilterValue = (object, key) => {
|
||||
if (key.endsWith('._id')) {
|
||||
const refPath = key.slice(0, -4);
|
||||
const ref = _.get(object, refPath);
|
||||
if (ref && typeof ref === 'object' && ref._id) {
|
||||
return ref._id;
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
|
||||
return _.get(object, key);
|
||||
};
|
||||
|
||||
const valuesMatch = (actual, expected) => {
|
||||
if (actual == expected) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (actual != null && expected != null) {
|
||||
return String(actual) === String(expected);
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
const matchesFilter = (object, filter) => {
|
||||
if (!filter || Object.keys(filter).length === 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (object == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const normalizedObject =
|
||||
typeof object === 'object' && !Array.isArray(object)
|
||||
? object
|
||||
: { _id: object };
|
||||
|
||||
for (const [key, expectedValue] of Object.entries(filter)) {
|
||||
if (!valuesMatch(getFilterValue(normalizedObject, key), expectedValue)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
const stableStringify = value => {
|
||||
if (Array.isArray(value)) {
|
||||
return `[${value.map(stableStringify).join(',')}]`;
|
||||
@ -34,9 +77,6 @@ const stableStringify = value => {
|
||||
const getSubscriptionOwner = (socketId, filter) =>
|
||||
`${socketId}:${stableStringify(normalizeFilter(filter))}`;
|
||||
|
||||
const getModelByName = modelName =>
|
||||
modelList.filter(model => model.modelName == modelName)[0];
|
||||
|
||||
/**
|
||||
* UpdateManager handles tracking object updates and broadcasts update events via websockets.
|
||||
*/
|
||||
@ -45,31 +85,13 @@ export class UpdateManager {
|
||||
this.socketClient = socketClient;
|
||||
}
|
||||
|
||||
async matchesObjectTypeFilter(objectType, filter, value) {
|
||||
matchesObjectTypeFilter(objectType, filter, value) {
|
||||
return matchesFilter(value, normalizeFilter(filter));
|
||||
}
|
||||
|
||||
emitObjectTypeEvent(eventName, objectType, filter, value) {
|
||||
const normalizedFilter = normalizeFilter(filter);
|
||||
|
||||
if (Object.keys(normalizedFilter).length === 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const model = getModelByName(objectType);
|
||||
const objectId = value?._id || value;
|
||||
|
||||
if (!model || !objectId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const objects = await listObjects({
|
||||
model,
|
||||
filter: { ...normalizedFilter, _id: objectId }
|
||||
});
|
||||
|
||||
return Array.isArray(objects) && objects.length > 0;
|
||||
}
|
||||
|
||||
async emitObjectTypeEvent(eventName, objectType, filter, value) {
|
||||
const normalizedFilter = normalizeFilter(filter);
|
||||
const matches = await this.matchesObjectTypeFilter(
|
||||
const matches = this.matchesObjectTypeFilter(
|
||||
objectType,
|
||||
normalizedFilter,
|
||||
value
|
||||
@ -98,7 +120,7 @@ export class UpdateManager {
|
||||
getSubscriptionOwner(this.socketClient.socketId, normalizedFilter),
|
||||
async (key, value) => {
|
||||
logger.trace('Object new event:', value);
|
||||
await this.emitObjectTypeEvent(
|
||||
this.emitObjectTypeEvent(
|
||||
'objectNew',
|
||||
objectType,
|
||||
normalizedFilter,
|
||||
@ -116,7 +138,7 @@ export class UpdateManager {
|
||||
getSubscriptionOwner(this.socketClient.socketId, normalizedFilter),
|
||||
async (key, value) => {
|
||||
logger.trace('Object delete event:', value);
|
||||
await this.emitObjectTypeEvent(
|
||||
this.emitObjectTypeEvent(
|
||||
'objectDelete',
|
||||
objectType,
|
||||
normalizedFilter,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user