From 81196897fa49484131783e4ca4c274297b005d90 Mon Sep 17 00:00:00 2001 From: Tom Butcher Date: Fri, 9 May 2025 22:20:37 +0100 Subject: [PATCH] Updated to update the DB and handle print commands. --- config.json | 4 +- src/auth/auth.js | 2 +- src/database/printer.schema.js | 52 +- src/database/printjob.schema.js | 35 + src/database/printsubjob.schema.js | 48 ++ src/index.js | 15 +- src/network/websocketScanner.js | 185 +++++ src/network/websocketScannerWorker.js | 110 +++ src/printer/jsonrpc.js | 62 +- src/printer/printerclient.js | 1078 +++++++++++++++++-------- src/printer/printermanager.js | 220 +++-- src/socket/socketclient.js | 483 ++++++++++- src/socket/socketmanager.js | 349 ++++---- 13 files changed, 2019 insertions(+), 624 deletions(-) create mode 100644 src/database/printjob.schema.js create mode 100644 src/database/printsubjob.schema.js create mode 100644 src/network/websocketScanner.js create mode 100644 src/network/websocketScannerWorker.js diff --git a/config.json b/config.json index 681ef26..31c3250 100644 --- a/config.json +++ b/config.json @@ -1,6 +1,6 @@ { "server": { - "port": 8080, + "port": 8081, "logLevel": "debug" }, "auth": { @@ -9,7 +9,7 @@ "url": "https://auth.tombutcher.work", "realm": "master", "clientId": "farmcontrol-client", - "clientSecret": "" + "clientSecret": "GPyh59xctRX83yfKWb83ShK6VEwHIvLF" }, "requiredRoles": [] }, diff --git a/src/auth/auth.js b/src/auth/auth.js index bcf1c08..e445211 100644 --- a/src/auth/auth.js +++ b/src/auth/auth.js @@ -7,7 +7,7 @@ import { loadConfig } from "../config.js"; const config = loadConfig(); -const logger = log4js.getLogger("MongoDB"); +const logger = log4js.getLogger("Auth"); logger.level = config.server.logLevel; export class KeycloakAuth { diff --git a/src/database/printer.schema.js b/src/database/printer.schema.js index e798524..5667689 100644 --- a/src/database/printer.schema.js +++ b/src/database/printer.schema.js @@ -3,39 +3,43 @@ const { Schema } = mongoose; // Define the moonraker connection schema const moonrakerSchema = new Schema( - { - host: { type: String, required: true }, - port: { type: Number, required: true }, - protocol: { type: String, required: true }, - apiKey: { type: String, default: null }, - }, - { _id: false }, + { + host: { type: String, required: true }, + port: { type: Number, required: true }, + protocol: { type: String, required: true }, + apiKey: { type: String, default: null, required: false }, + }, + { _id: false }, ); // Define the main printer schema const printerSchema = new Schema( - { - printerId: { type: String, required: true, unique: true }, - printerName: { type: String, required: true }, - online: { type: Boolean, required: true, default: false }, - state: { - type: { type: String, required: true, default: "Offline" }, - percent: { type: Number, required: false }, - }, - connectedAt: { type: Date, default: null }, - loadedFillament: { - type: Schema.Types.ObjectId, - ref: "Fillament", - default: null, - }, - moonraker: { type: moonrakerSchema, required: true }, + { + printerName: { type: String, required: true }, + online: { type: Boolean, required: true, default: false }, + state: { + type: { type: String, required: true, default: "Offline" }, + progress: { required: false, type: Number, default: 0 }, }, - { timestamps: true }, + connectedAt: { type: Date, default: null }, + loadedFilament: { + type: Schema.Types.ObjectId, + ref: "Filament", + default: null, + }, + moonraker: { type: moonrakerSchema, required: true }, + tags: [{ type: String }], + firmware: { type: String }, + currentJob: { type: Schema.Types.ObjectId, ref: "PrintJob" }, + currentSubJob: { type: Schema.Types.ObjectId, ref: "PrintSubJob" }, + subJobs: [{ type: Schema.Types.ObjectId, ref: "PrintSubJob" }], + }, + { timestamps: true }, ); // Add virtual id getter printerSchema.virtual("id").get(function () { - return this._id.toHexString(); + return this._id.toHexString(); }); // Configure JSON serialization to include virtuals diff --git a/src/database/printjob.schema.js b/src/database/printjob.schema.js new file mode 100644 index 0000000..ac74d56 --- /dev/null +++ b/src/database/printjob.schema.js @@ -0,0 +1,35 @@ +import mongoose from "mongoose"; +const { Schema } = mongoose; + +const printJobSchema = new mongoose.Schema({ + state: { + type: { required: true, type: String }, + progress: { required: false, type: Number, default: 0 }, + }, + printers: [{ type: Schema.Types.ObjectId, ref: "Printer", required: false }], + createdAt: { required: true, type: Date }, + updatedAt: { required: true, type: Date }, + startedAt: { required: true, type: Date }, + gcodeFile: { + type: Schema.Types.ObjectId, + ref: "GCodeFile", + required: false, + }, + quantity: { + type: Number, + required: true, + default: 1, + min: 1, + }, + subJobs: [ + {type: Schema.Types.ObjectId, ref: "PrintSubJob", required: false} + ], +}); + +printJobSchema.virtual("id").get(function () { + return this._id.toHexString(); +}); + +printJobSchema.set("toJSON", { virtuals: true }); + +export const printJobModel = mongoose.model("PrintJob", printJobSchema); diff --git a/src/database/printsubjob.schema.js b/src/database/printsubjob.schema.js new file mode 100644 index 0000000..cf10d19 --- /dev/null +++ b/src/database/printsubjob.schema.js @@ -0,0 +1,48 @@ +import mongoose from "mongoose"; +const { Schema } = mongoose; + +const printSubJobSchema = new mongoose.Schema({ + printer: { + type: Schema.Types.ObjectId, + ref: "Printer", + required: true + }, + printJob: { + type: Schema.Types.ObjectId, + ref: "PrintJob", + required: true + }, + subJobId: { + type: String, + required: true + }, + gcodeFile: { + type: Schema.Types.ObjectId, + ref: "GCodeFile", + required: true, + }, + state: { + type: { required: true, type: String }, + progress: { required: false, type: Number, default: 0 }, + }, + number: { + type: Number, + required: true + }, + createdAt: { + type: Date, + default: Date.now + }, + updatedAt: { + type: Date, + default: Date.now + } +}); + +printSubJobSchema.virtual("id").get(function () { + return this._id.toHexString(); +}); + +printSubJobSchema.set("toJSON", { virtuals: true }); + +export const printSubJobModel = mongoose.model("PrintSubJob", printSubJobSchema); \ No newline at end of file diff --git a/src/index.js b/src/index.js index ee559d7..310ba67 100644 --- a/src/index.js +++ b/src/index.js @@ -2,16 +2,19 @@ import { loadConfig } from "./config.js"; import { dbConnect } from "./database/mongo.js"; import { PrinterManager } from "./printer/printermanager.js"; import { SocketManager } from "./socket/socketmanager.js"; - import { KeycloakAuth } from "./auth/auth.js"; - +import express from "express"; import log4js from "log4js"; + // Load configuration const config = loadConfig(); const logger = log4js.getLogger("FarmControl Server"); logger.level = config.server.logLevel; +// Create Express app +const app = express(); + // Connect to database dbConnect(); @@ -20,7 +23,13 @@ const keycloakAuth = new KeycloakAuth(config); // Create printer manager const printerManager = new PrinterManager(config); -const socketManager = new SocketManager(config, printerManager); +const socketManager = new SocketManager(config, printerManager, keycloakAuth); +printerManager.setSocketManager(socketManager); + +// Start Express server +app.listen(config.server.port, () => { + logger.info(`Server listening on port ${config.server.port}`); +}); process.on("SIGINT", () => { logger.info("Shutting down..."); diff --git a/src/network/websocketScanner.js b/src/network/websocketScanner.js new file mode 100644 index 0000000..671b8a8 --- /dev/null +++ b/src/network/websocketScanner.js @@ -0,0 +1,185 @@ +import { EventEmitter } from 'events'; +import os from 'os'; +import { Worker } from 'worker_threads'; +import path from 'path'; +import { fileURLToPath } from 'url'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +export class WebSocketScanner extends EventEmitter { + constructor(options = {}) { + super(); + this.scanning = false; + this.workers = []; + // Default to number of CPU cores, but allow override + this.maxThreads = options.maxThreads || os.cpus().length; + this.totalIPs = 0; + this.scannedIPs = 0; + } + + /** + * Scans the local network for websocket services on a specified port + * @param {number} port - Port number to scan + * @returns {Promise} Array of IP addresses where websocket service was found + */ + async scanNetwork(port, protocol) { + // Clean up any existing workers before starting a new scan + this.cleanupWorkers(); + console.log("Cleaned up workers"); + + if (this.scanning) { + throw new Error('Scan already in progress'); + } + + this.scanning = true; + this.scannedIPs = 0; + const foundServices = []; + + // Get local network range + const { startIP, endIP } = this.getLocalNetworkRange(); + const start = this.ipToNumber(startIP); + const end = this.ipToNumber(endIP); + + // Calculate IP ranges for each worker + this.totalIPs = end - start + 1; + const ipsPerWorker = Math.ceil(this.totalIPs / this.maxThreads); + const workerPromises = []; + + for (let i = 0; i < this.maxThreads; i++) { + const workerStart = start + (i * ipsPerWorker); + const workerEnd = Math.min(workerStart + ipsPerWorker - 1, end); + + if (workerStart > end) break; + + const workerStartIP = this.numberToIP(workerStart); + const workerEndIP = this.numberToIP(workerEnd); + + const worker = new Worker(path.join(__dirname, 'websocketScannerWorker.js')); + this.workers.push(worker); + console.log("Created worker", i); + + const workerPromise = new Promise((resolve) => { + worker.on('message', (message) => { + switch (message.type) { + case 'serviceFound': + foundServices.push({ ip: message.ip, hostname: message.hostname }); + this.emit('serviceFound', { ip: message.ip, hostname: message.hostname }); + break; + case 'scanProgress': + this.scannedIPs += message.increment; + const totalProgress = (this.scannedIPs / this.totalIPs) * 100; + this.emit('scanProgress', { + currentIP: message.currentIP, + progress: totalProgress + }); + break; + case 'scanComplete': + resolve(message.results); + break; + } + }); + }); + + worker.postMessage({ + type: 'scan', + startIP: workerStartIP, + endIP: workerEndIP, + port, + protocol + }); + + workerPromises.push(workerPromise); + } + + await Promise.all(workerPromises); + this.cleanupWorkers(); + this.scanning = false; + return foundServices; + } + + cleanupWorkers() { + this.workers.forEach(worker => worker.terminate()); + this.workers = []; + } + + /** + * Gets the local network IP range + * @returns {Object} Object containing startIP and endIP + */ + getLocalNetworkRange() { + const interfaces = os.networkInterfaces(); + let localIP = null; + let subnetMask = null; + + // Find the first non-internal IPv4 address + for (const name of Object.keys(interfaces)) { + for (const iface of interfaces[name]) { + if (iface.family === 'IPv4' && !iface.internal) { + localIP = iface.address; + subnetMask = iface.netmask; + break; + } + } + if (localIP) break; + } + + if (!localIP) { + throw new Error('Could not determine local network IP address'); + } + + // Convert IP and subnet mask to numbers + const ipNum = this.ipToNumber(localIP); + const maskNum = this.ipToNumber(subnetMask); + + // Calculate network address + const networkNum = ipNum & maskNum; + + // Calculate broadcast address + const broadcastNum = networkNum | (~maskNum >>> 0); + + // Start IP is network address + 1 + const startIP = this.numberToIP(networkNum + 1); + // End IP is broadcast address - 1 + const endIP = this.numberToIP(broadcastNum - 1); + + return { startIP, endIP }; + } + + /** + * Converts an IP address to a number + * @param {string} ip - IP address to convert + * @returns {number} Numeric representation of IP + */ + ipToNumber(ip) { + return ip.split('.') + .reduce((acc, octet) => (acc << 8) + parseInt(octet), 0) >>> 0; + } + + /** + * Converts a number to an IP address + * @param {number} num - Number to convert + * @returns {string} IP address + */ + numberToIP(num) { + return [ + (num >>> 24) & 255, + (num >>> 16) & 255, + (num >>> 8) & 255, + num & 255 + ].join('.'); + } + + /** + * Stops the current scan + */ + stopScan() { + this.scanning = false; + this.cleanupWorkers(); + } +} + + + +// To stop scanning at any time: +// scanner.stopScan(); \ No newline at end of file diff --git a/src/network/websocketScannerWorker.js b/src/network/websocketScannerWorker.js new file mode 100644 index 0000000..9ff0a1f --- /dev/null +++ b/src/network/websocketScannerWorker.js @@ -0,0 +1,110 @@ +import { parentPort, workerData } from 'worker_threads'; +import WebSocket from 'ws'; +import { loadConfig } from "../config.js"; +import log4js from "log4js"; +import dns from 'dns'; + +// Load configuration +const config = loadConfig(); + +class WebSocketScannerWorker { + constructor() { + this.randomId = Math.floor(Math.random() * 1000).toString().padStart(3, '0'); + this.logger = log4js.getLogger(`WS Scanner #${this.randomId}`); + this.logger.level = config.server.logLevel; + this.timeout = 2000; // 2 second timeout for each connection attempt + } + + async scanRange(startIP, endIP, port, protocol) { + const start = this.ipToNumber(startIP); + const end = this.ipToNumber(endIP); + const foundServices = []; + + this.logger.info(`Scanning ${startIP} - ${endIP} on port: ${port} using: ${protocol}`); + + for (let ip = start; ip <= end; ip++) { + const currentIP = this.numberToIP(ip); + const url = `${protocol}://${currentIP}:${port}/websocket`; + + try { + this.logger.debug(`Checking ${currentIP} for websocket service on port ${port}`); + const isOpen = await this.checkWebSocket(url); + if (isOpen) { + const hostname = await this.resolveHostname(currentIP); + foundServices.push({ ip: currentIP, hostname }); + this.logger.info(`WebSocket connection successful for ${currentIP}${hostname ? ` (${hostname})` : ''}`); + parentPort.postMessage({ type: 'serviceFound', ip: currentIP, hostname }); + } else { + this.logger.debug(`WebSocket connection failed for ${currentIP}`); + } + } catch (error) { + // Connection failed, continue scanning + } + + parentPort.postMessage({ + type: 'scanProgress', + currentIP, + increment: 1 + }); + } + + return foundServices; + } + + async resolveHostname(ip) { + try { + const hostnames = await dns.promises.reverse(ip); + return hostnames[0] || null; + } catch (error) { + // Only log errors that aren't ENOTFOUND (which is expected for many IPs) + if (error.code !== 'ENOTFOUND') { + this.logger.warn(`Unexpected error resolving hostname for ${ip}: ${error.message}`); + } + return null; + } + } + + checkWebSocket(url) { + return new Promise((resolve) => { + const ws = new WebSocket(url); + let timeout = setTimeout(() => { + ws.terminate(); + resolve(false); + }, this.timeout); + + ws.on('open', () => { + clearTimeout(timeout); + ws.close(); + resolve(true); + }); + + ws.on('error', () => { + clearTimeout(timeout); + resolve(false); + }); + }); + } + + ipToNumber(ip) { + return ip.split('.') + .reduce((acc, octet) => (acc << 8) + parseInt(octet), 0) >>> 0; + } + + numberToIP(num) { + return [ + (num >>> 24) & 255, + (num >>> 16) & 255, + (num >>> 8) & 255, + num & 255 + ].join('.'); + } +} + +// Handle messages from the main thread +parentPort.on('message', async (data) => { + if (data.type === 'scan') { + const scanner = new WebSocketScannerWorker(); + const results = await scanner.scanRange(data.startIP, data.endIP, data.port, data.protocol); + parentPort.postMessage({ type: 'scanComplete', results }); + } +}); \ No newline at end of file diff --git a/src/printer/jsonrpc.js b/src/printer/jsonrpc.js index 9403aca..8f14a93 100644 --- a/src/printer/jsonrpc.js +++ b/src/printer/jsonrpc.js @@ -1,50 +1,66 @@ // jsonrpc.js - Implementation of JSON-RPC 2.0 protocol for Moonraker communication +import { loadConfig } from "../config.js"; +import log4js from "log4js"; + +// Load configuration +const config = loadConfig(); + +const logger = log4js.getLogger("JSON RPC"); +logger.level = config.server.logLevel; export class JsonRPC { constructor() { - this.id_counter = 0; + this.idCounter = 0; this.methods = {}; - this.pending_requests = {}; + this.pendingRequests = {}; } // Generate a unique ID for RPC requests - generate_id() { - return this.id_counter++; + generateId() { + return this.idCounter++; } // Register a method to handle incoming notifications/responses - register_method(method_name, callback) { - this.methods[method_name] = callback; + registerMethod(methodName, callback) { + this.methods[methodName] = callback; } // Process incoming messages - process_message(message) { + processMessage(message) { + if (message.method && this.methods[message.method]) { // Handle method call or notification this.methods[message.method](message.params); + logger.trace(`JSON-RPC notification: ${message.method}`); } else if (message.id !== undefined) { // Handle response to a previous request - const rpc_promise = this.pending_requests[message.id]; - if (rpc_promise) { + const rpcPromise = this.pendingRequests[message.id]; + if (rpcPromise) { if (message.error) { - rpc_promise.reject(message.error); + logger.error(`Error in JSON-RPC response: ${message.error}`); + rpcPromise.reject(message.error); } else { - rpc_promise.resolve(message.result); + logger.debug(`JSON-RPC response: OK`); + logger.trace("Result:", message.result); + + rpcPromise.resolve(message.result); } - delete this.pending_requests[message.id]; + delete this.pendingRequests[message.id]; } } // If it's a notification without a registered method, ignore it } // Call a method without parameters - call_method(method) { - return this.call_method_with_kwargs(method, {}); + callMethod(method) { + return this.callMethodWithKwargs(method, {}); } // Call a method with parameters - call_method_with_kwargs(method, params) { - const id = this.generate_id(); + callMethodWithKwargs(method, params) { + logger.debug(`Calling method: ${method}`); + logger.trace("Params:", params); + const id = this.generateId(); const request = { jsonrpc: "2.0", method: method, @@ -53,9 +69,7 @@ export class JsonRPC { }; return new Promise((resolve, reject) => { - this.pending_requests[id] = { resolve, reject }; - - console.log(`Sending JSON-RPC request: ${JSON.stringify(request)}`); + this.pendingRequests[id] = { resolve, reject }; // The actual sending of the message is done by the WebSocket connection // This just prepares the message and returns a promise if (this.socket) { @@ -63,20 +77,20 @@ export class JsonRPC { } else { // If socket is not directly attached to this instance, the caller // is responsible for sending the serialized request - this.last_request = JSON.stringify(request); + this.lastRequest = JSON.stringify(request); } }); } // For external socket handling - get_last_request() { - const req = this.last_request; - this.last_request = null; + getLastRequest() { + const req = this.lastRequest; + this.lastRequest = null; return req; } // Associate a WebSocket with this RPC instance for direct communication - set_socket(socket) { + setSocket(socket) { this.socket = socket; } } diff --git a/src/printer/printerclient.js b/src/printer/printerclient.js index 3578f35..dae5a3a 100644 --- a/src/printer/printerclient.js +++ b/src/printer/printerclient.js @@ -1,355 +1,789 @@ // moonraker-connection.js - Handles connection to a single Moonraker instance +import { JsonRPC } from "./jsonrpc.js"; +import { WebSocket } from "ws"; +import { loadConfig } from "../config.js"; +import { printerModel } from "../database/printer.schema.js"; // Import your printer model +import { printJobModel } from "../database/printjob.schema.js"; +import { printSubJobModel } from "../database/printsubjob.schema.js"; +import log4js from "log4js"; +import axios from "axios"; +import FormData from "form-data"; + +// Load configuration +const config = loadConfig(); + +const logger = log4js.getLogger("Printer Client"); +logger.level = config.server.logLevel; export class PrinterClient { - constructor(printerConfig) { - this.id = printerConfig.id; - this.printerName = printerConfig.printerName; - this.state = printerConfig.state; - this.online = printerConfig.online; - this.config = printerConfig.moonraker; - this.jsonRpc = new JsonRPC(); - this.socket = null; - this.connectionId = null; - this.clients = new Set(); - this.registerEventHandlers(); + constructor(printer, printerManager, socketManager) { + this.id = printer.id; + this.name = printer.printerName; + this.printerManager = printerManager; + this.socketManager = socketManager; + this.state = printer.state; + this.klippyState = null; + this.config = printer.moonraker; + this.version = printer.version; + this.jsonRpc = new JsonRPC(); + this.socket = null; + this.connectionId = null; + this.currentJobId = printer.currentJob; + this.currentJobState = { type: "unknown", progress: 0 }; + this.currentSubJobId = printer.currentSubJob; + this.currentSubJobState = { type: "unknown", progress: 0 }; + this.registerEventHandlers(); + this.baseSubscription = { + print_stats: null, + display_status: null, + }; + this.subscriptions = new Map(); + this.queuedJobIds = []; + this.isOnline = printer.online; + this.subJobIsCancelling = false; + this.subJobCancelId = null; + } + + registerEventHandlers() { + // Register event handlers for Moonraker notifications + this.jsonRpc.registerMethod( + "notify_gcode_response", + //this.handleGcodeResponse.bind(this), + ); + this.jsonRpc.registerMethod( + "notify_status_update", + this.handleStatusUpdate.bind(this), + ); + this.jsonRpc.registerMethod( + "notify_klippy_disconnected", + this.handleKlippyDisconnected.bind(this), + ); + this.jsonRpc.registerMethod( + "notify_klippy_ready", + this.handleKlippyReady.bind(this), + ); + this.jsonRpc.registerMethod( + "notify_filelist_changed", + this.handleFileListChanged.bind(this), + ); + this.jsonRpc.registerMethod( + "notify_metadata_update", + this.handleMetadataUpdate.bind(this), + ); + this.jsonRpc.registerMethod( + "notify_power_changed", + this.handlePowerChanged.bind(this), + ); + } + + async getPrinterConnectionConfig() { + try { + const printer = await printerModel.findOne({ _id: this.id }); + this.config = printer.moonraker; + logger.info(`Reloaded connection config! (${this.name})`); + logger.debug(this.config); + } catch (error) { + logger.error( + `Failed to get printer connection config! (${this.name}):`, + error, + ); + } + } + + async connect() { + await this.getPrinterConnectionConfig(); + const { protocol, host, port } = this.config; + const wsUrl = `${protocol}://${host}:${port}/websocket`; + + logger.info(`Connecting to Moonraker at ${wsUrl} (${this.id})`); + + this.socket = new WebSocket(wsUrl); + + this.jsonRpc.setSocket(this.socket); + + this.socket.on("open", () => { + logger.info(`Connected to Moonraker (${this.name})`); + this.online = true; + this.identifyConnection(); + }); + + this.socket.on("message", (data) => { + this.jsonRpc.processMessage(JSON.parse(data)); + }); + + this.socket.on("close", () => { + logger.info(`Disconnected from Moonraker (${this.name})`); + this.online = false; + this.state = { type: "offline" }; + this.updatePrinterState(); + this.connectionId = null; + // Attempt to reconnect after delay + setTimeout(() => this.connect(), 10000); + }); + + this.socket.on("error", (error) => { + logger.error(`Moonraker connection error (${this.name}):`, error); + }); + } + + identifyConnection() { + const args = { + client_name: "farmcontrol-server", + version: "0.1.0", + type: "web", + url: "https://github.com/printer-bridge", + }; + + if (this.config.apiKey) { + args.api_key = this.config.apiKey; } - registerEventHandlers() { - // Register event handlers for Moonraker notifications - this.jsonRpc.register_method( - "notify_gcode_response", - this.handleGcodeResponse.bind(this), + logger.debug(`Identifying connection... (${this.name})`); + + this.jsonRpc + .callMethodWithKwargs("server.connection.identify", args) + .then(async (result) => { + this.connectionId = result.connection_id; + logger.info( + `Connection identified with ID: ${this.connectionId} (${this.name})`, ); - this.jsonRpc.register_method( - "notify_status_update", - this.handleStatusUpdate.bind(this), - ); - this.jsonRpc.register_method( - "notify_klippy_disconnected", - this.handleKlippyDisconnected.bind(this), - ); - this.jsonRpc.register_method( - "notify_klippy_ready", - this.handleKlippyReady.bind(this), - ); - this.jsonRpc.register_method( - "notify_filelist_changed", - this.handleFileListChanged.bind(this), - ); - this.jsonRpc.register_method( - "notify_metadata_update", - this.handleMetadataUpdate.bind(this), - ); - this.jsonRpc.register_method( - "notify_power_changed", - this.handlePowerChanged.bind(this), + await this.initialize(); + }) + .catch((error) => { + logger.error(`Error identifying connection (${this.name}):`, error); + }); + } + + async initialize() { + logger.info("Running printer initialization..."); + await this.getInfo(); + await this.getPrinterState(); + await this.getQueuedJobsInfo(); + await this.updateSubscriptions(); + } + + async getInfo() { + logger.info("Getting printer info..."); + try { + // Get server info + const serverResult = await this.jsonRpc.callMethod("server.info"); + this.online = true; + this.klippyState = { type: serverResult.klippy_state }; + logger.info( + "Server:", + `Moonraker ${serverResult.moonraker_version} (${this.name})`, + `State: ${this.klippyState.type}`, + ); + + try { + const klippyResult = await this.jsonRpc.callMethod("printer.info"); + logger.info( + `Klippy info for ${this.name}: ${klippyResult.hostname}, ${klippyResult.software_version}`, ); + // Update firmware version in database + try { + await printerModel.findByIdAndUpdate( + this.id, + { firmware: klippyResult.software_version }, + { new: true }, + ); + logger.info( + `Updated firmware version for ${this.name} to ${klippyResult.software_version}`, + ); + } catch (error) { + logger.error( + `Failed to update firmware version in database (${this.name}):`, + error, + ); + } + + if (klippyResult.state === "error") { + logger.error( + `Klippy error for ${this.name}: ${klippyResult.state_message}`, + ); + } + } catch (error) { + logger.error(`Error getting Klippy info (${this.name}):`, error); + } + } catch (error) { + logger.error(`Error getting server info (${this.name}):`, error); + } + } + + async getQueuedJobsInfo() { + logger.info(`Getting queued jobs info for (${this.name})`); + const result = await this.sendPrinterCommand({ + method: "server.job_queue.status", + }); + this.queuedJobIds = result.queued_jobs.map((job) => job.job_id); + logger.debug(`Queued job IDs: ${this.queuedJobIds}`); + await this.updatePrinterSubJobs(); + } + + async getPrinterState() { + logger.info(`Getting state of (${this.name})`); + if (!this.online) { + logger.error( + `Cannot send command: Not connected to Moonraker (${this.name})`, + ); + return false; } - connect() { - console.log(this.config); - const { protocol, host, port } = this.config; - const wsUrl = `${protocol}://${host}:${port}/websocket`; + if (this.klippyState.type === "error") { + logger.error(`Klippy is reporting error for ${this.name}`); + this.state = this.klippyState; + this.updatePrinterState(); + return; + } - logger.info(`Connecting to Moonraker at ${wsUrl} (${this.id})`); + if (this.klippyState.type === "shutdown") { + logger.error(`Klippy is reporting shutdown for ${this.name}`); + this.state = this.klippyState; + this.updatePrinterState(); + return; + } - this.socket = new WebSocket(wsUrl); + try { + const result = await this.jsonRpc.callMethodWithKwargs( + "printer.objects.query", + { objects: this.baseSubscription }, + ); + logger.debug(`Command sent to (${this.name})`); + if (result.status != undefined) { + this.handleStatusUpdate([result.status]); + } + return result; + } catch (error) { + logger.error(`Error sending command to (${this.name}):`, error); + return false; + } + } - this.jsonRpc.set_socket(this.socket); + async updateSubscriptions() { + logger.info(`Updating subscriptions for (${this.name})`); - this.socket.on("open", () => { - logger.info(`Connected to Moonraker (${this.printerName})`); - this.online = true; - this.identifyConnection(); - }); + // Start with base subscription content + const allSubscriptions = { ...this.baseSubscription }; - this.socket.on("message", (data) => { - const message = data.toString(); - logger.trace( - `Received message from Moonraker (${this.printerName}): ${message}`, + // Add all subscription content from the Map + for (const [_, value] of this.subscriptions) { + Object.assign(allSubscriptions, value); + } + + logger.debug("Combined subscriptions:", allSubscriptions); + + if (!this.online) { + logger.error( + `Cannot send command: Not connected to Moonraker (${this.name})`, + ); + return false; + } + + try { + await this.jsonRpc.callMethodWithKwargs("printer.objects.subscribe", { + objects: allSubscriptions, + }); + logger.debug(`Command sent to (${this.name})`); + return true; + } catch (error) { + logger.error(`Error sending command to (${this.name}):`, error); + return false; + } + } + + async sendPrinterCommand(command) { + logger.info(`Sending ${command.method} command to (${this.name})`); + if (!this.online) { + logger.error( + `Cannot send command: Not connected to Moonraker (${this.name})`, + ); + return false; + } + + try { + const result = await this.jsonRpc.callMethodWithKwargs( + command.method, + command.params, + ); + logger.debug(`Command sent to (${this.name})`); + if (result.status != undefined) { + if (command.method == "printer.objects.query") { + this.handleStatusUpdate([result.status]); + } + } + return result; + } catch (error) { + logger.error(`Error sending command to (${this.name}):`, error); + return false; + } + } + + async handleStatusUpdate(status) { + logger.trace("Status update:", status); + status = status[0]; + // Process printer status updates + if (this.state.type != "deploying") { + if (status.print_stats && status.print_stats.state) { + logger.info( + `Print state for ${this.name}: ${status.print_stats.state}`, + ); + + const stateTypeChanged = status.print_stats.state != this.state.type; + + // When status changes, update states + if (stateTypeChanged) { + this.state.type = status.print_stats.state; + await this.updatePrinterState(); + await this.getQueuedJobsInfo(); + } + } + + if (status.display_status) { + console.log("Display status:", status.display_status); + this.state.progress = status.display_status.progress; + await this.updatePrinterState(); + await this.updateCurrentJobAndSubJobState(); + } + + this.socketManager.broadcastToSubscribers(this.id, { + method: "notify_status_update", + params: status, + }); + } + } + + async updatePrinterState() { + try { + if (this.state.type == "printing" && this.state.progress == undefined) { + this.state.progress = 0; + } + // Update the printer's state + const updatedPrinter = await printerModel.findByIdAndUpdate( + this.id, + { state: this.state, online: this.online }, + { new: true }, + ); + + if (!updatedPrinter) { + logger.error( + `Printer with ID ${this.id} not found when updating status`, + ); + return; + } + + // Notify clients of the update + this.socketManager.broadcast("notify_printer_update", { + id: this.id, + state: this.state, + }); + + logger.info( + `Updated printer state to ${this.state.type} and progress to ${this.state?.progress} for ${this.id}`, + ); + } catch (error) { + logger.error( + `Failed to update job status in database (${this.name}):`, + error, + ); + } + } + + async removePrinterSubJob(subJobId) { + try { + const updatedPrinter = await printerModel.findByIdAndUpdate( + this.id, + { + $pull: { subJobs: subJobId }, + }, + { new: true }, + ); + + if (!updatedPrinter) { + logger.error( + `Printer with ID ${this.id} not found when removing failed subjob`, + ); + return; + } + + logger.info(`Removed subjob ${subJobId} from printer ${this.id}`); + } catch (error) { + logger.error( + `Failed to remove subjob ${subJobId} from printer ${this.id}:`, + error, + ); + } + } + + async updatePrinterSubJobs() { + try { + logger.debug("Updating Printer Subjobs..."); + // Get all subjobs for this printer + const subJobs = await printSubJobModel.find({ printer: this.id }); + + for (const subJob of subJobs) { + if ( + !this.queuedJobIds.includes(subJob.subJobId) && + !( + subJob.state.type == "failed" || + subJob.state.type == "complete" || + subJob.state.type == "draft" || + subJob.state.type == "cancelled" + ) + ) { + if (subJob.subJobId == this.subJobCancelId) { + const updatedSubJob = await printSubJobModel.findByIdAndUpdate( + subJob.id, + { state: { type: "cancelled" } }, + { new: true }, ); - try { - const parsed = JSON.parse(message); - this.jsonRpc.process_message(parsed); - if (parsed.method != undefined) { - //console.log(parsed.params); - this.broadcastToClients(message); - } - } catch (e) { - logger.error( - `Error processing message for ${this.printerName}:`, - e, - ); + await this.removePrinterSubJob(subJob.id); + // Notify clients of the update + this.socketManager.broadcast("notify_subjob_update", { + id: subJob.id, + state: { type: "cancelled" }, + }); + if (updatedSubJob) { + logger.debug(`Cancelled subjob ${updatedSubJob.id}`); } - }); - - this.socket.on("close", () => { - logger.info(`Disconnected from Moonraker (${this.printerName})`); - this.online = false; - this.state = { type: "offline" }; - this.connectionId = null; - // Attempt to reconnect after delay - setTimeout(() => this.connect(), 5000); - }); - - this.socket.on("error", (error) => { - logger.error( - `Moonraker connection error (${this.printerName}):`, - error, + return; + } else { + logger.debug( + `Subjob ${subJob.id} is not in the queued job list. It must be printing or paused. Setting job and subjob for ${this.id}`, ); - }); - } - - identifyConnection() { - const args = { - client_name: "farmcontrol-server", - version: "0.1.0", - type: "web", - url: "https://github.com/printer-bridge", - }; - - if (this.config.apiKey) { - args.api_key = this.config.apiKey; - } - - this.jsonRpc - .call_method_with_kwargs("server.connection.identify", args) - .then((result) => { - this.connectionId = result.connection_id; - logger.info( - `Connection identified with ID: ${this.connectionId} (${this.printerName})`, - ); - this.getServerInfo(); - }) - .catch((error) => { - logger.error( - `Error identifying connection (${this.printerName}):`, - error, - ); - }); - } - - getServerInfo() { - logger.trace("Getting server info"); - this.jsonRpc - .call_method("server.info") - .then((result) => { - this.online = true; - this.state = { type: result.klippy_state }; - logger.info( - "Server:", - `Moonraker ${result.moonraker_version} (${this.printerName})`, - ); - if (result.klippy_state === "ready") { - this.getKlippyInfo(); - this.subscribeToStateUpdates(); - } else { - logger.info( - `Waiting for Klippy to be ready. Current state: ${result.klippy_state} (${this.printerName})`, - ); - } - }) - .catch((error) => { - logger.error( - `Error getting server info (${this.printerName}):`, - error, - ); - }); - } - - getKlippyInfo() { - this.jsonRpc - .call_method("printer.info") - .then((result) => { - logger.info( - `Klippy info for ${this.printerName}: ${result.hostname}, ${result.software_version}`, - ); - if (result.state === "error") { - logger.error( - `Klippy error for ${this.printerName}: ${result.state_message}`, - ); - } - }) - .catch((error) => { - logger.error( - `Error getting Klippy info (${this.printerName}):`, - error, - ); - }); - } - - subscribeToAllUpdates() { - const subscriptions = { - objects: { - gcode_move: [ - "gcode_position", - "speed", - "speed_factor", - "extrude_factor", - ], - toolhead: ["position", "status"], - virtual_sdcard: null, - heater_bed: null, - extruder: null, - fan: null, - print_stats: null, - motion_report: null, - }, - }; - - this.jsonRpc - .call_method_with_kwargs("printer.objects.subscribe", subscriptions) - .then((result) => { - logger.info( - `Subscribed to all printer updates (${this.printerName})`, - ); - }) - .catch((error) => { - logger.error( - `Error subscribing to all printer updates (${this.printerName}):`, - error, - ); - }); - } - - subscribeToStateUpdates() { - const subscriptions = { - objects: { - toolhead: ["status"], - print_stats: null, - }, - }; - - this.jsonRpc - .call_method_with_kwargs("printer.objects.subscribe", subscriptions) - .then((result) => { - logger.info( - `Subscribed to printer state updates (${this.printerName})`, - ); - }) - .catch((error) => { - logger.error( - `Error subscribing to printer state updates (${this.printerName}):`, - error, - ); - }); - } - - sendMessage(message) { - if (!this.online) { - logger.error( - `Cannot send message: Not connected to Moonraker (${this.printerName})`, + this.currentSubJobId = subJob.id; + this.currentJobId = subJob.printJob; + await this.updateCurrentJobAndSubJobState(); + const updatedPrinter = await printerModel.findByIdAndUpdate( + this.id, + { + currentSubJob: this.currentSubJobId, + currentJob: this.currentJobId, + }, + { new: true }, ); - return false; - } - this.jsonRpc - .call_method_with_kwargs(message.method, message.params) - .then((result) => { - logger.info(`Message sent to (${this.printerName})`); - if (result.status != undefined) { - const resultStatusMessage = { - method: "notify_status_update", - params: [result.status], - }; - this.broadcastToClients( - JSON.stringify(resultStatusMessage), - ); - } - }) - .catch((error) => { - logger.error( - `Error sending message to (${this.printerName}):`, - error, - ); - }); - - return true; - } - - addClient(client) { - this.clients.add(client); - logger.info( - `Client subscribed to ${this.printerName}. Total subscribers: ${this.clients.size}`, - ); - } - - removeClient(client) { - this.clients.delete(client); - logger.info( - `Client unsubscribed from ${this.printerName}. Total subscribers: ${this.clients.size}`, - ); - } - - broadcastToClients(message) { - for (const client of this.clients) { - if (client.conn._readyState === "open") { - var jsonMessage = JSON.parse(message); - jsonMessage.params = { - printerId: this.id, - ...jsonMessage.params, - }; - client.emit(jsonMessage.method, jsonMessage.params); + if (!updatedPrinter) { + logger.error( + `Printer with ID ${this.id} not found when setting job and subjob`, + ); + return; } + + logger.info(`Set job and subjob for ${this.id}`); + // If the status is failed or completed, remove the subjob from the printer's subJobs array + } } - } - - getStatus() { - return { - id: this.printerId, - name: this.printerName, - connected: this.online, - connectionId: this.connectionId, - host: this.config.host, - port: this.config.port, - }; - } - - // Event handlers - handleGcodeResponse(response) { - logger.info(`GCode response (${this.printerName}): ${response}`); - } - - handleStatusUpdate(status) { - // Process printer status updates - if (status.print_stats && status.print_stats.state) { - logger.info( - `Print state for ${this.printerName}: ${status.print_stats.state}`, - ); - this.state.type = status.print_stats.state; + if ( + subJob.state.type === "failed" || + subJob.state.type === "complete" + ) { + await this.removePrinterSubJob(subJob.id); } + } + } catch (error) { + logger.error( + `Failed to update job status in database (${this.name}):`, + error, + ); } + } - handleKlippyDisconnected() { - this.online = false; - logger.info(`Klippy disconnected (${this.printerName})`); - } + async updateCurrentJobAndSubJobState() { + try { + if (!this.currentJobId || !this.currentSubJobId) { + return; + } - handleKlippyReady() { - logger.info(`Klippy ready (${this.printerName})`); - this.getKlippyInfo(); - this.subscribeToStateUpdates(); - } + this.currentJobState = { type: "unknown", progress: 0 }; + this.currentSubJobState = { type: "unknown", progress: 0 }; - handleFileListChanged(fileInfo) { - logger.info( - `File list changed for ${this.printerName}:`, - fileInfo.action, + // Get the current job + const currentJob = await printJobModel + .findById(this.currentJobId) + .populate("subJobs"); + + const jobLength = currentJob.subJobs.length; + + let externalProgressSum = 0; + + var printing = 0; + var paused = 0; + var complete = 0; + var failed = 0; + + for (const subJob of currentJob.subJobs.filter( + (subJob) => subJob.id != this.currentSubJobId, + )) { + if (subJob.state.type === "printing") { + externalProgressSum = externalProgressSum + subJob.state.progress; + printing = printing + 1; + } + if (subJob.state.type === "paused") { + paused = paused + 1; + } + if (subJob.state.type === "complete") { + complete = complete + 1; + } + if (subJob.state.type === "failed") { + failed = failed + 1; + } + } + + if (this.state.type === "printing") { + this.currentSubJobState.type = "printing"; + this.currentSubJobState.progress = this.state.progress; + printing = printing + 1; + } else if (this.state.type === "paused") { + this.currentSubJobState.type = "paused"; + paused = paused + 1; + } else if (this.state.type === "complete") { + this.currentSubJobState.type = "complete"; + complete = complete + 1; + } else { + this.currentSubJobState.type = "failed"; + failed = failed + 1; + } + + if (paused > 0) { + this.currentJobState.type = "paused"; + } else if (printing > 0) { + this.currentJobState.type = "printing"; + } else if (failed > 0) { + this.currentJobState.type = "failed"; + } else if (complete == jobLength) { + this.currentJobState.type = "complete"; + } else { + this.currentJobState.type = "queued"; + } + + if (this.state.type === "printing") { + this.currentJobState.progress = + (externalProgressSum + complete + (this.state.progress || 0)) / + jobLength; + } else { + this.currentJobState.progress = externalProgressSum / jobLength; + } + + currentJob.state = this.currentJobState; + + await currentJob.save(); + + this.socketManager.broadcast("notify_job_update", { + id: this.currentJobId, + state: this.currentJobState, + }); + + logger.info( + `Updated job status to ${this.currentJobState.type} (Progress: ${this.currentJobState.progress}) for ${this.currentSubJobId}`, + ); + + const updatedSubJob = await printSubJobModel.findByIdAndUpdate( + this.currentSubJobId, + { state: this.currentSubJobState }, + { new: true }, + ); + + if (!updatedSubJob) { + logger.error( + `Sub job with ID ${this.currentSubJobId} not found when updating status`, ); + return; + } + + // Notify clients of the update + this.socketManager.broadcast("notify_subjob_update", { + id: this.currentSubJobId, + state: this.currentSubJobState, + }); + + logger.info( + `Updated sub job status to ${this.currentSubJobState.type} (Progress: ${this.currentSubJobState.progress}) for ${this.currentSubJobId}`, + ); + } catch (error) { + logger.error(`Error updating current job state:`, error); + } + } + + async handleKlippyDisconnected() { + logger.info(`Klippy disconnected (${this.name})`); + this.state = { type: "offline" }; + this.isOnline = false; + this.isPrinting = false; + this.isError = false; + this.isReady = false; + await this.updatePrinterState(); + await this.updatePrinterSubJobs(); + } + + async handleKlippyReady() { + logger.info(`Klippy ready (${this.name})`); + await this.initialize(); + } + + handleFileListChanged(fileInfo) { + logger.debug(`File list changed for ${this.name}:`, fileInfo); + } + + handleMetadataUpdate(metadata) { + logger.info(`Metadata updated for ${this.name}:`, metadata.filename); + } + + handlePowerChanged(powerStatus) { + logger.info(`Power status changed for ${this.name}:`, powerStatus); + } + + async uploadGcodeFile(fileBlob, fileName) { + logger.info(`Uploading G-code file ${fileName} to ${this.name}`); + if (!this.online) { + logger.error( + `Cannot upload file: Not connected to Moonraker (${this.name})`, + ); + return false; } - handleMetadataUpdate(metadata) { - logger.info( - `Metadata updated for ${this.printerName}:`, - metadata.filename, - ); - } + try { + const { protocol, host, port } = this.config; + const httpUrl = `${protocol === "ws" ? "http" : "https"}://${host}:${port}/server/files/upload`; - handlePowerChanged(powerStatus) { - logger.info( - `Power status changed for ${this.printerName}:`, - powerStatus, - ); + // Convert Blob to Buffer + const arrayBuffer = await fileBlob.arrayBuffer(); + const buffer = Buffer.from(arrayBuffer); + + const formData = new FormData(); + formData.append("file", buffer, { + filename: fileName, + contentType: fileBlob.type || "text/plain", + }); + + const headers = { + ...formData.getHeaders(), + }; + + if (this.config.apiKey) { + headers["X-Api-Key"] = this.config.apiKey; + } + + const response = await axios.post(httpUrl, formData, { + headers, + onUploadProgress: (progressEvent) => { + const percentCompleted = Math.round( + (progressEvent.loaded * 100) / progressEvent.total, + ); + logger.debug( + `Uploading file to ${this.name}: ` + + fileName + + " " + + percentCompleted + + "%", + ); + this.socketManager.broadcast("notify_printer_update", { + printerId: this.id, + state: { type: "deploying", progress: percentCompleted }, + }); + }, + }); + + if (!response.result) { + this.socketManager.broadcast("notify_printer_update", { + printerId: this.id, + state: { type: "error" }, + }); + throw new Error("Failed to upload G-code file to printer"); + } + + logger.info(`Successfully uploaded file ${fileName} to ${this.name}`); + return true; + } catch (error) { + logger.error(`Error uploading file to ${this.name}:`, error); + return false; } + } + + async deploySubJobs(jobId) { + try { + const printSubJobs = await printSubJobModel + .find({ printJob: jobId }) + .sort({ number: 1 }); + + // Deploy sub jobs + for (const subJob of printSubJobs) { + logger.info( + "Deploying sub job:", + subJob.subJobId, + "to printer:", + this.id, + "with files:", + `${jobId.id}.gcode`, + ); + + const result = await this.sendPrinterCommand({ + method: "server.job_queue.post_job", + params: { + filenames: [`${jobId}.gcode`], + reset: false, + }, + }); + + if (!result) { + throw new Error("Failed to deploy sub job to printer"); + } + + // Update the PrintSubJob model + const updatedSubJob = await printSubJobModel.findByIdAndUpdate( + subJob.id, + { + subJobId: result.queued_jobs[result.queued_jobs.length - 1].job_id, + state: { type: "queued" }, + updatedAt: new Date(), + }, + { new: true }, + ); + + if (!updatedSubJob) { + throw new Error(`Failed to update sub job ${subJob.id}`); + } + + // Update the printer's subJobs array + const printer = await printerModel.findById(this.id); + if (printer) { + printer.subJobs.push(updatedSubJob._id); + await printer.save(); + } + + this.socketManager.broadcast("notify_subjob_update", { + id: subJob.id, + subJobId: result.queued_jobs[result.queued_jobs.length - 1].job_id, + state: { type: "queued" }, + }); + + logger.info("Sub job deployed to printer:", this.id); + } + } catch (error) { + logger.error(`Error deploying sub jobs:`, error); + } + } + + async cancelSubJob(subJobId) { + try { + this.subJobCancelId = subJobId; + const result = await this.sendPrinterCommand({ + method: "server.job_queue.delete_job", + params: { + job_ids: [subJobId], + }, + }); + + if (!result) { + throw new Error("Failed to cancel sub job"); + } + + await this.getQueuedJobsInfo(); + this.subJobCancelId = null; + + logger.info("Sub job canceled:", subJobId); + } catch (error) { + logger.error(`Error canceling sub job ${subJobId}:`, error); + } + } } diff --git a/src/printer/printermanager.js b/src/printer/printermanager.js index 40164cb..3d36cdf 100644 --- a/src/printer/printermanager.js +++ b/src/printer/printermanager.js @@ -1,9 +1,10 @@ // printer-manager.js - Manages multiple printer connections through MongoDB import { PrinterClient } from "./printerclient.js"; -import { saveConfig, loadConfig } from "../config.js"; import { printerModel } from "../database/printer.schema.js"; // Import your printer model +import { printSubJobModel } from "../database/printsubjob.schema.js"; // Import your subjob model +import { loadConfig } from "../config.js"; import log4js from "log4js"; - +import { printJobModel } from "../database/printjob.schema.js"; // Load configuration const config = loadConfig(); @@ -11,80 +12,175 @@ const logger = log4js.getLogger("Printer Manager"); logger.level = config.server.logLevel; export class PrinterManager { - constructor(config) { - this.config = config; - this.printerClientConnections = new Map(); - this.statusCheckInterval = null; - this.initializePrinterConnections(); + constructor(config) { + this.config = config; + this.printerClientConnections = new Map(); + this.statusCheckInterval = null; + this.initializePrinterConnections(); + } + + async initializePrinterConnections() { + try { + // Get all printers from the database + const printers = await printerModel.find({}); + + for (const printer of printers) { + await this.connectToPrinter(printer); + } + + logger.info(`Initialized connections to ${printers.length} printers`); + } catch (error) { + logger.error(`Error initializing printer connections: ${error.message}`); + } + } + + async connectToPrinter(printer) { + // Create and store the connection + const printerClientConnection = new PrinterClient( + printer, + this, + this.socketManager, + ); + this.printerClientConnections.set(printer.id, printerClientConnection); + + // Connect to the printer + await printerClientConnection.connect(); + + logger.info(`Connected to printer: ${printer.printerName} (${printer.id})`); + return true; + } + + getPrinterClient(printerId) { + return this.printerClientConnections.get(printerId); + } + + getAllPrinterClients() { + return this.printerClientConnections.values(); + } + + // Process command for a specific printer + async processPrinterCommand(command) { + const printerId = command.params.printerId; + const printerClientConnection = + this.printerClientConnections.get(printerId); + if (!printerClientConnection) { + return { + success: false, + error: `Printer with ID ${printerId} not found`, + }; } - async initializePrinterConnections() { - try { - // Get all printers from the database - const printers = await printerModel.find({}); + return await printerClientConnection.sendPrinterCommand(command); + } - for (const printer of printers) { - await this.connectToPrinter(printer); - } - - logger.info( - `Initialized connections to ${printers.length} printers`, - ); - - // Set up periodic status checking - this.startStatusChecking(); - } catch (error) { - logger.error( - `Error initializing printer connections: ${error.message}`, - ); - } + async updateSubscription(printerId, socketId, mergedSubscription) { + const printerClientConnection = + this.printerClientConnections.get(printerId); + if (!printerClientConnection) { + return { + success: false, + error: `Printer with ID ${printerId} not found`, + }; } + printerClientConnection.subscriptions.set(socketId, mergedSubscription); + return await printerClientConnection.updateSubscriptions(); + } - async connectToPrinter(printer) { - // Create and store the connection - const printerClientConnection = new PrinterClient(printer); - this.moonrakerConnections.set(printer.id, printerClientConnection); + // Close all printer connections + closeAllConnections() { + for (const printerClientConnection of this.printerClientConnections.values()) { + if (printerClientConnection.socket) { + printerClientConnection.socket.close(); + } + } + } - // Connect to the printer - printerClientConnection.connect(); + setSocketManager(socketManager) { + this.socketManager = socketManager; + } - logger.info( - `Connected to printer: ${printer.printerName} (${printer.id})`, + async downloadGCODE(gcodeFileId) { + logger.info(`Downloading G-code file ${gcodeFileId}`); + try { + // Download the G-code file with authentication + const url = `http://localhost:8080/gcodefiles/${gcodeFileId}/content/`; + const response = await fetch(url, { + headers: { + Authorization: `Bearer ${this.socketManager.socketClientConnections.values().next().value.socket.handshake.auth.token}`, + }, + }); + if (!response.ok) { + throw new Error( + `Failed to download G-code file: ${response.statusText}`, ); - return true; + } + const gcodeContent = await response.blob(); + + logger.info(`G-code file ${gcodeFileId} downloaded!`); + + return gcodeContent; + } catch (error) { + logger.error("Error in deployGcodeToAllPrinters:", error); + return { + success: false, + error: error.message, + }; + } + } + + async deployPrintJob(printJobId) { + logger.info(`Deploying print job ${printJobId}`); + const printJob = await printJobModel + .findById(printJobId) + .populate("printers") + .populate("subJobs"); + if (!printJob) { + throw new Error("Print job not found"); } - getPrinterClient(printerId) { - return this.printerClientConnections.get(printerId); + if (!printJob.gcodeFile) { + throw new Error("No G-code file associated with this print job"); } - getAllPrinterClients() { - return this.printerClientConnections.values(); + const gcodeFileId = printJob.gcodeFile.toString(); + const fileName = `${printJob.id}.gcode`; + + const gcodeFile = await this.downloadGCODE(gcodeFileId); + + for (const printer of printJob.printers) { + const printerClient = this.getPrinterClient(printer.id); + if (!printerClient) { + throw new Error(`Printer with ID ${printer.id} not found`); + return false; + } + await printerClient.uploadGcodeFile(gcodeFile, fileName); + await printerClient.deploySubJobs(printJob.id); } - // Process command for a specific printer - processCommand(printerId, command) { - const printerClientConnection = - this.printerClientConnections.get(printerId); - if (!printerClientConnection) { - return { - success: false, - error: `Printer with ID ${printerId} not found`, - }; - } - const success = connection.sendCommand(message); - return { - success, - error: success ? null : "Printer not connected", - }; - } + printJob.state = { type: "queued" }; + printJob.updatedAt = new Date(); + await printJob.save(); - // Close all printer connections - closeAllConnections() { - for (const printerClientConnection of this.printerClientConnections.values()) { - if (printerClientConnection.socket) { - printerClientConnection.socket.close(); - } - } + this.socketManager.broadcast("notify_job_update", { + id: printJob.id, + state: { type: "queued" }, + }); + + return true; + } + + async cancelSubJob(subJobId) { + logger.info(`Canceling sub job ${subJobId}`); + const subJob = await printSubJobModel.findById(subJobId); + if (!subJob) { + throw new Error("Sub job not found"); } + const printerClient = this.getPrinterClient(subJob.printer.toString()); + if (!printerClient) { + throw new Error(`Printer with ID ${printer.id} not found`); + return false; + } + await printerClient.cancelSubJob(subJob.subJobId); + return true; + } } diff --git a/src/socket/socketclient.js b/src/socket/socketclient.js index c949fdf..2486b59 100644 --- a/src/socket/socketclient.js +++ b/src/socket/socketclient.js @@ -1,63 +1,460 @@ -import { WebSocket } from "ws"; -import { JsonRPC } from "./jsonrpc.js"; import log4js from "log4js"; +import { printJobModel } from "../database/printjob.schema.js"; +import { printSubJobModel } from "../database/printsubjob.schema.js"; +import { WebSocketScanner } from "../network/websocketScanner.js"; // Load configuration -import { loadConfig } from "./config.js"; +import { loadConfig } from "../config.js"; +import { printerModel } from "../database/printer.schema.js"; const config = loadConfig(); -const logger = log4js.getLogger("Moonraker"); +const logger = log4js.getLogger("Socket Client"); logger.level = config.server.logLevel; export class SocketClient { - constructor(socket, printerManager) { - this.socket = socket; - this.user = socket?.user; - this.printerManager = printerManager; + constructor(socket, socketManager, printerManager) { + this.socket = socket; + this.user = socket?.user; + this.socketManager = socketManager; + this.printerManager = printerManager; + this.activeSubscriptions = new Map(); + this.scanner = new WebSocketScanner({ maxThreads: 50 }); - this.socket.on("bridge.list_printers", (data) => { - - }); + this.socket.on("bridge.list_printers", (data) => {}); - this.socket.on("bridge.add_printer", (data, callback) => { - - }); + this.socket.on("bridge.add_printer", (data, callback) => {}); - this.socket.on("bridge.remove_printer", (data, callback) => { - - }); + this.socket.on("bridge.remove_printer", (data, callback) => {}); - this.socket.on("bridge.update_printer", (data, callback) => { - - }); + this.socket.on("bridge.update_printer", (data, callback) => {}); - // Handle printer commands - this.socket.on("printer_command", (data, callback) => { - try { - if (data && data.params.printerId) { - const printerId = data.params.printerId; - // Remove the printer_id before forwarding - const cleanCommand = { ...data }; - delete cleanCommand.params.printerId; + this.socket.on("bridge.scan_network.start", (data, callback) => { + if (this.scanner.scanning == false) { + try { + this.scanner = new WebSocketScanner({ maxThreads: 50 }); + // Listen for found services + this.scanner.on("serviceFound", (data) => { + logger.info( + `Found websocket service at ${data.hostname} (${data.ip})`, + ); + this.socket.emit("notify_scan_network_found", data); + }); - const result = printerManager.processCommand( - printerId, - cleanCommand, - ); + // Listen for scan progress + this.scanner.on("scanProgress", ({ currentIP, progress }) => { + logger.info( + `Scanning ${currentIP} (${progress.toFixed(2)}% complete)`, + ); + this.socket.emit("notify_scan_network_progress", { + currentIP: currentIP, + progress: progress, + }); + }); - } else { - logger.error("Missing Printer ID"); - } - } catch (e) { - logger.error("Error processing client command:", e); - + // Start scanning on port + logger.info( + "Scanning network for websocket services on port:", + data?.port || 7125, + "using protocol:", + data?.protocol || "ws", + ); + this.scanner + .scanNetwork(data?.port || 7125, data?.protocol || "ws") + .then((foundServices) => { + logger.info("Scan complete. Found services:", foundServices); + this.socket.emit("notify_scan_network_complete", foundServices); + }) + .catch((error) => { + logger.error("Scan error:", error); + this.socket.emit("notify_scan_network_complete", false); + }); + } catch (error) { + logger.error("Scan error:", error); + this.socket.emit("notify_scan_network_complete", false); + } + } + }); + + this.socket.on("bridge.scan_network.stop", (callback) => { + if (this.scanner.scanning == true) { + logger.info("Stopping network scan"); + this.scanner.removeAllListeners("serviceFound"); + this.scanner.removeAllListeners("scanProgress"); + this.scanner.removeAllListeners("scanComplete"); + this.scanner.stopScan(); + callback(true); + } else { + logger.info("Scan not in progress"); + callback(false); + } + }); + + // Handle printer object subscriptions + this.socket.on("printer.objects.subscribe", async (data, callback) => { + logger.debug("Received printer.objects.subscribe event:", data); + try { + if (data && data.printerId) { + const printerId = data.printerId; + + // Get existing subscription or create new one + const existingSubscription = + this.activeSubscriptions.get(printerId) || {}; + + // Merge the new subscription data with existing data + const mergedSubscription = { + ...existingSubscription.objects, + ...data.objects, + }; + + this.activeSubscriptions.set(printerId, mergedSubscription); + + logger.trace("Merged subscription:", mergedSubscription); + const result = await this.printerManager.updateSubscription( + printerId, + socket.id, + mergedSubscription, + ); + + if (callback) { + callback(result); + } + } else { + logger.error("Missing Printer ID in subscription request"); + if (callback) { + callback({ error: "Missing Printer ID" }); + } + } + } catch (e) { + logger.error("Error processing subscription request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("printer.objects.unsubscribe", async (data, callback) => { + logger.debug("Received printer.objects.unsubscribe event:", data); + try { + if (data && data.printerId) { + const printerId = data.printerId; + const existingSubscription = this.activeSubscriptions.get(printerId); + + if (existingSubscription) { + // Create a new objects object without the unsubscribed objects + const remainingObjects = { ...existingSubscription.objects }; + if (data.objects) { + for (const key of Object.keys(data.objects)) { + delete remainingObjects[key]; + } } + + // If there are no remaining objects, remove the entire subscription + if (Object.keys(remainingObjects).length === 0) { + this.activeSubscriptions.delete(printerId); + + // Send subscribe command with updated subscription + const result = await this.printerManager.updateSubscription( + printerId, + socket.id, + {}, + ); + if (callback) { + callback(result); + } + } else { + // Update the subscription with remaining objects + const updatedSubscription = { + printerId: printerId, + objects: remainingObjects, + }; + this.activeSubscriptions.set(printerId, updatedSubscription); + + // Send subscribe command with updated subscription + const result = await this.printerManager.updateSubscription( + printerId, + socket.id, + updatedSubscription, + ); + + if (callback) { + callback(result); + } + } + } else { + logger.warn( + "No existing subscription found for printer:", + printerId, + ); + if (callback) { + callback({ success: true, message: "No subscription found" }); + } + } + } else { + logger.error("Missing Printer ID in unsubscribe request"); + if (callback) { + callback({ error: "Missing Printer ID" }); + } + } + } catch (e) { + logger.error("Error processing unsubscribe request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("printer.gcode.script", async (data, callback) => { + logger.debug("Received printer.gcode.script event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "printer.gcode.script", + params: data, }); - this.socket.on("disconnect", () => { - // Unsubscribe from all printers when client disconnects - printerManager.unsubscribeClientFromAll(socket); - logger.info("External client disconnected:", socket.user?.username); + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing gcode script request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("printer.objects.query", async (data, callback) => { + logger.debug("Received printer.objects.query event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "printer.objects.query", + params: data, }); - } + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing printer objects query request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("printer.emergency_stop", async (data, callback) => { + logger.debug("Received printer.gcode.script event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "printer.emergency_stop", + params: data, + }); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing gcode script request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("printer.firmware_restart", async (data, callback) => { + logger.debug("Received printer.firmware_restart event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "printer.firmware_restart", + params: data, + }); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing firmware restart request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("printer.restart", async (data, callback) => { + logger.debug("Received printer.restart event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "printer.restart", + params: data, + }); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing printer restart request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("server.job_queue.status", async (data, callback) => { + logger.debug("Received server.job_queue.status event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "server.job_queue.status", + params: data, + }); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing job queue status request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("server.job_queue.deploy", async (data, callback) => { + logger.debug("Received server.job_queue.deploy event:", data); + try { + if (!data || !data.printJobId) { + throw new Error("Missing required print job ID"); + } + // Deploy the print job to all printers + const result = await this.printerManager.deployPrintJob( + data.printJobId, + ); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing job queue deploy request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("printer.print.resume", async (data, callback) => { + logger.debug("Received printer.print.resume event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "printer.print.resume", + params: data, + }); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing print resume request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("server.job_queue.cancel", async (data, callback) => { + logger.debug("Received server.job_queue.cancel event:", data); + try { + if (!data || !data.subJobId) { + throw new Error("Missing required sub job ID"); + } + const result = await this.printerManager.cancelSubJob(data.subJobId); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing job queue delete job request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("printer.print.cancel", async (data, callback) => { + logger.debug("Received printer.print.cancel event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "printer.print.cancel", + params: data, + }); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing print cancel request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("printer.print.pause", async (data, callback) => { + logger.debug("Received printer.print.pause event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "printer.print.pause", + params: data, + }); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing print pause request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("server.job_queue.pause", async (data, callback) => { + logger.debug("Received server.job_queue.pause event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "server.job_queue.pause", + params: data, + }); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing job queue pause request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("server.job_queue.start", async (data, callback) => { + logger.debug("Received server.job_queue.start event:", data); + try { + const result = await this.printerManager.processPrinterCommand({ + method: "server.job_queue.start", + params: data, + }); + + if (callback) { + callback(result); + } + } catch (e) { + logger.error("Error processing job queue start request:", e); + if (callback) { + callback({ error: e.message }); + } + } + }); + + this.socket.on("disconnect", () => { + logger.info("External client disconnected:", socket.user?.username); + }); + } } diff --git a/src/socket/socketmanager.js b/src/socket/socketmanager.js index 07bad9d..0b67a71 100644 --- a/src/socket/socketmanager.js +++ b/src/socket/socketmanager.js @@ -1,19 +1,22 @@ // server.js - HTTP and Socket.IO server setup import { Server } from "socket.io"; import http from "http"; -import { createAuthMiddleware } from "./auth.js"; +import { createAuthMiddleware } from "../auth/auth.js"; import log4js from "log4js"; // Load configuration -import { loadConfig } from "./config.js"; +import { loadConfig } from "../config.js"; +import { SocketClient } from "./socketclient.js"; const config = loadConfig(); -const logger = log4js.getLogger("Server"); +const logger = log4js.getLogger("Socket Manager"); logger.level = config.server.logLevel; -export class SocketServer { +export class SocketManager { constructor(config, printerManager, auth) { this.socketClientConnections = new Map(); + this.printerManager = printerManager; + // Create HTTP server const server = http.createServer((req, res) => { res.writeHead(200, { "Content-Type": "text/plain" }); @@ -34,7 +37,7 @@ export class SocketServer { // Handle client connections io.on("connection", (socket) => { logger.info("External client connected:", socket.user?.username); - this.socketClientConnections.set(socket.id, socket); + this.addClient(socket); }); // Start the server @@ -44,164 +47,224 @@ export class SocketServer { ); }); - return { server, io }; + this.io = io; + this.server = server; } -} -export function setupServer(config, printerManager, auth) {} -// Command handlers -function handleSubscribe( - socket, - data, - clientSubscriptions, - printerManager, - callback, -) { - logger.info("Handling subscribe command..."); - const printerId = data.printerId; - if (printerManager.subscribeClient(printerId, socket)) { - clientSubscriptions.add(printerId); - logger.info(`Client subscribed to printer ${printerId}`); + addClient(socket) { + const client = new SocketClient(socket, this, this.printerManager); + this.socketClientConnections.set(socket.id, client); + logger.info("External client connected:", socket.user?.username); + // Handle disconnection + socket.on("disconnect", () => { + logger.info("External client disconnected:", socket.user?.username); + this.removeClient(socket.id); + }); + + } + + removeClient(socketClientId) { + const socketClient = this.socketClientConnections.get(socketClientId); + if (socketClient) { + this.socketClientConnections.delete(socketClientId); + logger.info("External client disconnected:", socketClient.socket.user?.username); + } + } + + getSocketClient(clientId) { + return this.socketClientConnections.get(clientId); + } + + getAllSocketClients() { + return Array.from(this.socketClientConnections.values()); + } + + broadcast(event, data, excludeClientId = null) { + for (const [clientId, socketClient] of this.socketClientConnections) { + if (excludeClientId !== clientId) { + socketClient.socket.emit(event, data); + } + } + } + + broadcastToSubscribers(printerId, command) { + const paramsObject = command.params; + for (const [clientId, socketClient] of this.socketClientConnections) { + + // Check if this client has subscribed to the printer + if (socketClient.activeSubscriptions.has(printerId)) { + + const subscription = socketClient.activeSubscriptions.get(printerId); + + // Filter the message based on the subscription filters + const filteredMessage = this.filterMessage(paramsObject, subscription); + if (filteredMessage) { + logger.trace(`Broadcasting message to client ${clientId}: ${JSON.stringify(filteredMessage)}`); + socketClient.socket.emit(command.method, { + printerId, + ...filteredMessage + }); + } + } + } + } + + + filterMessage(message, subscription) { + if (!message || !subscription) return null; + + const filtered = {}; + // Handle both subscription formats + const subscriptionObjects = subscription.objects || subscription; + + for (const [objectName, fields] of Object.entries(subscriptionObjects)) { + if (message[objectName]) { + if (fields === null) { + // If fields is null, include all fields + filtered[objectName] = message[objectName]; + } else if (Array.isArray(fields)) { + // If fields is an array, only include specified fields + filtered[objectName] = {}; + for (const field of fields) { + if (message[objectName][field] !== undefined) { + filtered[objectName][field] = message[objectName][field]; + } + } + // Only include the object if it has any filtered fields + if (Object.keys(filtered[objectName]).length === 0) { + delete filtered[objectName]; + } + } + } + } + return Object.keys(filtered).length > 0 ? filtered : null; + } + + handleListPrinters(socket, data, callback) { + logger.info("handleListPrinters called with data:", data); if (callback) { callback({ - success: true, - printer_id: printerId, + printers: this.printerManager.getAllPrinters(), }); } - } else { - logger.warn(`Client failed to subscribe to printer ${printerId}`); + } + + handleListPrintersSubscribe(socket, data, callback) { + logger.info("handleListPrintersSubscribe called with data:", data); if (callback) { callback({ - success: false, - error: { - code: -32001, - message: `Printer ${printerId} not found`, - }, + printers: this.printerManager.getAllPrinters(), }); } } -} -function handleUnsubscribe( - socket, - data, - clientSubscriptions, - printerManager, - callback, -) { - const printerId = data.printer_id; - if (printerManager.unsubscribeClient(printerId, socket)) { - clientSubscriptions.delete(printerId); - if (callback) - callback({ - success: true, - printer_id: printerId, - }); - } else { - if (callback) - callback({ - success: false, - error: { - code: -32001, - message: `Printer ${printerId} not found`, - }, - }); + handleAddPrinter(socket, data, callback) { + logger.info("handleAddPrinter called with data:", data); + if (this.printerManager.addPrinter(data.printer_config)) { + if (callback) { + callback({ + success: true, + printer: this.printerManager + .getPrinter(data.printer_config.id) + .getStatus(), + }); + } + this.broadcastPrinterList(socket); + } else { + if (callback) { + callback({ + success: false, + error: { + code: -32003, + message: `Failed to add printer with ID ${data.printer_config.id}`, + }, + }); + } + } } -} -function handleListPrinters(socket, data, printerManager, callback) { - if (callback) - callback({ - printers: printerManager.getAllPrinters(), - }); -} - -function handleListPrintersSubscribe(socket, data, printerManager, callback) { - if (callback) - callback({ - printers: printerManager.getAllPrinters(), - }); -} - -function handleAddPrinter(socket, data, printerManager, callback) { - if (printerManager.addPrinter(data.printer_config)) { - if (callback) - callback({ - success: true, - printer: printerManager - .getPrinter(data.printer_config.id) - .getStatus(), - }); - - // Notify all clients about the new printer - broadcastPrinterList(printerManager, socket); - } else { - if (callback) - callback({ - success: false, - error: { - code: -32003, - message: `Failed to add printer with ID ${data.printer_config.id}`, - }, - }); + handleRemovePrinter(socket, data, callback) { + logger.info("handleRemovePrinter called with data:", data); + if (this.printerManager.removePrinter(data.printer_id)) { + if (callback) { + callback({ + success: true, + printer_id: data.printer_id, + }); + } + this.broadcastPrinterList(socket); + } else { + if (callback) { + callback({ + success: false, + error: { + code: -32001, + message: `Printer ${data.printer_id} not found`, + }, + }); + } + } } -} -function handleRemovePrinter(socket, data, printerManager, callback) { - if (printerManager.removePrinter(data.printer_id)) { - if (callback) - callback({ - success: true, - printer_id: data.printer_id, - }); - - // Notify all clients about the updated printer list - broadcastPrinterList(printerManager, socket); - } else { - if (callback) - callback({ - success: false, - error: { - code: -32001, - message: `Printer ${data.printer_id} not found`, - }, - }); + handleUpdatePrinter(socket, data, callback) { + logger.info("handleUpdatePrinter called with data:", data); + if (this.printerManager.updatePrinter(data.printer_config)) { + if (callback) { + callback({ + success: true, + printer: this.printerManager + .getPrinter(data.printer_config.id) + .getStatus(), + }); + } + this.broadcastPrinterList(socket); + } else { + if (callback) { + callback({ + success: false, + error: { + code: -32001, + message: `Failed to update printer ${data.printer_config.id}`, + }, + }); + } + } } -} -function handleUpdatePrinter(socket, data, printerManager, callback) { - if (printerManager.updatePrinter(data.printer_config)) { - if (callback) - callback({ - success: true, - printer: printerManager - .getPrinter(data.printer_config.id) - .getStatus(), + + resubscribeAllClients(printerId) { + logger.info(`Resubscribing to all active subscriptions for printer ${printerId}`); + + // Create a combined subscription object for the specified printer + const combinedSubscription = { + printerId: printerId, + objects: {} + }; + + // Combine all subscription objects for the specified printer + for (const [clientId, socketClient] of this.socketClientConnections) { + if (socketClient.activeSubscriptions.has(printerId)) { + const subscription = socketClient.activeSubscriptions.get(printerId); + + // Merge the objects from this subscription into the combined subscription + Object.assign(combinedSubscription.objects, subscription.objects); + logger.debug(`Adding subscription objects from client ${clientId} for printer ${printerId}`); + } + } + + // Only send the subscription command if there are any objects to subscribe to + if (Object.keys(combinedSubscription.objects).length > 0) { + logger.debug(`Sending combined subscription and query for printer ${printerId}:`, combinedSubscription); + this.printerManager.processPrinterCommand({ + method: "printer.objects.subscribe", + params: combinedSubscription }); - - // Notify all clients about the updated printer - broadcastPrinterList(printerManager, socket); - } else { - if (callback) - callback({ - success: false, - error: { - code: -32001, - message: `Failed to update printer ${data.printer_config.id}`, - }, + this.printerManager.processPrinterCommand({ + method: "printer.objects.query", + params: combinedSubscription }); - } -} - -function broadcastPrinterList(printerManager, excludeSocket = null) { - const printerList = printerManager.getAllPrinters(); - const message = { - printers: printerList, - }; - - for (const client of printerManager.getAllClients()) { - if (client !== excludeSocket && client.connected) { - client.emit("notify_printers_updated", message); + } else { + logger.debug(`No active subscriptions found for printer ${printerId}`); } } }