Add NATS integration: Implemented NATS server connection and messaging capabilities. Updated package.json and package-lock.json to include NATS dependencies. Enhanced index.js to connect to NATS and added documentJobs route for job management.
This commit is contained in:
parent
751c931e67
commit
bec46489d1
52
package-lock.json
generated
52
package-lock.json
generated
@ -9,6 +9,7 @@
|
||||
"version": "1.0.0",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@nats-io/transport-node": "^3.1.0",
|
||||
"axios": "^1.11.0",
|
||||
"bcrypt": "^6.0.0",
|
||||
"body-parser": "^2.2.0",
|
||||
@ -2924,6 +2925,51 @@
|
||||
"sparse-bitfield": "^3.0.3"
|
||||
}
|
||||
},
|
||||
"node_modules/@nats-io/nats-core": {
|
||||
"version": "3.1.0",
|
||||
"resolved": "https://registry.npmjs.org/@nats-io/nats-core/-/nats-core-3.1.0.tgz",
|
||||
"integrity": "sha512-xsSkLEGGcqNF+Ru8dMjPmKtfbBeq/U4meuJJX4Zi+5TBHpjpjNjs4YkCBC/pGYWnEum1/vdNPizjE1RdNHCyBg==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@nats-io/nkeys": "2.0.3",
|
||||
"@nats-io/nuid": "2.0.3"
|
||||
}
|
||||
},
|
||||
"node_modules/@nats-io/nkeys": {
|
||||
"version": "2.0.3",
|
||||
"resolved": "https://registry.npmjs.org/@nats-io/nkeys/-/nkeys-2.0.3.tgz",
|
||||
"integrity": "sha512-JVt56GuE6Z89KUkI4TXUbSI9fmIfAmk6PMPknijmuL72GcD+UgIomTcRWiNvvJKxA01sBbmIPStqJs5cMRBC3A==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"tweetnacl": "^1.0.3"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=18.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@nats-io/nuid": {
|
||||
"version": "2.0.3",
|
||||
"resolved": "https://registry.npmjs.org/@nats-io/nuid/-/nuid-2.0.3.tgz",
|
||||
"integrity": "sha512-TpA3HEBna/qMVudy+3HZr5M3mo/L1JPofpVT4t0HkFGkz2Cn9wrlrQC8tvR8Md5Oa9//GtGG26eN0qEWF5Vqew==",
|
||||
"license": "Apache-2.0",
|
||||
"engines": {
|
||||
"node": ">= 18.x"
|
||||
}
|
||||
},
|
||||
"node_modules/@nats-io/transport-node": {
|
||||
"version": "3.1.0",
|
||||
"resolved": "https://registry.npmjs.org/@nats-io/transport-node/-/transport-node-3.1.0.tgz",
|
||||
"integrity": "sha512-k5pH7IOKUetwXOMraVgcB5zG0wibcHOwJJuyuY1/5Q4K0XfBJDnb/IbczP5/JJWwMYfxSL9O+46ojtdBHvHRSw==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@nats-io/nats-core": "3.1.0",
|
||||
"@nats-io/nkeys": "2.0.3",
|
||||
"@nats-io/nuid": "2.0.3"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">= 18.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@nicolo-ribaudo/chokidar-2": {
|
||||
"version": "2.1.8-no-fsevents.3",
|
||||
"resolved": "https://registry.npmjs.org/@nicolo-ribaudo/chokidar-2/-/chokidar-2-2.1.8-no-fsevents.3.tgz",
|
||||
@ -11193,6 +11239,12 @@
|
||||
"license": "0BSD",
|
||||
"optional": true
|
||||
},
|
||||
"node_modules/tweetnacl": {
|
||||
"version": "1.0.3",
|
||||
"resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-1.0.3.tgz",
|
||||
"integrity": "sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==",
|
||||
"license": "Unlicense"
|
||||
},
|
||||
"node_modules/type-check": {
|
||||
"version": "0.4.0",
|
||||
"resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz",
|
||||
|
||||
@ -4,6 +4,7 @@
|
||||
"description": "",
|
||||
"main": "index.js",
|
||||
"dependencies": {
|
||||
"@nats-io/transport-node": "^3.1.0",
|
||||
"axios": "^1.11.0",
|
||||
"bcrypt": "^6.0.0",
|
||||
"body-parser": "^2.2.0",
|
||||
|
||||
313
src/database/nats.js
Normal file
313
src/database/nats.js
Normal file
@ -0,0 +1,313 @@
|
||||
import { connect } from '@nats-io/transport-node';
|
||||
import log4js from 'log4js';
|
||||
import dotenv from 'dotenv';
|
||||
|
||||
dotenv.config();
|
||||
|
||||
const NATS_HOST = process.env.NATS_HOST || 'localhost';
|
||||
const NATS_PORT = process.env.NATS_PORT || 4222;
|
||||
const LOG_LEVEL = process.env.LOG_LEVEL || 'info';
|
||||
|
||||
const logger = log4js.getLogger('Nats');
|
||||
logger.level = LOG_LEVEL;
|
||||
|
||||
class NatsServer {
|
||||
constructor() {
|
||||
this.client = null;
|
||||
this.subscriptions = new Map(); // subject → { subscription, callbacks }
|
||||
this.requestHandlers = new Map(); // subject → { handler, callbacks }
|
||||
this.queuedSubscriptions = new Map(); // subject → { subscription, callbacks, queue }
|
||||
this.servers = [`nats://${NATS_HOST}:${NATS_PORT}`];
|
||||
this.textEncoder = new TextEncoder();
|
||||
this.textDecoder = new TextDecoder();
|
||||
|
||||
logger.trace(`NatsServer: servers set to ${JSON.stringify(this.servers)}`);
|
||||
}
|
||||
|
||||
async connect() {
|
||||
if (!this.client) {
|
||||
logger.info('Connecting to NATS...');
|
||||
logger.trace(`Creating NATS client with servers ${JSON.stringify(this.servers)}`);
|
||||
|
||||
try {
|
||||
this.client = await connect({
|
||||
servers: this.servers,
|
||||
reconnect: true,
|
||||
maxReconnectAttempts: -1, // unlimited reconnects
|
||||
reconnectTimeWait: 1000,
|
||||
timeout: 20000,
|
||||
});
|
||||
|
||||
// Test connection by checking if client is connected
|
||||
try {
|
||||
if (this.client.isClosed()) {
|
||||
throw new Error('NATS client connection failed');
|
||||
}
|
||||
logger.trace('NATS client connected successfully.');
|
||||
} catch (error) {
|
||||
throw error;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to connect to NATS:', error);
|
||||
throw error;
|
||||
}
|
||||
} else {
|
||||
logger.trace('NATS client already exists, skipping connection.');
|
||||
}
|
||||
return this.client;
|
||||
}
|
||||
|
||||
async getClient() {
|
||||
if (!this.client) {
|
||||
logger.trace('No client found, calling connect().');
|
||||
await this.connect();
|
||||
}
|
||||
return this.client;
|
||||
}
|
||||
|
||||
async publish(subject, data) {
|
||||
const client = await this.getClient();
|
||||
const payload = typeof data === 'string' ? data : JSON.stringify(data);
|
||||
|
||||
try {
|
||||
client.publish(subject, this.textEncoder.encode(payload));
|
||||
logger.trace(`Published to subject: ${subject}, data: ${payload}`);
|
||||
return { success: true };
|
||||
} catch (error) {
|
||||
logger.error(`Failed to publish to subject ${subject}:`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async request(subject, data, timeout = 30000) {
|
||||
const client = await this.getClient();
|
||||
const payload = typeof data === 'string' ? data : JSON.stringify(data);
|
||||
|
||||
try {
|
||||
const response = await client.request(subject, this.textEncoder.encode(payload), {
|
||||
timeout: timeout,
|
||||
});
|
||||
|
||||
const responseData = this.textDecoder.decode(response.data);
|
||||
logger.trace(`Request to subject: ${subject}, response: ${responseData}`);
|
||||
|
||||
// Try to parse as JSON, fallback to string
|
||||
try {
|
||||
return JSON.parse(responseData);
|
||||
} catch {
|
||||
return responseData;
|
||||
}
|
||||
} catch (error) {
|
||||
if (error.code === 'TIMEOUT') {
|
||||
logger.trace(`Request timeout for subject: ${subject}`);
|
||||
return null;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async subscribe(subject, owner, callback) {
|
||||
const client = await this.getClient();
|
||||
const subscriptionKey = subject;
|
||||
|
||||
if (this.subscriptions.has(subscriptionKey)) {
|
||||
this.subscriptions.get(subscriptionKey).callbacks.set(owner, callback);
|
||||
logger.trace(`Added subscription callback for owner=${owner} on subject=${subject}`);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
logger.trace(`Creating new subscription for subject: ${subject}`);
|
||||
const subscription = client.subscribe(subject);
|
||||
const callbacks = new Map();
|
||||
callbacks.set(owner, callback);
|
||||
|
||||
(async () => {
|
||||
for await (const msg of subscription) {
|
||||
logger.trace(`Message received on subject: ${subject}`);
|
||||
const data = this.textDecoder.decode(msg.data);
|
||||
let parsedData;
|
||||
|
||||
try {
|
||||
parsedData = JSON.parse(data);
|
||||
} catch {
|
||||
parsedData = data;
|
||||
}
|
||||
|
||||
for (const [ownerId, cb] of callbacks) {
|
||||
try {
|
||||
cb(subject, parsedData, msg);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`Error in subscription callback for owner=${ownerId}, subject=${subject}:`,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
})().catch((err) => {
|
||||
logger.error(`Subscription error for subject ${subject}:`, err);
|
||||
});
|
||||
|
||||
this.subscriptions.set(subscriptionKey, { subscription, callbacks });
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async setRequestHandler(subject, owner, handler) {
|
||||
const client = await this.getClient();
|
||||
const handlerKey = subject;
|
||||
|
||||
if (this.requestHandlers.has(handlerKey)) {
|
||||
this.requestHandlers.get(handlerKey).callbacks.set(owner, handler);
|
||||
logger.trace(`Added request handler for owner=${owner} on subject=${subject}`);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
logger.trace(`Creating new request handler for subject: ${subject}`);
|
||||
const subscription = client.subscribe(subject);
|
||||
const callbacks = new Map();
|
||||
callbacks.set(owner, handler);
|
||||
|
||||
(async () => {
|
||||
for await (const msg of subscription) {
|
||||
logger.trace(`Request received on subject: ${subject}`);
|
||||
const data = this.textDecoder.decode(msg.data);
|
||||
let parsedData;
|
||||
|
||||
try {
|
||||
parsedData = JSON.parse(data);
|
||||
} catch {
|
||||
parsedData = data;
|
||||
}
|
||||
|
||||
for (const [ownerId, cb] of callbacks) {
|
||||
try {
|
||||
const response = await cb(subject, parsedData, msg);
|
||||
const responsePayload =
|
||||
typeof response === 'string' ? response : JSON.stringify(response);
|
||||
msg.respond(this.textEncoder.encode(responsePayload));
|
||||
} catch (err) {
|
||||
logger.error(`Error in request handler for owner=${ownerId}, subject=${subject}:`, err);
|
||||
// Send error response
|
||||
msg.respond(this.textEncoder.encode(JSON.stringify({ error: err.message })));
|
||||
}
|
||||
}
|
||||
}
|
||||
})().catch((err) => {
|
||||
logger.error(`Request handler error for subject ${subject}:`, err);
|
||||
});
|
||||
|
||||
this.requestHandlers.set(handlerKey, { subscription, callbacks });
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
async removeSubscription(subject, owner) {
|
||||
const entry = this.subscriptions.get(subject);
|
||||
|
||||
if (!entry) {
|
||||
logger.trace(`Subscription not found for subject: ${subject}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (entry.callbacks.delete(owner)) {
|
||||
logger.trace(`Removed subscription callback for owner: ${owner} on subject: ${subject}`);
|
||||
} else {
|
||||
logger.trace(`No subscription callback found for owner: ${owner} on subject: ${subject}`);
|
||||
}
|
||||
|
||||
if (entry.callbacks.size === 0) {
|
||||
logger.trace(`No callbacks left, stopping subscription for ${subject}`);
|
||||
entry.subscription.unsubscribe();
|
||||
this.subscriptions.delete(subject);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
async removeQueuedSubscription(subject, queue, owner) {
|
||||
const subscriptionKey = `${subject}:${queue}`;
|
||||
const entry = this.queuedSubscriptions.get(subscriptionKey);
|
||||
|
||||
if (!entry) {
|
||||
logger.trace(`Queued subscription not found for subject: ${subject}, queue: ${queue}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (entry.callbacks.delete(owner)) {
|
||||
logger.trace(
|
||||
`Removed queued subscription callback for owner: ${owner} on subject: ${subject}, queue: ${queue}`
|
||||
);
|
||||
} else {
|
||||
logger.trace(
|
||||
`No queued subscription callback found for owner: ${owner} on subject: ${subject}, queue: ${queue}`
|
||||
);
|
||||
}
|
||||
|
||||
if (entry.callbacks.size === 0) {
|
||||
logger.trace(
|
||||
`No callbacks left, stopping queued subscription for ${subject}, queue: ${queue}`
|
||||
);
|
||||
entry.subscription.unsubscribe();
|
||||
this.queuedSubscriptions.delete(subscriptionKey);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
async removeRequestHandler(subject, owner) {
|
||||
const entry = this.requestHandlers.get(subject);
|
||||
|
||||
if (!entry) {
|
||||
logger.trace(`Request handler not found for subject: ${subject}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (entry.callbacks.delete(owner)) {
|
||||
logger.trace(`Removed request handler for owner: ${owner} on subject: ${subject}`);
|
||||
} else {
|
||||
logger.trace(`No request handler found for owner: ${owner} on subject: ${subject}`);
|
||||
}
|
||||
|
||||
if (entry.callbacks.size === 0) {
|
||||
logger.trace(`No handlers left, stopping request handler for ${subject}`);
|
||||
entry.subscription.unsubscribe();
|
||||
this.requestHandlers.delete(subject);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
async disconnect() {
|
||||
logger.info('Disconnecting from NATS...');
|
||||
|
||||
// Stop all subscriptions
|
||||
for (const [subject, entry] of this.subscriptions) {
|
||||
logger.trace(`Stopping subscription: ${subject}`);
|
||||
entry.subscription.unsubscribe();
|
||||
}
|
||||
this.subscriptions.clear();
|
||||
|
||||
// Stop all queued subscriptions
|
||||
for (const [key, entry] of this.queuedSubscriptions) {
|
||||
logger.trace(`Stopping queued subscription: ${key}`);
|
||||
entry.subscription.unsubscribe();
|
||||
}
|
||||
this.queuedSubscriptions.clear();
|
||||
|
||||
// Stop all request handlers
|
||||
for (const [subject, entry] of this.requestHandlers) {
|
||||
logger.trace(`Stopping request handler: ${subject}`);
|
||||
entry.subscription.unsubscribe();
|
||||
}
|
||||
this.requestHandlers.clear();
|
||||
|
||||
if (this.client) {
|
||||
await this.client.close();
|
||||
this.client = null;
|
||||
logger.info('Disconnected from NATS');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const natsServer = new NatsServer();
|
||||
|
||||
export { NatsServer, natsServer };
|
||||
12
src/index.js
12
src/index.js
@ -28,6 +28,7 @@ import {
|
||||
documentSizesRoutes,
|
||||
documentTemplatesRoutes,
|
||||
documentPrintersRoutes,
|
||||
documentJobsRoutes,
|
||||
} from './routes/index.js';
|
||||
import path from 'path';
|
||||
import * as fs from 'fs';
|
||||
@ -36,6 +37,7 @@ import ReseedAction from './database/ReseedAction.js';
|
||||
import log4js from 'log4js';
|
||||
import { etcdServer } from './database/etcd.js';
|
||||
import { populateUserMiddleware } from './services/misc/auth.js';
|
||||
import { natsServer } from './database/nats.js';
|
||||
|
||||
dotenv.config();
|
||||
|
||||
@ -70,6 +72,15 @@ try {
|
||||
throw err;
|
||||
}
|
||||
|
||||
// Connect to NATS (await)
|
||||
try {
|
||||
natsServer.connect();
|
||||
logger.info('Connected to NATS');
|
||||
} catch (err) {
|
||||
logger.error('Failed to connect to NATS:', err);
|
||||
throw err;
|
||||
}
|
||||
|
||||
app.use(cors(corsOptions));
|
||||
app.use(bodyParser.json({ type: 'application/json', strict: false, limit: '50mb' }));
|
||||
app.use(express.json());
|
||||
@ -104,6 +115,7 @@ app.use('/notetypes', noteTypeRoutes);
|
||||
app.use('/documentsizes', documentSizesRoutes);
|
||||
app.use('/documenttemplates', documentTemplatesRoutes);
|
||||
app.use('/documentprinters', documentPrintersRoutes);
|
||||
app.use('/documentjobs', documentJobsRoutes);
|
||||
app.use('/notes', noteRoutes);
|
||||
|
||||
if (process.env.SCHEDULE_HOUR) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user