From 7ccc4bb9933befe755e9496401c7510b6f59ea19 Mon Sep 17 00:00:00 2001 From: Tom Butcher Date: Fri, 5 Sep 2025 23:29:14 +0100 Subject: [PATCH] Add NATS messaging support - Introduced a new NatsServer class for managing NATS connections and messaging. - Implemented connection handling, publishing, requesting, and subscription management. - Updated index.js to connect to NATS on application startup and handle connection errors. - Enhanced logging for NATS operations to improve traceability and error management. --- src/database/nats.js | 307 +++++++++++++++++++++++++++++++++++ src/index.js | 10 ++ src/updates/updatemanager.js | 84 ++++------ 3 files changed, 352 insertions(+), 49 deletions(-) create mode 100644 src/database/nats.js diff --git a/src/database/nats.js b/src/database/nats.js new file mode 100644 index 0000000..d021ce1 --- /dev/null +++ b/src/database/nats.js @@ -0,0 +1,307 @@ +import { connect } from '@nats-io/transport-node'; +import log4js from 'log4js'; +import { loadConfig } from '../config.js'; + +const config = loadConfig(); +const logger = log4js.getLogger('Nats'); +logger.level = config.server.logLevel; + +class NatsServer { + constructor() { + this.client = null; + this.subscriptions = new Map(); // subject → { subscription, callbacks } + this.requestHandlers = new Map(); // subject → { handler, callbacks } + this.queuedSubscriptions = new Map(); // subject → { subscription, callbacks, queue } + + const natsConfig = config.database?.nats || config.database; // fallback for production config + const host = natsConfig.host || 'localhost'; + const port = natsConfig.port || 4222; + this.servers = [`nats://${host}:${port}`]; + this.textEncoder = new TextEncoder(); + this.textDecoder = new TextDecoder(); + + logger.trace(`NatsServer: servers set to ${JSON.stringify(this.servers)}`); + } + + async connect() { + if (!this.client) { + logger.info('Connecting to NATS...'); + logger.trace( + `Creating NATS client with servers ${JSON.stringify(this.servers)}` + ); + + try { + this.client = await connect({ + servers: this.servers, + reconnect: true, + maxReconnectAttempts: -1, // unlimited reconnects + reconnectTimeWait: 1000, + timeout: 20000 + }); + + // Test connection by checking if client is connected + try { + if (this.client.isClosed()) { + throw new Error('NATS client connection failed'); + } + logger.trace('NATS client connected successfully.'); + } catch (error) { + throw error; + } + } catch (error) { + logger.error('Failed to connect to NATS:', error); + throw error; + } + } else { + logger.trace('NATS client already exists, skipping connection.'); + } + return this.client; + } + + async getClient() { + if (!this.client) { + logger.trace('No client found, calling connect().'); + await this.connect(); + } + return this.client; + } + + async publish(subject, data) { + const client = await this.getClient(); + const payload = typeof data === 'string' ? data : JSON.stringify(data); + + try { + client.publish(subject, this.textEncoder.encode(payload)); + logger.trace(`Published to subject: ${subject}, data: ${payload}`); + return { success: true }; + } catch (error) { + logger.error(`Failed to publish to subject ${subject}:`, error); + throw error; + } + } + + async request(subject, data, timeout = 30000) { + const client = await this.getClient(); + const payload = typeof data === 'string' ? data : JSON.stringify(data); + + try { + const response = await client.request( + subject, + this.textEncoder.encode(payload), + { + timeout: timeout + } + ); + + const responseData = this.textDecoder.decode(response.data); + logger.trace(`Request to subject: ${subject}, response: ${responseData}`); + + // Try to parse as JSON, fallback to string + try { + return JSON.parse(responseData); + } catch { + return responseData; + } + } catch (error) { + if (error.code === 'TIMEOUT') { + logger.trace(`Request timeout for subject: ${subject}`); + return null; + } + throw error; + } + } + + async subscribe(subject, owner, callback) { + const client = await this.getClient(); + const subscriptionKey = subject; + + if (this.subscriptions.has(subscriptionKey)) { + this.subscriptions.get(subscriptionKey).callbacks.set(owner, callback); + logger.trace( + `Added subscription callback for owner=${owner} on subject=${subject}` + ); + return { success: true }; + } + + logger.trace(`Creating new subscription for subject: ${subject}`); + const subscription = client.subscribe(subject); + const callbacks = new Map(); + callbacks.set(owner, callback); + + (async () => { + for await (const msg of subscription) { + logger.trace(`Message received on subject: ${subject}`); + const data = this.textDecoder.decode(msg.data); + let parsedData; + + try { + parsedData = JSON.parse(data); + } catch { + parsedData = data; + } + + for (const [ownerId, cb] of callbacks) { + try { + cb(subject, parsedData, msg); + } catch (err) { + logger.error( + `Error in subscription callback for owner=${ownerId}, subject=${subject}:`, + err + ); + } + } + } + })().catch(err => { + logger.error(`Subscription error for subject ${subject}:`, err); + }); + + this.subscriptions.set(subscriptionKey, { subscription, callbacks }); + return { success: true }; + } + + async setRequestHandler(subject, owner, handler) { + const client = await this.getClient(); + const handlerKey = subject; + + if (this.requestHandlers.has(handlerKey)) { + this.requestHandlers.get(handlerKey).callbacks.set(owner, handler); + logger.trace( + `Added request handler for owner=${owner} on subject=${subject}` + ); + return { success: true }; + } + + logger.trace(`Creating new request handler for subject: ${subject}`); + const subscription = client.subscribe(subject); + const callbacks = new Map(); + callbacks.set(owner, handler); + + (async () => { + for await (const msg of subscription) { + logger.trace(`Request received on subject: ${subject}`); + const data = this.textDecoder.decode(msg.data); + let parsedData; + + try { + parsedData = JSON.parse(data); + } catch { + parsedData = data; + } + + for (const [ownerId, cb] of callbacks) { + try { + const response = await cb(subject, parsedData, msg); + const responsePayload = + typeof response === 'string' + ? response + : JSON.stringify(response); + msg.respond(this.textEncoder.encode(responsePayload)); + } catch (err) { + logger.error( + `Error in request handler for owner=${ownerId}, subject=${subject}:`, + err + ); + // Send error response + msg.respond( + this.textEncoder.encode(JSON.stringify({ error: err.message })) + ); + } + } + } + })().catch(err => { + logger.error(`Request handler error for subject ${subject}:`, err); + }); + + this.requestHandlers.set(handlerKey, { subscription, callbacks }); + return { success: true }; + } + + async removeSubscription(subject, owner) { + const entry = this.subscriptions.get(subject); + + if (!entry) { + logger.trace(`Subscription not found for subject: ${subject}`); + return false; + } + + if (entry.callbacks.delete(owner)) { + logger.trace( + `Removed subscription callback for owner: ${owner} on subject: ${subject}` + ); + } else { + logger.trace( + `No subscription callback found for owner: ${owner} on subject: ${subject}` + ); + } + + if (entry.callbacks.size === 0) { + logger.trace(`No callbacks left, stopping subscription for ${subject}`); + entry.subscription.unsubscribe(); + this.subscriptions.delete(subject); + } + + return true; + } + + async removeRequestHandler(subject, owner) { + const entry = this.requestHandlers.get(subject); + + if (!entry) { + logger.trace(`Request handler not found for subject: ${subject}`); + return false; + } + + if (entry.callbacks.delete(owner)) { + logger.trace( + `Removed request handler for owner: ${owner} on subject: ${subject}` + ); + } else { + logger.trace( + `No request handler found for owner: ${owner} on subject: ${subject}` + ); + } + + if (entry.callbacks.size === 0) { + logger.trace(`No handlers left, stopping request handler for ${subject}`); + entry.subscription.unsubscribe(); + this.requestHandlers.delete(subject); + } + + return true; + } + + async disconnect() { + logger.info('Disconnecting from NATS...'); + + // Stop all subscriptions + for (const [subject, entry] of this.subscriptions) { + logger.trace(`Stopping subscription: ${subject}`); + entry.subscription.unsubscribe(); + } + this.subscriptions.clear(); + + // Stop all queued subscriptions + for (const [key, entry] of this.queuedSubscriptions) { + logger.trace(`Stopping queued subscription: ${key}`); + entry.subscription.unsubscribe(); + } + this.queuedSubscriptions.clear(); + + // Stop all request handlers + for (const [subject, entry] of this.requestHandlers) { + logger.trace(`Stopping request handler: ${subject}`); + entry.subscription.unsubscribe(); + } + this.requestHandlers.clear(); + + if (this.client) { + await this.client.close(); + this.client = null; + logger.info('Disconnected from NATS'); + } + } +} + +const natsServer = new NatsServer(); + +export { NatsServer, natsServer }; diff --git a/src/index.js b/src/index.js index 6a04ede..8ab99a5 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,7 @@ import { loadConfig } from './config.js'; import { SocketManager } from './socket/socketmanager.js'; import { etcdServer } from './database/etcd.js'; +import { natsServer } from './database/nats.js'; import express from 'express'; import log4js from 'log4js'; import http from 'http'; @@ -29,6 +30,15 @@ import { mongoServer } from './database/mongo.js'; throw err; } + // Connect to NATS (await) + try { + await natsServer.connect(); + logger.info('Connected to NATS'); + } catch (err) { + logger.error('Failed to connect to NATS:', err); + throw err; + } + // Connect to Mongo DB (await) try { await mongoServer.connect(); diff --git a/src/updates/updatemanager.js b/src/updates/updatemanager.js index e28dc3a..3665f11 100644 --- a/src/updates/updatemanager.js +++ b/src/updates/updatemanager.js @@ -1,9 +1,6 @@ import log4js from 'log4js'; import { loadConfig } from '../config.js'; -import NodeCache from 'node-cache'; -import { etcdServer } from '../database/etcd.js'; -import { updateObjectCache } from '../database/database.js'; - +import { natsServer } from '../database/nats.js'; const config = loadConfig(); // Setup logger @@ -19,13 +16,28 @@ export class UpdateManager { } async subscribeToObjectNew(objectType) { - await etcdServer.onKeyPutEvent( - `/${objectType}s/new`, + await natsServer.subscribe( + `${objectType}s.new`, this.socketClient.socketId, (key, value) => { logger.trace('Object new event:', value); this.socketClient.socket.emit('objectNew', { - _id: value, + object: value, + objectType: objectType + }); + } + ); + return { success: true }; + } + + async subscribeToObjectDelete(objectType) { + await natsServer.subscribe( + `${objectType}s.delete`, + this.socketClient.socketId, + (key, value) => { + logger.trace('Object delete event:', value); + this.socketClient.socket.emit('objectDelete', { + object: value, objectType: objectType }); } @@ -34,8 +46,8 @@ export class UpdateManager { } async subscribeToObjectUpdate(id, objectType) { - await etcdServer.onKeyPutEvent( - `/${objectType}s/${id}/object`, + await natsServer.subscribe( + `${objectType}s.${id}.object`, this.socketClient.socketId, (key, value) => { logger.trace('Object update event:', id); @@ -50,52 +62,26 @@ export class UpdateManager { } async removeObjectNewListener(objectType) { - await etcdServer.removeKeyWatcher( - `/${objectType}s/new`, - this.socketClient.socketId, - 'put' + await natsServer.removeSubscription( + `${objectType}s.new`, + this.socketClient.socketId + ); + return { success: true }; + } + + async removeObjectDeleteListener(objectType) { + await natsServer.removeSubscription( + `${objectType}s.delete`, + this.socketClient.socketId ); return { success: true }; } async removeObjectUpdateListener(id, objectType) { - await etcdServer.removeKeyWatcher( - `/${objectType}s/${id}/object`, - this.socketClient.socketId, - 'put' + await natsServer.removeSubscription( + `${objectType}s.${id}.object`, + this.socketClient.socketId ); return { success: true }; } - - async getObjectUpdate(id, objectType) { - try { - const objectUpdate = { - _id: id, - objectType: objectType, - object: await etcdServer.get(`/${objectType}s/${id}/object`) - }; - logger.trace(`Returning path: /${objectType}s/${id}/object`); - return objectUpdate; - } catch (error) { - logger.error( - `UpdateManager: Failed to get current value for /${objectType}s/${id}/object:`, - error - ); - return { error: 'Not found' }; - } - } - - async setObjectUpdate(id, objectType, value) { - try { - await etcdServer.set(`/${objectType}s/${id}/object`, value); - logger.trace(`Set value for path: /${objectType}s/${id}/object`); - return true; - } catch (error) { - logger.error( - `Failed to set value for /${objectType}s/${id}/object:`, - error - ); - return false; - } - } }