diff --git a/src/database/etcd.js b/src/database/etcd.js deleted file mode 100644 index f81fe96..0000000 --- a/src/database/etcd.js +++ /dev/null @@ -1,386 +0,0 @@ -import { Etcd3 } from 'etcd3'; -import log4js from 'log4js'; -import { loadConfig } from '../config.js'; - -const config = loadConfig(); -const logger = log4js.getLogger('Etcd'); -logger.level = config.server.logLevel; - -class EtcdServer { - constructor() { - this.client = null; - this.prefixPutWatchers = new Map(); // prefix → { watcher, callbacks } - this.prefixDeleteWatchers = new Map(); // prefix → { watcher, callbacks } - this.keyPutWatchers = new Map(); // key → { watcher, callbacks } - this.keyDeleteWatchers = new Map(); // key → { watcher, callbacks } - const etcdConfig = config.database?.etcd || config.database; // fallback for production config - const host = etcdConfig.host || 'localhost'; - const port = etcdConfig.port || 2379; - this.hosts = [`${host}:${port}`]; - logger.trace(`EtcdServer: hosts set to ${JSON.stringify(this.hosts)}`); - } - - async connect() { - if (!this.client) { - logger.info('Connecting to Etcd...'); - logger.trace( - `Creating Etcd client with hosts ${JSON.stringify(this.hosts)}` - ); - this.client = new Etcd3({ - hosts: this.hosts - }); - - // Test connection - try { - await this.client.get('test-connection').string(); - logger.trace('Etcd client connected successfully.'); - } catch (error) { - if (error.code === 'NOT_FOUND') { - logger.trace( - 'Etcd client connected successfully (test key not found as expected).' - ); - } else { - throw error; - } - } - } else { - logger.trace('Etcd client already exists, skipping connection.'); - } - return this.client; - } - - async getClient() { - if (!this.client) { - logger.trace('No client found, calling connect().'); - await this.connect(); - } - return this.client; - } - - async setKey(key, value) { - const client = await this.getClient(); - const stringValue = - typeof value === 'string' ? value : JSON.stringify(value); - - await client.put(key).value(stringValue); - logger.trace(`Set key: ${key}, value: ${stringValue}`); - return true; - } - - async getKey(key) { - const client = await this.getClient(); - - try { - const value = await client.get(key).string(); - logger.trace(`Retrieved key: ${key}, value: ${value}`); - - // Try to parse as JSON, fallback to string - try { - return JSON.parse(value); - } catch { - return value; - } - } catch (error) { - if (error.code === 'NOT_FOUND') { - logger.trace(`Key not found: ${key}`); - return null; - } - throw error; - } - } - - async deleteKey(key) { - const client = await this.getClient(); - - try { - await client.delete().key(key); - logger.trace(`Deleted key: ${key}`); - return { success: true }; - } catch (error) { - if (error.code === 'NOT_FOUND') { - const error = `Key not found for deletion.`; - console.log(error, 'Key:', key); - return { error: error }; - } - throw error; - } - } - - async onPrefixPutEvent(prefix, owner, callback) { - const client = await this.getClient(); - const watcherKey = prefix; - - if (this.prefixPutWatchers.has(watcherKey)) { - this.prefixPutWatchers.get(watcherKey).callbacks.set(owner, callback); - logger.trace(`Added put callback for owner=${owner} on prefix=${prefix}`); - return; - } - - logger.trace(`Creating new put watcher for prefix: ${prefix}`); - const watcher = await client.watch().prefix(prefix).create(); - const callbacks = new Map(); - callbacks.set(owner, callback); - - watcher.on('put', (kv, previous) => { - logger.trace(`Prefix put event detected: ${prefix}, key: ${kv.key}`); - const valueStr = kv.value.toString(); - let parsedValue; - try { - parsedValue = JSON.parse(valueStr); - } catch { - parsedValue = valueStr; - } - - for (const [ownerId, cb] of callbacks) { - try { - cb(kv.key.toString(), parsedValue, kv, previous); - } catch (err) { - logger.error( - `Error in onPrefixPutEvent callback for owner=${ownerId}, prefix=${prefix}:`, - err - ); - } - } - }); - - this.prefixPutWatchers.set(watcherKey, { watcher, callbacks }); - return { success: true }; - } - - async onPrefixDeleteEvent(prefix, owner, callback) { - const client = await this.getClient(); - const watcherKey = prefix; - - if (this.prefixDeleteWatchers.has(watcherKey)) { - this.prefixDeleteWatchers.get(watcherKey).callbacks.set(owner, callback); - logger.trace( - `Added delete callback for owner=${owner} on prefix=${prefix}` - ); - return; - } - - logger.trace(`Creating new delete watcher for prefix: ${prefix}`); - const watcher = await client.watch().prefix(prefix).create(); - const callbacks = new Map(); - callbacks.set(owner, callback); - - watcher.on('delete', (kv, previous) => { - logger.trace(`Prefix delete event detected: ${prefix}, key: ${kv.key}`); - for (const [ownerId, cb] of callbacks) { - try { - cb(kv.key.toString(), kv, previous); - } catch (err) { - logger.error( - `Error in onPrefixDeleteEvent callback for owner=${ownerId}, prefix=${prefix}:`, - err - ); - } - } - }); - - this.prefixDeleteWatchers.set(watcherKey, { watcher, callbacks }); - return { success: true }; - } - - async onKeyPutEvent(key, owner, callback) { - const client = await this.getClient(); - const watcherKey = key; - - if (this.keyPutWatchers.has(watcherKey)) { - this.keyPutWatchers.get(watcherKey).callbacks.set(owner, callback); - logger.trace(`Added put callback for owner: ${owner}, on key: ${key}`); - return; - } - - logger.trace(`Creating new put watcher for key: ${key}`); - const watcher = await client.watch().key(key).create(); - const callbacks = new Map(); - callbacks.set(owner, callback); - - watcher.on('put', (kv, previous) => { - logger.trace(`Key put event detected: ${key}, key: ${kv.key}`); - const valueStr = kv.value.toString(); - let parsedValue; - try { - parsedValue = JSON.parse(valueStr); - } catch { - parsedValue = valueStr; - } - - for (const [ownerId, cb] of callbacks) { - try { - cb(kv.key.toString(), parsedValue, kv, previous); - } catch (err) { - logger.error( - `Error in onKeyPutEvent callback for owner: ${ownerId}, key: ${key}:`, - err - ); - } - } - }); - - this.keyPutWatchers.set(watcherKey, { watcher, callbacks }); - return { success: true }; - } - - async onKeyDeleteEvent(key, owner, callback) { - const client = await this.getClient(); - const watcherKey = key; - - if (this.keyDeleteWatchers.has(watcherKey)) { - this.keyDeleteWatchers.get(watcherKey).callbacks.set(owner, callback); - logger.trace(`Added delete callback for owner: ${owner} on key: ${key}`); - return; - } - - logger.trace(`Creating new delete watcher for key: ${key}`); - const watcher = await client.watch().key(key).create(); - const callbacks = new Map(); - callbacks.set(owner, callback); - - watcher.on('delete', (kv, previous) => { - logger.trace(`Key delete event detected: ${key}, key: ${kv.key}`); - for (const [ownerId, cb] of callbacks) { - try { - cb(kv.key.toString(), kv, previous); - } catch (err) { - logger.error( - `Error in onKeyDeleteEvent callback for owner=${ownerId}, key=${key}:`, - err - ); - } - } - }); - - this.keyDeleteWatchers.set(watcherKey, { watcher, callbacks }); - } - - async onKeyEvent(key, callback) { - const client = await this.getClient(); - logger.trace(`Setting up watcher for key events: ${key}`); - - client - .watch() - .key(key) - .create() - .then(watcher => { - // Handle put events - watcher.on('put', (kv, previous) => { - logger.trace(`Key put event detected: ${key}`); - try { - const value = kv.value.toString(); - let parsedValue; - try { - parsedValue = JSON.parse(value); - } catch { - parsedValue = value; - } - callback(key, parsedValue, kv, previous); - } catch (error) { - logger.error( - `Error in onKeyEvent put callback for key ${key}:`, - error - ); - } - }); - - // Handle delete events - watcher.on('delete', (kv, previous) => { - logger.trace(`Key delete event detected: ${key}`); - try { - callback(key, null, kv, previous); - } catch (error) { - logger.error( - `Error in onKeyEvent delete callback for key ${key}:`, - error - ); - } - }); - - // Store watcher with a unique key - const watcherKey = `event:key:${key}`; - this.watchers.set(watcherKey, watcher); - }); - } - - async removePrefixWatcher(prefix, owner, type = 'put') { - const store = - type === 'put' ? this.prefixPutWatchers : this.prefixDeleteWatchers; - const entry = store.get(prefix); - - if (!entry) { - logger.trace(`Watcher not found for prefix: ${prefix}, type: ${type}`); - return false; - } - - if (entry.callbacks.delete(owner)) { - logger.trace( - `Removed ${type} callback for owner: ${owner} on prefix: ${prefix}` - ); - } else { - logger.trace( - `No ${type} callback found for owner: ${owner} on prefix: ${prefix}` - ); - } - - if (entry.callbacks.size === 0) { - logger.trace(`No callbacks left, stopping ${type} watcher for ${prefix}`); - entry.watcher.removeAllListeners(); - await entry.watcher.cancel(); - store.delete(prefix); - } - - return true; - } - - async removeKeyWatcher(key, owner, type = 'put') { - const store = type === 'put' ? this.keyPutWatchers : this.keyDeleteWatchers; - const entry = store.get(key); - - if (!entry) { - logger.trace(`Watcher not found for key: ${key}, type: ${type}`); - return false; - } - - if (entry.callbacks.delete(owner)) { - logger.trace( - `Removed ${type} callback for owner: ${owner} on key: ${key}` - ); - } else { - logger.trace( - `No ${type} callback found for owner: ${owner} on key: ${key}` - ); - } - - if (entry.callbacks.size === 0) { - logger.trace(`No callbacks left, stopping ${type} watcher for ${key}`); - entry.watcher.removeAllListeners(); - await entry.watcher.cancel(); - store.delete(key); - } - - return true; - } - - async disconnect() { - logger.info('Disconnecting from Etcd...'); - - // Stop all watchers - for (const [key, watcher] of this.watchers) { - logger.trace(`Stopping watcher: ${key}`); - watcher.removeAllListeners(); - await watcher.close(); - } - this.watchers.clear(); - - if (this.client) { - await this.client.close(); - this.client = null; - logger.info('Disconnected from Etcd'); - } - } -} - -const etcdServer = new EtcdServer(); - -export { EtcdServer, etcdServer };