From bec46489d1f083dfe586e4cb830f46060e13a5e3 Mon Sep 17 00:00:00 2001 From: Tom Butcher Date: Fri, 5 Sep 2025 23:23:50 +0100 Subject: [PATCH] Add NATS integration: Implemented NATS server connection and messaging capabilities. Updated package.json and package-lock.json to include NATS dependencies. Enhanced index.js to connect to NATS and added documentJobs route for job management. --- package-lock.json | 52 +++++++ package.json | 1 + src/database/nats.js | 313 +++++++++++++++++++++++++++++++++++++++++++ src/index.js | 12 ++ 4 files changed, 378 insertions(+) create mode 100644 src/database/nats.js diff --git a/package-lock.json b/package-lock.json index 17372b7..f1f6075 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "1.0.0", "license": "ISC", "dependencies": { + "@nats-io/transport-node": "^3.1.0", "axios": "^1.11.0", "bcrypt": "^6.0.0", "body-parser": "^2.2.0", @@ -2924,6 +2925,51 @@ "sparse-bitfield": "^3.0.3" } }, + "node_modules/@nats-io/nats-core": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@nats-io/nats-core/-/nats-core-3.1.0.tgz", + "integrity": "sha512-xsSkLEGGcqNF+Ru8dMjPmKtfbBeq/U4meuJJX4Zi+5TBHpjpjNjs4YkCBC/pGYWnEum1/vdNPizjE1RdNHCyBg==", + "license": "Apache-2.0", + "dependencies": { + "@nats-io/nkeys": "2.0.3", + "@nats-io/nuid": "2.0.3" + } + }, + "node_modules/@nats-io/nkeys": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/@nats-io/nkeys/-/nkeys-2.0.3.tgz", + "integrity": "sha512-JVt56GuE6Z89KUkI4TXUbSI9fmIfAmk6PMPknijmuL72GcD+UgIomTcRWiNvvJKxA01sBbmIPStqJs5cMRBC3A==", + "license": "Apache-2.0", + "dependencies": { + "tweetnacl": "^1.0.3" + }, + "engines": { + "node": ">=18.0.0" + } + }, + "node_modules/@nats-io/nuid": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/@nats-io/nuid/-/nuid-2.0.3.tgz", + "integrity": "sha512-TpA3HEBna/qMVudy+3HZr5M3mo/L1JPofpVT4t0HkFGkz2Cn9wrlrQC8tvR8Md5Oa9//GtGG26eN0qEWF5Vqew==", + "license": "Apache-2.0", + "engines": { + "node": ">= 18.x" + } + }, + "node_modules/@nats-io/transport-node": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@nats-io/transport-node/-/transport-node-3.1.0.tgz", + "integrity": "sha512-k5pH7IOKUetwXOMraVgcB5zG0wibcHOwJJuyuY1/5Q4K0XfBJDnb/IbczP5/JJWwMYfxSL9O+46ojtdBHvHRSw==", + "license": "Apache-2.0", + "dependencies": { + "@nats-io/nats-core": "3.1.0", + "@nats-io/nkeys": "2.0.3", + "@nats-io/nuid": "2.0.3" + }, + "engines": { + "node": ">= 18.0.0" + } + }, "node_modules/@nicolo-ribaudo/chokidar-2": { "version": "2.1.8-no-fsevents.3", "resolved": "https://registry.npmjs.org/@nicolo-ribaudo/chokidar-2/-/chokidar-2-2.1.8-no-fsevents.3.tgz", @@ -11193,6 +11239,12 @@ "license": "0BSD", "optional": true }, + "node_modules/tweetnacl": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-1.0.3.tgz", + "integrity": "sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==", + "license": "Unlicense" + }, "node_modules/type-check": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz", diff --git a/package.json b/package.json index fd86c27..e6a19a0 100644 --- a/package.json +++ b/package.json @@ -4,6 +4,7 @@ "description": "", "main": "index.js", "dependencies": { + "@nats-io/transport-node": "^3.1.0", "axios": "^1.11.0", "bcrypt": "^6.0.0", "body-parser": "^2.2.0", diff --git a/src/database/nats.js b/src/database/nats.js new file mode 100644 index 0000000..abf0372 --- /dev/null +++ b/src/database/nats.js @@ -0,0 +1,313 @@ +import { connect } from '@nats-io/transport-node'; +import log4js from 'log4js'; +import dotenv from 'dotenv'; + +dotenv.config(); + +const NATS_HOST = process.env.NATS_HOST || 'localhost'; +const NATS_PORT = process.env.NATS_PORT || 4222; +const LOG_LEVEL = process.env.LOG_LEVEL || 'info'; + +const logger = log4js.getLogger('Nats'); +logger.level = LOG_LEVEL; + +class NatsServer { + constructor() { + this.client = null; + this.subscriptions = new Map(); // subject → { subscription, callbacks } + this.requestHandlers = new Map(); // subject → { handler, callbacks } + this.queuedSubscriptions = new Map(); // subject → { subscription, callbacks, queue } + this.servers = [`nats://${NATS_HOST}:${NATS_PORT}`]; + this.textEncoder = new TextEncoder(); + this.textDecoder = new TextDecoder(); + + logger.trace(`NatsServer: servers set to ${JSON.stringify(this.servers)}`); + } + + async connect() { + if (!this.client) { + logger.info('Connecting to NATS...'); + logger.trace(`Creating NATS client with servers ${JSON.stringify(this.servers)}`); + + try { + this.client = await connect({ + servers: this.servers, + reconnect: true, + maxReconnectAttempts: -1, // unlimited reconnects + reconnectTimeWait: 1000, + timeout: 20000, + }); + + // Test connection by checking if client is connected + try { + if (this.client.isClosed()) { + throw new Error('NATS client connection failed'); + } + logger.trace('NATS client connected successfully.'); + } catch (error) { + throw error; + } + } catch (error) { + logger.error('Failed to connect to NATS:', error); + throw error; + } + } else { + logger.trace('NATS client already exists, skipping connection.'); + } + return this.client; + } + + async getClient() { + if (!this.client) { + logger.trace('No client found, calling connect().'); + await this.connect(); + } + return this.client; + } + + async publish(subject, data) { + const client = await this.getClient(); + const payload = typeof data === 'string' ? data : JSON.stringify(data); + + try { + client.publish(subject, this.textEncoder.encode(payload)); + logger.trace(`Published to subject: ${subject}, data: ${payload}`); + return { success: true }; + } catch (error) { + logger.error(`Failed to publish to subject ${subject}:`, error); + throw error; + } + } + + async request(subject, data, timeout = 30000) { + const client = await this.getClient(); + const payload = typeof data === 'string' ? data : JSON.stringify(data); + + try { + const response = await client.request(subject, this.textEncoder.encode(payload), { + timeout: timeout, + }); + + const responseData = this.textDecoder.decode(response.data); + logger.trace(`Request to subject: ${subject}, response: ${responseData}`); + + // Try to parse as JSON, fallback to string + try { + return JSON.parse(responseData); + } catch { + return responseData; + } + } catch (error) { + if (error.code === 'TIMEOUT') { + logger.trace(`Request timeout for subject: ${subject}`); + return null; + } + throw error; + } + } + + async subscribe(subject, owner, callback) { + const client = await this.getClient(); + const subscriptionKey = subject; + + if (this.subscriptions.has(subscriptionKey)) { + this.subscriptions.get(subscriptionKey).callbacks.set(owner, callback); + logger.trace(`Added subscription callback for owner=${owner} on subject=${subject}`); + return { success: true }; + } + + logger.trace(`Creating new subscription for subject: ${subject}`); + const subscription = client.subscribe(subject); + const callbacks = new Map(); + callbacks.set(owner, callback); + + (async () => { + for await (const msg of subscription) { + logger.trace(`Message received on subject: ${subject}`); + const data = this.textDecoder.decode(msg.data); + let parsedData; + + try { + parsedData = JSON.parse(data); + } catch { + parsedData = data; + } + + for (const [ownerId, cb] of callbacks) { + try { + cb(subject, parsedData, msg); + } catch (err) { + logger.error( + `Error in subscription callback for owner=${ownerId}, subject=${subject}:`, + err + ); + } + } + } + })().catch((err) => { + logger.error(`Subscription error for subject ${subject}:`, err); + }); + + this.subscriptions.set(subscriptionKey, { subscription, callbacks }); + return { success: true }; + } + + async setRequestHandler(subject, owner, handler) { + const client = await this.getClient(); + const handlerKey = subject; + + if (this.requestHandlers.has(handlerKey)) { + this.requestHandlers.get(handlerKey).callbacks.set(owner, handler); + logger.trace(`Added request handler for owner=${owner} on subject=${subject}`); + return { success: true }; + } + + logger.trace(`Creating new request handler for subject: ${subject}`); + const subscription = client.subscribe(subject); + const callbacks = new Map(); + callbacks.set(owner, handler); + + (async () => { + for await (const msg of subscription) { + logger.trace(`Request received on subject: ${subject}`); + const data = this.textDecoder.decode(msg.data); + let parsedData; + + try { + parsedData = JSON.parse(data); + } catch { + parsedData = data; + } + + for (const [ownerId, cb] of callbacks) { + try { + const response = await cb(subject, parsedData, msg); + const responsePayload = + typeof response === 'string' ? response : JSON.stringify(response); + msg.respond(this.textEncoder.encode(responsePayload)); + } catch (err) { + logger.error(`Error in request handler for owner=${ownerId}, subject=${subject}:`, err); + // Send error response + msg.respond(this.textEncoder.encode(JSON.stringify({ error: err.message }))); + } + } + } + })().catch((err) => { + logger.error(`Request handler error for subject ${subject}:`, err); + }); + + this.requestHandlers.set(handlerKey, { subscription, callbacks }); + return { success: true }; + } + + async removeSubscription(subject, owner) { + const entry = this.subscriptions.get(subject); + + if (!entry) { + logger.trace(`Subscription not found for subject: ${subject}`); + return false; + } + + if (entry.callbacks.delete(owner)) { + logger.trace(`Removed subscription callback for owner: ${owner} on subject: ${subject}`); + } else { + logger.trace(`No subscription callback found for owner: ${owner} on subject: ${subject}`); + } + + if (entry.callbacks.size === 0) { + logger.trace(`No callbacks left, stopping subscription for ${subject}`); + entry.subscription.unsubscribe(); + this.subscriptions.delete(subject); + } + + return true; + } + + async removeQueuedSubscription(subject, queue, owner) { + const subscriptionKey = `${subject}:${queue}`; + const entry = this.queuedSubscriptions.get(subscriptionKey); + + if (!entry) { + logger.trace(`Queued subscription not found for subject: ${subject}, queue: ${queue}`); + return false; + } + + if (entry.callbacks.delete(owner)) { + logger.trace( + `Removed queued subscription callback for owner: ${owner} on subject: ${subject}, queue: ${queue}` + ); + } else { + logger.trace( + `No queued subscription callback found for owner: ${owner} on subject: ${subject}, queue: ${queue}` + ); + } + + if (entry.callbacks.size === 0) { + logger.trace( + `No callbacks left, stopping queued subscription for ${subject}, queue: ${queue}` + ); + entry.subscription.unsubscribe(); + this.queuedSubscriptions.delete(subscriptionKey); + } + + return true; + } + + async removeRequestHandler(subject, owner) { + const entry = this.requestHandlers.get(subject); + + if (!entry) { + logger.trace(`Request handler not found for subject: ${subject}`); + return false; + } + + if (entry.callbacks.delete(owner)) { + logger.trace(`Removed request handler for owner: ${owner} on subject: ${subject}`); + } else { + logger.trace(`No request handler found for owner: ${owner} on subject: ${subject}`); + } + + if (entry.callbacks.size === 0) { + logger.trace(`No handlers left, stopping request handler for ${subject}`); + entry.subscription.unsubscribe(); + this.requestHandlers.delete(subject); + } + + return true; + } + + async disconnect() { + logger.info('Disconnecting from NATS...'); + + // Stop all subscriptions + for (const [subject, entry] of this.subscriptions) { + logger.trace(`Stopping subscription: ${subject}`); + entry.subscription.unsubscribe(); + } + this.subscriptions.clear(); + + // Stop all queued subscriptions + for (const [key, entry] of this.queuedSubscriptions) { + logger.trace(`Stopping queued subscription: ${key}`); + entry.subscription.unsubscribe(); + } + this.queuedSubscriptions.clear(); + + // Stop all request handlers + for (const [subject, entry] of this.requestHandlers) { + logger.trace(`Stopping request handler: ${subject}`); + entry.subscription.unsubscribe(); + } + this.requestHandlers.clear(); + + if (this.client) { + await this.client.close(); + this.client = null; + logger.info('Disconnected from NATS'); + } + } +} + +const natsServer = new NatsServer(); + +export { NatsServer, natsServer }; diff --git a/src/index.js b/src/index.js index 9e3121f..8bfc093 100644 --- a/src/index.js +++ b/src/index.js @@ -28,6 +28,7 @@ import { documentSizesRoutes, documentTemplatesRoutes, documentPrintersRoutes, + documentJobsRoutes, } from './routes/index.js'; import path from 'path'; import * as fs from 'fs'; @@ -36,6 +37,7 @@ import ReseedAction from './database/ReseedAction.js'; import log4js from 'log4js'; import { etcdServer } from './database/etcd.js'; import { populateUserMiddleware } from './services/misc/auth.js'; +import { natsServer } from './database/nats.js'; dotenv.config(); @@ -70,6 +72,15 @@ try { throw err; } +// Connect to NATS (await) +try { + natsServer.connect(); + logger.info('Connected to NATS'); +} catch (err) { + logger.error('Failed to connect to NATS:', err); + throw err; +} + app.use(cors(corsOptions)); app.use(bodyParser.json({ type: 'application/json', strict: false, limit: '50mb' })); app.use(express.json()); @@ -104,6 +115,7 @@ app.use('/notetypes', noteTypeRoutes); app.use('/documentsizes', documentSizesRoutes); app.use('/documenttemplates', documentTemplatesRoutes); app.use('/documentprinters', documentPrintersRoutes); +app.use('/documentjobs', documentJobsRoutes); app.use('/notes', noteRoutes); if (process.env.SCHEDULE_HOUR) {