Merge remote-tracking branch 'origin/master' into 2.0.X

# Conflicts:
#	docker/debian-base.dockerfile
#	package-lock.json
#	package.json
#	server/database.js
#	src/router.js
This commit is contained in:
Louis Lam
2023-07-30 19:15:09 +08:00
115 changed files with 2008 additions and 890 deletions

View File

@@ -31,8 +31,11 @@ const readline = require("readline");
const rl = readline.createInterface({ input: process.stdin,
output: process.stdout });
const isWindows = process.platform === /^win/.test(process.platform);
// SASLOptions used in JSDoc
// eslint-disable-next-line no-unused-vars
const { Kafka, SASLOptions } = require("kafkajs");
const isWindows = process.platform === /^win/.test(process.platform);
/**
* Init or reset JWT secret
* @returns {Promise<Bean>}
@@ -199,6 +202,94 @@ exports.mqttAsync = function (hostname, topic, okMessage, options = {}) {
});
};
/**
* Monitor Kafka using Producer
* @param {string} topic Topic name to produce into
* @param {string} message Message to produce
* @param {Object} [options={interval = 20, allowAutoTopicCreation = false, ssl = false, clientId = "Uptime-Kuma"}]
* Kafka client options. Contains ssl, clientId, allowAutoTopicCreation and
* interval (interval defaults to 20, allowAutoTopicCreation defaults to false, clientId defaults to "Uptime-Kuma"
* and ssl defaults to false)
* @param {string[]} brokers List of kafka brokers to connect, host and port joined by ':'
* @param {SASLOptions} [saslOptions={}] Options for kafka client Authentication (SASL) (defaults to
* {})
* @returns {Promise<string>}
*/
exports.kafkaProducerAsync = function (brokers, topic, message, options = {}, saslOptions = {}) {
return new Promise((resolve, reject) => {
const { interval = 20, allowAutoTopicCreation = false, ssl = false, clientId = "Uptime-Kuma" } = options;
let connectedToKafka = false;
const timeoutID = setTimeout(() => {
log.debug("kafkaProducer", "KafkaProducer timeout triggered");
connectedToKafka = true;
reject(new Error("Timeout"));
}, interval * 1000 * 0.8);
if (saslOptions.mechanism === "None") {
saslOptions = undefined;
}
let client = new Kafka({
brokers: brokers,
clientId: clientId,
sasl: saslOptions,
retry: {
retries: 0,
},
ssl: ssl,
});
let producer = client.producer({
allowAutoTopicCreation: allowAutoTopicCreation,
retry: {
retries: 0,
}
});
producer.connect().then(
() => {
try {
producer.send({
topic: topic,
messages: [{
value: message,
}],
});
connectedToKafka = true;
clearTimeout(timeoutID);
resolve("Message sent successfully");
} catch (e) {
connectedToKafka = true;
producer.disconnect();
clearTimeout(timeoutID);
reject(new Error("Error sending message: " + e.message));
}
}
).catch(
(e) => {
connectedToKafka = true;
producer.disconnect();
clearTimeout(timeoutID);
reject(new Error("Error in producer connection: " + e.message));
}
);
producer.on("producer.network.request_timeout", (_) => {
clearTimeout(timeoutID);
reject(new Error("producer.network.request_timeout"));
});
producer.on("producer.disconnect", (_) => {
if (!connectedToKafka) {
clearTimeout(timeoutID);
reject(new Error("producer.disconnect"));
}
});
});
};
/**
* Use NTLM Auth for a http request.
* @param {Object} options The http request options
@@ -381,6 +472,7 @@ exports.mongodbPing = async function (connectionString) {
* @param {string} callingStationId ID of calling station
* @param {string} secret Secret to use
* @param {number} [port=1812] Port to contact radius server on
* @param {number} [timeout=2500] Timeout for connection to use
* @returns {Promise<any>}
*/
exports.radius = function (
@@ -391,10 +483,12 @@ exports.radius = function (
callingStationId,
secret,
port = 1812,
timeout = 2500,
) {
const client = new radiusClient({
host: hostname,
hostPort: port,
timeout: timeout,
dictionaries: [ file ],
});