Updated to update the DB and handle print commands.

This commit is contained in:
Tom Butcher 2025-05-09 22:20:37 +01:00
parent 92906e940d
commit 81196897fa
13 changed files with 2019 additions and 624 deletions

View File

@ -1,6 +1,6 @@
{
"server": {
"port": 8080,
"port": 8081,
"logLevel": "debug"
},
"auth": {
@ -9,7 +9,7 @@
"url": "https://auth.tombutcher.work",
"realm": "master",
"clientId": "farmcontrol-client",
"clientSecret": ""
"clientSecret": "GPyh59xctRX83yfKWb83ShK6VEwHIvLF"
},
"requiredRoles": []
},

View File

@ -7,7 +7,7 @@ import { loadConfig } from "../config.js";
const config = loadConfig();
const logger = log4js.getLogger("MongoDB");
const logger = log4js.getLogger("Auth");
logger.level = config.server.logLevel;
export class KeycloakAuth {

View File

@ -7,7 +7,7 @@ const moonrakerSchema = new Schema(
host: { type: String, required: true },
port: { type: Number, required: true },
protocol: { type: String, required: true },
apiKey: { type: String, default: null },
apiKey: { type: String, default: null, required: false },
},
{ _id: false },
);
@ -15,20 +15,24 @@ const moonrakerSchema = new Schema(
// Define the main printer schema
const printerSchema = new Schema(
{
printerId: { type: String, required: true, unique: true },
printerName: { type: String, required: true },
online: { type: Boolean, required: true, default: false },
state: {
type: { type: String, required: true, default: "Offline" },
percent: { type: Number, required: false },
progress: { required: false, type: Number, default: 0 },
},
connectedAt: { type: Date, default: null },
loadedFillament: {
loadedFilament: {
type: Schema.Types.ObjectId,
ref: "Fillament",
ref: "Filament",
default: null,
},
moonraker: { type: moonrakerSchema, required: true },
tags: [{ type: String }],
firmware: { type: String },
currentJob: { type: Schema.Types.ObjectId, ref: "PrintJob" },
currentSubJob: { type: Schema.Types.ObjectId, ref: "PrintSubJob" },
subJobs: [{ type: Schema.Types.ObjectId, ref: "PrintSubJob" }],
},
{ timestamps: true },
);

View File

@ -0,0 +1,35 @@
import mongoose from "mongoose";
const { Schema } = mongoose;
const printJobSchema = new mongoose.Schema({
state: {
type: { required: true, type: String },
progress: { required: false, type: Number, default: 0 },
},
printers: [{ type: Schema.Types.ObjectId, ref: "Printer", required: false }],
createdAt: { required: true, type: Date },
updatedAt: { required: true, type: Date },
startedAt: { required: true, type: Date },
gcodeFile: {
type: Schema.Types.ObjectId,
ref: "GCodeFile",
required: false,
},
quantity: {
type: Number,
required: true,
default: 1,
min: 1,
},
subJobs: [
{type: Schema.Types.ObjectId, ref: "PrintSubJob", required: false}
],
});
printJobSchema.virtual("id").get(function () {
return this._id.toHexString();
});
printJobSchema.set("toJSON", { virtuals: true });
export const printJobModel = mongoose.model("PrintJob", printJobSchema);

View File

@ -0,0 +1,48 @@
import mongoose from "mongoose";
const { Schema } = mongoose;
const printSubJobSchema = new mongoose.Schema({
printer: {
type: Schema.Types.ObjectId,
ref: "Printer",
required: true
},
printJob: {
type: Schema.Types.ObjectId,
ref: "PrintJob",
required: true
},
subJobId: {
type: String,
required: true
},
gcodeFile: {
type: Schema.Types.ObjectId,
ref: "GCodeFile",
required: true,
},
state: {
type: { required: true, type: String },
progress: { required: false, type: Number, default: 0 },
},
number: {
type: Number,
required: true
},
createdAt: {
type: Date,
default: Date.now
},
updatedAt: {
type: Date,
default: Date.now
}
});
printSubJobSchema.virtual("id").get(function () {
return this._id.toHexString();
});
printSubJobSchema.set("toJSON", { virtuals: true });
export const printSubJobModel = mongoose.model("PrintSubJob", printSubJobSchema);

View File

@ -2,16 +2,19 @@ import { loadConfig } from "./config.js";
import { dbConnect } from "./database/mongo.js";
import { PrinterManager } from "./printer/printermanager.js";
import { SocketManager } from "./socket/socketmanager.js";
import { KeycloakAuth } from "./auth/auth.js";
import express from "express";
import log4js from "log4js";
// Load configuration
const config = loadConfig();
const logger = log4js.getLogger("FarmControl Server");
logger.level = config.server.logLevel;
// Create Express app
const app = express();
// Connect to database
dbConnect();
@ -20,7 +23,13 @@ const keycloakAuth = new KeycloakAuth(config);
// Create printer manager
const printerManager = new PrinterManager(config);
const socketManager = new SocketManager(config, printerManager);
const socketManager = new SocketManager(config, printerManager, keycloakAuth);
printerManager.setSocketManager(socketManager);
// Start Express server
app.listen(config.server.port, () => {
logger.info(`Server listening on port ${config.server.port}`);
});
process.on("SIGINT", () => {
logger.info("Shutting down...");

View File

@ -0,0 +1,185 @@
import { EventEmitter } from 'events';
import os from 'os';
import { Worker } from 'worker_threads';
import path from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
export class WebSocketScanner extends EventEmitter {
constructor(options = {}) {
super();
this.scanning = false;
this.workers = [];
// Default to number of CPU cores, but allow override
this.maxThreads = options.maxThreads || os.cpus().length;
this.totalIPs = 0;
this.scannedIPs = 0;
}
/**
* Scans the local network for websocket services on a specified port
* @param {number} port - Port number to scan
* @returns {Promise<Array>} Array of IP addresses where websocket service was found
*/
async scanNetwork(port, protocol) {
// Clean up any existing workers before starting a new scan
this.cleanupWorkers();
console.log("Cleaned up workers");
if (this.scanning) {
throw new Error('Scan already in progress');
}
this.scanning = true;
this.scannedIPs = 0;
const foundServices = [];
// Get local network range
const { startIP, endIP } = this.getLocalNetworkRange();
const start = this.ipToNumber(startIP);
const end = this.ipToNumber(endIP);
// Calculate IP ranges for each worker
this.totalIPs = end - start + 1;
const ipsPerWorker = Math.ceil(this.totalIPs / this.maxThreads);
const workerPromises = [];
for (let i = 0; i < this.maxThreads; i++) {
const workerStart = start + (i * ipsPerWorker);
const workerEnd = Math.min(workerStart + ipsPerWorker - 1, end);
if (workerStart > end) break;
const workerStartIP = this.numberToIP(workerStart);
const workerEndIP = this.numberToIP(workerEnd);
const worker = new Worker(path.join(__dirname, 'websocketScannerWorker.js'));
this.workers.push(worker);
console.log("Created worker", i);
const workerPromise = new Promise((resolve) => {
worker.on('message', (message) => {
switch (message.type) {
case 'serviceFound':
foundServices.push({ ip: message.ip, hostname: message.hostname });
this.emit('serviceFound', { ip: message.ip, hostname: message.hostname });
break;
case 'scanProgress':
this.scannedIPs += message.increment;
const totalProgress = (this.scannedIPs / this.totalIPs) * 100;
this.emit('scanProgress', {
currentIP: message.currentIP,
progress: totalProgress
});
break;
case 'scanComplete':
resolve(message.results);
break;
}
});
});
worker.postMessage({
type: 'scan',
startIP: workerStartIP,
endIP: workerEndIP,
port,
protocol
});
workerPromises.push(workerPromise);
}
await Promise.all(workerPromises);
this.cleanupWorkers();
this.scanning = false;
return foundServices;
}
cleanupWorkers() {
this.workers.forEach(worker => worker.terminate());
this.workers = [];
}
/**
* Gets the local network IP range
* @returns {Object} Object containing startIP and endIP
*/
getLocalNetworkRange() {
const interfaces = os.networkInterfaces();
let localIP = null;
let subnetMask = null;
// Find the first non-internal IPv4 address
for (const name of Object.keys(interfaces)) {
for (const iface of interfaces[name]) {
if (iface.family === 'IPv4' && !iface.internal) {
localIP = iface.address;
subnetMask = iface.netmask;
break;
}
}
if (localIP) break;
}
if (!localIP) {
throw new Error('Could not determine local network IP address');
}
// Convert IP and subnet mask to numbers
const ipNum = this.ipToNumber(localIP);
const maskNum = this.ipToNumber(subnetMask);
// Calculate network address
const networkNum = ipNum & maskNum;
// Calculate broadcast address
const broadcastNum = networkNum | (~maskNum >>> 0);
// Start IP is network address + 1
const startIP = this.numberToIP(networkNum + 1);
// End IP is broadcast address - 1
const endIP = this.numberToIP(broadcastNum - 1);
return { startIP, endIP };
}
/**
* Converts an IP address to a number
* @param {string} ip - IP address to convert
* @returns {number} Numeric representation of IP
*/
ipToNumber(ip) {
return ip.split('.')
.reduce((acc, octet) => (acc << 8) + parseInt(octet), 0) >>> 0;
}
/**
* Converts a number to an IP address
* @param {number} num - Number to convert
* @returns {string} IP address
*/
numberToIP(num) {
return [
(num >>> 24) & 255,
(num >>> 16) & 255,
(num >>> 8) & 255,
num & 255
].join('.');
}
/**
* Stops the current scan
*/
stopScan() {
this.scanning = false;
this.cleanupWorkers();
}
}
// To stop scanning at any time:
// scanner.stopScan();

View File

@ -0,0 +1,110 @@
import { parentPort, workerData } from 'worker_threads';
import WebSocket from 'ws';
import { loadConfig } from "../config.js";
import log4js from "log4js";
import dns from 'dns';
// Load configuration
const config = loadConfig();
class WebSocketScannerWorker {
constructor() {
this.randomId = Math.floor(Math.random() * 1000).toString().padStart(3, '0');
this.logger = log4js.getLogger(`WS Scanner #${this.randomId}`);
this.logger.level = config.server.logLevel;
this.timeout = 2000; // 2 second timeout for each connection attempt
}
async scanRange(startIP, endIP, port, protocol) {
const start = this.ipToNumber(startIP);
const end = this.ipToNumber(endIP);
const foundServices = [];
this.logger.info(`Scanning ${startIP} - ${endIP} on port: ${port} using: ${protocol}`);
for (let ip = start; ip <= end; ip++) {
const currentIP = this.numberToIP(ip);
const url = `${protocol}://${currentIP}:${port}/websocket`;
try {
this.logger.debug(`Checking ${currentIP} for websocket service on port ${port}`);
const isOpen = await this.checkWebSocket(url);
if (isOpen) {
const hostname = await this.resolveHostname(currentIP);
foundServices.push({ ip: currentIP, hostname });
this.logger.info(`WebSocket connection successful for ${currentIP}${hostname ? ` (${hostname})` : ''}`);
parentPort.postMessage({ type: 'serviceFound', ip: currentIP, hostname });
} else {
this.logger.debug(`WebSocket connection failed for ${currentIP}`);
}
} catch (error) {
// Connection failed, continue scanning
}
parentPort.postMessage({
type: 'scanProgress',
currentIP,
increment: 1
});
}
return foundServices;
}
async resolveHostname(ip) {
try {
const hostnames = await dns.promises.reverse(ip);
return hostnames[0] || null;
} catch (error) {
// Only log errors that aren't ENOTFOUND (which is expected for many IPs)
if (error.code !== 'ENOTFOUND') {
this.logger.warn(`Unexpected error resolving hostname for ${ip}: ${error.message}`);
}
return null;
}
}
checkWebSocket(url) {
return new Promise((resolve) => {
const ws = new WebSocket(url);
let timeout = setTimeout(() => {
ws.terminate();
resolve(false);
}, this.timeout);
ws.on('open', () => {
clearTimeout(timeout);
ws.close();
resolve(true);
});
ws.on('error', () => {
clearTimeout(timeout);
resolve(false);
});
});
}
ipToNumber(ip) {
return ip.split('.')
.reduce((acc, octet) => (acc << 8) + parseInt(octet), 0) >>> 0;
}
numberToIP(num) {
return [
(num >>> 24) & 255,
(num >>> 16) & 255,
(num >>> 8) & 255,
num & 255
].join('.');
}
}
// Handle messages from the main thread
parentPort.on('message', async (data) => {
if (data.type === 'scan') {
const scanner = new WebSocketScannerWorker();
const results = await scanner.scanRange(data.startIP, data.endIP, data.port, data.protocol);
parentPort.postMessage({ type: 'scanComplete', results });
}
});

View File

@ -1,50 +1,66 @@
// jsonrpc.js - Implementation of JSON-RPC 2.0 protocol for Moonraker communication
import { loadConfig } from "../config.js";
import log4js from "log4js";
// Load configuration
const config = loadConfig();
const logger = log4js.getLogger("JSON RPC");
logger.level = config.server.logLevel;
export class JsonRPC {
constructor() {
this.id_counter = 0;
this.idCounter = 0;
this.methods = {};
this.pending_requests = {};
this.pendingRequests = {};
}
// Generate a unique ID for RPC requests
generate_id() {
return this.id_counter++;
generateId() {
return this.idCounter++;
}
// Register a method to handle incoming notifications/responses
register_method(method_name, callback) {
this.methods[method_name] = callback;
registerMethod(methodName, callback) {
this.methods[methodName] = callback;
}
// Process incoming messages
process_message(message) {
processMessage(message) {
if (message.method && this.methods[message.method]) {
// Handle method call or notification
this.methods[message.method](message.params);
logger.trace(`JSON-RPC notification: ${message.method}`);
} else if (message.id !== undefined) {
// Handle response to a previous request
const rpc_promise = this.pending_requests[message.id];
if (rpc_promise) {
const rpcPromise = this.pendingRequests[message.id];
if (rpcPromise) {
if (message.error) {
rpc_promise.reject(message.error);
logger.error(`Error in JSON-RPC response: ${message.error}`);
rpcPromise.reject(message.error);
} else {
rpc_promise.resolve(message.result);
logger.debug(`JSON-RPC response: OK`);
logger.trace("Result:", message.result);
rpcPromise.resolve(message.result);
}
delete this.pending_requests[message.id];
delete this.pendingRequests[message.id];
}
}
// If it's a notification without a registered method, ignore it
}
// Call a method without parameters
call_method(method) {
return this.call_method_with_kwargs(method, {});
callMethod(method) {
return this.callMethodWithKwargs(method, {});
}
// Call a method with parameters
call_method_with_kwargs(method, params) {
const id = this.generate_id();
callMethodWithKwargs(method, params) {
logger.debug(`Calling method: ${method}`);
logger.trace("Params:", params);
const id = this.generateId();
const request = {
jsonrpc: "2.0",
method: method,
@ -53,9 +69,7 @@ export class JsonRPC {
};
return new Promise((resolve, reject) => {
this.pending_requests[id] = { resolve, reject };
console.log(`Sending JSON-RPC request: ${JSON.stringify(request)}`);
this.pendingRequests[id] = { resolve, reject };
// The actual sending of the message is done by the WebSocket connection
// This just prepares the message and returns a promise
if (this.socket) {
@ -63,20 +77,20 @@ export class JsonRPC {
} else {
// If socket is not directly attached to this instance, the caller
// is responsible for sending the serialized request
this.last_request = JSON.stringify(request);
this.lastRequest = JSON.stringify(request);
}
});
}
// For external socket handling
get_last_request() {
const req = this.last_request;
this.last_request = null;
getLastRequest() {
const req = this.lastRequest;
this.lastRequest = null;
return req;
}
// Associate a WebSocket with this RPC instance for direct communication
set_socket(socket) {
setSocket(socket) {
this.socket = socket;
}
}

View File

@ -1,53 +1,97 @@
// moonraker-connection.js - Handles connection to a single Moonraker instance
import { JsonRPC } from "./jsonrpc.js";
import { WebSocket } from "ws";
import { loadConfig } from "../config.js";
import { printerModel } from "../database/printer.schema.js"; // Import your printer model
import { printJobModel } from "../database/printjob.schema.js";
import { printSubJobModel } from "../database/printsubjob.schema.js";
import log4js from "log4js";
import axios from "axios";
import FormData from "form-data";
// Load configuration
const config = loadConfig();
const logger = log4js.getLogger("Printer Client");
logger.level = config.server.logLevel;
export class PrinterClient {
constructor(printerConfig) {
this.id = printerConfig.id;
this.printerName = printerConfig.printerName;
this.state = printerConfig.state;
this.online = printerConfig.online;
this.config = printerConfig.moonraker;
constructor(printer, printerManager, socketManager) {
this.id = printer.id;
this.name = printer.printerName;
this.printerManager = printerManager;
this.socketManager = socketManager;
this.state = printer.state;
this.klippyState = null;
this.config = printer.moonraker;
this.version = printer.version;
this.jsonRpc = new JsonRPC();
this.socket = null;
this.connectionId = null;
this.clients = new Set();
this.currentJobId = printer.currentJob;
this.currentJobState = { type: "unknown", progress: 0 };
this.currentSubJobId = printer.currentSubJob;
this.currentSubJobState = { type: "unknown", progress: 0 };
this.registerEventHandlers();
this.baseSubscription = {
print_stats: null,
display_status: null,
};
this.subscriptions = new Map();
this.queuedJobIds = [];
this.isOnline = printer.online;
this.subJobIsCancelling = false;
this.subJobCancelId = null;
}
registerEventHandlers() {
// Register event handlers for Moonraker notifications
this.jsonRpc.register_method(
this.jsonRpc.registerMethod(
"notify_gcode_response",
this.handleGcodeResponse.bind(this),
//this.handleGcodeResponse.bind(this),
);
this.jsonRpc.register_method(
this.jsonRpc.registerMethod(
"notify_status_update",
this.handleStatusUpdate.bind(this),
);
this.jsonRpc.register_method(
this.jsonRpc.registerMethod(
"notify_klippy_disconnected",
this.handleKlippyDisconnected.bind(this),
);
this.jsonRpc.register_method(
this.jsonRpc.registerMethod(
"notify_klippy_ready",
this.handleKlippyReady.bind(this),
);
this.jsonRpc.register_method(
this.jsonRpc.registerMethod(
"notify_filelist_changed",
this.handleFileListChanged.bind(this),
);
this.jsonRpc.register_method(
this.jsonRpc.registerMethod(
"notify_metadata_update",
this.handleMetadataUpdate.bind(this),
);
this.jsonRpc.register_method(
this.jsonRpc.registerMethod(
"notify_power_changed",
this.handlePowerChanged.bind(this),
);
}
connect() {
console.log(this.config);
async getPrinterConnectionConfig() {
try {
const printer = await printerModel.findOne({ _id: this.id });
this.config = printer.moonraker;
logger.info(`Reloaded connection config! (${this.name})`);
logger.debug(this.config);
} catch (error) {
logger.error(
`Failed to get printer connection config! (${this.name}):`,
error,
);
}
}
async connect() {
await this.getPrinterConnectionConfig();
const { protocol, host, port } = this.config;
const wsUrl = `${protocol}://${host}:${port}/websocket`;
@ -55,49 +99,30 @@ export class PrinterClient {
this.socket = new WebSocket(wsUrl);
this.jsonRpc.set_socket(this.socket);
this.jsonRpc.setSocket(this.socket);
this.socket.on("open", () => {
logger.info(`Connected to Moonraker (${this.printerName})`);
logger.info(`Connected to Moonraker (${this.name})`);
this.online = true;
this.identifyConnection();
});
this.socket.on("message", (data) => {
const message = data.toString();
logger.trace(
`Received message from Moonraker (${this.printerName}): ${message}`,
);
try {
const parsed = JSON.parse(message);
this.jsonRpc.process_message(parsed);
if (parsed.method != undefined) {
//console.log(parsed.params);
this.broadcastToClients(message);
}
} catch (e) {
logger.error(
`Error processing message for ${this.printerName}:`,
e,
);
}
this.jsonRpc.processMessage(JSON.parse(data));
});
this.socket.on("close", () => {
logger.info(`Disconnected from Moonraker (${this.printerName})`);
logger.info(`Disconnected from Moonraker (${this.name})`);
this.online = false;
this.state = { type: "offline" };
this.updatePrinterState();
this.connectionId = null;
// Attempt to reconnect after delay
setTimeout(() => this.connect(), 5000);
setTimeout(() => this.connect(), 10000);
});
this.socket.on("error", (error) => {
logger.error(
`Moonraker connection error (${this.printerName}):`,
error,
);
logger.error(`Moonraker connection error (${this.name}):`, error);
});
}
@ -113,243 +138,652 @@ export class PrinterClient {
args.api_key = this.config.apiKey;
}
logger.debug(`Identifying connection... (${this.name})`);
this.jsonRpc
.call_method_with_kwargs("server.connection.identify", args)
.then((result) => {
.callMethodWithKwargs("server.connection.identify", args)
.then(async (result) => {
this.connectionId = result.connection_id;
logger.info(
`Connection identified with ID: ${this.connectionId} (${this.printerName})`,
`Connection identified with ID: ${this.connectionId} (${this.name})`,
);
this.getServerInfo();
await this.initialize();
})
.catch((error) => {
logger.error(
`Error identifying connection (${this.printerName}):`,
error,
);
logger.error(`Error identifying connection (${this.name}):`, error);
});
}
getServerInfo() {
logger.trace("Getting server info");
this.jsonRpc
.call_method("server.info")
.then((result) => {
async initialize() {
logger.info("Running printer initialization...");
await this.getInfo();
await this.getPrinterState();
await this.getQueuedJobsInfo();
await this.updateSubscriptions();
}
async getInfo() {
logger.info("Getting printer info...");
try {
// Get server info
const serverResult = await this.jsonRpc.callMethod("server.info");
this.online = true;
this.state = { type: result.klippy_state };
this.klippyState = { type: serverResult.klippy_state };
logger.info(
"Server:",
`Moonraker ${result.moonraker_version} (${this.printerName})`,
`Moonraker ${serverResult.moonraker_version} (${this.name})`,
`State: ${this.klippyState.type}`,
);
if (result.klippy_state === "ready") {
this.getKlippyInfo();
this.subscribeToStateUpdates();
} else {
try {
const klippyResult = await this.jsonRpc.callMethod("printer.info");
logger.info(
`Waiting for Klippy to be ready. Current state: ${result.klippy_state} (${this.printerName})`,
`Klippy info for ${this.name}: ${klippyResult.hostname}, ${klippyResult.software_version}`,
);
}
})
.catch((error) => {
// Update firmware version in database
try {
await printerModel.findByIdAndUpdate(
this.id,
{ firmware: klippyResult.software_version },
{ new: true },
);
logger.info(
`Updated firmware version for ${this.name} to ${klippyResult.software_version}`,
);
} catch (error) {
logger.error(
`Error getting server info (${this.printerName}):`,
`Failed to update firmware version in database (${this.name}):`,
error,
);
});
}
getKlippyInfo() {
this.jsonRpc
.call_method("printer.info")
.then((result) => {
logger.info(
`Klippy info for ${this.printerName}: ${result.hostname}, ${result.software_version}`,
);
if (result.state === "error") {
if (klippyResult.state === "error") {
logger.error(
`Klippy error for ${this.printerName}: ${result.state_message}`,
`Klippy error for ${this.name}: ${klippyResult.state_message}`,
);
}
})
.catch((error) => {
logger.error(
`Error getting Klippy info (${this.printerName}):`,
error,
);
} catch (error) {
logger.error(`Error getting Klippy info (${this.name}):`, error);
}
} catch (error) {
logger.error(`Error getting server info (${this.name}):`, error);
}
}
async getQueuedJobsInfo() {
logger.info(`Getting queued jobs info for (${this.name})`);
const result = await this.sendPrinterCommand({
method: "server.job_queue.status",
});
this.queuedJobIds = result.queued_jobs.map((job) => job.job_id);
logger.debug(`Queued job IDs: ${this.queuedJobIds}`);
await this.updatePrinterSubJobs();
}
subscribeToAllUpdates() {
const subscriptions = {
objects: {
gcode_move: [
"gcode_position",
"speed",
"speed_factor",
"extrude_factor",
],
toolhead: ["position", "status"],
virtual_sdcard: null,
heater_bed: null,
extruder: null,
fan: null,
print_stats: null,
motion_report: null,
},
};
this.jsonRpc
.call_method_with_kwargs("printer.objects.subscribe", subscriptions)
.then((result) => {
logger.info(
`Subscribed to all printer updates (${this.printerName})`,
);
})
.catch((error) => {
logger.error(
`Error subscribing to all printer updates (${this.printerName}):`,
error,
);
});
}
subscribeToStateUpdates() {
const subscriptions = {
objects: {
toolhead: ["status"],
print_stats: null,
},
};
this.jsonRpc
.call_method_with_kwargs("printer.objects.subscribe", subscriptions)
.then((result) => {
logger.info(
`Subscribed to printer state updates (${this.printerName})`,
);
})
.catch((error) => {
logger.error(
`Error subscribing to printer state updates (${this.printerName}):`,
error,
);
});
}
sendMessage(message) {
async getPrinterState() {
logger.info(`Getting state of (${this.name})`);
if (!this.online) {
logger.error(
`Cannot send message: Not connected to Moonraker (${this.printerName})`,
`Cannot send command: Not connected to Moonraker (${this.name})`,
);
return false;
}
this.jsonRpc
.call_method_with_kwargs(message.method, message.params)
.then((result) => {
logger.info(`Message sent to (${this.printerName})`);
if (this.klippyState.type === "error") {
logger.error(`Klippy is reporting error for ${this.name}`);
this.state = this.klippyState;
this.updatePrinterState();
return;
}
if (this.klippyState.type === "shutdown") {
logger.error(`Klippy is reporting shutdown for ${this.name}`);
this.state = this.klippyState;
this.updatePrinterState();
return;
}
try {
const result = await this.jsonRpc.callMethodWithKwargs(
"printer.objects.query",
{ objects: this.baseSubscription },
);
logger.debug(`Command sent to (${this.name})`);
if (result.status != undefined) {
const resultStatusMessage = {
method: "notify_status_update",
params: [result.status],
};
this.broadcastToClients(
JSON.stringify(resultStatusMessage),
);
this.handleStatusUpdate([result.status]);
}
})
.catch((error) => {
return result;
} catch (error) {
logger.error(`Error sending command to (${this.name}):`, error);
return false;
}
}
async updateSubscriptions() {
logger.info(`Updating subscriptions for (${this.name})`);
// Start with base subscription content
const allSubscriptions = { ...this.baseSubscription };
// Add all subscription content from the Map
for (const [_, value] of this.subscriptions) {
Object.assign(allSubscriptions, value);
}
logger.debug("Combined subscriptions:", allSubscriptions);
if (!this.online) {
logger.error(
`Error sending message to (${this.printerName}):`,
error,
`Cannot send command: Not connected to Moonraker (${this.name})`,
);
return false;
}
try {
await this.jsonRpc.callMethodWithKwargs("printer.objects.subscribe", {
objects: allSubscriptions,
});
logger.debug(`Command sent to (${this.name})`);
return true;
} catch (error) {
logger.error(`Error sending command to (${this.name}):`, error);
return false;
}
}
addClient(client) {
this.clients.add(client);
logger.info(
`Client subscribed to ${this.printerName}. Total subscribers: ${this.clients.size}`,
async sendPrinterCommand(command) {
logger.info(`Sending ${command.method} command to (${this.name})`);
if (!this.online) {
logger.error(
`Cannot send command: Not connected to Moonraker (${this.name})`,
);
return false;
}
removeClient(client) {
this.clients.delete(client);
logger.info(
`Client unsubscribed from ${this.printerName}. Total subscribers: ${this.clients.size}`,
try {
const result = await this.jsonRpc.callMethodWithKwargs(
command.method,
command.params,
);
logger.debug(`Command sent to (${this.name})`);
if (result.status != undefined) {
if (command.method == "printer.objects.query") {
this.handleStatusUpdate([result.status]);
}
broadcastToClients(message) {
for (const client of this.clients) {
if (client.conn._readyState === "open") {
var jsonMessage = JSON.parse(message);
jsonMessage.params = {
printerId: this.id,
...jsonMessage.params,
};
client.emit(jsonMessage.method, jsonMessage.params);
}
return result;
} catch (error) {
logger.error(`Error sending command to (${this.name}):`, error);
return false;
}
}
getStatus() {
return {
id: this.printerId,
name: this.printerName,
connected: this.online,
connectionId: this.connectionId,
host: this.config.host,
port: this.config.port,
};
}
// Event handlers
handleGcodeResponse(response) {
logger.info(`GCode response (${this.printerName}): ${response}`);
}
handleStatusUpdate(status) {
async handleStatusUpdate(status) {
logger.trace("Status update:", status);
status = status[0];
// Process printer status updates
if (this.state.type != "deploying") {
if (status.print_stats && status.print_stats.state) {
logger.info(
`Print state for ${this.printerName}: ${status.print_stats.state}`,
`Print state for ${this.name}: ${status.print_stats.state}`,
);
const stateTypeChanged = status.print_stats.state != this.state.type;
// When status changes, update states
if (stateTypeChanged) {
this.state.type = status.print_stats.state;
await this.updatePrinterState();
await this.getQueuedJobsInfo();
}
}
handleKlippyDisconnected() {
this.online = false;
logger.info(`Klippy disconnected (${this.printerName})`);
if (status.display_status) {
console.log("Display status:", status.display_status);
this.state.progress = status.display_status.progress;
await this.updatePrinterState();
await this.updateCurrentJobAndSubJobState();
}
handleKlippyReady() {
logger.info(`Klippy ready (${this.printerName})`);
this.getKlippyInfo();
this.subscribeToStateUpdates();
this.socketManager.broadcastToSubscribers(this.id, {
method: "notify_status_update",
params: status,
});
}
}
async updatePrinterState() {
try {
if (this.state.type == "printing" && this.state.progress == undefined) {
this.state.progress = 0;
}
// Update the printer's state
const updatedPrinter = await printerModel.findByIdAndUpdate(
this.id,
{ state: this.state, online: this.online },
{ new: true },
);
if (!updatedPrinter) {
logger.error(
`Printer with ID ${this.id} not found when updating status`,
);
return;
}
// Notify clients of the update
this.socketManager.broadcast("notify_printer_update", {
id: this.id,
state: this.state,
});
logger.info(
`Updated printer state to ${this.state.type} and progress to ${this.state?.progress} for ${this.id}`,
);
} catch (error) {
logger.error(
`Failed to update job status in database (${this.name}):`,
error,
);
}
}
async removePrinterSubJob(subJobId) {
try {
const updatedPrinter = await printerModel.findByIdAndUpdate(
this.id,
{
$pull: { subJobs: subJobId },
},
{ new: true },
);
if (!updatedPrinter) {
logger.error(
`Printer with ID ${this.id} not found when removing failed subjob`,
);
return;
}
logger.info(`Removed subjob ${subJobId} from printer ${this.id}`);
} catch (error) {
logger.error(
`Failed to remove subjob ${subJobId} from printer ${this.id}:`,
error,
);
}
}
async updatePrinterSubJobs() {
try {
logger.debug("Updating Printer Subjobs...");
// Get all subjobs for this printer
const subJobs = await printSubJobModel.find({ printer: this.id });
for (const subJob of subJobs) {
if (
!this.queuedJobIds.includes(subJob.subJobId) &&
!(
subJob.state.type == "failed" ||
subJob.state.type == "complete" ||
subJob.state.type == "draft" ||
subJob.state.type == "cancelled"
)
) {
if (subJob.subJobId == this.subJobCancelId) {
const updatedSubJob = await printSubJobModel.findByIdAndUpdate(
subJob.id,
{ state: { type: "cancelled" } },
{ new: true },
);
await this.removePrinterSubJob(subJob.id);
// Notify clients of the update
this.socketManager.broadcast("notify_subjob_update", {
id: subJob.id,
state: { type: "cancelled" },
});
if (updatedSubJob) {
logger.debug(`Cancelled subjob ${updatedSubJob.id}`);
}
return;
} else {
logger.debug(
`Subjob ${subJob.id} is not in the queued job list. It must be printing or paused. Setting job and subjob for ${this.id}`,
);
this.currentSubJobId = subJob.id;
this.currentJobId = subJob.printJob;
await this.updateCurrentJobAndSubJobState();
const updatedPrinter = await printerModel.findByIdAndUpdate(
this.id,
{
currentSubJob: this.currentSubJobId,
currentJob: this.currentJobId,
},
{ new: true },
);
if (!updatedPrinter) {
logger.error(
`Printer with ID ${this.id} not found when setting job and subjob`,
);
return;
}
logger.info(`Set job and subjob for ${this.id}`);
// If the status is failed or completed, remove the subjob from the printer's subJobs array
}
}
if (
subJob.state.type === "failed" ||
subJob.state.type === "complete"
) {
await this.removePrinterSubJob(subJob.id);
}
}
} catch (error) {
logger.error(
`Failed to update job status in database (${this.name}):`,
error,
);
}
}
async updateCurrentJobAndSubJobState() {
try {
if (!this.currentJobId || !this.currentSubJobId) {
return;
}
this.currentJobState = { type: "unknown", progress: 0 };
this.currentSubJobState = { type: "unknown", progress: 0 };
// Get the current job
const currentJob = await printJobModel
.findById(this.currentJobId)
.populate("subJobs");
const jobLength = currentJob.subJobs.length;
let externalProgressSum = 0;
var printing = 0;
var paused = 0;
var complete = 0;
var failed = 0;
for (const subJob of currentJob.subJobs.filter(
(subJob) => subJob.id != this.currentSubJobId,
)) {
if (subJob.state.type === "printing") {
externalProgressSum = externalProgressSum + subJob.state.progress;
printing = printing + 1;
}
if (subJob.state.type === "paused") {
paused = paused + 1;
}
if (subJob.state.type === "complete") {
complete = complete + 1;
}
if (subJob.state.type === "failed") {
failed = failed + 1;
}
}
if (this.state.type === "printing") {
this.currentSubJobState.type = "printing";
this.currentSubJobState.progress = this.state.progress;
printing = printing + 1;
} else if (this.state.type === "paused") {
this.currentSubJobState.type = "paused";
paused = paused + 1;
} else if (this.state.type === "complete") {
this.currentSubJobState.type = "complete";
complete = complete + 1;
} else {
this.currentSubJobState.type = "failed";
failed = failed + 1;
}
if (paused > 0) {
this.currentJobState.type = "paused";
} else if (printing > 0) {
this.currentJobState.type = "printing";
} else if (failed > 0) {
this.currentJobState.type = "failed";
} else if (complete == jobLength) {
this.currentJobState.type = "complete";
} else {
this.currentJobState.type = "queued";
}
if (this.state.type === "printing") {
this.currentJobState.progress =
(externalProgressSum + complete + (this.state.progress || 0)) /
jobLength;
} else {
this.currentJobState.progress = externalProgressSum / jobLength;
}
currentJob.state = this.currentJobState;
await currentJob.save();
this.socketManager.broadcast("notify_job_update", {
id: this.currentJobId,
state: this.currentJobState,
});
logger.info(
`Updated job status to ${this.currentJobState.type} (Progress: ${this.currentJobState.progress}) for ${this.currentSubJobId}`,
);
const updatedSubJob = await printSubJobModel.findByIdAndUpdate(
this.currentSubJobId,
{ state: this.currentSubJobState },
{ new: true },
);
if (!updatedSubJob) {
logger.error(
`Sub job with ID ${this.currentSubJobId} not found when updating status`,
);
return;
}
// Notify clients of the update
this.socketManager.broadcast("notify_subjob_update", {
id: this.currentSubJobId,
state: this.currentSubJobState,
});
logger.info(
`Updated sub job status to ${this.currentSubJobState.type} (Progress: ${this.currentSubJobState.progress}) for ${this.currentSubJobId}`,
);
} catch (error) {
logger.error(`Error updating current job state:`, error);
}
}
async handleKlippyDisconnected() {
logger.info(`Klippy disconnected (${this.name})`);
this.state = { type: "offline" };
this.isOnline = false;
this.isPrinting = false;
this.isError = false;
this.isReady = false;
await this.updatePrinterState();
await this.updatePrinterSubJobs();
}
async handleKlippyReady() {
logger.info(`Klippy ready (${this.name})`);
await this.initialize();
}
handleFileListChanged(fileInfo) {
logger.info(
`File list changed for ${this.printerName}:`,
fileInfo.action,
);
logger.debug(`File list changed for ${this.name}:`, fileInfo);
}
handleMetadataUpdate(metadata) {
logger.info(
`Metadata updated for ${this.printerName}:`,
metadata.filename,
);
logger.info(`Metadata updated for ${this.name}:`, metadata.filename);
}
handlePowerChanged(powerStatus) {
logger.info(
`Power status changed for ${this.printerName}:`,
powerStatus,
logger.info(`Power status changed for ${this.name}:`, powerStatus);
}
async uploadGcodeFile(fileBlob, fileName) {
logger.info(`Uploading G-code file ${fileName} to ${this.name}`);
if (!this.online) {
logger.error(
`Cannot upload file: Not connected to Moonraker (${this.name})`,
);
return false;
}
try {
const { protocol, host, port } = this.config;
const httpUrl = `${protocol === "ws" ? "http" : "https"}://${host}:${port}/server/files/upload`;
// Convert Blob to Buffer
const arrayBuffer = await fileBlob.arrayBuffer();
const buffer = Buffer.from(arrayBuffer);
const formData = new FormData();
formData.append("file", buffer, {
filename: fileName,
contentType: fileBlob.type || "text/plain",
});
const headers = {
...formData.getHeaders(),
};
if (this.config.apiKey) {
headers["X-Api-Key"] = this.config.apiKey;
}
const response = await axios.post(httpUrl, formData, {
headers,
onUploadProgress: (progressEvent) => {
const percentCompleted = Math.round(
(progressEvent.loaded * 100) / progressEvent.total,
);
logger.debug(
`Uploading file to ${this.name}: ` +
fileName +
" " +
percentCompleted +
"%",
);
this.socketManager.broadcast("notify_printer_update", {
printerId: this.id,
state: { type: "deploying", progress: percentCompleted },
});
},
});
if (!response.result) {
this.socketManager.broadcast("notify_printer_update", {
printerId: this.id,
state: { type: "error" },
});
throw new Error("Failed to upload G-code file to printer");
}
logger.info(`Successfully uploaded file ${fileName} to ${this.name}`);
return true;
} catch (error) {
logger.error(`Error uploading file to ${this.name}:`, error);
return false;
}
}
async deploySubJobs(jobId) {
try {
const printSubJobs = await printSubJobModel
.find({ printJob: jobId })
.sort({ number: 1 });
// Deploy sub jobs
for (const subJob of printSubJobs) {
logger.info(
"Deploying sub job:",
subJob.subJobId,
"to printer:",
this.id,
"with files:",
`${jobId.id}.gcode`,
);
const result = await this.sendPrinterCommand({
method: "server.job_queue.post_job",
params: {
filenames: [`${jobId}.gcode`],
reset: false,
},
});
if (!result) {
throw new Error("Failed to deploy sub job to printer");
}
// Update the PrintSubJob model
const updatedSubJob = await printSubJobModel.findByIdAndUpdate(
subJob.id,
{
subJobId: result.queued_jobs[result.queued_jobs.length - 1].job_id,
state: { type: "queued" },
updatedAt: new Date(),
},
{ new: true },
);
if (!updatedSubJob) {
throw new Error(`Failed to update sub job ${subJob.id}`);
}
// Update the printer's subJobs array
const printer = await printerModel.findById(this.id);
if (printer) {
printer.subJobs.push(updatedSubJob._id);
await printer.save();
}
this.socketManager.broadcast("notify_subjob_update", {
id: subJob.id,
subJobId: result.queued_jobs[result.queued_jobs.length - 1].job_id,
state: { type: "queued" },
});
logger.info("Sub job deployed to printer:", this.id);
}
} catch (error) {
logger.error(`Error deploying sub jobs:`, error);
}
}
async cancelSubJob(subJobId) {
try {
this.subJobCancelId = subJobId;
const result = await this.sendPrinterCommand({
method: "server.job_queue.delete_job",
params: {
job_ids: [subJobId],
},
});
if (!result) {
throw new Error("Failed to cancel sub job");
}
await this.getQueuedJobsInfo();
this.subJobCancelId = null;
logger.info("Sub job canceled:", subJobId);
} catch (error) {
logger.error(`Error canceling sub job ${subJobId}:`, error);
}
}
}

View File

@ -1,9 +1,10 @@
// printer-manager.js - Manages multiple printer connections through MongoDB
import { PrinterClient } from "./printerclient.js";
import { saveConfig, loadConfig } from "../config.js";
import { printerModel } from "../database/printer.schema.js"; // Import your printer model
import { printSubJobModel } from "../database/printsubjob.schema.js"; // Import your subjob model
import { loadConfig } from "../config.js";
import log4js from "log4js";
import { printJobModel } from "../database/printjob.schema.js";
// Load configuration
const config = loadConfig();
@ -27,30 +28,25 @@ export class PrinterManager {
await this.connectToPrinter(printer);
}
logger.info(
`Initialized connections to ${printers.length} printers`,
);
// Set up periodic status checking
this.startStatusChecking();
logger.info(`Initialized connections to ${printers.length} printers`);
} catch (error) {
logger.error(
`Error initializing printer connections: ${error.message}`,
);
logger.error(`Error initializing printer connections: ${error.message}`);
}
}
async connectToPrinter(printer) {
// Create and store the connection
const printerClientConnection = new PrinterClient(printer);
this.moonrakerConnections.set(printer.id, printerClientConnection);
const printerClientConnection = new PrinterClient(
printer,
this,
this.socketManager,
);
this.printerClientConnections.set(printer.id, printerClientConnection);
// Connect to the printer
printerClientConnection.connect();
await printerClientConnection.connect();
logger.info(
`Connected to printer: ${printer.printerName} (${printer.id})`,
);
logger.info(`Connected to printer: ${printer.printerName} (${printer.id})`);
return true;
}
@ -63,7 +59,8 @@ export class PrinterManager {
}
// Process command for a specific printer
processCommand(printerId, command) {
async processPrinterCommand(command) {
const printerId = command.params.printerId;
const printerClientConnection =
this.printerClientConnections.get(printerId);
if (!printerClientConnection) {
@ -72,12 +69,22 @@ export class PrinterManager {
error: `Printer with ID ${printerId} not found`,
};
}
const success = connection.sendCommand(message);
return await printerClientConnection.sendPrinterCommand(command);
}
async updateSubscription(printerId, socketId, mergedSubscription) {
const printerClientConnection =
this.printerClientConnections.get(printerId);
if (!printerClientConnection) {
return {
success,
error: success ? null : "Printer not connected",
success: false,
error: `Printer with ID ${printerId} not found`,
};
}
printerClientConnection.subscriptions.set(socketId, mergedSubscription);
return await printerClientConnection.updateSubscriptions();
}
// Close all printer connections
closeAllConnections() {
@ -87,4 +94,93 @@ export class PrinterManager {
}
}
}
setSocketManager(socketManager) {
this.socketManager = socketManager;
}
async downloadGCODE(gcodeFileId) {
logger.info(`Downloading G-code file ${gcodeFileId}`);
try {
// Download the G-code file with authentication
const url = `http://localhost:8080/gcodefiles/${gcodeFileId}/content/`;
const response = await fetch(url, {
headers: {
Authorization: `Bearer ${this.socketManager.socketClientConnections.values().next().value.socket.handshake.auth.token}`,
},
});
if (!response.ok) {
throw new Error(
`Failed to download G-code file: ${response.statusText}`,
);
}
const gcodeContent = await response.blob();
logger.info(`G-code file ${gcodeFileId} downloaded!`);
return gcodeContent;
} catch (error) {
logger.error("Error in deployGcodeToAllPrinters:", error);
return {
success: false,
error: error.message,
};
}
}
async deployPrintJob(printJobId) {
logger.info(`Deploying print job ${printJobId}`);
const printJob = await printJobModel
.findById(printJobId)
.populate("printers")
.populate("subJobs");
if (!printJob) {
throw new Error("Print job not found");
}
if (!printJob.gcodeFile) {
throw new Error("No G-code file associated with this print job");
}
const gcodeFileId = printJob.gcodeFile.toString();
const fileName = `${printJob.id}.gcode`;
const gcodeFile = await this.downloadGCODE(gcodeFileId);
for (const printer of printJob.printers) {
const printerClient = this.getPrinterClient(printer.id);
if (!printerClient) {
throw new Error(`Printer with ID ${printer.id} not found`);
return false;
}
await printerClient.uploadGcodeFile(gcodeFile, fileName);
await printerClient.deploySubJobs(printJob.id);
}
printJob.state = { type: "queued" };
printJob.updatedAt = new Date();
await printJob.save();
this.socketManager.broadcast("notify_job_update", {
id: printJob.id,
state: { type: "queued" },
});
return true;
}
async cancelSubJob(subJobId) {
logger.info(`Canceling sub job ${subJobId}`);
const subJob = await printSubJobModel.findById(subJobId);
if (!subJob) {
throw new Error("Sub job not found");
}
const printerClient = this.getPrinterClient(subJob.printer.toString());
if (!printerClient) {
throw new Error(`Printer with ID ${printer.id} not found`);
return false;
}
await printerClient.cancelSubJob(subJob.subJobId);
return true;
}
}

View File

@ -1,62 +1,459 @@
import { WebSocket } from "ws";
import { JsonRPC } from "./jsonrpc.js";
import log4js from "log4js";
import { printJobModel } from "../database/printjob.schema.js";
import { printSubJobModel } from "../database/printsubjob.schema.js";
import { WebSocketScanner } from "../network/websocketScanner.js";
// Load configuration
import { loadConfig } from "./config.js";
import { loadConfig } from "../config.js";
import { printerModel } from "../database/printer.schema.js";
const config = loadConfig();
const logger = log4js.getLogger("Moonraker");
const logger = log4js.getLogger("Socket Client");
logger.level = config.server.logLevel;
export class SocketClient {
constructor(socket, printerManager) {
constructor(socket, socketManager, printerManager) {
this.socket = socket;
this.user = socket?.user;
this.socketManager = socketManager;
this.printerManager = printerManager;
this.activeSubscriptions = new Map();
this.scanner = new WebSocketScanner({ maxThreads: 50 });
this.socket.on("bridge.list_printers", (data) => {
this.socket.on("bridge.list_printers", (data) => {});
});
this.socket.on("bridge.add_printer", (data, callback) => {});
this.socket.on("bridge.add_printer", (data, callback) => {
this.socket.on("bridge.remove_printer", (data, callback) => {});
});
this.socket.on("bridge.update_printer", (data, callback) => {});
this.socket.on("bridge.remove_printer", (data, callback) => {
});
this.socket.on("bridge.update_printer", (data, callback) => {
});
// Handle printer commands
this.socket.on("printer_command", (data, callback) => {
this.socket.on("bridge.scan_network.start", (data, callback) => {
if (this.scanner.scanning == false) {
try {
if (data && data.params.printerId) {
const printerId = data.params.printerId;
// Remove the printer_id before forwarding
const cleanCommand = { ...data };
delete cleanCommand.params.printerId;
this.scanner = new WebSocketScanner({ maxThreads: 50 });
// Listen for found services
this.scanner.on("serviceFound", (data) => {
logger.info(
`Found websocket service at ${data.hostname} (${data.ip})`,
);
this.socket.emit("notify_scan_network_found", data);
});
const result = printerManager.processCommand(
// Listen for scan progress
this.scanner.on("scanProgress", ({ currentIP, progress }) => {
logger.info(
`Scanning ${currentIP} (${progress.toFixed(2)}% complete)`,
);
this.socket.emit("notify_scan_network_progress", {
currentIP: currentIP,
progress: progress,
});
});
// Start scanning on port
logger.info(
"Scanning network for websocket services on port:",
data?.port || 7125,
"using protocol:",
data?.protocol || "ws",
);
this.scanner
.scanNetwork(data?.port || 7125, data?.protocol || "ws")
.then((foundServices) => {
logger.info("Scan complete. Found services:", foundServices);
this.socket.emit("notify_scan_network_complete", foundServices);
})
.catch((error) => {
logger.error("Scan error:", error);
this.socket.emit("notify_scan_network_complete", false);
});
} catch (error) {
logger.error("Scan error:", error);
this.socket.emit("notify_scan_network_complete", false);
}
}
});
this.socket.on("bridge.scan_network.stop", (callback) => {
if (this.scanner.scanning == true) {
logger.info("Stopping network scan");
this.scanner.removeAllListeners("serviceFound");
this.scanner.removeAllListeners("scanProgress");
this.scanner.removeAllListeners("scanComplete");
this.scanner.stopScan();
callback(true);
} else {
logger.info("Scan not in progress");
callback(false);
}
});
// Handle printer object subscriptions
this.socket.on("printer.objects.subscribe", async (data, callback) => {
logger.debug("Received printer.objects.subscribe event:", data);
try {
if (data && data.printerId) {
const printerId = data.printerId;
// Get existing subscription or create new one
const existingSubscription =
this.activeSubscriptions.get(printerId) || {};
// Merge the new subscription data with existing data
const mergedSubscription = {
...existingSubscription.objects,
...data.objects,
};
this.activeSubscriptions.set(printerId, mergedSubscription);
logger.trace("Merged subscription:", mergedSubscription);
const result = await this.printerManager.updateSubscription(
printerId,
cleanCommand,
socket.id,
mergedSubscription,
);
if (callback) {
callback(result);
}
} else {
logger.error("Missing Printer ID");
logger.error("Missing Printer ID in subscription request");
if (callback) {
callback({ error: "Missing Printer ID" });
}
}
} catch (e) {
logger.error("Error processing client command:", e);
logger.error("Error processing subscription request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("printer.objects.unsubscribe", async (data, callback) => {
logger.debug("Received printer.objects.unsubscribe event:", data);
try {
if (data && data.printerId) {
const printerId = data.printerId;
const existingSubscription = this.activeSubscriptions.get(printerId);
if (existingSubscription) {
// Create a new objects object without the unsubscribed objects
const remainingObjects = { ...existingSubscription.objects };
if (data.objects) {
for (const key of Object.keys(data.objects)) {
delete remainingObjects[key];
}
}
// If there are no remaining objects, remove the entire subscription
if (Object.keys(remainingObjects).length === 0) {
this.activeSubscriptions.delete(printerId);
// Send subscribe command with updated subscription
const result = await this.printerManager.updateSubscription(
printerId,
socket.id,
{},
);
if (callback) {
callback(result);
}
} else {
// Update the subscription with remaining objects
const updatedSubscription = {
printerId: printerId,
objects: remainingObjects,
};
this.activeSubscriptions.set(printerId, updatedSubscription);
// Send subscribe command with updated subscription
const result = await this.printerManager.updateSubscription(
printerId,
socket.id,
updatedSubscription,
);
if (callback) {
callback(result);
}
}
} else {
logger.warn(
"No existing subscription found for printer:",
printerId,
);
if (callback) {
callback({ success: true, message: "No subscription found" });
}
}
} else {
logger.error("Missing Printer ID in unsubscribe request");
if (callback) {
callback({ error: "Missing Printer ID" });
}
}
} catch (e) {
logger.error("Error processing unsubscribe request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("printer.gcode.script", async (data, callback) => {
logger.debug("Received printer.gcode.script event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "printer.gcode.script",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing gcode script request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("printer.objects.query", async (data, callback) => {
logger.debug("Received printer.objects.query event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "printer.objects.query",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing printer objects query request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("printer.emergency_stop", async (data, callback) => {
logger.debug("Received printer.gcode.script event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "printer.emergency_stop",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing gcode script request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("printer.firmware_restart", async (data, callback) => {
logger.debug("Received printer.firmware_restart event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "printer.firmware_restart",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing firmware restart request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("printer.restart", async (data, callback) => {
logger.debug("Received printer.restart event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "printer.restart",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing printer restart request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("server.job_queue.status", async (data, callback) => {
logger.debug("Received server.job_queue.status event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "server.job_queue.status",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing job queue status request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("server.job_queue.deploy", async (data, callback) => {
logger.debug("Received server.job_queue.deploy event:", data);
try {
if (!data || !data.printJobId) {
throw new Error("Missing required print job ID");
}
// Deploy the print job to all printers
const result = await this.printerManager.deployPrintJob(
data.printJobId,
);
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing job queue deploy request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("printer.print.resume", async (data, callback) => {
logger.debug("Received printer.print.resume event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "printer.print.resume",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing print resume request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("server.job_queue.cancel", async (data, callback) => {
logger.debug("Received server.job_queue.cancel event:", data);
try {
if (!data || !data.subJobId) {
throw new Error("Missing required sub job ID");
}
const result = await this.printerManager.cancelSubJob(data.subJobId);
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing job queue delete job request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("printer.print.cancel", async (data, callback) => {
logger.debug("Received printer.print.cancel event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "printer.print.cancel",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing print cancel request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("printer.print.pause", async (data, callback) => {
logger.debug("Received printer.print.pause event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "printer.print.pause",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing print pause request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("server.job_queue.pause", async (data, callback) => {
logger.debug("Received server.job_queue.pause event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "server.job_queue.pause",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing job queue pause request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("server.job_queue.start", async (data, callback) => {
logger.debug("Received server.job_queue.start event:", data);
try {
const result = await this.printerManager.processPrinterCommand({
method: "server.job_queue.start",
params: data,
});
if (callback) {
callback(result);
}
} catch (e) {
logger.error("Error processing job queue start request:", e);
if (callback) {
callback({ error: e.message });
}
}
});
this.socket.on("disconnect", () => {
// Unsubscribe from all printers when client disconnects
printerManager.unsubscribeClientFromAll(socket);
logger.info("External client disconnected:", socket.user?.username);
});
}

View File

@ -1,19 +1,22 @@
// server.js - HTTP and Socket.IO server setup
import { Server } from "socket.io";
import http from "http";
import { createAuthMiddleware } from "./auth.js";
import { createAuthMiddleware } from "../auth/auth.js";
import log4js from "log4js";
// Load configuration
import { loadConfig } from "./config.js";
import { loadConfig } from "../config.js";
import { SocketClient } from "./socketclient.js";
const config = loadConfig();
const logger = log4js.getLogger("Server");
const logger = log4js.getLogger("Socket Manager");
logger.level = config.server.logLevel;
export class SocketServer {
export class SocketManager {
constructor(config, printerManager, auth) {
this.socketClientConnections = new Map();
this.printerManager = printerManager;
// Create HTTP server
const server = http.createServer((req, res) => {
res.writeHead(200, { "Content-Type": "text/plain" });
@ -34,7 +37,7 @@ export class SocketServer {
// Handle client connections
io.on("connection", (socket) => {
logger.info("External client connected:", socket.user?.username);
this.socketClientConnections.set(socket.id, socket);
this.addClient(socket);
});
// Start the server
@ -44,99 +47,131 @@ export class SocketServer {
);
});
return { server, io };
this.io = io;
this.server = server;
}
}
export function setupServer(config, printerManager, auth) {}
// Command handlers
function handleSubscribe(
socket,
data,
clientSubscriptions,
printerManager,
callback,
) {
logger.info("Handling subscribe command...");
const printerId = data.printerId;
if (printerManager.subscribeClient(printerId, socket)) {
clientSubscriptions.add(printerId);
logger.info(`Client subscribed to printer ${printerId}`);
addClient(socket) {
const client = new SocketClient(socket, this, this.printerManager);
this.socketClientConnections.set(socket.id, client);
logger.info("External client connected:", socket.user?.username);
// Handle disconnection
socket.on("disconnect", () => {
logger.info("External client disconnected:", socket.user?.username);
this.removeClient(socket.id);
});
}
removeClient(socketClientId) {
const socketClient = this.socketClientConnections.get(socketClientId);
if (socketClient) {
this.socketClientConnections.delete(socketClientId);
logger.info("External client disconnected:", socketClient.socket.user?.username);
}
}
getSocketClient(clientId) {
return this.socketClientConnections.get(clientId);
}
getAllSocketClients() {
return Array.from(this.socketClientConnections.values());
}
broadcast(event, data, excludeClientId = null) {
for (const [clientId, socketClient] of this.socketClientConnections) {
if (excludeClientId !== clientId) {
socketClient.socket.emit(event, data);
}
}
}
broadcastToSubscribers(printerId, command) {
const paramsObject = command.params;
for (const [clientId, socketClient] of this.socketClientConnections) {
// Check if this client has subscribed to the printer
if (socketClient.activeSubscriptions.has(printerId)) {
const subscription = socketClient.activeSubscriptions.get(printerId);
// Filter the message based on the subscription filters
const filteredMessage = this.filterMessage(paramsObject, subscription);
if (filteredMessage) {
logger.trace(`Broadcasting message to client ${clientId}: ${JSON.stringify(filteredMessage)}`);
socketClient.socket.emit(command.method, {
printerId,
...filteredMessage
});
}
}
}
}
filterMessage(message, subscription) {
if (!message || !subscription) return null;
const filtered = {};
// Handle both subscription formats
const subscriptionObjects = subscription.objects || subscription;
for (const [objectName, fields] of Object.entries(subscriptionObjects)) {
if (message[objectName]) {
if (fields === null) {
// If fields is null, include all fields
filtered[objectName] = message[objectName];
} else if (Array.isArray(fields)) {
// If fields is an array, only include specified fields
filtered[objectName] = {};
for (const field of fields) {
if (message[objectName][field] !== undefined) {
filtered[objectName][field] = message[objectName][field];
}
}
// Only include the object if it has any filtered fields
if (Object.keys(filtered[objectName]).length === 0) {
delete filtered[objectName];
}
}
}
}
return Object.keys(filtered).length > 0 ? filtered : null;
}
handleListPrinters(socket, data, callback) {
logger.info("handleListPrinters called with data:", data);
if (callback) {
callback({
printers: this.printerManager.getAllPrinters(),
});
}
}
handleListPrintersSubscribe(socket, data, callback) {
logger.info("handleListPrintersSubscribe called with data:", data);
if (callback) {
callback({
printers: this.printerManager.getAllPrinters(),
});
}
}
handleAddPrinter(socket, data, callback) {
logger.info("handleAddPrinter called with data:", data);
if (this.printerManager.addPrinter(data.printer_config)) {
if (callback) {
callback({
success: true,
printer_id: printerId,
});
}
} else {
logger.warn(`Client failed to subscribe to printer ${printerId}`);
if (callback) {
callback({
success: false,
error: {
code: -32001,
message: `Printer ${printerId} not found`,
},
});
}
}
}
function handleUnsubscribe(
socket,
data,
clientSubscriptions,
printerManager,
callback,
) {
const printerId = data.printer_id;
if (printerManager.unsubscribeClient(printerId, socket)) {
clientSubscriptions.delete(printerId);
if (callback)
callback({
success: true,
printer_id: printerId,
});
} else {
if (callback)
callback({
success: false,
error: {
code: -32001,
message: `Printer ${printerId} not found`,
},
});
}
}
function handleListPrinters(socket, data, printerManager, callback) {
if (callback)
callback({
printers: printerManager.getAllPrinters(),
});
}
function handleListPrintersSubscribe(socket, data, printerManager, callback) {
if (callback)
callback({
printers: printerManager.getAllPrinters(),
});
}
function handleAddPrinter(socket, data, printerManager, callback) {
if (printerManager.addPrinter(data.printer_config)) {
if (callback)
callback({
success: true,
printer: printerManager
printer: this.printerManager
.getPrinter(data.printer_config.id)
.getStatus(),
});
// Notify all clients about the new printer
broadcastPrinterList(printerManager, socket);
}
this.broadcastPrinterList(socket);
} else {
if (callback)
if (callback) {
callback({
success: false,
error: {
@ -145,20 +180,21 @@ function handleAddPrinter(socket, data, printerManager, callback) {
},
});
}
}
}
}
function handleRemovePrinter(socket, data, printerManager, callback) {
if (printerManager.removePrinter(data.printer_id)) {
if (callback)
handleRemovePrinter(socket, data, callback) {
logger.info("handleRemovePrinter called with data:", data);
if (this.printerManager.removePrinter(data.printer_id)) {
if (callback) {
callback({
success: true,
printer_id: data.printer_id,
});
// Notify all clients about the updated printer list
broadcastPrinterList(printerManager, socket);
}
this.broadcastPrinterList(socket);
} else {
if (callback)
if (callback) {
callback({
success: false,
error: {
@ -167,22 +203,23 @@ function handleRemovePrinter(socket, data, printerManager, callback) {
},
});
}
}
}
}
function handleUpdatePrinter(socket, data, printerManager, callback) {
if (printerManager.updatePrinter(data.printer_config)) {
if (callback)
handleUpdatePrinter(socket, data, callback) {
logger.info("handleUpdatePrinter called with data:", data);
if (this.printerManager.updatePrinter(data.printer_config)) {
if (callback) {
callback({
success: true,
printer: printerManager
printer: this.printerManager
.getPrinter(data.printer_config.id)
.getStatus(),
});
// Notify all clients about the updated printer
broadcastPrinterList(printerManager, socket);
}
this.broadcastPrinterList(socket);
} else {
if (callback)
if (callback) {
callback({
success: false,
error: {
@ -191,17 +228,43 @@ function handleUpdatePrinter(socket, data, printerManager, callback) {
},
});
}
}
}
}
function broadcastPrinterList(printerManager, excludeSocket = null) {
const printerList = printerManager.getAllPrinters();
const message = {
printers: printerList,
resubscribeAllClients(printerId) {
logger.info(`Resubscribing to all active subscriptions for printer ${printerId}`);
// Create a combined subscription object for the specified printer
const combinedSubscription = {
printerId: printerId,
objects: {}
};
for (const client of printerManager.getAllClients()) {
if (client !== excludeSocket && client.connected) {
client.emit("notify_printers_updated", message);
// Combine all subscription objects for the specified printer
for (const [clientId, socketClient] of this.socketClientConnections) {
if (socketClient.activeSubscriptions.has(printerId)) {
const subscription = socketClient.activeSubscriptions.get(printerId);
// Merge the objects from this subscription into the combined subscription
Object.assign(combinedSubscription.objects, subscription.objects);
logger.debug(`Adding subscription objects from client ${clientId} for printer ${printerId}`);
}
}
// Only send the subscription command if there are any objects to subscribe to
if (Object.keys(combinedSubscription.objects).length > 0) {
logger.debug(`Sending combined subscription and query for printer ${printerId}:`, combinedSubscription);
this.printerManager.processPrinterCommand({
method: "printer.objects.subscribe",
params: combinedSubscription
});
this.printerManager.processPrinterCommand({
method: "printer.objects.query",
params: combinedSubscription
});
} else {
logger.debug(`No active subscriptions found for printer ${printerId}`);
}
}
}