Enhance UpdateManager for object event handling and subscription management
- Refactored constructor to utilize socketClient for improved clarity. - Added methods to subscribe and unsubscribe from object creation and update events. - Implemented functionality to retrieve and set object updates in Etcd. - Improved logging for event handling and error management.
This commit is contained in:
parent
d695772a3a
commit
1b86d256ca
@ -1,7 +1,9 @@
|
|||||||
import { etcdServer } from '../database/etcd.js';
|
|
||||||
|
|
||||||
import log4js from 'log4js';
|
import log4js from 'log4js';
|
||||||
import { loadConfig } from '../config.js';
|
import { loadConfig } from '../config.js';
|
||||||
|
import NodeCache from 'node-cache';
|
||||||
|
import { etcdServer } from '../database/etcd.js';
|
||||||
|
import { updateObjectCache } from '../database/database.js';
|
||||||
|
|
||||||
const config = loadConfig();
|
const config = loadConfig();
|
||||||
|
|
||||||
// Setup logger
|
// Setup logger
|
||||||
@ -12,40 +14,88 @@ logger.level = config.server.logLevel;
|
|||||||
* UpdateManager handles tracking object updates using Etcd and broadcasts update events via websockets.
|
* UpdateManager handles tracking object updates using Etcd and broadcasts update events via websockets.
|
||||||
*/
|
*/
|
||||||
export class UpdateManager {
|
export class UpdateManager {
|
||||||
constructor(socketManager) {
|
constructor(socketClient) {
|
||||||
this.socketManager = socketManager;
|
this.socketClient = socketClient;
|
||||||
this.setupUpdatesListeners();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async updateObject(object) {
|
async subscribeToObjectNew(objectType) {
|
||||||
// Add an 'update' event to the 'updates' stream
|
await etcdServer.onKeyPutEvent(
|
||||||
logger.debug('Updating object:', object._id);
|
`/${objectType}s/new`,
|
||||||
try {
|
this.socketClient.socketId,
|
||||||
const updateData = {
|
(key, value) => {
|
||||||
_id: object._id,
|
logger.trace('Object new event:', value);
|
||||||
type: object.type,
|
this.socketClient.socket.emit('objectNew', {
|
||||||
updatedAt: new Date().toISOString()
|
_id: value,
|
||||||
};
|
objectType: objectType
|
||||||
|
});
|
||||||
await etcdServer.set(
|
}
|
||||||
`/updates/${object.type}s/${object._id}`,
|
|
||||||
updateData
|
|
||||||
);
|
);
|
||||||
logger.info(`Update event for id: ${object._id}`);
|
return { success: true };
|
||||||
} catch (err) {
|
}
|
||||||
logger.error(`Error adding update event to: ${object._id}:`, err);
|
|
||||||
throw err;
|
async subscribeToObjectUpdate(id, objectType) {
|
||||||
|
await etcdServer.onKeyPutEvent(
|
||||||
|
`/${objectType}s/${id}/object`,
|
||||||
|
this.socketClient.socketId,
|
||||||
|
(key, value) => {
|
||||||
|
logger.trace('Object update event:', id);
|
||||||
|
this.socketClient.socket.emit('objectUpdate', {
|
||||||
|
_id: id,
|
||||||
|
objectType: objectType,
|
||||||
|
object: { ...value }
|
||||||
|
});
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeObjectNewListener(objectType) {
|
||||||
|
await etcdServer.removeKeyWatcher(
|
||||||
|
`/${objectType}s/new`,
|
||||||
|
this.socketClient.socketId,
|
||||||
|
'put'
|
||||||
|
);
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeObjectUpdateListener(id, objectType) {
|
||||||
|
await etcdServer.removeKeyWatcher(
|
||||||
|
`/${objectType}s/${id}/object`,
|
||||||
|
this.socketClient.socketId,
|
||||||
|
'put'
|
||||||
|
);
|
||||||
|
return { success: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
async getObjectUpdate(id, objectType) {
|
||||||
|
try {
|
||||||
|
const objectUpdate = {
|
||||||
|
_id: id,
|
||||||
|
objectType: objectType,
|
||||||
|
object: await etcdServer.get(`/${objectType}s/${id}/object`)
|
||||||
|
};
|
||||||
|
logger.trace(`Returning path: /${objectType}s/${id}/object`);
|
||||||
|
return objectUpdate;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`UpdateManager: Failed to get current value for /${objectType}s/${id}/object:`,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
return { error: 'Not found' };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
setupUpdatesListeners() {
|
async setObjectUpdate(id, objectType, value) {
|
||||||
etcdServer.onPrefixPut('/updates', (key, value) => {
|
try {
|
||||||
const id = key.split('/').pop();
|
await etcdServer.set(`/${objectType}s/${id}/object`, value);
|
||||||
logger.debug('Update object event:', id);
|
logger.trace(`Set value for path: /${objectType}s/${id}/object`);
|
||||||
this.socketManager.broadcast('notify_object_update', {
|
return true;
|
||||||
...value
|
} catch (error) {
|
||||||
});
|
logger.error(
|
||||||
});
|
`Failed to set value for /${objectType}s/${id}/object:`,
|
||||||
logger.info('Subscribed to Etcd stream for update changes.');
|
error
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user