Add NATS messaging for updates and deletes: Replaced Etcd operations with NATS publish calls in utils.js for distributing updates, new entries, and deletions. Updated logging in etcd.js to use trace level for improved verbosity during connection and operation handling.
This commit is contained in:
parent
4685cac563
commit
695ff8efc7
@ -16,13 +16,13 @@ class EtcdServer {
|
|||||||
this.client = null;
|
this.client = null;
|
||||||
this.watchers = new Map();
|
this.watchers = new Map();
|
||||||
this.hosts = [`${ETCD_HOST}:${ETCD_PORT}`];
|
this.hosts = [`${ETCD_HOST}:${ETCD_PORT}`];
|
||||||
logger.debug(`EtcdServer constructor: hosts set to ${JSON.stringify(this.hosts)}`);
|
logger.trace(`EtcdServer constructor: hosts set to ${JSON.stringify(this.hosts)}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect() {
|
async connect() {
|
||||||
if (!this.client) {
|
if (!this.client) {
|
||||||
logger.info('Connecting to Etcd...');
|
logger.info('Connecting to Etcd...');
|
||||||
logger.debug(`Creating Etcd client with hosts ${JSON.stringify(this.hosts)}`);
|
logger.trace(`Creating Etcd client with hosts ${JSON.stringify(this.hosts)}`);
|
||||||
this.client = new Etcd3({
|
this.client = new Etcd3({
|
||||||
hosts: this.hosts,
|
hosts: this.hosts,
|
||||||
});
|
});
|
||||||
@ -30,16 +30,16 @@ class EtcdServer {
|
|||||||
// Test connection
|
// Test connection
|
||||||
try {
|
try {
|
||||||
await this.client.get('test-connection').string();
|
await this.client.get('test-connection').string();
|
||||||
logger.debug('Etcd client connected successfully.');
|
logger.trace('Etcd client connected successfully.');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error.code === 'NOT_FOUND') {
|
if (error.code === 'NOT_FOUND') {
|
||||||
logger.debug('Etcd client connected successfully (test key not found as expected).');
|
logger.trace('Etcd client connected successfully (test key not found as expected).');
|
||||||
} else {
|
} else {
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.debug('Etcd client already exists, skipping connection.');
|
logger.trace('Etcd client already exists, skipping connection.');
|
||||||
}
|
}
|
||||||
return this.client;
|
return this.client;
|
||||||
}
|
}
|
||||||
@ -47,7 +47,7 @@ class EtcdServer {
|
|||||||
async getClient() {
|
async getClient() {
|
||||||
logger.trace('Checking if Etcd client exists.');
|
logger.trace('Checking if Etcd client exists.');
|
||||||
if (!this.client) {
|
if (!this.client) {
|
||||||
logger.debug('No client found, calling connect().');
|
logger.trace('No client found, calling connect().');
|
||||||
await this.connect();
|
await this.connect();
|
||||||
}
|
}
|
||||||
logger.trace('Returning Etcd client.');
|
logger.trace('Returning Etcd client.');
|
||||||
@ -60,7 +60,7 @@ class EtcdServer {
|
|||||||
const stringValue = typeof value === 'string' ? value : JSON.stringify(value);
|
const stringValue = typeof value === 'string' ? value : JSON.stringify(value);
|
||||||
|
|
||||||
await client.put(key).value(stringValue);
|
await client.put(key).value(stringValue);
|
||||||
logger.debug(`Set key: ${key}, value: ${stringValue}`);
|
logger.trace(`Set key: ${key}, value: ${stringValue}`);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -69,7 +69,7 @@ class EtcdServer {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
const value = await client.get(key).string();
|
const value = await client.get(key).string();
|
||||||
logger.debug(`Retrieved key: ${key}, value: ${value}`);
|
logger.trace(`Retrieved key: ${key}, value: ${value}`);
|
||||||
|
|
||||||
// Try to parse as JSON, fallback to string
|
// Try to parse as JSON, fallback to string
|
||||||
try {
|
try {
|
||||||
@ -79,7 +79,7 @@ class EtcdServer {
|
|||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error.code === 'NOT_FOUND') {
|
if (error.code === 'NOT_FOUND') {
|
||||||
logger.debug(`Key not found: ${key}`);
|
logger.trace(`Key not found: ${key}`);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
throw error;
|
throw error;
|
||||||
@ -91,7 +91,7 @@ class EtcdServer {
|
|||||||
|
|
||||||
// Stop all watchers
|
// Stop all watchers
|
||||||
for (const [key, watcher] of this.watchers) {
|
for (const [key, watcher] of this.watchers) {
|
||||||
logger.debug(`Stopping watcher: ${key}`);
|
logger.trace(`Stopping watcher: ${key}`);
|
||||||
watcher.removeAllListeners();
|
watcher.removeAllListeners();
|
||||||
await watcher.close();
|
await watcher.close();
|
||||||
}
|
}
|
||||||
|
|||||||
12
src/utils.js
12
src/utils.js
@ -1,6 +1,7 @@
|
|||||||
import { ObjectId } from 'mongodb';
|
import { ObjectId } from 'mongodb';
|
||||||
import { auditLogModel } from './schemas/management/auditlog.schema.js';
|
import { auditLogModel } from './schemas/management/auditlog.schema.js';
|
||||||
import { etcdServer } from './database/etcd.js';
|
import { etcdServer } from './database/etcd.js';
|
||||||
|
import { natsServer } from './database/nats.js';
|
||||||
|
|
||||||
function parseFilter(property, value) {
|
function parseFilter(property, value) {
|
||||||
if (typeof value === 'string') {
|
if (typeof value === 'string') {
|
||||||
@ -383,11 +384,15 @@ async function getAuditLogs(idOrIds) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function distributeUpdate(value, id, type) {
|
async function distributeUpdate(value, id, type) {
|
||||||
await etcdServer.setKey(`/${type}s/${id}/object`, value);
|
await natsServer.publish(`${type}s.${id}.object`, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function distributeNew(id, type) {
|
async function distributeNew(value, type) {
|
||||||
await etcdServer.setKey(`/${type}s/new`, id);
|
await natsServer.publish(`${type}s.new`, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function distributeDelete(value, type) {
|
||||||
|
await natsServer.publish(`${type}s.delete`, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
function flatternObjectIds(object) {
|
function flatternObjectIds(object) {
|
||||||
@ -487,6 +492,7 @@ export {
|
|||||||
expandObjectIds,
|
expandObjectIds,
|
||||||
distributeUpdate,
|
distributeUpdate,
|
||||||
distributeNew,
|
distributeNew,
|
||||||
|
distributeDelete,
|
||||||
getFilter, // <-- add here
|
getFilter, // <-- add here
|
||||||
convertPropertiesString,
|
convertPropertiesString,
|
||||||
};
|
};
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user