From f5bfbe0d63e3c2abbebc29b0c5a31da12d7b2e73 Mon Sep 17 00:00:00 2001 From: Tom Butcher Date: Mon, 18 Aug 2025 01:08:36 +0100 Subject: [PATCH] Refactor EtcdServer class to enhance event handling and logging - Introduced separate watchers for prefix and key events, improving organization and clarity. - Renamed methods for setting and deleting keys to better reflect their functionality. - Updated logging levels from debug to trace for more granular logging during operations. - Added functionality to remove watchers for both prefixes and keys, enhancing resource management. - Improved error handling and callback management for event listeners. --- src/database/etcd.js | 340 ++++++++++++++++++++++++++++++------------- 1 file changed, 235 insertions(+), 105 deletions(-) diff --git a/src/database/etcd.js b/src/database/etcd.js index 9edfce7..f81fe96 100644 --- a/src/database/etcd.js +++ b/src/database/etcd.js @@ -9,20 +9,21 @@ logger.level = config.server.logLevel; class EtcdServer { constructor() { this.client = null; - this.watchers = new Map(); + this.prefixPutWatchers = new Map(); // prefix → { watcher, callbacks } + this.prefixDeleteWatchers = new Map(); // prefix → { watcher, callbacks } + this.keyPutWatchers = new Map(); // key → { watcher, callbacks } + this.keyDeleteWatchers = new Map(); // key → { watcher, callbacks } const etcdConfig = config.database?.etcd || config.database; // fallback for production config const host = etcdConfig.host || 'localhost'; const port = etcdConfig.port || 2379; this.hosts = [`${host}:${port}`]; - logger.debug( - `EtcdServer constructor: hosts set to ${JSON.stringify(this.hosts)}` - ); + logger.trace(`EtcdServer: hosts set to ${JSON.stringify(this.hosts)}`); } async connect() { if (!this.client) { logger.info('Connecting to Etcd...'); - logger.debug( + logger.trace( `Creating Etcd client with hosts ${JSON.stringify(this.hosts)}` ); this.client = new Etcd3({ @@ -32,10 +33,10 @@ class EtcdServer { // Test connection try { await this.client.get('test-connection').string(); - logger.debug('Etcd client connected successfully.'); + logger.trace('Etcd client connected successfully.'); } catch (error) { if (error.code === 'NOT_FOUND') { - logger.debug( + logger.trace( 'Etcd client connected successfully (test key not found as expected).' ); } else { @@ -43,38 +44,35 @@ class EtcdServer { } } } else { - logger.debug('Etcd client already exists, skipping connection.'); + logger.trace('Etcd client already exists, skipping connection.'); } return this.client; } async getClient() { - logger.trace('Checking if Etcd client exists.'); if (!this.client) { - logger.debug('No client found, calling connect().'); + logger.trace('No client found, calling connect().'); await this.connect(); } - logger.trace('Returning Etcd client.'); return this.client; } - // Hash-like functionality using etcd - async set(key, value) { + async setKey(key, value) { const client = await this.getClient(); const stringValue = typeof value === 'string' ? value : JSON.stringify(value); await client.put(key).value(stringValue); - logger.debug(`Set key: ${key}, value: ${stringValue}`); + logger.trace(`Set key: ${key}, value: ${stringValue}`); return true; } - async get(key) { + async getKey(key) { const client = await this.getClient(); try { const value = await client.get(key).string(); - logger.debug(`Retrieved key: ${key}, value: ${value}`); + logger.trace(`Retrieved key: ${key}, value: ${value}`); // Try to parse as JSON, fallback to string try { @@ -84,41 +82,191 @@ class EtcdServer { } } catch (error) { if (error.code === 'NOT_FOUND') { - logger.debug(`Key not found: ${key}`); + logger.trace(`Key not found: ${key}`); return null; } throw error; } } - async delete(key) { + async deleteKey(key) { const client = await this.getClient(); try { await client.delete().key(key); - logger.debug(`Deleted key: ${key}`); - return true; + logger.trace(`Deleted key: ${key}`); + return { success: true }; } catch (error) { if (error.code === 'NOT_FOUND') { - logger.debug(`Key not found for deletion: ${key}`); - return false; + const error = `Key not found for deletion.`; + console.log(error, 'Key:', key); + return { error: error }; } throw error; } } - async onPrefixEvent(prefix, callback) { + async onPrefixPutEvent(prefix, owner, callback) { const client = await this.getClient(); - logger.debug(`Setting up watcher for prefix events: ${prefix}`); + const watcherKey = prefix; + + if (this.prefixPutWatchers.has(watcherKey)) { + this.prefixPutWatchers.get(watcherKey).callbacks.set(owner, callback); + logger.trace(`Added put callback for owner=${owner} on prefix=${prefix}`); + return; + } + + logger.trace(`Creating new put watcher for prefix: ${prefix}`); + const watcher = await client.watch().prefix(prefix).create(); + const callbacks = new Map(); + callbacks.set(owner, callback); + + watcher.on('put', (kv, previous) => { + logger.trace(`Prefix put event detected: ${prefix}, key: ${kv.key}`); + const valueStr = kv.value.toString(); + let parsedValue; + try { + parsedValue = JSON.parse(valueStr); + } catch { + parsedValue = valueStr; + } + + for (const [ownerId, cb] of callbacks) { + try { + cb(kv.key.toString(), parsedValue, kv, previous); + } catch (err) { + logger.error( + `Error in onPrefixPutEvent callback for owner=${ownerId}, prefix=${prefix}:`, + err + ); + } + } + }); + + this.prefixPutWatchers.set(watcherKey, { watcher, callbacks }); + return { success: true }; + } + + async onPrefixDeleteEvent(prefix, owner, callback) { + const client = await this.getClient(); + const watcherKey = prefix; + + if (this.prefixDeleteWatchers.has(watcherKey)) { + this.prefixDeleteWatchers.get(watcherKey).callbacks.set(owner, callback); + logger.trace( + `Added delete callback for owner=${owner} on prefix=${prefix}` + ); + return; + } + + logger.trace(`Creating new delete watcher for prefix: ${prefix}`); + const watcher = await client.watch().prefix(prefix).create(); + const callbacks = new Map(); + callbacks.set(owner, callback); + + watcher.on('delete', (kv, previous) => { + logger.trace(`Prefix delete event detected: ${prefix}, key: ${kv.key}`); + for (const [ownerId, cb] of callbacks) { + try { + cb(kv.key.toString(), kv, previous); + } catch (err) { + logger.error( + `Error in onPrefixDeleteEvent callback for owner=${ownerId}, prefix=${prefix}:`, + err + ); + } + } + }); + + this.prefixDeleteWatchers.set(watcherKey, { watcher, callbacks }); + return { success: true }; + } + + async onKeyPutEvent(key, owner, callback) { + const client = await this.getClient(); + const watcherKey = key; + + if (this.keyPutWatchers.has(watcherKey)) { + this.keyPutWatchers.get(watcherKey).callbacks.set(owner, callback); + logger.trace(`Added put callback for owner: ${owner}, on key: ${key}`); + return; + } + + logger.trace(`Creating new put watcher for key: ${key}`); + const watcher = await client.watch().key(key).create(); + const callbacks = new Map(); + callbacks.set(owner, callback); + + watcher.on('put', (kv, previous) => { + logger.trace(`Key put event detected: ${key}, key: ${kv.key}`); + const valueStr = kv.value.toString(); + let parsedValue; + try { + parsedValue = JSON.parse(valueStr); + } catch { + parsedValue = valueStr; + } + + for (const [ownerId, cb] of callbacks) { + try { + cb(kv.key.toString(), parsedValue, kv, previous); + } catch (err) { + logger.error( + `Error in onKeyPutEvent callback for owner: ${ownerId}, key: ${key}:`, + err + ); + } + } + }); + + this.keyPutWatchers.set(watcherKey, { watcher, callbacks }); + return { success: true }; + } + + async onKeyDeleteEvent(key, owner, callback) { + const client = await this.getClient(); + const watcherKey = key; + + if (this.keyDeleteWatchers.has(watcherKey)) { + this.keyDeleteWatchers.get(watcherKey).callbacks.set(owner, callback); + logger.trace(`Added delete callback for owner: ${owner} on key: ${key}`); + return; + } + + logger.trace(`Creating new delete watcher for key: ${key}`); + const watcher = await client.watch().key(key).create(); + const callbacks = new Map(); + callbacks.set(owner, callback); + + watcher.on('delete', (kv, previous) => { + logger.trace(`Key delete event detected: ${key}, key: ${kv.key}`); + for (const [ownerId, cb] of callbacks) { + try { + cb(kv.key.toString(), kv, previous); + } catch (err) { + logger.error( + `Error in onKeyDeleteEvent callback for owner=${ownerId}, key=${key}:`, + err + ); + } + } + }); + + this.keyDeleteWatchers.set(watcherKey, { watcher, callbacks }); + } + + async onKeyEvent(key, callback) { + const client = await this.getClient(); + logger.trace(`Setting up watcher for key events: ${key}`); client .watch() - .prefix(prefix) + .key(key) .create() .then(watcher => { // Handle put events watcher.on('put', (kv, previous) => { - logger.debug(`Prefix put event detected: ${prefix}, key: ${kv.key}`); + logger.trace(`Key put event detected: ${key}`); try { const value = kv.value.toString(); let parsedValue; @@ -127,10 +275,10 @@ class EtcdServer { } catch { parsedValue = value; } - callback(kv.key.toString(), parsedValue, kv, previous); + callback(key, parsedValue, kv, previous); } catch (error) { logger.error( - `Error in onPrefixEvent put callback for prefix ${prefix}:`, + `Error in onKeyEvent put callback for key ${key}:`, error ); } @@ -138,98 +286,80 @@ class EtcdServer { // Handle delete events watcher.on('delete', (kv, previous) => { - logger.debug( - `Prefix delete event detected: ${prefix}, key: ${kv.key}` - ); + logger.trace(`Key delete event detected: ${key}`); try { - callback(kv.key.toString(), null, kv, previous); + callback(key, null, kv, previous); } catch (error) { logger.error( - `Error in onPrefixEvent delete callback for prefix ${prefix}:`, + `Error in onKeyEvent delete callback for key ${key}:`, error ); } }); // Store watcher with a unique key - const watcherKey = `event:${prefix}`; + const watcherKey = `event:key:${key}`; this.watchers.set(watcherKey, watcher); }); } - async onPrefixPut(prefix, callback) { - const client = await this.getClient(); - logger.debug(`Setting up watcher for prefix put: ${prefix}`); + async removePrefixWatcher(prefix, owner, type = 'put') { + const store = + type === 'put' ? this.prefixPutWatchers : this.prefixDeleteWatchers; + const entry = store.get(prefix); - client - .watch() - .prefix(prefix) - .create() - .then(watcher => { - watcher.on('put', (kv, previous) => { - logger.debug(`Prefix put event detected: ${prefix}, key: ${kv.key}`); - try { - const value = kv.value.toString(); - let parsedValue; - try { - parsedValue = JSON.parse(value); - } catch { - parsedValue = value; - } - callback(kv.key.toString(), parsedValue, kv, previous); - } catch (error) { - logger.error( - `Error in onPrefixPut callback for prefix ${prefix}:`, - error - ); - } - }); - - this.watchers.set(`put:${prefix}`, watcher); - }); - } - - async onPrefixDelete(prefix, callback) { - const client = await this.getClient(); - logger.debug(`Setting up watcher for prefix delete: ${prefix}`); - - client - .watch() - .prefix(prefix) - .create() - .then(watcher => { - watcher.on('delete', (kv, previous) => { - logger.debug( - `Prefix delete event detected: ${prefix}, key: ${kv.key}` - ); - try { - callback(kv.key.toString(), kv, previous); - } catch (error) { - logger.error( - `Error in onPrefixDelete callback for prefix ${prefix}:`, - error - ); - } - }); - - this.watchers.set(`delete:${prefix}`, watcher); - }); - } - - async removeWatcher(prefix, type = 'put') { - const watcherKey = `${type}:${prefix}`; - const watcher = this.watchers.get(watcherKey); - - if (watcher) { - logger.debug(`Removing watcher: ${watcherKey}`); - watcher.removeAllListeners(); - await watcher.close(); - this.watchers.delete(watcherKey); - return true; - } else { - logger.debug(`Watcher not found: ${watcherKey}`); + if (!entry) { + logger.trace(`Watcher not found for prefix: ${prefix}, type: ${type}`); return false; } + + if (entry.callbacks.delete(owner)) { + logger.trace( + `Removed ${type} callback for owner: ${owner} on prefix: ${prefix}` + ); + } else { + logger.trace( + `No ${type} callback found for owner: ${owner} on prefix: ${prefix}` + ); + } + + if (entry.callbacks.size === 0) { + logger.trace(`No callbacks left, stopping ${type} watcher for ${prefix}`); + entry.watcher.removeAllListeners(); + await entry.watcher.cancel(); + store.delete(prefix); + } + + return true; + } + + async removeKeyWatcher(key, owner, type = 'put') { + const store = type === 'put' ? this.keyPutWatchers : this.keyDeleteWatchers; + const entry = store.get(key); + + if (!entry) { + logger.trace(`Watcher not found for key: ${key}, type: ${type}`); + return false; + } + + if (entry.callbacks.delete(owner)) { + logger.trace( + `Removed ${type} callback for owner: ${owner} on key: ${key}` + ); + } else { + logger.trace( + `No ${type} callback found for owner: ${owner} on key: ${key}` + ); + } + + if (entry.callbacks.size === 0) { + logger.trace(`No callbacks left, stopping ${type} watcher for ${key}`); + entry.watcher.removeAllListeners(); + await entry.watcher.cancel(); + store.delete(key); + } + + return true; } async disconnect() { @@ -237,7 +367,7 @@ class EtcdServer { // Stop all watchers for (const [key, watcher] of this.watchers) { - logger.debug(`Stopping watcher: ${key}`); + logger.trace(`Stopping watcher: ${key}`); watcher.removeAllListeners(); await watcher.close(); }