Tom Butcher 7ccc4bb993 Add NATS messaging support
- Introduced a new NatsServer class for managing NATS connections and messaging.
- Implemented connection handling, publishing, requesting, and subscription management.
- Updated index.js to connect to NATS on application startup and handle connection errors.
- Enhanced logging for NATS operations to improve traceability and error management.
2025-09-05 23:29:14 +01:00

308 lines
8.9 KiB
JavaScript

import { connect } from '@nats-io/transport-node';
import log4js from 'log4js';
import { loadConfig } from '../config.js';
const config = loadConfig();
const logger = log4js.getLogger('Nats');
logger.level = config.server.logLevel;
class NatsServer {
constructor() {
this.client = null;
this.subscriptions = new Map(); // subject → { subscription, callbacks }
this.requestHandlers = new Map(); // subject → { handler, callbacks }
this.queuedSubscriptions = new Map(); // subject → { subscription, callbacks, queue }
const natsConfig = config.database?.nats || config.database; // fallback for production config
const host = natsConfig.host || 'localhost';
const port = natsConfig.port || 4222;
this.servers = [`nats://${host}:${port}`];
this.textEncoder = new TextEncoder();
this.textDecoder = new TextDecoder();
logger.trace(`NatsServer: servers set to ${JSON.stringify(this.servers)}`);
}
async connect() {
if (!this.client) {
logger.info('Connecting to NATS...');
logger.trace(
`Creating NATS client with servers ${JSON.stringify(this.servers)}`
);
try {
this.client = await connect({
servers: this.servers,
reconnect: true,
maxReconnectAttempts: -1, // unlimited reconnects
reconnectTimeWait: 1000,
timeout: 20000
});
// Test connection by checking if client is connected
try {
if (this.client.isClosed()) {
throw new Error('NATS client connection failed');
}
logger.trace('NATS client connected successfully.');
} catch (error) {
throw error;
}
} catch (error) {
logger.error('Failed to connect to NATS:', error);
throw error;
}
} else {
logger.trace('NATS client already exists, skipping connection.');
}
return this.client;
}
async getClient() {
if (!this.client) {
logger.trace('No client found, calling connect().');
await this.connect();
}
return this.client;
}
async publish(subject, data) {
const client = await this.getClient();
const payload = typeof data === 'string' ? data : JSON.stringify(data);
try {
client.publish(subject, this.textEncoder.encode(payload));
logger.trace(`Published to subject: ${subject}, data: ${payload}`);
return { success: true };
} catch (error) {
logger.error(`Failed to publish to subject ${subject}:`, error);
throw error;
}
}
async request(subject, data, timeout = 30000) {
const client = await this.getClient();
const payload = typeof data === 'string' ? data : JSON.stringify(data);
try {
const response = await client.request(
subject,
this.textEncoder.encode(payload),
{
timeout: timeout
}
);
const responseData = this.textDecoder.decode(response.data);
logger.trace(`Request to subject: ${subject}, response: ${responseData}`);
// Try to parse as JSON, fallback to string
try {
return JSON.parse(responseData);
} catch {
return responseData;
}
} catch (error) {
if (error.code === 'TIMEOUT') {
logger.trace(`Request timeout for subject: ${subject}`);
return null;
}
throw error;
}
}
async subscribe(subject, owner, callback) {
const client = await this.getClient();
const subscriptionKey = subject;
if (this.subscriptions.has(subscriptionKey)) {
this.subscriptions.get(subscriptionKey).callbacks.set(owner, callback);
logger.trace(
`Added subscription callback for owner=${owner} on subject=${subject}`
);
return { success: true };
}
logger.trace(`Creating new subscription for subject: ${subject}`);
const subscription = client.subscribe(subject);
const callbacks = new Map();
callbacks.set(owner, callback);
(async () => {
for await (const msg of subscription) {
logger.trace(`Message received on subject: ${subject}`);
const data = this.textDecoder.decode(msg.data);
let parsedData;
try {
parsedData = JSON.parse(data);
} catch {
parsedData = data;
}
for (const [ownerId, cb] of callbacks) {
try {
cb(subject, parsedData, msg);
} catch (err) {
logger.error(
`Error in subscription callback for owner=${ownerId}, subject=${subject}:`,
err
);
}
}
}
})().catch(err => {
logger.error(`Subscription error for subject ${subject}:`, err);
});
this.subscriptions.set(subscriptionKey, { subscription, callbacks });
return { success: true };
}
async setRequestHandler(subject, owner, handler) {
const client = await this.getClient();
const handlerKey = subject;
if (this.requestHandlers.has(handlerKey)) {
this.requestHandlers.get(handlerKey).callbacks.set(owner, handler);
logger.trace(
`Added request handler for owner=${owner} on subject=${subject}`
);
return { success: true };
}
logger.trace(`Creating new request handler for subject: ${subject}`);
const subscription = client.subscribe(subject);
const callbacks = new Map();
callbacks.set(owner, handler);
(async () => {
for await (const msg of subscription) {
logger.trace(`Request received on subject: ${subject}`);
const data = this.textDecoder.decode(msg.data);
let parsedData;
try {
parsedData = JSON.parse(data);
} catch {
parsedData = data;
}
for (const [ownerId, cb] of callbacks) {
try {
const response = await cb(subject, parsedData, msg);
const responsePayload =
typeof response === 'string'
? response
: JSON.stringify(response);
msg.respond(this.textEncoder.encode(responsePayload));
} catch (err) {
logger.error(
`Error in request handler for owner=${ownerId}, subject=${subject}:`,
err
);
// Send error response
msg.respond(
this.textEncoder.encode(JSON.stringify({ error: err.message }))
);
}
}
}
})().catch(err => {
logger.error(`Request handler error for subject ${subject}:`, err);
});
this.requestHandlers.set(handlerKey, { subscription, callbacks });
return { success: true };
}
async removeSubscription(subject, owner) {
const entry = this.subscriptions.get(subject);
if (!entry) {
logger.trace(`Subscription not found for subject: ${subject}`);
return false;
}
if (entry.callbacks.delete(owner)) {
logger.trace(
`Removed subscription callback for owner: ${owner} on subject: ${subject}`
);
} else {
logger.trace(
`No subscription callback found for owner: ${owner} on subject: ${subject}`
);
}
if (entry.callbacks.size === 0) {
logger.trace(`No callbacks left, stopping subscription for ${subject}`);
entry.subscription.unsubscribe();
this.subscriptions.delete(subject);
}
return true;
}
async removeRequestHandler(subject, owner) {
const entry = this.requestHandlers.get(subject);
if (!entry) {
logger.trace(`Request handler not found for subject: ${subject}`);
return false;
}
if (entry.callbacks.delete(owner)) {
logger.trace(
`Removed request handler for owner: ${owner} on subject: ${subject}`
);
} else {
logger.trace(
`No request handler found for owner: ${owner} on subject: ${subject}`
);
}
if (entry.callbacks.size === 0) {
logger.trace(`No handlers left, stopping request handler for ${subject}`);
entry.subscription.unsubscribe();
this.requestHandlers.delete(subject);
}
return true;
}
async disconnect() {
logger.info('Disconnecting from NATS...');
// Stop all subscriptions
for (const [subject, entry] of this.subscriptions) {
logger.trace(`Stopping subscription: ${subject}`);
entry.subscription.unsubscribe();
}
this.subscriptions.clear();
// Stop all queued subscriptions
for (const [key, entry] of this.queuedSubscriptions) {
logger.trace(`Stopping queued subscription: ${key}`);
entry.subscription.unsubscribe();
}
this.queuedSubscriptions.clear();
// Stop all request handlers
for (const [subject, entry] of this.requestHandlers) {
logger.trace(`Stopping request handler: ${subject}`);
entry.subscription.unsubscribe();
}
this.requestHandlers.clear();
if (this.client) {
await this.client.close();
this.client = null;
logger.info('Disconnected from NATS');
}
}
}
const natsServer = new NatsServer();
export { NatsServer, natsServer };