Add NATS messaging support

- Introduced a new NatsServer class for managing NATS connections and messaging.
- Implemented connection handling, publishing, requesting, and subscription management.
- Updated index.js to connect to NATS on application startup and handle connection errors.
- Enhanced logging for NATS operations to improve traceability and error management.
This commit is contained in:
Tom Butcher 2025-09-05 23:29:14 +01:00
parent d6214e316b
commit 7ccc4bb993
3 changed files with 352 additions and 49 deletions

307
src/database/nats.js Normal file
View File

@ -0,0 +1,307 @@
import { connect } from '@nats-io/transport-node';
import log4js from 'log4js';
import { loadConfig } from '../config.js';
const config = loadConfig();
const logger = log4js.getLogger('Nats');
logger.level = config.server.logLevel;
class NatsServer {
constructor() {
this.client = null;
this.subscriptions = new Map(); // subject → { subscription, callbacks }
this.requestHandlers = new Map(); // subject → { handler, callbacks }
this.queuedSubscriptions = new Map(); // subject → { subscription, callbacks, queue }
const natsConfig = config.database?.nats || config.database; // fallback for production config
const host = natsConfig.host || 'localhost';
const port = natsConfig.port || 4222;
this.servers = [`nats://${host}:${port}`];
this.textEncoder = new TextEncoder();
this.textDecoder = new TextDecoder();
logger.trace(`NatsServer: servers set to ${JSON.stringify(this.servers)}`);
}
async connect() {
if (!this.client) {
logger.info('Connecting to NATS...');
logger.trace(
`Creating NATS client with servers ${JSON.stringify(this.servers)}`
);
try {
this.client = await connect({
servers: this.servers,
reconnect: true,
maxReconnectAttempts: -1, // unlimited reconnects
reconnectTimeWait: 1000,
timeout: 20000
});
// Test connection by checking if client is connected
try {
if (this.client.isClosed()) {
throw new Error('NATS client connection failed');
}
logger.trace('NATS client connected successfully.');
} catch (error) {
throw error;
}
} catch (error) {
logger.error('Failed to connect to NATS:', error);
throw error;
}
} else {
logger.trace('NATS client already exists, skipping connection.');
}
return this.client;
}
async getClient() {
if (!this.client) {
logger.trace('No client found, calling connect().');
await this.connect();
}
return this.client;
}
async publish(subject, data) {
const client = await this.getClient();
const payload = typeof data === 'string' ? data : JSON.stringify(data);
try {
client.publish(subject, this.textEncoder.encode(payload));
logger.trace(`Published to subject: ${subject}, data: ${payload}`);
return { success: true };
} catch (error) {
logger.error(`Failed to publish to subject ${subject}:`, error);
throw error;
}
}
async request(subject, data, timeout = 30000) {
const client = await this.getClient();
const payload = typeof data === 'string' ? data : JSON.stringify(data);
try {
const response = await client.request(
subject,
this.textEncoder.encode(payload),
{
timeout: timeout
}
);
const responseData = this.textDecoder.decode(response.data);
logger.trace(`Request to subject: ${subject}, response: ${responseData}`);
// Try to parse as JSON, fallback to string
try {
return JSON.parse(responseData);
} catch {
return responseData;
}
} catch (error) {
if (error.code === 'TIMEOUT') {
logger.trace(`Request timeout for subject: ${subject}`);
return null;
}
throw error;
}
}
async subscribe(subject, owner, callback) {
const client = await this.getClient();
const subscriptionKey = subject;
if (this.subscriptions.has(subscriptionKey)) {
this.subscriptions.get(subscriptionKey).callbacks.set(owner, callback);
logger.trace(
`Added subscription callback for owner=${owner} on subject=${subject}`
);
return { success: true };
}
logger.trace(`Creating new subscription for subject: ${subject}`);
const subscription = client.subscribe(subject);
const callbacks = new Map();
callbacks.set(owner, callback);
(async () => {
for await (const msg of subscription) {
logger.trace(`Message received on subject: ${subject}`);
const data = this.textDecoder.decode(msg.data);
let parsedData;
try {
parsedData = JSON.parse(data);
} catch {
parsedData = data;
}
for (const [ownerId, cb] of callbacks) {
try {
cb(subject, parsedData, msg);
} catch (err) {
logger.error(
`Error in subscription callback for owner=${ownerId}, subject=${subject}:`,
err
);
}
}
}
})().catch(err => {
logger.error(`Subscription error for subject ${subject}:`, err);
});
this.subscriptions.set(subscriptionKey, { subscription, callbacks });
return { success: true };
}
async setRequestHandler(subject, owner, handler) {
const client = await this.getClient();
const handlerKey = subject;
if (this.requestHandlers.has(handlerKey)) {
this.requestHandlers.get(handlerKey).callbacks.set(owner, handler);
logger.trace(
`Added request handler for owner=${owner} on subject=${subject}`
);
return { success: true };
}
logger.trace(`Creating new request handler for subject: ${subject}`);
const subscription = client.subscribe(subject);
const callbacks = new Map();
callbacks.set(owner, handler);
(async () => {
for await (const msg of subscription) {
logger.trace(`Request received on subject: ${subject}`);
const data = this.textDecoder.decode(msg.data);
let parsedData;
try {
parsedData = JSON.parse(data);
} catch {
parsedData = data;
}
for (const [ownerId, cb] of callbacks) {
try {
const response = await cb(subject, parsedData, msg);
const responsePayload =
typeof response === 'string'
? response
: JSON.stringify(response);
msg.respond(this.textEncoder.encode(responsePayload));
} catch (err) {
logger.error(
`Error in request handler for owner=${ownerId}, subject=${subject}:`,
err
);
// Send error response
msg.respond(
this.textEncoder.encode(JSON.stringify({ error: err.message }))
);
}
}
}
})().catch(err => {
logger.error(`Request handler error for subject ${subject}:`, err);
});
this.requestHandlers.set(handlerKey, { subscription, callbacks });
return { success: true };
}
async removeSubscription(subject, owner) {
const entry = this.subscriptions.get(subject);
if (!entry) {
logger.trace(`Subscription not found for subject: ${subject}`);
return false;
}
if (entry.callbacks.delete(owner)) {
logger.trace(
`Removed subscription callback for owner: ${owner} on subject: ${subject}`
);
} else {
logger.trace(
`No subscription callback found for owner: ${owner} on subject: ${subject}`
);
}
if (entry.callbacks.size === 0) {
logger.trace(`No callbacks left, stopping subscription for ${subject}`);
entry.subscription.unsubscribe();
this.subscriptions.delete(subject);
}
return true;
}
async removeRequestHandler(subject, owner) {
const entry = this.requestHandlers.get(subject);
if (!entry) {
logger.trace(`Request handler not found for subject: ${subject}`);
return false;
}
if (entry.callbacks.delete(owner)) {
logger.trace(
`Removed request handler for owner: ${owner} on subject: ${subject}`
);
} else {
logger.trace(
`No request handler found for owner: ${owner} on subject: ${subject}`
);
}
if (entry.callbacks.size === 0) {
logger.trace(`No handlers left, stopping request handler for ${subject}`);
entry.subscription.unsubscribe();
this.requestHandlers.delete(subject);
}
return true;
}
async disconnect() {
logger.info('Disconnecting from NATS...');
// Stop all subscriptions
for (const [subject, entry] of this.subscriptions) {
logger.trace(`Stopping subscription: ${subject}`);
entry.subscription.unsubscribe();
}
this.subscriptions.clear();
// Stop all queued subscriptions
for (const [key, entry] of this.queuedSubscriptions) {
logger.trace(`Stopping queued subscription: ${key}`);
entry.subscription.unsubscribe();
}
this.queuedSubscriptions.clear();
// Stop all request handlers
for (const [subject, entry] of this.requestHandlers) {
logger.trace(`Stopping request handler: ${subject}`);
entry.subscription.unsubscribe();
}
this.requestHandlers.clear();
if (this.client) {
await this.client.close();
this.client = null;
logger.info('Disconnected from NATS');
}
}
}
const natsServer = new NatsServer();
export { NatsServer, natsServer };

View File

@ -1,6 +1,7 @@
import { loadConfig } from './config.js';
import { SocketManager } from './socket/socketmanager.js';
import { etcdServer } from './database/etcd.js';
import { natsServer } from './database/nats.js';
import express from 'express';
import log4js from 'log4js';
import http from 'http';
@ -29,6 +30,15 @@ import { mongoServer } from './database/mongo.js';
throw err;
}
// Connect to NATS (await)
try {
await natsServer.connect();
logger.info('Connected to NATS');
} catch (err) {
logger.error('Failed to connect to NATS:', err);
throw err;
}
// Connect to Mongo DB (await)
try {
await mongoServer.connect();

View File

@ -1,9 +1,6 @@
import log4js from 'log4js';
import { loadConfig } from '../config.js';
import NodeCache from 'node-cache';
import { etcdServer } from '../database/etcd.js';
import { updateObjectCache } from '../database/database.js';
import { natsServer } from '../database/nats.js';
const config = loadConfig();
// Setup logger
@ -19,13 +16,28 @@ export class UpdateManager {
}
async subscribeToObjectNew(objectType) {
await etcdServer.onKeyPutEvent(
`/${objectType}s/new`,
await natsServer.subscribe(
`${objectType}s.new`,
this.socketClient.socketId,
(key, value) => {
logger.trace('Object new event:', value);
this.socketClient.socket.emit('objectNew', {
_id: value,
object: value,
objectType: objectType
});
}
);
return { success: true };
}
async subscribeToObjectDelete(objectType) {
await natsServer.subscribe(
`${objectType}s.delete`,
this.socketClient.socketId,
(key, value) => {
logger.trace('Object delete event:', value);
this.socketClient.socket.emit('objectDelete', {
object: value,
objectType: objectType
});
}
@ -34,8 +46,8 @@ export class UpdateManager {
}
async subscribeToObjectUpdate(id, objectType) {
await etcdServer.onKeyPutEvent(
`/${objectType}s/${id}/object`,
await natsServer.subscribe(
`${objectType}s.${id}.object`,
this.socketClient.socketId,
(key, value) => {
logger.trace('Object update event:', id);
@ -50,52 +62,26 @@ export class UpdateManager {
}
async removeObjectNewListener(objectType) {
await etcdServer.removeKeyWatcher(
`/${objectType}s/new`,
this.socketClient.socketId,
'put'
await natsServer.removeSubscription(
`${objectType}s.new`,
this.socketClient.socketId
);
return { success: true };
}
async removeObjectDeleteListener(objectType) {
await natsServer.removeSubscription(
`${objectType}s.delete`,
this.socketClient.socketId
);
return { success: true };
}
async removeObjectUpdateListener(id, objectType) {
await etcdServer.removeKeyWatcher(
`/${objectType}s/${id}/object`,
this.socketClient.socketId,
'put'
await natsServer.removeSubscription(
`${objectType}s.${id}.object`,
this.socketClient.socketId
);
return { success: true };
}
async getObjectUpdate(id, objectType) {
try {
const objectUpdate = {
_id: id,
objectType: objectType,
object: await etcdServer.get(`/${objectType}s/${id}/object`)
};
logger.trace(`Returning path: /${objectType}s/${id}/object`);
return objectUpdate;
} catch (error) {
logger.error(
`UpdateManager: Failed to get current value for /${objectType}s/${id}/object:`,
error
);
return { error: 'Not found' };
}
}
async setObjectUpdate(id, objectType, value) {
try {
await etcdServer.set(`/${objectType}s/${id}/object`, value);
logger.trace(`Set value for path: /${objectType}s/${id}/object`);
return true;
} catch (error) {
logger.error(
`Failed to set value for /${objectType}s/${id}/object:`,
error
);
return false;
}
}
}