Add native MQTT integration

This commit is contained in:
2026-05-05 12:56:38 +00:00
parent c052a5aec2
commit 97cba0a9a1
13 changed files with 1398 additions and 38 deletions
+17
View File
@@ -0,0 +1,17 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { createMqttDiscoveryDescriptor } from '../../ts/integrations/mqtt/index.js';
tap.test('matches Home Assistant MQTT discovery config topics', async () => {
const descriptor = createMqttDiscoveryDescriptor();
const matcher = descriptor.getMatchers()[1];
const result = await matcher.matches({
topic: 'homeassistant/sensor/kitchen/temperature/config',
payload: '{"name":"Kitchen temperature","state_topic":"kitchen/temperature"}',
}, {});
expect(result.matched).toBeTrue();
expect(result.candidate?.metadata?.component).toEqual('sensor');
expect(result.candidate?.metadata?.nodeId).toEqual('kitchen');
expect(result.normalizedDeviceId).toEqual('temperature');
});
export default tap.start();
+34
View File
@@ -0,0 +1,34 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { MqttMapper } from '../../ts/integrations/mqtt/index.js';
const config = {
host: 'mqtt.local',
discoveryMessages: [{
topic: 'homeassistant/device/kitchen_sensor/config',
payload: JSON.stringify({
dev: { ids: 'kitchen-sensor-1', name: 'Kitchen Sensor', mf: 'Example', mdl: 'MQTT Multi Sensor', sw: '1.0' },
o: { name: 'example2mqtt', sw: '2.0' },
'~': 'kitchen/sensor',
stat_t: '~/state',
cmps: {
temperature: { p: 'sensor', name: 'Temperature', uniq_id: 'kitchen_temperature', dev_cla: 'temperature', unit_of_meas: 'C', val_tpl: '{{ value_json.temperature }}' },
relay: { p: 'switch', name: 'Relay', uniq_id: 'kitchen_relay', cmd_t: 'kitchen/relay/set', stat_t: 'kitchen/relay/state' },
},
}),
}],
retainedMessages: [
{ topic: 'kitchen/sensor/state', payload: '{"temperature":21.5}' },
{ topic: 'kitchen/relay/state', payload: 'ON' },
],
};
tap.test('maps MQTT discovery payloads to devices and entities', async () => {
const snapshot = MqttMapper.toSnapshot(config);
const devices = MqttMapper.toDevices(snapshot);
const entities = MqttMapper.toEntities(snapshot);
expect(devices.some((deviceArg) => deviceArg.id === 'mqtt.device.kitchen_sensor_1')).toBeTrue();
expect(entities.some((entityArg) => entityArg.uniqueId === 'kitchen_temperature' && entityArg.state === 21.5)).toBeTrue();
expect(entities.some((entityArg) => entityArg.uniqueId === 'kitchen_relay' && entityArg.state === 'on')).toBeTrue();
});
export default tap.start();
+2
View File
@@ -4,6 +4,7 @@ export * from './integrations/index.js';
import { HueIntegration } from './integrations/hue/index.js';
import { CastIntegration } from './integrations/cast/index.js';
import { MqttIntegration } from './integrations/mqtt/index.js';
import { RokuIntegration } from './integrations/roku/index.js';
import { ShellyIntegration } from './integrations/shelly/index.js';
import { SonosIntegration } from './integrations/sonos/index.js';
@@ -14,6 +15,7 @@ import { IntegrationRegistry } from './core/index.js';
export const integrations = [
new CastIntegration(),
new HueIntegration(),
new MqttIntegration(),
new RokuIntegration(),
new ShellyIntegration(),
new SonosIntegration(),
+2 -3
View File
@@ -785,7 +785,6 @@ import { HomeAssistantMotionblindsBleIntegration } from '../motionblinds_ble/ind
import { HomeAssistantMotioneyeIntegration } from '../motioneye/index.js';
import { HomeAssistantMotionmountIntegration } from '../motionmount/index.js';
import { HomeAssistantMpdIntegration } from '../mpd/index.js';
import { HomeAssistantMqttIntegration } from '../mqtt/index.js';
import { HomeAssistantMqttEventstreamIntegration } from '../mqtt_eventstream/index.js';
import { HomeAssistantMqttJsonIntegration } from '../mqtt_json/index.js';
import { HomeAssistantMqttRoomIntegration } from '../mqtt_room/index.js';
@@ -2240,7 +2239,6 @@ generatedHomeAssistantPortIntegrations.push(new HomeAssistantMotionblindsBleInte
generatedHomeAssistantPortIntegrations.push(new HomeAssistantMotioneyeIntegration());
generatedHomeAssistantPortIntegrations.push(new HomeAssistantMotionmountIntegration());
generatedHomeAssistantPortIntegrations.push(new HomeAssistantMpdIntegration());
generatedHomeAssistantPortIntegrations.push(new HomeAssistantMqttIntegration());
generatedHomeAssistantPortIntegrations.push(new HomeAssistantMqttEventstreamIntegration());
generatedHomeAssistantPortIntegrations.push(new HomeAssistantMqttJsonIntegration());
generatedHomeAssistantPortIntegrations.push(new HomeAssistantMqttRoomIntegration());
@@ -2910,10 +2908,11 @@ generatedHomeAssistantPortIntegrations.push(new HomeAssistantZoneminderIntegrati
generatedHomeAssistantPortIntegrations.push(new HomeAssistantZwaveJsIntegration());
generatedHomeAssistantPortIntegrations.push(new HomeAssistantZwaveMeIntegration());
export const generatedHomeAssistantPortCount = 1453;
export const generatedHomeAssistantPortCount = 1452;
export const handwrittenHomeAssistantPortDomains = [
"cast",
"hue",
"mqtt",
"roku",
"shelly",
"sonos"
@@ -1 +0,0 @@
This folder is generated from Home Assistant component metadata. Replace it with a handwritten TypeScript port when implementing runtime support.
+4
View File
@@ -1,2 +1,6 @@
export * from './mqtt.classes.integration.js';
export * from './mqtt.classes.client.js';
export * from './mqtt.classes.configflow.js';
export * from './mqtt.discovery.js';
export * from './mqtt.mapper.js';
export * from './mqtt.types.js';
+437
View File
@@ -0,0 +1,437 @@
import * as plugins from '../../plugins.js';
import type { IMqttConfig, IMqttMessage, IMqttPayload, IMqttPublishOptions, IMqttSnapshot } from './mqtt.types.js';
import { MqttMapper } from './mqtt.mapper.js';
const packetConnect = 1;
const packetConnack = 2;
const packetPublish = 3;
const packetPuback = 4;
const packetSubscribe = 8;
const packetSuback = 9;
const packetPingreq = 12;
const packetPingresp = 13;
const packetDisconnect = 14;
const defaultDiscoveryPrefix = 'homeassistant';
type TMqttMessageHandler = (messageArg: IMqttMessage) => void;
interface IPendingPacket {
resolve(valueArg?: unknown): void;
reject(errorArg: Error): void;
timer: ReturnType<typeof setTimeout>;
}
export class MqttClient {
private connection?: MqttTcpConnection;
private readonly receivedMessages: IMqttMessage[] = [];
private readonly messageHandlers = new Set<TMqttMessageHandler>();
private started = false;
constructor(private readonly config: IMqttConfig) {}
public async getSnapshot(): Promise<IMqttSnapshot> {
if (this.config.host && !this.started) {
await this.start();
}
return MqttMapper.toSnapshot(this.config, Boolean(this.connection?.connected), this.receivedMessages);
}
public async start(): Promise<void> {
if (this.started || !this.config.host) {
this.started = true;
return;
}
const connection = new MqttTcpConnection(this.config, (messageArg) => this.handleMessage(messageArg));
await connection.connect();
this.connection = connection;
this.started = true;
if (this.config.birthMessage !== false) {
const birth = this.config.birthMessage || { topic: `${this.config.discoveryPrefix || defaultDiscoveryPrefix}/status`, payload: 'online', qos: 0, retain: false };
await this.publish(birth);
}
const subscriptions = new Set(this.config.subscriptions || []);
if (this.config.discovery !== false) {
subscriptions.add(`${this.config.discoveryPrefix || defaultDiscoveryPrefix}/#`);
}
for (const topic of subscriptions) {
await this.subscribe(topic);
}
}
public async publish(optionsArg: IMqttPublishOptions): Promise<void> {
await this.ensureConnection();
await this.connection?.publish(optionsArg);
}
public async subscribe(topicArg: string, handlerArg?: TMqttMessageHandler): Promise<() => Promise<void>> {
if (handlerArg) {
this.messageHandlers.add(handlerArg);
}
await this.ensureConnection();
await this.connection?.subscribe(topicArg);
return async () => {
if (handlerArg) {
this.messageHandlers.delete(handlerArg);
}
};
}
public async destroy(): Promise<void> {
if (this.config.birthMessage !== false && this.connection?.connected) {
await this.publish({ topic: `${this.config.discoveryPrefix || defaultDiscoveryPrefix}/status`, payload: 'offline', qos: 0, retain: false });
}
this.connection?.destroy();
this.connection = undefined;
this.started = false;
this.messageHandlers.clear();
}
private async ensureConnection(): Promise<void> {
if (!this.connection?.connected) {
await this.start();
}
if (!this.connection?.connected) {
throw new Error('MQTT broker connection is not available.');
}
}
private handleMessage(messageArg: IMqttMessage): void {
this.receivedMessages.push(messageArg);
for (const handler of this.messageHandlers) {
handler(messageArg);
}
}
}
class MqttTcpConnection {
private socket?: plugins.net.Socket;
private buffer = Buffer.alloc(0);
private nextPacketId = 1;
private pingTimer?: ReturnType<typeof setInterval>;
private connectPending?: IPendingPacket;
private readonly pendingPackets = new Map<number, IPendingPacket>();
private readonly subscriptions = new Set<string>();
public connected = false;
constructor(private readonly config: IMqttConfig, private readonly messageHandler: TMqttMessageHandler) {}
public async connect(): Promise<void> {
if (this.connected) {
return;
}
if (!this.config.host) {
throw new Error('MQTT host is required when fixture data is not provided.');
}
this.socket = await this.createSocket();
this.socket.on('data', (chunkArg) => this.handleData(chunkArg));
this.socket.on('error', (errorArg) => this.rejectAll(errorArg));
this.socket.on('close', () => {
this.connected = false;
this.rejectAll(new Error('MQTT socket closed.'));
});
await this.sendConnect();
const keepalive = this.config.keepalive || 60;
this.pingTimer = setInterval(() => {
if (this.connected) {
void this.writePacket(packetPingreq, Buffer.alloc(0));
}
}, Math.max(15, keepalive) * 1000);
}
public async publish(optionsArg: IMqttPublishOptions): Promise<void> {
const topic = this.encodeString(optionsArg.topic);
const payload = this.payloadBuffer(optionsArg.payload);
const qos = optionsArg.qos || 0;
if (qos !== 0) {
throw new Error('MQTT publish currently supports QoS 0.');
}
await this.writePacket(packetPublish, Buffer.concat([topic, payload]), optionsArg.retain ? 0x01 : 0);
}
public async subscribe(topicArg: string): Promise<void> {
if (this.subscriptions.has(topicArg)) {
return;
}
const packetId = this.nextId();
const variableHeader = Buffer.alloc(2);
variableHeader.writeUInt16BE(packetId, 0);
const payload = Buffer.concat([this.encodeString(topicArg), Buffer.from([0])]);
const promise = this.waitForPacket(packetId, `MQTT subscribe ${topicArg}`);
await this.writePacket(packetSubscribe, Buffer.concat([variableHeader, payload]), 0x02);
await promise;
this.subscriptions.add(topicArg);
}
public destroy(): void {
if (this.connected) {
void this.writePacket(packetDisconnect, Buffer.alloc(0));
}
if (this.pingTimer) {
clearInterval(this.pingTimer);
this.pingTimer = undefined;
}
this.rejectAll(new Error('MQTT connection destroyed.'));
this.socket?.destroy();
this.socket = undefined;
this.connected = false;
}
private async createSocket(): Promise<plugins.net.Socket> {
const port = this.config.port || (this.config.protocol === 'mqtts' ? 8883 : 1883);
if (this.config.protocol === 'mqtts') {
return new Promise<plugins.net.Socket>((resolve, reject) => {
const socket = plugins.tls.connect({
host: this.config.host,
port,
rejectUnauthorized: this.config.rejectUnauthorized !== false,
servername: this.config.host,
});
socket.once('secureConnect', () => resolve(socket));
socket.once('error', reject);
});
}
return new Promise<plugins.net.Socket>((resolve, reject) => {
const socket = plugins.net.connect({ host: this.config.host, port });
socket.once('connect', () => resolve(socket));
socket.once('error', reject);
});
}
private async sendConnect(): Promise<void> {
const keepalive = this.config.keepalive || 60;
const clientId = this.config.clientId || `smarthome-exchange-${plugins.crypto.randomBytes(4).toString('hex')}`;
const will = this.config.willMessage === false ? undefined : this.config.willMessage || { topic: `${this.config.discoveryPrefix || defaultDiscoveryPrefix}/status`, payload: 'offline', qos: 0, retain: false };
let flags = this.config.clean === false ? 0 : 0x02;
if (will) {
flags |= 0x04 | ((will.qos || 0) << 3);
if (will.retain) {
flags |= 0x20;
}
}
if (this.config.password) {
flags |= 0x40;
}
if (this.config.username) {
flags |= 0x80;
}
const variableHeader = Buffer.concat([
this.encodeString('MQTT'),
Buffer.from([4, flags]),
this.uint16(keepalive),
]);
const payloadParts = [this.encodeString(clientId)];
if (will) {
payloadParts.push(this.encodeString(will.topic), this.payloadWithLength(will.payload));
}
if (this.config.username) {
payloadParts.push(this.encodeString(this.config.username));
}
if (this.config.password) {
payloadParts.push(this.encodeString(this.config.password));
}
const promise = new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
this.connectPending = undefined;
reject(new Error('MQTT connect timed out.'));
}, 7000);
this.connectPending = { resolve: () => resolve(), reject, timer };
});
await this.writePacket(packetConnect, Buffer.concat([variableHeader, ...payloadParts]));
await promise;
}
private handleData(chunkArg: Buffer | string): void {
const chunk = Buffer.isBuffer(chunkArg) ? chunkArg : Buffer.from(chunkArg);
this.buffer = Buffer.concat([this.buffer, chunk]);
while (this.buffer.length >= 2) {
const remainingLength = this.decodeRemainingLength(this.buffer, 1);
if (!remainingLength || this.buffer.length < 1 + remainingLength.bytes + remainingLength.value) {
return;
}
const header = this.buffer[0];
const payloadStart = 1 + remainingLength.bytes;
const payloadEnd = payloadStart + remainingLength.value;
const payload = this.buffer.subarray(payloadStart, payloadEnd);
this.buffer = this.buffer.subarray(payloadEnd);
this.handlePacket(header, payload);
}
}
private handlePacket(headerArg: number, payloadArg: Buffer): void {
const packetType = headerArg >> 4;
if (packetType === packetConnack) {
const returnCode = payloadArg[1];
const pending = this.connectPending;
this.connectPending = undefined;
if (!pending) {
return;
}
clearTimeout(pending.timer);
if (returnCode === 0) {
this.connected = true;
pending.resolve();
} else {
pending.reject(new Error(`MQTT broker rejected connection with code ${returnCode}.`));
}
return;
}
if (packetType === packetSuback) {
const packetId = payloadArg.readUInt16BE(0);
const pending = this.pendingPackets.get(packetId);
if (pending) {
clearTimeout(pending.timer);
this.pendingPackets.delete(packetId);
const failure = payloadArg.subarray(2).includes(0x80);
failure ? pending.reject(new Error(`MQTT subscribe ${packetId} was rejected.`)) : pending.resolve();
}
return;
}
if (packetType === packetPublish) {
this.handlePublish(headerArg, payloadArg);
return;
}
if (packetType === packetPingresp) {
return;
}
}
private handlePublish(headerArg: number, payloadArg: Buffer): void {
const retain = Boolean(headerArg & 0x01);
const qos = ((headerArg >> 1) & 0x03) as 0 | 1 | 2;
const topicLength = payloadArg.readUInt16BE(0);
const topic = payloadArg.subarray(2, 2 + topicLength).toString('utf8');
let offset = 2 + topicLength;
if (qos > 0) {
const packetId = payloadArg.readUInt16BE(offset);
offset += 2;
if (qos === 1) {
void this.writePacket(packetPuback, this.uint16(packetId));
}
}
const payload = payloadArg.subarray(offset).toString('utf8');
const subscribedTopic = [...this.subscriptions].find((topicArg) => this.topicMatches(topicArg, topic)) || topic;
this.messageHandler({ topic, payload, qos, retain, subscribedTopic, timestamp: Date.now() } as IMqttMessage & { subscribedTopic: string });
}
private waitForPacket(packetIdArg: number, descriptionArg: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingPackets.delete(packetIdArg);
reject(new Error(`${descriptionArg} timed out.`));
}, 7000);
this.pendingPackets.set(packetIdArg, { resolve: () => resolve(), reject, timer });
});
}
private async writePacket(packetTypeArg: number, payloadArg: Buffer, flagsArg = 0): Promise<void> {
if (!this.socket || this.socket.destroyed) {
throw new Error('MQTT socket is not connected.');
}
const fixedHeader = Buffer.concat([Buffer.from([(packetTypeArg << 4) | flagsArg]), this.encodeRemainingLength(payloadArg.length)]);
const packet = Buffer.concat([fixedHeader, payloadArg]);
await new Promise<void>((resolve, reject) => {
this.socket?.write(packet, (errorArg) => errorArg ? reject(errorArg) : resolve());
});
}
private encodeRemainingLength(valueArg: number): Buffer {
const bytes: number[] = [];
let value = valueArg;
do {
let encodedByte = value % 128;
value = Math.floor(value / 128);
if (value > 0) {
encodedByte |= 128;
}
bytes.push(encodedByte);
} while (value > 0);
return Buffer.from(bytes);
}
private decodeRemainingLength(bufferArg: Buffer, offsetArg: number): { value: number; bytes: number } | undefined {
let multiplier = 1;
let value = 0;
let bytes = 0;
let offset = offsetArg;
while (offset < bufferArg.length) {
const encodedByte = bufferArg[offset++];
bytes++;
value += (encodedByte & 127) * multiplier;
if ((encodedByte & 128) === 0) {
return { value, bytes };
}
multiplier *= 128;
if (bytes > 4) {
throw new Error('Invalid MQTT remaining length.');
}
}
return undefined;
}
private payloadBuffer(payloadArg: IMqttPayload | undefined): Buffer {
if (payloadArg === undefined || payloadArg === null) {
return Buffer.alloc(0);
}
if (typeof payloadArg === 'object') {
return Buffer.from(JSON.stringify(payloadArg), 'utf8');
}
return Buffer.from(String(payloadArg), 'utf8');
}
private payloadWithLength(payloadArg: IMqttPayload | undefined): Buffer {
const payload = this.payloadBuffer(payloadArg);
return Buffer.concat([this.uint16(payload.length), payload]);
}
private encodeString(valueArg: string): Buffer {
const value = Buffer.from(valueArg, 'utf8');
return Buffer.concat([this.uint16(value.length), value]);
}
private uint16(valueArg: number): Buffer {
const buffer = Buffer.alloc(2);
buffer.writeUInt16BE(valueArg, 0);
return buffer;
}
private nextId(): number {
const id = this.nextPacketId++;
if (this.nextPacketId > 0xffff) {
this.nextPacketId = 1;
}
return id;
}
private topicMatches(filterArg: string, topicArg: string): boolean {
const filterParts = filterArg.split('/');
const topicParts = topicArg.split('/');
for (let index = 0; index < filterParts.length; index++) {
const filterPart = filterParts[index];
if (filterPart === '#') {
return index === filterParts.length - 1;
}
if (filterPart === '+') {
continue;
}
if (filterPart !== topicParts[index]) {
return false;
}
}
return filterParts.length === topicParts.length;
}
private rejectAll(errorArg: Error): void {
if (this.connectPending) {
clearTimeout(this.connectPending.timer);
this.connectPending.reject(errorArg);
this.connectPending = undefined;
}
for (const [packetId, pending] of this.pendingPackets) {
clearTimeout(pending.timer);
pending.reject(errorArg);
this.pendingPackets.delete(packetId);
}
}
}
@@ -0,0 +1,44 @@
import type { IConfigFlow, IConfigFlowContext, IConfigFlowStep, IDiscoveryCandidate } from '../../core/types.js';
import type { IMqttConfig } from './mqtt.types.js';
export class MqttConfigFlow implements IConfigFlow<IMqttConfig> {
public async start(candidateArg: IDiscoveryCandidate, contextArg: IConfigFlowContext): Promise<IConfigFlowStep<IMqttConfig>> {
void contextArg;
return {
kind: 'form',
title: 'Connect MQTT broker',
description: 'Configure a local MQTT 3.1.1 broker and Home Assistant-style MQTT discovery.',
fields: [
{ name: 'host', label: 'Broker host', type: 'text', required: true },
{ name: 'port', label: 'Broker port', type: 'number' },
{ name: 'protocol', label: 'Protocol', type: 'select', options: [{ label: 'MQTT', value: 'mqtt' }, { label: 'MQTTS', value: 'mqtts' }] },
{ name: 'username', label: 'Username', type: 'text' },
{ name: 'password', label: 'Password', type: 'password' },
{ name: 'clientId', label: 'Client ID', type: 'text' },
{ name: 'discoveryPrefix', label: 'Discovery prefix', type: 'text' },
],
submit: async (valuesArg) => ({
kind: 'done',
title: 'MQTT configured',
config: {
host: this.stringValue(valuesArg.host) || candidateArg.host || '',
port: this.numberValue(valuesArg.port) || candidateArg.port || 1883,
protocol: valuesArg.protocol === 'mqtts' ? 'mqtts' : 'mqtt',
username: this.stringValue(valuesArg.username),
password: this.stringValue(valuesArg.password),
clientId: this.stringValue(valuesArg.clientId),
discovery: true,
discoveryPrefix: this.stringValue(valuesArg.discoveryPrefix) || 'homeassistant',
},
}),
};
}
private stringValue(valueArg: unknown): string | undefined {
return typeof valueArg === 'string' && valueArg.trim() ? valueArg.trim() : undefined;
}
private numberValue(valueArg: unknown): number | undefined {
return typeof valueArg === 'number' && Number.isFinite(valueArg) ? valueArg : undefined;
}
}
@@ -1,34 +1,97 @@
import { DescriptorOnlyIntegration } from '../../core/classes.descriptoronlyintegration.js';
import type * as shxInterfaces from '@smarthome.exchange/interfaces';
import { BaseIntegration } from '../../core/classes.baseintegration.js';
import type { IIntegrationEntity, IIntegrationEvent, IIntegrationRuntime, IIntegrationSetupContext, IServiceCallRequest, IServiceCallResult } from '../../core/types.js';
import { MqttClient } from './mqtt.classes.client.js';
import { MqttConfigFlow } from './mqtt.classes.configflow.js';
import { createMqttDiscoveryDescriptor } from './mqtt.discovery.js';
import { MqttMapper } from './mqtt.mapper.js';
import type { IMqttConfig, IMqttPublishOptions } from './mqtt.types.js';
export class HomeAssistantMqttIntegration extends DescriptorOnlyIntegration {
constructor() {
super({
domain: "mqtt",
displayName: "MQTT",
status: 'descriptor-only',
metadata: {
"source": "home-assistant/core",
"upstreamPath": "homeassistant/components/mqtt",
"upstreamDomain": "mqtt",
"integrationType": "service",
"iotClass": "local_push",
"qualityScale": "platinum",
"requirements": [
"paho-mqtt==2.1.0"
],
"dependencies": [
"file_upload",
"http"
],
"afterDependencies": [
"hassio"
],
"codeowners": [
"@emontnemery",
"@jbouwh",
"@bdraco"
]
},
export class MqttIntegration extends BaseIntegration<IMqttConfig> {
public readonly domain = 'mqtt';
public readonly displayName = 'MQTT';
public readonly status = 'control-runtime' as const;
public readonly discoveryDescriptor = createMqttDiscoveryDescriptor();
public readonly configFlow = new MqttConfigFlow();
public readonly metadata = {
source: 'home-assistant/core',
upstreamPath: 'homeassistant/components/mqtt',
upstreamDomain: 'mqtt',
integrationType: 'service',
iotClass: 'local_push',
qualityScale: 'platinum',
};
public async setup(configArg: IMqttConfig, contextArg: IIntegrationSetupContext): Promise<IIntegrationRuntime> {
void contextArg;
return new MqttRuntime(new MqttClient(configArg));
}
public async destroy(): Promise<void> {}
}
export class HomeAssistantMqttIntegration extends MqttIntegration {}
class MqttRuntime implements IIntegrationRuntime {
public domain = 'mqtt';
constructor(private readonly client: MqttClient) {}
public async devices(): Promise<shxInterfaces.data.IDeviceDefinition[]> {
return MqttMapper.toDevices(await this.client.getSnapshot());
}
public async entities(): Promise<IIntegrationEntity[]> {
return MqttMapper.toEntities(await this.client.getSnapshot());
}
public async subscribe(handlerArg: (eventArg: IIntegrationEvent) => void): Promise<() => Promise<void>> {
return this.client.subscribe('#', (messageArg) => {
handlerArg({
type: 'state_changed',
integrationDomain: 'mqtt',
data: messageArg,
timestamp: messageArg.timestamp || Date.now(),
});
});
}
public async callService(requestArg: IServiceCallRequest): Promise<IServiceCallResult> {
if (requestArg.domain === 'mqtt') {
if (requestArg.service !== 'publish') {
return { success: false, error: `Unsupported MQTT service: ${requestArg.service}` };
}
const publishOptions = this.publishOptionsFromRequest(requestArg);
if (!publishOptions) {
return { success: false, error: 'MQTT publish requires data.topic.' };
}
await this.client.publish(publishOptions);
return { success: true };
}
const publishOptions = MqttMapper.commandForService(await this.client.getSnapshot(), requestArg);
if (!publishOptions) {
return { success: false, error: `MQTT entity service ${requestArg.domain}.${requestArg.service} has no command topic mapping.` };
}
await this.client.publish(publishOptions);
return { success: true };
}
public async destroy(): Promise<void> {
await this.client.destroy();
}
private publishOptionsFromRequest(requestArg: IServiceCallRequest): IMqttPublishOptions | undefined {
const topic = requestArg.data?.topic;
if (typeof topic !== 'string' || !topic) {
return undefined;
}
const qos = requestArg.data?.qos;
return {
topic,
payload: requestArg.data?.payload as IMqttPublishOptions['payload'],
qos: qos === 1 || qos === 2 ? qos : 0,
retain: requestArg.data?.retain === true,
};
}
}
+114
View File
@@ -0,0 +1,114 @@
import { DiscoveryDescriptor } from '../../core/classes.discoverydescriptor.js';
import type { IDiscoveryCandidate, IDiscoveryMatch, IDiscoveryMatcher, IDiscoveryValidator } from '../../core/types.js';
import type { IMqttDiscoveryMessage, IMqttManualEntry } from './mqtt.types.js';
const defaultDiscoveryPrefix = 'homeassistant';
export class MqttManualMatcher implements IDiscoveryMatcher<IMqttManualEntry> {
public id = 'mqtt-manual-match';
public source = 'manual' as const;
public description = 'Recognize manual MQTT broker setup entries.';
public async matches(inputArg: IMqttManualEntry): Promise<IDiscoveryMatch> {
const model = inputArg.model?.toLowerCase() || '';
const matched = Boolean(inputArg.host || inputArg.metadata?.mqtt || model.includes('mqtt') || model.includes('mosquitto'));
if (!matched) {
return { matched: false, confidence: 'low', reason: 'Manual entry does not contain MQTT broker setup hints.' };
}
return {
matched: true,
confidence: inputArg.host ? 'high' : 'medium',
reason: 'Manual entry can start MQTT broker setup.',
normalizedDeviceId: inputArg.id || inputArg.host,
candidate: {
source: 'manual',
integrationDomain: 'mqtt',
id: inputArg.id || inputArg.host,
host: inputArg.host,
port: inputArg.port || (inputArg.protocol === 'mqtts' ? 8883 : 1883),
name: inputArg.name || 'MQTT broker',
manufacturer: 'MQTT',
model: inputArg.model || 'Broker',
metadata: {
...inputArg.metadata,
protocol: inputArg.protocol || 'mqtt',
username: inputArg.username,
},
},
};
}
}
export class MqttDiscoveryTopicMatcher implements IDiscoveryMatcher<IMqttDiscoveryMessage> {
public id = 'mqtt-discovery-topic-match';
public source = 'mqtt' as const;
public description = 'Recognize Home Assistant MQTT discovery config topics.';
public async matches(inputArg: IMqttDiscoveryMessage): Promise<IDiscoveryMatch> {
const topic = inputArg.topic || '';
const match = this.discoveryMatch(topic);
if (!match) {
return { matched: false, confidence: 'low', reason: 'MQTT message is not on a discovery config topic.' };
}
return {
matched: true,
confidence: 'certain',
reason: 'MQTT topic matches Home Assistant discovery format.',
normalizedDeviceId: match.objectId,
candidate: {
source: 'mqtt',
integrationDomain: 'mqtt',
id: `${match.component}:${match.nodeId || ''}:${match.objectId}`,
name: match.objectId,
metadata: {
discoveryPrefix: match.prefix,
component: match.component,
nodeId: match.nodeId,
objectId: match.objectId,
topic,
payload: inputArg.payload,
},
},
};
}
private discoveryMatch(topicArg: string): { prefix: string; component: string; nodeId?: string; objectId: string } | undefined {
const parts = topicArg.split('/').filter(Boolean);
if (parts.length < 4 || parts[parts.length - 1] !== 'config') {
return undefined;
}
const prefix = parts[0] || defaultDiscoveryPrefix;
const component = parts[1];
if (parts.length === 4) {
return { prefix, component, objectId: parts[2] };
}
if (parts.length === 5) {
return { prefix, component, nodeId: parts[2], objectId: parts[3] };
}
return undefined;
}
}
export class MqttCandidateValidator implements IDiscoveryValidator {
public id = 'mqtt-candidate-validator';
public description = 'Validate MQTT broker or discovery candidate metadata.';
public async validate(candidateArg: IDiscoveryCandidate): Promise<IDiscoveryMatch> {
const metadataProtocol = candidateArg.metadata?.protocol;
const matched = candidateArg.integrationDomain === 'mqtt' || candidateArg.port === 1883 || candidateArg.port === 8883 || candidateArg.metadata?.mqtt === true || metadataProtocol === 'mqtt' || metadataProtocol === 'mqtts';
return {
matched,
confidence: matched && candidateArg.host ? 'high' : matched ? 'medium' : 'low',
reason: matched ? 'Candidate has MQTT broker metadata.' : 'Candidate is not MQTT.',
candidate: matched ? candidateArg : undefined,
normalizedDeviceId: candidateArg.id || candidateArg.host,
};
}
}
export const createMqttDiscoveryDescriptor = (): DiscoveryDescriptor => {
return new DiscoveryDescriptor({ integrationDomain: 'mqtt', displayName: 'MQTT' })
.addMatcher(new MqttManualMatcher())
.addMatcher(new MqttDiscoveryTopicMatcher())
.addValidator(new MqttCandidateValidator());
};
+532
View File
@@ -0,0 +1,532 @@
import * as plugins from '../../plugins.js';
import type { IIntegrationEntity, IServiceCallRequest, TEntityPlatform } from '../../core/types.js';
import type { IMqttConfig, IMqttDeviceInfo, IMqttEntityConfig, IMqttMessage, IMqttPayload, IMqttPublishOptions, IMqttSnapshot } from './mqtt.types.js';
const defaultDiscoveryPrefix = 'homeassistant';
const defaultPayloadOn = 'ON';
const defaultPayloadOff = 'OFF';
const defaultPayloadAvailable = 'online';
const defaultPayloadNotAvailable = 'offline';
const abbreviations: Record<string, string> = {
avty: 'availability',
avty_t: 'availability_topic',
cmd_t: 'command_topic',
cmps: 'components',
def_ent_id: 'default_entity_id',
dev: 'device',
dev_cla: 'device_class',
json_attr_t: 'json_attributes_topic',
name: 'name',
o: 'origin',
p: 'platform',
pl_avail: 'payload_available',
pl_not_avail: 'payload_not_available',
pl_off: 'payload_off',
pl_on: 'payload_on',
pl_prs: 'payload_press',
pl_cls: 'payload_close',
pl_open: 'payload_open',
pl_stop: 'payload_stop',
qos: 'qos',
ret: 'retain',
stat_t: 'state_topic',
stat_val_tpl: 'state_value_template',
t: 'topic',
uniq_id: 'unique_id',
unit_of_meas: 'unit_of_measurement',
val_tpl: 'value_template',
};
const deviceAbbreviations: Record<string, string> = {
cns: 'connections',
cu: 'configuration_url',
hw: 'hw_version',
ids: 'identifiers',
mf: 'manufacturer',
mdl: 'model',
mdl_id: 'model_id',
name: 'name',
sa: 'suggested_area',
sn: 'serial_number',
sw: 'sw_version',
};
const originAbbreviations: Record<string, string> = {
name: 'name',
sw: 'sw_version',
url: 'support_url',
};
export class MqttMapper {
public static toSnapshot(configArg: IMqttConfig, connectedArg = false, receivedMessagesArg: IMqttMessage[] = []): IMqttSnapshot {
const messages = [...(configArg.retainedMessages || []), ...(configArg.discoveryMessages || []), ...receivedMessagesArg];
const entities = [
...(configArg.entities || []),
...this.entitiesFromDiscoveryMessages(configArg.discoveryMessages || [], configArg.discoveryPrefix || defaultDiscoveryPrefix),
];
return {
brokerInfo: {
host: configArg.host,
port: configArg.port || (configArg.protocol === 'mqtts' ? 8883 : 1883),
protocol: configArg.protocol || 'mqtt',
clientId: configArg.clientId,
connected: connectedArg,
discoveryPrefix: configArg.discoveryPrefix || defaultDiscoveryPrefix,
},
entities: this.applyStates(entities, messages),
messages,
};
}
public static toDevices(snapshotArg: IMqttSnapshot): plugins.shxInterfaces.data.IDeviceDefinition[] {
const updatedAt = new Date().toISOString();
const devices = new Map<string, plugins.shxInterfaces.data.IDeviceDefinition>();
const brokerId = this.brokerDeviceId(snapshotArg);
devices.set(brokerId, {
id: brokerId,
integrationDomain: 'mqtt',
name: snapshotArg.brokerInfo.host ? `MQTT broker ${snapshotArg.brokerInfo.host}` : 'MQTT broker',
protocol: 'mqtt',
manufacturer: 'MQTT',
model: 'Broker',
online: snapshotArg.brokerInfo.connected || Boolean(snapshotArg.messages.length || snapshotArg.entities.length),
features: [
{ id: 'connection', capability: 'sensor', name: 'Connection', readable: true, writable: false },
{ id: 'discovery_prefix', capability: 'sensor', name: 'Discovery prefix', readable: true, writable: false },
],
state: [
{ featureId: 'connection', value: snapshotArg.brokerInfo.connected ? 'connected' : 'configured', updatedAt },
{ featureId: 'discovery_prefix', value: snapshotArg.brokerInfo.discoveryPrefix, updatedAt },
],
metadata: {
host: snapshotArg.brokerInfo.host,
port: snapshotArg.brokerInfo.port,
protocol: snapshotArg.brokerInfo.protocol,
},
});
for (const entity of snapshotArg.entities) {
const deviceId = this.deviceIdForEntity(entity, snapshotArg);
const existing = devices.get(deviceId);
const feature = this.featureForEntity(entity);
const state = { featureId: feature.id, value: this.deviceStateValue(entity.state ?? null), updatedAt };
if (existing) {
existing.features.push(feature);
existing.state.push(state);
continue;
}
devices.set(deviceId, {
id: deviceId,
integrationDomain: 'mqtt',
name: this.deviceName(entity.device, entity.name || entity.objectId || 'MQTT device'),
protocol: 'mqtt',
manufacturer: entity.device?.manufacturer,
model: entity.device?.model,
online: this.available(entity, snapshotArg.messages),
features: [feature],
state: [state],
metadata: {
identifiers: entity.device?.identifiers,
connections: entity.device?.connections,
serialNumber: entity.device?.serialNumber,
swVersion: entity.device?.swVersion,
hwVersion: entity.device?.hwVersion,
origin: entity.origin,
},
});
}
return [...devices.values()];
}
public static toEntities(snapshotArg: IMqttSnapshot): IIntegrationEntity[] {
return snapshotArg.entities.map((entityArg) => ({
id: entityArg.defaultEntityId || `${this.toEntityPlatform(entityArg.platform)}.${this.slug(entityArg.name || entityArg.objectId || entityArg.uniqueId || 'mqtt')}`,
uniqueId: entityArg.uniqueId || `mqtt_${this.slug(entityArg.objectId || entityArg.name || entityArg.stateTopic || entityArg.commandTopic || 'entity')}`,
integrationDomain: 'mqtt',
deviceId: this.deviceIdForEntity(entityArg, snapshotArg),
platform: this.toEntityPlatform(entityArg.platform),
name: entityArg.name || entityArg.objectId || 'MQTT entity',
state: this.entityState(entityArg),
attributes: {
...entityArg.attributes,
mqttPlatform: entityArg.platform,
stateTopic: entityArg.stateTopic,
commandTopic: entityArg.commandTopic,
availabilityTopic: entityArg.availabilityTopic,
deviceClass: entityArg.deviceClass,
unit: entityArg.unitOfMeasurement,
discoveryTopic: entityArg.discoveryTopic,
},
available: this.available(entityArg, snapshotArg.messages),
}));
}
public static commandForService(snapshotArg: IMqttSnapshot, requestArg: IServiceCallRequest): IMqttPublishOptions | undefined {
const entity = this.findTargetEntity(snapshotArg, requestArg);
if (!entity?.commandTopic) {
return undefined;
}
const raw = entity.rawConfig || {};
let payload: IMqttPayload | undefined;
if (requestArg.service === 'turn_on') {
payload = entity.payloadOn || defaultPayloadOn;
} else if (requestArg.service === 'turn_off') {
payload = entity.payloadOff || defaultPayloadOff;
} else if (requestArg.service === 'press') {
payload = typeof raw.payload_press === 'string' ? raw.payload_press : 'PRESS';
} else if (requestArg.service === 'open_cover') {
payload = typeof raw.payload_open === 'string' ? raw.payload_open : 'OPEN';
} else if (requestArg.service === 'close_cover') {
payload = typeof raw.payload_close === 'string' ? raw.payload_close : 'CLOSE';
} else if (requestArg.service === 'stop_cover') {
payload = typeof raw.payload_stop === 'string' ? raw.payload_stop : 'STOP';
} else if (requestArg.service === 'set_value') {
payload = requestArg.data?.value as IMqttPayload;
} else if (requestArg.service === 'select_option') {
payload = requestArg.data?.option as IMqttPayload;
} else if (requestArg.service === 'set_temperature') {
payload = requestArg.data?.temperature as IMqttPayload;
}
if (payload === undefined) {
return undefined;
}
return {
topic: entity.commandTopic,
payload,
qos: entity.qos || 0,
retain: entity.retain || false,
};
}
public static entitiesFromDiscoveryMessages(messagesArg: IMqttMessage[], prefixArg = defaultDiscoveryPrefix): IMqttEntityConfig[] {
const entities: IMqttEntityConfig[] = [];
for (const message of messagesArg) {
const parsedTopic = this.parseDiscoveryTopic(message.topic, prefixArg);
if (!parsedTopic || message.payload === '' || message.payload === null) {
continue;
}
const payload = this.discoveryPayload(message.payload);
if (!payload) {
continue;
}
this.replaceAllAbbreviations(payload);
this.replaceTopicBase(payload);
if (parsedTopic.component === 'device') {
const components = this.asRecord(payload.components);
for (const [componentId, componentConfig] of Object.entries(components)) {
const componentPayload = { ...payload, ...this.asRecord(componentConfig) };
delete componentPayload.components;
entities.push(this.entityFromPayload(componentPayload, String(componentPayload.platform || 'sensor'), parsedTopic.objectId, parsedTopic.nodeId, message.topic, componentId));
}
continue;
}
entities.push(this.entityFromPayload(payload, parsedTopic.component, parsedTopic.objectId, parsedTopic.nodeId, message.topic));
}
return entities;
}
private static entityFromPayload(payloadArg: Record<string, unknown>, platformArg: string, objectIdArg: string, nodeIdArg: string | undefined, topicArg: string, componentIdArg?: string): IMqttEntityConfig {
const device = this.deviceFromPayload(this.asRecord(payloadArg.device));
const origin = this.originFromPayload(this.asRecord(payloadArg.origin));
return {
platform: platformArg,
objectId: componentIdArg || objectIdArg,
nodeId: nodeIdArg,
uniqueId: this.stringValue(payloadArg.unique_id),
defaultEntityId: this.stringValue(payloadArg.default_entity_id),
name: payloadArg.name === null ? null : this.stringValue(payloadArg.name),
device,
origin,
stateTopic: this.stringValue(payloadArg.state_topic),
commandTopic: this.stringValue(payloadArg.command_topic),
availabilityTopic: this.stringValue(payloadArg.availability_topic),
jsonAttributesTopic: this.stringValue(payloadArg.json_attributes_topic),
valueTemplate: this.stringValue(payloadArg.value_template),
stateValueTemplate: this.stringValue(payloadArg.state_value_template),
deviceClass: this.stringValue(payloadArg.device_class),
unitOfMeasurement: this.stringValue(payloadArg.unit_of_measurement),
payloadOn: this.stringValue(payloadArg.payload_on),
payloadOff: this.stringValue(payloadArg.payload_off),
payloadAvailable: this.stringValue(payloadArg.payload_available),
payloadNotAvailable: this.stringValue(payloadArg.payload_not_available),
qos: this.qosValue(payloadArg.qos),
retain: typeof payloadArg.retain === 'boolean' ? payloadArg.retain : undefined,
discoveryTopic: topicArg,
rawConfig: payloadArg,
};
}
private static applyStates(entitiesArg: IMqttEntityConfig[], messagesArg: IMqttMessage[]): IMqttEntityConfig[] {
return entitiesArg.map((entityArg) => {
const stateMessage = entityArg.stateTopic ? this.latestMessage(messagesArg, entityArg.stateTopic) : undefined;
const attributesMessage = entityArg.jsonAttributesTopic ? this.latestMessage(messagesArg, entityArg.jsonAttributesTopic) : undefined;
return {
...entityArg,
state: stateMessage ? this.renderValue(stateMessage.payload, entityArg.valueTemplate || entityArg.stateValueTemplate) : entityArg.state,
attributes: attributesMessage ? this.attributesPayload(attributesMessage.payload) : entityArg.attributes,
};
});
}
private static parseDiscoveryTopic(topicArg: string, prefixArg: string): { component: string; nodeId?: string; objectId: string } | undefined {
const parts = topicArg.split('/').filter(Boolean);
if (parts[0] !== prefixArg || parts[parts.length - 1] !== 'config') {
return undefined;
}
if (parts.length === 4) {
return { component: parts[1], objectId: parts[2] };
}
if (parts.length === 5) {
return { component: parts[1], nodeId: parts[2], objectId: parts[3] };
}
return undefined;
}
private static discoveryPayload(payloadArg: IMqttPayload): Record<string, unknown> | undefined {
if (typeof payloadArg === 'string') {
try {
const parsed = JSON.parse(payloadArg) as unknown;
return this.asRecord(parsed);
} catch {
return undefined;
}
}
return this.asRecord(payloadArg);
}
private static replaceAllAbbreviations(payloadArg: Record<string, unknown>): void {
this.replaceAbbreviations(payloadArg, abbreviations);
if (this.isRecord(payloadArg.device)) {
this.replaceAbbreviations(payloadArg.device, deviceAbbreviations);
}
if (this.isRecord(payloadArg.origin)) {
this.replaceAbbreviations(payloadArg.origin, originAbbreviations);
}
if (this.isRecord(payloadArg.components)) {
for (const component of Object.values(payloadArg.components)) {
if (this.isRecord(component)) {
this.replaceAbbreviations(component, abbreviations);
}
}
}
}
private static replaceAbbreviations(payloadArg: Record<string, unknown>, mapArg: Record<string, string>): void {
for (const [shortKey, longKey] of Object.entries(mapArg)) {
if (shortKey in payloadArg && !(longKey in payloadArg)) {
payloadArg[longKey] = payloadArg[shortKey];
}
}
}
private static replaceTopicBase(payloadArg: Record<string, unknown>): void {
const topicBase = this.stringValue(payloadArg['~']);
if (!topicBase) {
return;
}
for (const [key, value] of Object.entries(payloadArg)) {
if (!key.endsWith('topic') || typeof value !== 'string') {
continue;
}
if (value.startsWith('~')) {
payloadArg[key] = `${topicBase}${value.slice(1)}`;
} else if (value.endsWith('~')) {
payloadArg[key] = `${value.slice(0, -1)}${topicBase}`;
}
}
}
private static deviceFromPayload(payloadArg: Record<string, unknown>): IMqttDeviceInfo | undefined {
if (!Object.keys(payloadArg).length) {
return undefined;
}
return {
identifiers: this.stringOrStringArray(payloadArg.identifiers),
connections: Array.isArray(payloadArg.connections) ? payloadArg.connections as string[][] : undefined,
name: this.stringValue(payloadArg.name),
manufacturer: this.stringValue(payloadArg.manufacturer),
model: this.stringValue(payloadArg.model),
modelId: this.stringValue(payloadArg.model_id),
swVersion: this.stringValue(payloadArg.sw_version),
hwVersion: this.stringValue(payloadArg.hw_version),
serialNumber: this.stringValue(payloadArg.serial_number),
configurationUrl: this.stringValue(payloadArg.configuration_url),
suggestedArea: this.stringValue(payloadArg.suggested_area),
};
}
private static originFromPayload(payloadArg: Record<string, unknown>) {
if (!Object.keys(payloadArg).length) {
return undefined;
}
return {
name: this.stringValue(payloadArg.name),
swVersion: this.stringValue(payloadArg.sw_version),
supportUrl: this.stringValue(payloadArg.support_url),
};
}
private static latestMessage(messagesArg: IMqttMessage[], topicArg: string): IMqttMessage | undefined {
const matches = messagesArg.filter((messageArg) => messageArg.topic === topicArg);
return matches.sort((leftArg, rightArg) => (rightArg.timestamp || 0) - (leftArg.timestamp || 0))[0] || matches[matches.length - 1];
}
private static renderValue(payloadArg: IMqttPayload, templateArg?: string): IMqttPayload {
if (!templateArg) {
return payloadArg;
}
if (templateArg.includes('value_json')) {
const parsed = typeof payloadArg === 'string' ? this.safeJson(payloadArg) : payloadArg;
const pathMatch = /value_json(?:\.([a-zA-Z0-9_]+)|\[['"]([^'"]+)['"]\])/.exec(templateArg);
const key = pathMatch?.[1] || pathMatch?.[2];
if (key && this.isRecord(parsed)) {
return parsed[key] as IMqttPayload;
}
}
if (templateArg.includes('value')) {
return payloadArg;
}
return payloadArg;
}
private static attributesPayload(payloadArg: IMqttPayload): Record<string, unknown> {
if (this.isRecord(payloadArg)) {
return payloadArg;
}
if (typeof payloadArg === 'string') {
return this.asRecord(this.safeJson(payloadArg));
}
return {};
}
private static entityState(entityArg: IMqttEntityConfig): unknown {
const state = entityArg.state;
if (entityArg.platform === 'switch' || entityArg.platform === 'light' || entityArg.platform === 'fan') {
if (state === (entityArg.payloadOn || defaultPayloadOn)) {
return 'on';
}
if (state === (entityArg.payloadOff || defaultPayloadOff)) {
return 'off';
}
}
return state ?? 'unknown';
}
private static available(entityArg: IMqttEntityConfig, messagesArg: IMqttMessage[]): boolean {
if (!entityArg.availabilityTopic) {
return true;
}
const availabilityMessage = this.latestMessage(messagesArg, entityArg.availabilityTopic);
if (!availabilityMessage) {
return false;
}
return availabilityMessage.payload !== (entityArg.payloadNotAvailable || defaultPayloadNotAvailable) && availabilityMessage.payload === (entityArg.payloadAvailable || defaultPayloadAvailable);
}
private static featureForEntity(entityArg: IMqttEntityConfig): plugins.shxInterfaces.data.IDeviceFeature {
const platform = this.toEntityPlatform(entityArg.platform);
return {
id: this.slug(entityArg.uniqueId || entityArg.objectId || entityArg.name || platform),
capability: this.capabilityForPlatform(platform),
name: entityArg.name || entityArg.objectId || 'MQTT entity',
readable: Boolean(entityArg.stateTopic),
writable: Boolean(entityArg.commandTopic),
unit: entityArg.unitOfMeasurement,
};
}
private static capabilityForPlatform(platformArg: TEntityPlatform): plugins.shxInterfaces.data.TDeviceCapability {
if (platformArg === 'light') {
return 'light';
}
if (platformArg === 'switch' || platformArg === 'button' || platformArg === 'cover' || platformArg === 'select' || platformArg === 'number' || platformArg === 'text') {
return 'switch';
}
if (platformArg === 'climate') {
return 'climate';
}
if (platformArg === 'fan') {
return 'fan';
}
if (platformArg === 'media_player') {
return 'media';
}
return 'sensor';
}
private static toEntityPlatform(platformArg: string): TEntityPlatform {
const allowed: TEntityPlatform[] = ['light', 'switch', 'sensor', 'binary_sensor', 'button', 'climate', 'cover', 'fan', 'media_player', 'number', 'select', 'text', 'update'];
return allowed.includes(platformArg as TEntityPlatform) ? platformArg as TEntityPlatform : 'sensor';
}
private static findTargetEntity(snapshotArg: IMqttSnapshot, requestArg: IServiceCallRequest): IMqttEntityConfig | undefined {
if (requestArg.target.entityId) {
const entities = this.toEntities(snapshotArg);
const entity = entities.find((entityArg) => entityArg.id === requestArg.target.entityId || entityArg.uniqueId === requestArg.target.entityId);
return entity ? snapshotArg.entities.find((entityArg) => (entityArg.uniqueId || `mqtt_${this.slug(entityArg.objectId || entityArg.name || entityArg.stateTopic || entityArg.commandTopic || 'entity')}`) === entity.uniqueId) : undefined;
}
if (requestArg.target.deviceId) {
return snapshotArg.entities.find((entityArg) => this.deviceIdForEntity(entityArg, snapshotArg) === requestArg.target.deviceId && entityArg.commandTopic);
}
return snapshotArg.entities.find((entityArg) => entityArg.commandTopic);
}
private static deviceIdForEntity(entityArg: IMqttEntityConfig, snapshotArg: IMqttSnapshot): string {
const identifiers = entityArg.device?.identifiers;
const identifier = Array.isArray(identifiers) ? identifiers[0] : identifiers;
const connection = entityArg.device?.connections?.[0]?.join('_');
return `mqtt.device.${this.slug(identifier || connection || entityArg.uniqueId || entityArg.objectId || snapshotArg.brokerInfo.host || 'broker')}`;
}
private static brokerDeviceId(snapshotArg: IMqttSnapshot): string {
return `mqtt.broker.${this.slug(snapshotArg.brokerInfo.host || 'configured')}`;
}
private static deviceName(deviceArg: IMqttDeviceInfo | undefined, fallbackArg: string): string {
return deviceArg?.name || fallbackArg;
}
private static deviceStateValue(valueArg: IMqttPayload): plugins.shxInterfaces.data.TDeviceStateValue {
return Array.isArray(valueArg) ? JSON.stringify(valueArg) : valueArg as plugins.shxInterfaces.data.TDeviceStateValue;
}
private static stringValue(valueArg: unknown): string | undefined {
return typeof valueArg === 'string' && valueArg ? valueArg : undefined;
}
private static stringOrStringArray(valueArg: unknown): string | string[] | undefined {
if (typeof valueArg === 'string') {
return valueArg;
}
if (Array.isArray(valueArg)) {
return valueArg.filter((itemArg): itemArg is string => typeof itemArg === 'string');
}
return undefined;
}
private static qosValue(valueArg: unknown): 0 | 1 | 2 | undefined {
return valueArg === 0 || valueArg === 1 || valueArg === 2 ? valueArg : undefined;
}
private static safeJson(valueArg: string): unknown {
try {
return JSON.parse(valueArg) as unknown;
} catch {
return valueArg;
}
}
private static asRecord(valueArg: unknown): Record<string, unknown> {
return this.isRecord(valueArg) ? valueArg : {};
}
private static isRecord(valueArg: unknown): valueArg is Record<string, unknown> {
return typeof valueArg === 'object' && valueArg !== null && !Array.isArray(valueArg);
}
private static slug(valueArg: string): string {
return valueArg.toLowerCase().replace(/[^a-z0-9]+/g, '_').replace(/^_+|_+$/g, '') || 'mqtt';
}
}
+117 -3
View File
@@ -1,4 +1,118 @@
export interface IHomeAssistantMqttConfig {
// TODO: replace with the TypeScript-native config for mqtt.
[key: string]: unknown;
import type { TEntityPlatform } from '../../core/types.js';
export interface IMqttConfig {
host?: string;
port?: number;
protocol?: 'mqtt' | 'mqtts';
username?: string;
password?: string;
clientId?: string;
keepalive?: number;
clean?: boolean;
rejectUnauthorized?: boolean;
discovery?: boolean;
discoveryPrefix?: string;
birthMessage?: IMqttPublishOptions | false;
willMessage?: IMqttPublishOptions | false;
subscriptions?: string[];
discoveryMessages?: IMqttMessage[];
retainedMessages?: IMqttMessage[];
entities?: IMqttEntityConfig[];
}
export interface IMqttPublishOptions {
topic: string;
payload?: IMqttPayload;
qos?: 0 | 1 | 2;
retain?: boolean;
}
export type IMqttPayload = string | number | boolean | null | Record<string, unknown> | unknown[];
export interface IMqttMessage {
topic: string;
payload: IMqttPayload;
qos?: 0 | 1 | 2;
retain?: boolean;
timestamp?: number;
}
export interface IMqttBrokerInfo {
host?: string;
port: number;
protocol: 'mqtt' | 'mqtts';
clientId?: string;
connected: boolean;
discoveryPrefix: string;
}
export interface IMqttSnapshot {
brokerInfo: IMqttBrokerInfo;
entities: IMqttEntityConfig[];
messages: IMqttMessage[];
}
export interface IMqttDeviceInfo {
identifiers?: string | string[];
connections?: string[][];
name?: string;
manufacturer?: string;
model?: string;
modelId?: string;
swVersion?: string;
hwVersion?: string;
serialNumber?: string;
configurationUrl?: string;
suggestedArea?: string;
}
export interface IMqttOriginInfo {
name?: string;
swVersion?: string;
supportUrl?: string;
}
export interface IMqttEntityConfig {
platform: TEntityPlatform | string;
objectId?: string;
nodeId?: string;
uniqueId?: string;
defaultEntityId?: string;
name?: string | null;
device?: IMqttDeviceInfo;
origin?: IMqttOriginInfo;
stateTopic?: string;
commandTopic?: string;
availabilityTopic?: string;
jsonAttributesTopic?: string;
valueTemplate?: string;
stateValueTemplate?: string;
deviceClass?: string;
unitOfMeasurement?: string;
payloadOn?: string;
payloadOff?: string;
payloadAvailable?: string;
payloadNotAvailable?: string;
qos?: 0 | 1 | 2;
retain?: boolean;
state?: IMqttPayload;
attributes?: Record<string, unknown>;
discoveryTopic?: string;
rawConfig?: Record<string, unknown>;
}
export interface IMqttManualEntry {
host?: string;
port?: number;
protocol?: 'mqtt' | 'mqtts';
username?: string;
id?: string;
name?: string;
model?: string;
metadata?: Record<string, unknown>;
}
export interface IMqttDiscoveryMessage {
topic?: string;
payload?: IMqttPayload;
}
+2 -1
View File
@@ -2,9 +2,10 @@
import * as fs from 'node:fs/promises';
import * as path from 'node:path';
import * as crypto from 'node:crypto';
import * as net from 'node:net';
import * as tls from 'node:tls';
export { crypto, fs, path, tls };
export { crypto, fs, net, path, tls };
// Project scope
import * as shxInterfaces from '@smarthome.exchange/interfaces';