101 lines
2.9 KiB
JavaScript
101 lines
2.9 KiB
JavaScript
import { etcdServer } from '../database/etcd.js';
|
|
import log4js from 'log4js';
|
|
import { loadConfig } from '../config.js';
|
|
const config = loadConfig();
|
|
|
|
// Setup logger
|
|
const logger = log4js.getLogger('Lock Manager');
|
|
logger.level = config.server.logLevel;
|
|
|
|
/**
|
|
* LockManager handles distributed locking using Etcd and broadcasts lock events via websockets.
|
|
*/
|
|
export class LockManager {
|
|
constructor(socketManager) {
|
|
this.socketManager = socketManager;
|
|
this.setupLocksListeners();
|
|
}
|
|
|
|
async lockObject(object) {
|
|
// Add a 'lock' event to the 'locks' stream
|
|
logger.debug('Locking object:', object._id);
|
|
try {
|
|
await etcdServer.set(`/locks/${object.type}s/${object._id}`, object);
|
|
logger.info(`Lock event to id: ${object._id}`);
|
|
return true;
|
|
} catch (err) {
|
|
logger.error(`Error adding lock event to: ${object._id}:`, err);
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
async unlockObject(object) {
|
|
// Add an 'unlock' event to the 'locks' stream
|
|
const key = `/locks/${object.type}s/${object._id}`;
|
|
console.log('unlocking');
|
|
try {
|
|
logger.debug('Checking user can unlock:', object._id);
|
|
|
|
const lockEvent = await etcdServer.get(key);
|
|
|
|
if (lockEvent?.user === object.user) {
|
|
logger.debug('Unlocking object:', object._id);
|
|
await etcdServer.delete(key);
|
|
logger.info(`Unlocked object: ${object._id}`);
|
|
return true;
|
|
}
|
|
} catch (err) {
|
|
logger.error(`Error unlocking object ${object._id}:`, err);
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
async getObjectLock(object) {
|
|
// Get the current lock status of an object and broadcast it
|
|
logger.info('Getting lock status for object:', object._id);
|
|
try {
|
|
const lockKey = `/locks/${object.type}s/${object._id}`;
|
|
const lockValue = await etcdServer.get(lockKey);
|
|
|
|
if (lockValue) {
|
|
// Object is locked
|
|
logger.debug(`Object ${object._id} is locked`);
|
|
return {
|
|
...lockValue,
|
|
locked: true
|
|
};
|
|
} else {
|
|
// Object is not locked
|
|
logger.debug(`Object ${object._id} is not locked`);
|
|
return {
|
|
_id: object._id,
|
|
locked: false
|
|
};
|
|
}
|
|
} catch (err) {
|
|
logger.error(`Error getting lock status for object ${object._id}:`, err);
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
setupLocksListeners() {
|
|
etcdServer.onPrefixPut('/locks', (key, value) => {
|
|
const id = key.split('/').pop();
|
|
logger.debug('Lock object event:', id);
|
|
this.socketManager.broadcast('notify_lock_update', {
|
|
...value,
|
|
locked: true
|
|
});
|
|
});
|
|
etcdServer.onPrefixDelete('/locks', key => {
|
|
const id = key.split('/').pop();
|
|
logger.debug('Unlock object event:', id);
|
|
this.socketManager.broadcast('notify_lock_update', {
|
|
_id: id,
|
|
locked: false
|
|
});
|
|
});
|
|
logger.info('Subscribed to Etcd stream for lock changes.');
|
|
}
|
|
}
|