Refactor EtcdServer class to enhance event handling and logging

- Introduced separate watchers for prefix and key events, improving organization and clarity.
- Renamed methods for setting and deleting keys to better reflect their functionality.
- Updated logging levels from debug to trace for more granular logging during operations.
- Added functionality to remove watchers for both prefixes and keys, enhancing resource management.
- Improved error handling and callback management for event listeners.
This commit is contained in:
Tom Butcher 2025-08-18 01:08:36 +01:00
parent 75ccd91b50
commit f5bfbe0d63

View File

@ -9,20 +9,21 @@ logger.level = config.server.logLevel;
class EtcdServer { class EtcdServer {
constructor() { constructor() {
this.client = null; this.client = null;
this.watchers = new Map(); this.prefixPutWatchers = new Map(); // prefix → { watcher, callbacks }
this.prefixDeleteWatchers = new Map(); // prefix → { watcher, callbacks }
this.keyPutWatchers = new Map(); // key → { watcher, callbacks }
this.keyDeleteWatchers = new Map(); // key → { watcher, callbacks }
const etcdConfig = config.database?.etcd || config.database; // fallback for production config const etcdConfig = config.database?.etcd || config.database; // fallback for production config
const host = etcdConfig.host || 'localhost'; const host = etcdConfig.host || 'localhost';
const port = etcdConfig.port || 2379; const port = etcdConfig.port || 2379;
this.hosts = [`${host}:${port}`]; this.hosts = [`${host}:${port}`];
logger.debug( logger.trace(`EtcdServer: hosts set to ${JSON.stringify(this.hosts)}`);
`EtcdServer constructor: hosts set to ${JSON.stringify(this.hosts)}`
);
} }
async connect() { async connect() {
if (!this.client) { if (!this.client) {
logger.info('Connecting to Etcd...'); logger.info('Connecting to Etcd...');
logger.debug( logger.trace(
`Creating Etcd client with hosts ${JSON.stringify(this.hosts)}` `Creating Etcd client with hosts ${JSON.stringify(this.hosts)}`
); );
this.client = new Etcd3({ this.client = new Etcd3({
@ -32,10 +33,10 @@ class EtcdServer {
// Test connection // Test connection
try { try {
await this.client.get('test-connection').string(); await this.client.get('test-connection').string();
logger.debug('Etcd client connected successfully.'); logger.trace('Etcd client connected successfully.');
} catch (error) { } catch (error) {
if (error.code === 'NOT_FOUND') { if (error.code === 'NOT_FOUND') {
logger.debug( logger.trace(
'Etcd client connected successfully (test key not found as expected).' 'Etcd client connected successfully (test key not found as expected).'
); );
} else { } else {
@ -43,38 +44,35 @@ class EtcdServer {
} }
} }
} else { } else {
logger.debug('Etcd client already exists, skipping connection.'); logger.trace('Etcd client already exists, skipping connection.');
} }
return this.client; return this.client;
} }
async getClient() { async getClient() {
logger.trace('Checking if Etcd client exists.');
if (!this.client) { if (!this.client) {
logger.debug('No client found, calling connect().'); logger.trace('No client found, calling connect().');
await this.connect(); await this.connect();
} }
logger.trace('Returning Etcd client.');
return this.client; return this.client;
} }
// Hash-like functionality using etcd async setKey(key, value) {
async set(key, value) {
const client = await this.getClient(); const client = await this.getClient();
const stringValue = const stringValue =
typeof value === 'string' ? value : JSON.stringify(value); typeof value === 'string' ? value : JSON.stringify(value);
await client.put(key).value(stringValue); await client.put(key).value(stringValue);
logger.debug(`Set key: ${key}, value: ${stringValue}`); logger.trace(`Set key: ${key}, value: ${stringValue}`);
return true; return true;
} }
async get(key) { async getKey(key) {
const client = await this.getClient(); const client = await this.getClient();
try { try {
const value = await client.get(key).string(); const value = await client.get(key).string();
logger.debug(`Retrieved key: ${key}, value: ${value}`); logger.trace(`Retrieved key: ${key}, value: ${value}`);
// Try to parse as JSON, fallback to string // Try to parse as JSON, fallback to string
try { try {
@ -84,41 +82,191 @@ class EtcdServer {
} }
} catch (error) { } catch (error) {
if (error.code === 'NOT_FOUND') { if (error.code === 'NOT_FOUND') {
logger.debug(`Key not found: ${key}`); logger.trace(`Key not found: ${key}`);
return null; return null;
} }
throw error; throw error;
} }
} }
async delete(key) { async deleteKey(key) {
const client = await this.getClient(); const client = await this.getClient();
try { try {
await client.delete().key(key); await client.delete().key(key);
logger.debug(`Deleted key: ${key}`); logger.trace(`Deleted key: ${key}`);
return true; return { success: true };
} catch (error) { } catch (error) {
if (error.code === 'NOT_FOUND') { if (error.code === 'NOT_FOUND') {
logger.debug(`Key not found for deletion: ${key}`); const error = `Key not found for deletion.`;
return false; console.log(error, 'Key:', key);
return { error: error };
} }
throw error; throw error;
} }
} }
async onPrefixEvent(prefix, callback) { async onPrefixPutEvent(prefix, owner, callback) {
const client = await this.getClient(); const client = await this.getClient();
logger.debug(`Setting up watcher for prefix events: ${prefix}`); const watcherKey = prefix;
if (this.prefixPutWatchers.has(watcherKey)) {
this.prefixPutWatchers.get(watcherKey).callbacks.set(owner, callback);
logger.trace(`Added put callback for owner=${owner} on prefix=${prefix}`);
return;
}
logger.trace(`Creating new put watcher for prefix: ${prefix}`);
const watcher = await client.watch().prefix(prefix).create();
const callbacks = new Map();
callbacks.set(owner, callback);
watcher.on('put', (kv, previous) => {
logger.trace(`Prefix put event detected: ${prefix}, key: ${kv.key}`);
const valueStr = kv.value.toString();
let parsedValue;
try {
parsedValue = JSON.parse(valueStr);
} catch {
parsedValue = valueStr;
}
for (const [ownerId, cb] of callbacks) {
try {
cb(kv.key.toString(), parsedValue, kv, previous);
} catch (err) {
logger.error(
`Error in onPrefixPutEvent callback for owner=${ownerId}, prefix=${prefix}:`,
err
);
}
}
});
this.prefixPutWatchers.set(watcherKey, { watcher, callbacks });
return { success: true };
}
async onPrefixDeleteEvent(prefix, owner, callback) {
const client = await this.getClient();
const watcherKey = prefix;
if (this.prefixDeleteWatchers.has(watcherKey)) {
this.prefixDeleteWatchers.get(watcherKey).callbacks.set(owner, callback);
logger.trace(
`Added delete callback for owner=${owner} on prefix=${prefix}`
);
return;
}
logger.trace(`Creating new delete watcher for prefix: ${prefix}`);
const watcher = await client.watch().prefix(prefix).create();
const callbacks = new Map();
callbacks.set(owner, callback);
watcher.on('delete', (kv, previous) => {
logger.trace(`Prefix delete event detected: ${prefix}, key: ${kv.key}`);
for (const [ownerId, cb] of callbacks) {
try {
cb(kv.key.toString(), kv, previous);
} catch (err) {
logger.error(
`Error in onPrefixDeleteEvent callback for owner=${ownerId}, prefix=${prefix}:`,
err
);
}
}
});
this.prefixDeleteWatchers.set(watcherKey, { watcher, callbacks });
return { success: true };
}
async onKeyPutEvent(key, owner, callback) {
const client = await this.getClient();
const watcherKey = key;
if (this.keyPutWatchers.has(watcherKey)) {
this.keyPutWatchers.get(watcherKey).callbacks.set(owner, callback);
logger.trace(`Added put callback for owner: ${owner}, on key: ${key}`);
return;
}
logger.trace(`Creating new put watcher for key: ${key}`);
const watcher = await client.watch().key(key).create();
const callbacks = new Map();
callbacks.set(owner, callback);
watcher.on('put', (kv, previous) => {
logger.trace(`Key put event detected: ${key}, key: ${kv.key}`);
const valueStr = kv.value.toString();
let parsedValue;
try {
parsedValue = JSON.parse(valueStr);
} catch {
parsedValue = valueStr;
}
for (const [ownerId, cb] of callbacks) {
try {
cb(kv.key.toString(), parsedValue, kv, previous);
} catch (err) {
logger.error(
`Error in onKeyPutEvent callback for owner: ${ownerId}, key: ${key}:`,
err
);
}
}
});
this.keyPutWatchers.set(watcherKey, { watcher, callbacks });
return { success: true };
}
async onKeyDeleteEvent(key, owner, callback) {
const client = await this.getClient();
const watcherKey = key;
if (this.keyDeleteWatchers.has(watcherKey)) {
this.keyDeleteWatchers.get(watcherKey).callbacks.set(owner, callback);
logger.trace(`Added delete callback for owner: ${owner} on key: ${key}`);
return;
}
logger.trace(`Creating new delete watcher for key: ${key}`);
const watcher = await client.watch().key(key).create();
const callbacks = new Map();
callbacks.set(owner, callback);
watcher.on('delete', (kv, previous) => {
logger.trace(`Key delete event detected: ${key}, key: ${kv.key}`);
for (const [ownerId, cb] of callbacks) {
try {
cb(kv.key.toString(), kv, previous);
} catch (err) {
logger.error(
`Error in onKeyDeleteEvent callback for owner=${ownerId}, key=${key}:`,
err
);
}
}
});
this.keyDeleteWatchers.set(watcherKey, { watcher, callbacks });
}
async onKeyEvent(key, callback) {
const client = await this.getClient();
logger.trace(`Setting up watcher for key events: ${key}`);
client client
.watch() .watch()
.prefix(prefix) .key(key)
.create() .create()
.then(watcher => { .then(watcher => {
// Handle put events // Handle put events
watcher.on('put', (kv, previous) => { watcher.on('put', (kv, previous) => {
logger.debug(`Prefix put event detected: ${prefix}, key: ${kv.key}`); logger.trace(`Key put event detected: ${key}`);
try { try {
const value = kv.value.toString(); const value = kv.value.toString();
let parsedValue; let parsedValue;
@ -127,10 +275,10 @@ class EtcdServer {
} catch { } catch {
parsedValue = value; parsedValue = value;
} }
callback(kv.key.toString(), parsedValue, kv, previous); callback(key, parsedValue, kv, previous);
} catch (error) { } catch (error) {
logger.error( logger.error(
`Error in onPrefixEvent put callback for prefix ${prefix}:`, `Error in onKeyEvent put callback for key ${key}:`,
error error
); );
} }
@ -138,98 +286,80 @@ class EtcdServer {
// Handle delete events // Handle delete events
watcher.on('delete', (kv, previous) => { watcher.on('delete', (kv, previous) => {
logger.debug( logger.trace(`Key delete event detected: ${key}`);
`Prefix delete event detected: ${prefix}, key: ${kv.key}`
);
try { try {
callback(kv.key.toString(), null, kv, previous); callback(key, null, kv, previous);
} catch (error) { } catch (error) {
logger.error( logger.error(
`Error in onPrefixEvent delete callback for prefix ${prefix}:`, `Error in onKeyEvent delete callback for key ${key}:`,
error error
); );
} }
}); });
// Store watcher with a unique key // Store watcher with a unique key
const watcherKey = `event:${prefix}`; const watcherKey = `event:key:${key}`;
this.watchers.set(watcherKey, watcher); this.watchers.set(watcherKey, watcher);
}); });
} }
async onPrefixPut(prefix, callback) { async removePrefixWatcher(prefix, owner, type = 'put') {
const client = await this.getClient(); const store =
logger.debug(`Setting up watcher for prefix put: ${prefix}`); type === 'put' ? this.prefixPutWatchers : this.prefixDeleteWatchers;
const entry = store.get(prefix);
client if (!entry) {
.watch() logger.trace(`Watcher not found for prefix: ${prefix}, type: ${type}`);
.prefix(prefix)
.create()
.then(watcher => {
watcher.on('put', (kv, previous) => {
logger.debug(`Prefix put event detected: ${prefix}, key: ${kv.key}`);
try {
const value = kv.value.toString();
let parsedValue;
try {
parsedValue = JSON.parse(value);
} catch {
parsedValue = value;
}
callback(kv.key.toString(), parsedValue, kv, previous);
} catch (error) {
logger.error(
`Error in onPrefixPut callback for prefix ${prefix}:`,
error
);
}
});
this.watchers.set(`put:${prefix}`, watcher);
});
}
async onPrefixDelete(prefix, callback) {
const client = await this.getClient();
logger.debug(`Setting up watcher for prefix delete: ${prefix}`);
client
.watch()
.prefix(prefix)
.create()
.then(watcher => {
watcher.on('delete', (kv, previous) => {
logger.debug(
`Prefix delete event detected: ${prefix}, key: ${kv.key}`
);
try {
callback(kv.key.toString(), kv, previous);
} catch (error) {
logger.error(
`Error in onPrefixDelete callback for prefix ${prefix}:`,
error
);
}
});
this.watchers.set(`delete:${prefix}`, watcher);
});
}
async removeWatcher(prefix, type = 'put') {
const watcherKey = `${type}:${prefix}`;
const watcher = this.watchers.get(watcherKey);
if (watcher) {
logger.debug(`Removing watcher: ${watcherKey}`);
watcher.removeAllListeners();
await watcher.close();
this.watchers.delete(watcherKey);
return true;
} else {
logger.debug(`Watcher not found: ${watcherKey}`);
return false; return false;
} }
if (entry.callbacks.delete(owner)) {
logger.trace(
`Removed ${type} callback for owner: ${owner} on prefix: ${prefix}`
);
} else {
logger.trace(
`No ${type} callback found for owner: ${owner} on prefix: ${prefix}`
);
}
if (entry.callbacks.size === 0) {
logger.trace(`No callbacks left, stopping ${type} watcher for ${prefix}`);
entry.watcher.removeAllListeners();
await entry.watcher.cancel();
store.delete(prefix);
}
return true;
}
async removeKeyWatcher(key, owner, type = 'put') {
const store = type === 'put' ? this.keyPutWatchers : this.keyDeleteWatchers;
const entry = store.get(key);
if (!entry) {
logger.trace(`Watcher not found for key: ${key}, type: ${type}`);
return false;
}
if (entry.callbacks.delete(owner)) {
logger.trace(
`Removed ${type} callback for owner: ${owner} on key: ${key}`
);
} else {
logger.trace(
`No ${type} callback found for owner: ${owner} on key: ${key}`
);
}
if (entry.callbacks.size === 0) {
logger.trace(`No callbacks left, stopping ${type} watcher for ${key}`);
entry.watcher.removeAllListeners();
await entry.watcher.cancel();
store.delete(key);
}
return true;
} }
async disconnect() { async disconnect() {
@ -237,7 +367,7 @@ class EtcdServer {
// Stop all watchers // Stop all watchers
for (const [key, watcher] of this.watchers) { for (const [key, watcher] of this.watchers) {
logger.debug(`Stopping watcher: ${key}`); logger.trace(`Stopping watcher: ${key}`);
watcher.removeAllListeners(); watcher.removeAllListeners();
await watcher.close(); await watcher.close();
} }