From 97cba0a9a190842383fc2b67ba2be3dacafb1f50 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Tue, 5 May 2026 12:56:38 +0000 Subject: [PATCH] Add native MQTT integration --- test/mqtt/test.mqtt.discovery.node.ts | 17 + test/mqtt/test.mqtt.mapper.node.ts | 34 ++ ts/index.ts | 2 + ts/integrations/generated/index.ts | 5 +- .../mqtt/.generated-by-smarthome-exchange | 1 - ts/integrations/mqtt/index.ts | 4 + ts/integrations/mqtt/mqtt.classes.client.ts | 437 ++++++++++++++ .../mqtt/mqtt.classes.configflow.ts | 44 ++ .../mqtt/mqtt.classes.integration.ts | 123 +++- ts/integrations/mqtt/mqtt.discovery.ts | 114 ++++ ts/integrations/mqtt/mqtt.mapper.ts | 532 ++++++++++++++++++ ts/integrations/mqtt/mqtt.types.ts | 120 +++- ts/plugins.ts | 3 +- 13 files changed, 1398 insertions(+), 38 deletions(-) create mode 100644 test/mqtt/test.mqtt.discovery.node.ts create mode 100644 test/mqtt/test.mqtt.mapper.node.ts delete mode 100644 ts/integrations/mqtt/.generated-by-smarthome-exchange create mode 100644 ts/integrations/mqtt/mqtt.classes.client.ts create mode 100644 ts/integrations/mqtt/mqtt.classes.configflow.ts create mode 100644 ts/integrations/mqtt/mqtt.discovery.ts create mode 100644 ts/integrations/mqtt/mqtt.mapper.ts diff --git a/test/mqtt/test.mqtt.discovery.node.ts b/test/mqtt/test.mqtt.discovery.node.ts new file mode 100644 index 0000000..a0b0433 --- /dev/null +++ b/test/mqtt/test.mqtt.discovery.node.ts @@ -0,0 +1,17 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import { createMqttDiscoveryDescriptor } from '../../ts/integrations/mqtt/index.js'; + +tap.test('matches Home Assistant MQTT discovery config topics', async () => { + const descriptor = createMqttDiscoveryDescriptor(); + const matcher = descriptor.getMatchers()[1]; + const result = await matcher.matches({ + topic: 'homeassistant/sensor/kitchen/temperature/config', + payload: '{"name":"Kitchen temperature","state_topic":"kitchen/temperature"}', + }, {}); + expect(result.matched).toBeTrue(); + expect(result.candidate?.metadata?.component).toEqual('sensor'); + expect(result.candidate?.metadata?.nodeId).toEqual('kitchen'); + expect(result.normalizedDeviceId).toEqual('temperature'); +}); + +export default tap.start(); diff --git a/test/mqtt/test.mqtt.mapper.node.ts b/test/mqtt/test.mqtt.mapper.node.ts new file mode 100644 index 0000000..a3e258a --- /dev/null +++ b/test/mqtt/test.mqtt.mapper.node.ts @@ -0,0 +1,34 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import { MqttMapper } from '../../ts/integrations/mqtt/index.js'; + +const config = { + host: 'mqtt.local', + discoveryMessages: [{ + topic: 'homeassistant/device/kitchen_sensor/config', + payload: JSON.stringify({ + dev: { ids: 'kitchen-sensor-1', name: 'Kitchen Sensor', mf: 'Example', mdl: 'MQTT Multi Sensor', sw: '1.0' }, + o: { name: 'example2mqtt', sw: '2.0' }, + '~': 'kitchen/sensor', + stat_t: '~/state', + cmps: { + temperature: { p: 'sensor', name: 'Temperature', uniq_id: 'kitchen_temperature', dev_cla: 'temperature', unit_of_meas: 'C', val_tpl: '{{ value_json.temperature }}' }, + relay: { p: 'switch', name: 'Relay', uniq_id: 'kitchen_relay', cmd_t: 'kitchen/relay/set', stat_t: 'kitchen/relay/state' }, + }, + }), + }], + retainedMessages: [ + { topic: 'kitchen/sensor/state', payload: '{"temperature":21.5}' }, + { topic: 'kitchen/relay/state', payload: 'ON' }, + ], +}; + +tap.test('maps MQTT discovery payloads to devices and entities', async () => { + const snapshot = MqttMapper.toSnapshot(config); + const devices = MqttMapper.toDevices(snapshot); + const entities = MqttMapper.toEntities(snapshot); + expect(devices.some((deviceArg) => deviceArg.id === 'mqtt.device.kitchen_sensor_1')).toBeTrue(); + expect(entities.some((entityArg) => entityArg.uniqueId === 'kitchen_temperature' && entityArg.state === 21.5)).toBeTrue(); + expect(entities.some((entityArg) => entityArg.uniqueId === 'kitchen_relay' && entityArg.state === 'on')).toBeTrue(); +}); + +export default tap.start(); diff --git a/ts/index.ts b/ts/index.ts index f9d29e1..981d510 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -4,6 +4,7 @@ export * from './integrations/index.js'; import { HueIntegration } from './integrations/hue/index.js'; import { CastIntegration } from './integrations/cast/index.js'; +import { MqttIntegration } from './integrations/mqtt/index.js'; import { RokuIntegration } from './integrations/roku/index.js'; import { ShellyIntegration } from './integrations/shelly/index.js'; import { SonosIntegration } from './integrations/sonos/index.js'; @@ -14,6 +15,7 @@ import { IntegrationRegistry } from './core/index.js'; export const integrations = [ new CastIntegration(), new HueIntegration(), + new MqttIntegration(), new RokuIntegration(), new ShellyIntegration(), new SonosIntegration(), diff --git a/ts/integrations/generated/index.ts b/ts/integrations/generated/index.ts index 69a259e..80ab053 100644 --- a/ts/integrations/generated/index.ts +++ b/ts/integrations/generated/index.ts @@ -785,7 +785,6 @@ import { HomeAssistantMotionblindsBleIntegration } from '../motionblinds_ble/ind import { HomeAssistantMotioneyeIntegration } from '../motioneye/index.js'; import { HomeAssistantMotionmountIntegration } from '../motionmount/index.js'; import { HomeAssistantMpdIntegration } from '../mpd/index.js'; -import { HomeAssistantMqttIntegration } from '../mqtt/index.js'; import { HomeAssistantMqttEventstreamIntegration } from '../mqtt_eventstream/index.js'; import { HomeAssistantMqttJsonIntegration } from '../mqtt_json/index.js'; import { HomeAssistantMqttRoomIntegration } from '../mqtt_room/index.js'; @@ -2240,7 +2239,6 @@ generatedHomeAssistantPortIntegrations.push(new HomeAssistantMotionblindsBleInte generatedHomeAssistantPortIntegrations.push(new HomeAssistantMotioneyeIntegration()); generatedHomeAssistantPortIntegrations.push(new HomeAssistantMotionmountIntegration()); generatedHomeAssistantPortIntegrations.push(new HomeAssistantMpdIntegration()); -generatedHomeAssistantPortIntegrations.push(new HomeAssistantMqttIntegration()); generatedHomeAssistantPortIntegrations.push(new HomeAssistantMqttEventstreamIntegration()); generatedHomeAssistantPortIntegrations.push(new HomeAssistantMqttJsonIntegration()); generatedHomeAssistantPortIntegrations.push(new HomeAssistantMqttRoomIntegration()); @@ -2910,10 +2908,11 @@ generatedHomeAssistantPortIntegrations.push(new HomeAssistantZoneminderIntegrati generatedHomeAssistantPortIntegrations.push(new HomeAssistantZwaveJsIntegration()); generatedHomeAssistantPortIntegrations.push(new HomeAssistantZwaveMeIntegration()); -export const generatedHomeAssistantPortCount = 1453; +export const generatedHomeAssistantPortCount = 1452; export const handwrittenHomeAssistantPortDomains = [ "cast", "hue", + "mqtt", "roku", "shelly", "sonos" diff --git a/ts/integrations/mqtt/.generated-by-smarthome-exchange b/ts/integrations/mqtt/.generated-by-smarthome-exchange deleted file mode 100644 index d35eb12..0000000 --- a/ts/integrations/mqtt/.generated-by-smarthome-exchange +++ /dev/null @@ -1 +0,0 @@ -This folder is generated from Home Assistant component metadata. Replace it with a handwritten TypeScript port when implementing runtime support. diff --git a/ts/integrations/mqtt/index.ts b/ts/integrations/mqtt/index.ts index 6846eff..eb759be 100644 --- a/ts/integrations/mqtt/index.ts +++ b/ts/integrations/mqtt/index.ts @@ -1,2 +1,6 @@ export * from './mqtt.classes.integration.js'; +export * from './mqtt.classes.client.js'; +export * from './mqtt.classes.configflow.js'; +export * from './mqtt.discovery.js'; +export * from './mqtt.mapper.js'; export * from './mqtt.types.js'; diff --git a/ts/integrations/mqtt/mqtt.classes.client.ts b/ts/integrations/mqtt/mqtt.classes.client.ts new file mode 100644 index 0000000..bce939e --- /dev/null +++ b/ts/integrations/mqtt/mqtt.classes.client.ts @@ -0,0 +1,437 @@ +import * as plugins from '../../plugins.js'; +import type { IMqttConfig, IMqttMessage, IMqttPayload, IMqttPublishOptions, IMqttSnapshot } from './mqtt.types.js'; +import { MqttMapper } from './mqtt.mapper.js'; + +const packetConnect = 1; +const packetConnack = 2; +const packetPublish = 3; +const packetPuback = 4; +const packetSubscribe = 8; +const packetSuback = 9; +const packetPingreq = 12; +const packetPingresp = 13; +const packetDisconnect = 14; +const defaultDiscoveryPrefix = 'homeassistant'; + +type TMqttMessageHandler = (messageArg: IMqttMessage) => void; + +interface IPendingPacket { + resolve(valueArg?: unknown): void; + reject(errorArg: Error): void; + timer: ReturnType; +} + +export class MqttClient { + private connection?: MqttTcpConnection; + private readonly receivedMessages: IMqttMessage[] = []; + private readonly messageHandlers = new Set(); + private started = false; + + constructor(private readonly config: IMqttConfig) {} + + public async getSnapshot(): Promise { + if (this.config.host && !this.started) { + await this.start(); + } + return MqttMapper.toSnapshot(this.config, Boolean(this.connection?.connected), this.receivedMessages); + } + + public async start(): Promise { + if (this.started || !this.config.host) { + this.started = true; + return; + } + const connection = new MqttTcpConnection(this.config, (messageArg) => this.handleMessage(messageArg)); + await connection.connect(); + this.connection = connection; + this.started = true; + + if (this.config.birthMessage !== false) { + const birth = this.config.birthMessage || { topic: `${this.config.discoveryPrefix || defaultDiscoveryPrefix}/status`, payload: 'online', qos: 0, retain: false }; + await this.publish(birth); + } + + const subscriptions = new Set(this.config.subscriptions || []); + if (this.config.discovery !== false) { + subscriptions.add(`${this.config.discoveryPrefix || defaultDiscoveryPrefix}/#`); + } + for (const topic of subscriptions) { + await this.subscribe(topic); + } + } + + public async publish(optionsArg: IMqttPublishOptions): Promise { + await this.ensureConnection(); + await this.connection?.publish(optionsArg); + } + + public async subscribe(topicArg: string, handlerArg?: TMqttMessageHandler): Promise<() => Promise> { + if (handlerArg) { + this.messageHandlers.add(handlerArg); + } + await this.ensureConnection(); + await this.connection?.subscribe(topicArg); + return async () => { + if (handlerArg) { + this.messageHandlers.delete(handlerArg); + } + }; + } + + public async destroy(): Promise { + if (this.config.birthMessage !== false && this.connection?.connected) { + await this.publish({ topic: `${this.config.discoveryPrefix || defaultDiscoveryPrefix}/status`, payload: 'offline', qos: 0, retain: false }); + } + this.connection?.destroy(); + this.connection = undefined; + this.started = false; + this.messageHandlers.clear(); + } + + private async ensureConnection(): Promise { + if (!this.connection?.connected) { + await this.start(); + } + if (!this.connection?.connected) { + throw new Error('MQTT broker connection is not available.'); + } + } + + private handleMessage(messageArg: IMqttMessage): void { + this.receivedMessages.push(messageArg); + for (const handler of this.messageHandlers) { + handler(messageArg); + } + } +} + +class MqttTcpConnection { + private socket?: plugins.net.Socket; + private buffer = Buffer.alloc(0); + private nextPacketId = 1; + private pingTimer?: ReturnType; + private connectPending?: IPendingPacket; + private readonly pendingPackets = new Map(); + private readonly subscriptions = new Set(); + public connected = false; + + constructor(private readonly config: IMqttConfig, private readonly messageHandler: TMqttMessageHandler) {} + + public async connect(): Promise { + if (this.connected) { + return; + } + if (!this.config.host) { + throw new Error('MQTT host is required when fixture data is not provided.'); + } + this.socket = await this.createSocket(); + this.socket.on('data', (chunkArg) => this.handleData(chunkArg)); + this.socket.on('error', (errorArg) => this.rejectAll(errorArg)); + this.socket.on('close', () => { + this.connected = false; + this.rejectAll(new Error('MQTT socket closed.')); + }); + await this.sendConnect(); + const keepalive = this.config.keepalive || 60; + this.pingTimer = setInterval(() => { + if (this.connected) { + void this.writePacket(packetPingreq, Buffer.alloc(0)); + } + }, Math.max(15, keepalive) * 1000); + } + + public async publish(optionsArg: IMqttPublishOptions): Promise { + const topic = this.encodeString(optionsArg.topic); + const payload = this.payloadBuffer(optionsArg.payload); + const qos = optionsArg.qos || 0; + if (qos !== 0) { + throw new Error('MQTT publish currently supports QoS 0.'); + } + await this.writePacket(packetPublish, Buffer.concat([topic, payload]), optionsArg.retain ? 0x01 : 0); + } + + public async subscribe(topicArg: string): Promise { + if (this.subscriptions.has(topicArg)) { + return; + } + const packetId = this.nextId(); + const variableHeader = Buffer.alloc(2); + variableHeader.writeUInt16BE(packetId, 0); + const payload = Buffer.concat([this.encodeString(topicArg), Buffer.from([0])]); + const promise = this.waitForPacket(packetId, `MQTT subscribe ${topicArg}`); + await this.writePacket(packetSubscribe, Buffer.concat([variableHeader, payload]), 0x02); + await promise; + this.subscriptions.add(topicArg); + } + + public destroy(): void { + if (this.connected) { + void this.writePacket(packetDisconnect, Buffer.alloc(0)); + } + if (this.pingTimer) { + clearInterval(this.pingTimer); + this.pingTimer = undefined; + } + this.rejectAll(new Error('MQTT connection destroyed.')); + this.socket?.destroy(); + this.socket = undefined; + this.connected = false; + } + + private async createSocket(): Promise { + const port = this.config.port || (this.config.protocol === 'mqtts' ? 8883 : 1883); + if (this.config.protocol === 'mqtts') { + return new Promise((resolve, reject) => { + const socket = plugins.tls.connect({ + host: this.config.host, + port, + rejectUnauthorized: this.config.rejectUnauthorized !== false, + servername: this.config.host, + }); + socket.once('secureConnect', () => resolve(socket)); + socket.once('error', reject); + }); + } + return new Promise((resolve, reject) => { + const socket = plugins.net.connect({ host: this.config.host, port }); + socket.once('connect', () => resolve(socket)); + socket.once('error', reject); + }); + } + + private async sendConnect(): Promise { + const keepalive = this.config.keepalive || 60; + const clientId = this.config.clientId || `smarthome-exchange-${plugins.crypto.randomBytes(4).toString('hex')}`; + const will = this.config.willMessage === false ? undefined : this.config.willMessage || { topic: `${this.config.discoveryPrefix || defaultDiscoveryPrefix}/status`, payload: 'offline', qos: 0, retain: false }; + let flags = this.config.clean === false ? 0 : 0x02; + if (will) { + flags |= 0x04 | ((will.qos || 0) << 3); + if (will.retain) { + flags |= 0x20; + } + } + if (this.config.password) { + flags |= 0x40; + } + if (this.config.username) { + flags |= 0x80; + } + const variableHeader = Buffer.concat([ + this.encodeString('MQTT'), + Buffer.from([4, flags]), + this.uint16(keepalive), + ]); + const payloadParts = [this.encodeString(clientId)]; + if (will) { + payloadParts.push(this.encodeString(will.topic), this.payloadWithLength(will.payload)); + } + if (this.config.username) { + payloadParts.push(this.encodeString(this.config.username)); + } + if (this.config.password) { + payloadParts.push(this.encodeString(this.config.password)); + } + const promise = new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.connectPending = undefined; + reject(new Error('MQTT connect timed out.')); + }, 7000); + this.connectPending = { resolve: () => resolve(), reject, timer }; + }); + await this.writePacket(packetConnect, Buffer.concat([variableHeader, ...payloadParts])); + await promise; + } + + private handleData(chunkArg: Buffer | string): void { + const chunk = Buffer.isBuffer(chunkArg) ? chunkArg : Buffer.from(chunkArg); + this.buffer = Buffer.concat([this.buffer, chunk]); + while (this.buffer.length >= 2) { + const remainingLength = this.decodeRemainingLength(this.buffer, 1); + if (!remainingLength || this.buffer.length < 1 + remainingLength.bytes + remainingLength.value) { + return; + } + const header = this.buffer[0]; + const payloadStart = 1 + remainingLength.bytes; + const payloadEnd = payloadStart + remainingLength.value; + const payload = this.buffer.subarray(payloadStart, payloadEnd); + this.buffer = this.buffer.subarray(payloadEnd); + this.handlePacket(header, payload); + } + } + + private handlePacket(headerArg: number, payloadArg: Buffer): void { + const packetType = headerArg >> 4; + if (packetType === packetConnack) { + const returnCode = payloadArg[1]; + const pending = this.connectPending; + this.connectPending = undefined; + if (!pending) { + return; + } + clearTimeout(pending.timer); + if (returnCode === 0) { + this.connected = true; + pending.resolve(); + } else { + pending.reject(new Error(`MQTT broker rejected connection with code ${returnCode}.`)); + } + return; + } + if (packetType === packetSuback) { + const packetId = payloadArg.readUInt16BE(0); + const pending = this.pendingPackets.get(packetId); + if (pending) { + clearTimeout(pending.timer); + this.pendingPackets.delete(packetId); + const failure = payloadArg.subarray(2).includes(0x80); + failure ? pending.reject(new Error(`MQTT subscribe ${packetId} was rejected.`)) : pending.resolve(); + } + return; + } + if (packetType === packetPublish) { + this.handlePublish(headerArg, payloadArg); + return; + } + if (packetType === packetPingresp) { + return; + } + } + + private handlePublish(headerArg: number, payloadArg: Buffer): void { + const retain = Boolean(headerArg & 0x01); + const qos = ((headerArg >> 1) & 0x03) as 0 | 1 | 2; + const topicLength = payloadArg.readUInt16BE(0); + const topic = payloadArg.subarray(2, 2 + topicLength).toString('utf8'); + let offset = 2 + topicLength; + if (qos > 0) { + const packetId = payloadArg.readUInt16BE(offset); + offset += 2; + if (qos === 1) { + void this.writePacket(packetPuback, this.uint16(packetId)); + } + } + const payload = payloadArg.subarray(offset).toString('utf8'); + const subscribedTopic = [...this.subscriptions].find((topicArg) => this.topicMatches(topicArg, topic)) || topic; + this.messageHandler({ topic, payload, qos, retain, subscribedTopic, timestamp: Date.now() } as IMqttMessage & { subscribedTopic: string }); + } + + private waitForPacket(packetIdArg: number, descriptionArg: string): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pendingPackets.delete(packetIdArg); + reject(new Error(`${descriptionArg} timed out.`)); + }, 7000); + this.pendingPackets.set(packetIdArg, { resolve: () => resolve(), reject, timer }); + }); + } + + private async writePacket(packetTypeArg: number, payloadArg: Buffer, flagsArg = 0): Promise { + if (!this.socket || this.socket.destroyed) { + throw new Error('MQTT socket is not connected.'); + } + const fixedHeader = Buffer.concat([Buffer.from([(packetTypeArg << 4) | flagsArg]), this.encodeRemainingLength(payloadArg.length)]); + const packet = Buffer.concat([fixedHeader, payloadArg]); + await new Promise((resolve, reject) => { + this.socket?.write(packet, (errorArg) => errorArg ? reject(errorArg) : resolve()); + }); + } + + private encodeRemainingLength(valueArg: number): Buffer { + const bytes: number[] = []; + let value = valueArg; + do { + let encodedByte = value % 128; + value = Math.floor(value / 128); + if (value > 0) { + encodedByte |= 128; + } + bytes.push(encodedByte); + } while (value > 0); + return Buffer.from(bytes); + } + + private decodeRemainingLength(bufferArg: Buffer, offsetArg: number): { value: number; bytes: number } | undefined { + let multiplier = 1; + let value = 0; + let bytes = 0; + let offset = offsetArg; + while (offset < bufferArg.length) { + const encodedByte = bufferArg[offset++]; + bytes++; + value += (encodedByte & 127) * multiplier; + if ((encodedByte & 128) === 0) { + return { value, bytes }; + } + multiplier *= 128; + if (bytes > 4) { + throw new Error('Invalid MQTT remaining length.'); + } + } + return undefined; + } + + private payloadBuffer(payloadArg: IMqttPayload | undefined): Buffer { + if (payloadArg === undefined || payloadArg === null) { + return Buffer.alloc(0); + } + if (typeof payloadArg === 'object') { + return Buffer.from(JSON.stringify(payloadArg), 'utf8'); + } + return Buffer.from(String(payloadArg), 'utf8'); + } + + private payloadWithLength(payloadArg: IMqttPayload | undefined): Buffer { + const payload = this.payloadBuffer(payloadArg); + return Buffer.concat([this.uint16(payload.length), payload]); + } + + private encodeString(valueArg: string): Buffer { + const value = Buffer.from(valueArg, 'utf8'); + return Buffer.concat([this.uint16(value.length), value]); + } + + private uint16(valueArg: number): Buffer { + const buffer = Buffer.alloc(2); + buffer.writeUInt16BE(valueArg, 0); + return buffer; + } + + private nextId(): number { + const id = this.nextPacketId++; + if (this.nextPacketId > 0xffff) { + this.nextPacketId = 1; + } + return id; + } + + private topicMatches(filterArg: string, topicArg: string): boolean { + const filterParts = filterArg.split('/'); + const topicParts = topicArg.split('/'); + for (let index = 0; index < filterParts.length; index++) { + const filterPart = filterParts[index]; + if (filterPart === '#') { + return index === filterParts.length - 1; + } + if (filterPart === '+') { + continue; + } + if (filterPart !== topicParts[index]) { + return false; + } + } + return filterParts.length === topicParts.length; + } + + private rejectAll(errorArg: Error): void { + if (this.connectPending) { + clearTimeout(this.connectPending.timer); + this.connectPending.reject(errorArg); + this.connectPending = undefined; + } + for (const [packetId, pending] of this.pendingPackets) { + clearTimeout(pending.timer); + pending.reject(errorArg); + this.pendingPackets.delete(packetId); + } + } +} diff --git a/ts/integrations/mqtt/mqtt.classes.configflow.ts b/ts/integrations/mqtt/mqtt.classes.configflow.ts new file mode 100644 index 0000000..aa05e42 --- /dev/null +++ b/ts/integrations/mqtt/mqtt.classes.configflow.ts @@ -0,0 +1,44 @@ +import type { IConfigFlow, IConfigFlowContext, IConfigFlowStep, IDiscoveryCandidate } from '../../core/types.js'; +import type { IMqttConfig } from './mqtt.types.js'; + +export class MqttConfigFlow implements IConfigFlow { + public async start(candidateArg: IDiscoveryCandidate, contextArg: IConfigFlowContext): Promise> { + void contextArg; + return { + kind: 'form', + title: 'Connect MQTT broker', + description: 'Configure a local MQTT 3.1.1 broker and Home Assistant-style MQTT discovery.', + fields: [ + { name: 'host', label: 'Broker host', type: 'text', required: true }, + { name: 'port', label: 'Broker port', type: 'number' }, + { name: 'protocol', label: 'Protocol', type: 'select', options: [{ label: 'MQTT', value: 'mqtt' }, { label: 'MQTTS', value: 'mqtts' }] }, + { name: 'username', label: 'Username', type: 'text' }, + { name: 'password', label: 'Password', type: 'password' }, + { name: 'clientId', label: 'Client ID', type: 'text' }, + { name: 'discoveryPrefix', label: 'Discovery prefix', type: 'text' }, + ], + submit: async (valuesArg) => ({ + kind: 'done', + title: 'MQTT configured', + config: { + host: this.stringValue(valuesArg.host) || candidateArg.host || '', + port: this.numberValue(valuesArg.port) || candidateArg.port || 1883, + protocol: valuesArg.protocol === 'mqtts' ? 'mqtts' : 'mqtt', + username: this.stringValue(valuesArg.username), + password: this.stringValue(valuesArg.password), + clientId: this.stringValue(valuesArg.clientId), + discovery: true, + discoveryPrefix: this.stringValue(valuesArg.discoveryPrefix) || 'homeassistant', + }, + }), + }; + } + + private stringValue(valueArg: unknown): string | undefined { + return typeof valueArg === 'string' && valueArg.trim() ? valueArg.trim() : undefined; + } + + private numberValue(valueArg: unknown): number | undefined { + return typeof valueArg === 'number' && Number.isFinite(valueArg) ? valueArg : undefined; + } +} diff --git a/ts/integrations/mqtt/mqtt.classes.integration.ts b/ts/integrations/mqtt/mqtt.classes.integration.ts index 87b6bf8..9f0d65e 100644 --- a/ts/integrations/mqtt/mqtt.classes.integration.ts +++ b/ts/integrations/mqtt/mqtt.classes.integration.ts @@ -1,34 +1,97 @@ -import { DescriptorOnlyIntegration } from '../../core/classes.descriptoronlyintegration.js'; +import type * as shxInterfaces from '@smarthome.exchange/interfaces'; +import { BaseIntegration } from '../../core/classes.baseintegration.js'; +import type { IIntegrationEntity, IIntegrationEvent, IIntegrationRuntime, IIntegrationSetupContext, IServiceCallRequest, IServiceCallResult } from '../../core/types.js'; +import { MqttClient } from './mqtt.classes.client.js'; +import { MqttConfigFlow } from './mqtt.classes.configflow.js'; +import { createMqttDiscoveryDescriptor } from './mqtt.discovery.js'; +import { MqttMapper } from './mqtt.mapper.js'; +import type { IMqttConfig, IMqttPublishOptions } from './mqtt.types.js'; -export class HomeAssistantMqttIntegration extends DescriptorOnlyIntegration { - constructor() { - super({ - domain: "mqtt", - displayName: "MQTT", - status: 'descriptor-only', - metadata: { - "source": "home-assistant/core", - "upstreamPath": "homeassistant/components/mqtt", - "upstreamDomain": "mqtt", - "integrationType": "service", - "iotClass": "local_push", - "qualityScale": "platinum", - "requirements": [ - "paho-mqtt==2.1.0" - ], - "dependencies": [ - "file_upload", - "http" - ], - "afterDependencies": [ - "hassio" - ], - "codeowners": [ - "@emontnemery", - "@jbouwh", - "@bdraco" - ] -}, +export class MqttIntegration extends BaseIntegration { + public readonly domain = 'mqtt'; + public readonly displayName = 'MQTT'; + public readonly status = 'control-runtime' as const; + public readonly discoveryDescriptor = createMqttDiscoveryDescriptor(); + public readonly configFlow = new MqttConfigFlow(); + public readonly metadata = { + source: 'home-assistant/core', + upstreamPath: 'homeassistant/components/mqtt', + upstreamDomain: 'mqtt', + integrationType: 'service', + iotClass: 'local_push', + qualityScale: 'platinum', + }; + + public async setup(configArg: IMqttConfig, contextArg: IIntegrationSetupContext): Promise { + void contextArg; + return new MqttRuntime(new MqttClient(configArg)); + } + + public async destroy(): Promise {} +} + +export class HomeAssistantMqttIntegration extends MqttIntegration {} + +class MqttRuntime implements IIntegrationRuntime { + public domain = 'mqtt'; + + constructor(private readonly client: MqttClient) {} + + public async devices(): Promise { + return MqttMapper.toDevices(await this.client.getSnapshot()); + } + + public async entities(): Promise { + return MqttMapper.toEntities(await this.client.getSnapshot()); + } + + public async subscribe(handlerArg: (eventArg: IIntegrationEvent) => void): Promise<() => Promise> { + return this.client.subscribe('#', (messageArg) => { + handlerArg({ + type: 'state_changed', + integrationDomain: 'mqtt', + data: messageArg, + timestamp: messageArg.timestamp || Date.now(), + }); }); } + + public async callService(requestArg: IServiceCallRequest): Promise { + if (requestArg.domain === 'mqtt') { + if (requestArg.service !== 'publish') { + return { success: false, error: `Unsupported MQTT service: ${requestArg.service}` }; + } + const publishOptions = this.publishOptionsFromRequest(requestArg); + if (!publishOptions) { + return { success: false, error: 'MQTT publish requires data.topic.' }; + } + await this.client.publish(publishOptions); + return { success: true }; + } + + const publishOptions = MqttMapper.commandForService(await this.client.getSnapshot(), requestArg); + if (!publishOptions) { + return { success: false, error: `MQTT entity service ${requestArg.domain}.${requestArg.service} has no command topic mapping.` }; + } + await this.client.publish(publishOptions); + return { success: true }; + } + + public async destroy(): Promise { + await this.client.destroy(); + } + + private publishOptionsFromRequest(requestArg: IServiceCallRequest): IMqttPublishOptions | undefined { + const topic = requestArg.data?.topic; + if (typeof topic !== 'string' || !topic) { + return undefined; + } + const qos = requestArg.data?.qos; + return { + topic, + payload: requestArg.data?.payload as IMqttPublishOptions['payload'], + qos: qos === 1 || qos === 2 ? qos : 0, + retain: requestArg.data?.retain === true, + }; + } } diff --git a/ts/integrations/mqtt/mqtt.discovery.ts b/ts/integrations/mqtt/mqtt.discovery.ts new file mode 100644 index 0000000..e5bf1c9 --- /dev/null +++ b/ts/integrations/mqtt/mqtt.discovery.ts @@ -0,0 +1,114 @@ +import { DiscoveryDescriptor } from '../../core/classes.discoverydescriptor.js'; +import type { IDiscoveryCandidate, IDiscoveryMatch, IDiscoveryMatcher, IDiscoveryValidator } from '../../core/types.js'; +import type { IMqttDiscoveryMessage, IMqttManualEntry } from './mqtt.types.js'; + +const defaultDiscoveryPrefix = 'homeassistant'; + +export class MqttManualMatcher implements IDiscoveryMatcher { + public id = 'mqtt-manual-match'; + public source = 'manual' as const; + public description = 'Recognize manual MQTT broker setup entries.'; + + public async matches(inputArg: IMqttManualEntry): Promise { + const model = inputArg.model?.toLowerCase() || ''; + const matched = Boolean(inputArg.host || inputArg.metadata?.mqtt || model.includes('mqtt') || model.includes('mosquitto')); + if (!matched) { + return { matched: false, confidence: 'low', reason: 'Manual entry does not contain MQTT broker setup hints.' }; + } + return { + matched: true, + confidence: inputArg.host ? 'high' : 'medium', + reason: 'Manual entry can start MQTT broker setup.', + normalizedDeviceId: inputArg.id || inputArg.host, + candidate: { + source: 'manual', + integrationDomain: 'mqtt', + id: inputArg.id || inputArg.host, + host: inputArg.host, + port: inputArg.port || (inputArg.protocol === 'mqtts' ? 8883 : 1883), + name: inputArg.name || 'MQTT broker', + manufacturer: 'MQTT', + model: inputArg.model || 'Broker', + metadata: { + ...inputArg.metadata, + protocol: inputArg.protocol || 'mqtt', + username: inputArg.username, + }, + }, + }; + } +} + +export class MqttDiscoveryTopicMatcher implements IDiscoveryMatcher { + public id = 'mqtt-discovery-topic-match'; + public source = 'mqtt' as const; + public description = 'Recognize Home Assistant MQTT discovery config topics.'; + + public async matches(inputArg: IMqttDiscoveryMessage): Promise { + const topic = inputArg.topic || ''; + const match = this.discoveryMatch(topic); + if (!match) { + return { matched: false, confidence: 'low', reason: 'MQTT message is not on a discovery config topic.' }; + } + return { + matched: true, + confidence: 'certain', + reason: 'MQTT topic matches Home Assistant discovery format.', + normalizedDeviceId: match.objectId, + candidate: { + source: 'mqtt', + integrationDomain: 'mqtt', + id: `${match.component}:${match.nodeId || ''}:${match.objectId}`, + name: match.objectId, + metadata: { + discoveryPrefix: match.prefix, + component: match.component, + nodeId: match.nodeId, + objectId: match.objectId, + topic, + payload: inputArg.payload, + }, + }, + }; + } + + private discoveryMatch(topicArg: string): { prefix: string; component: string; nodeId?: string; objectId: string } | undefined { + const parts = topicArg.split('/').filter(Boolean); + if (parts.length < 4 || parts[parts.length - 1] !== 'config') { + return undefined; + } + const prefix = parts[0] || defaultDiscoveryPrefix; + const component = parts[1]; + if (parts.length === 4) { + return { prefix, component, objectId: parts[2] }; + } + if (parts.length === 5) { + return { prefix, component, nodeId: parts[2], objectId: parts[3] }; + } + return undefined; + } +} + +export class MqttCandidateValidator implements IDiscoveryValidator { + public id = 'mqtt-candidate-validator'; + public description = 'Validate MQTT broker or discovery candidate metadata.'; + + public async validate(candidateArg: IDiscoveryCandidate): Promise { + const metadataProtocol = candidateArg.metadata?.protocol; + const matched = candidateArg.integrationDomain === 'mqtt' || candidateArg.port === 1883 || candidateArg.port === 8883 || candidateArg.metadata?.mqtt === true || metadataProtocol === 'mqtt' || metadataProtocol === 'mqtts'; + return { + matched, + confidence: matched && candidateArg.host ? 'high' : matched ? 'medium' : 'low', + reason: matched ? 'Candidate has MQTT broker metadata.' : 'Candidate is not MQTT.', + candidate: matched ? candidateArg : undefined, + normalizedDeviceId: candidateArg.id || candidateArg.host, + }; + } +} + +export const createMqttDiscoveryDescriptor = (): DiscoveryDescriptor => { + return new DiscoveryDescriptor({ integrationDomain: 'mqtt', displayName: 'MQTT' }) + .addMatcher(new MqttManualMatcher()) + .addMatcher(new MqttDiscoveryTopicMatcher()) + .addValidator(new MqttCandidateValidator()); +}; diff --git a/ts/integrations/mqtt/mqtt.mapper.ts b/ts/integrations/mqtt/mqtt.mapper.ts new file mode 100644 index 0000000..73bae3c --- /dev/null +++ b/ts/integrations/mqtt/mqtt.mapper.ts @@ -0,0 +1,532 @@ +import * as plugins from '../../plugins.js'; +import type { IIntegrationEntity, IServiceCallRequest, TEntityPlatform } from '../../core/types.js'; +import type { IMqttConfig, IMqttDeviceInfo, IMqttEntityConfig, IMqttMessage, IMqttPayload, IMqttPublishOptions, IMqttSnapshot } from './mqtt.types.js'; + +const defaultDiscoveryPrefix = 'homeassistant'; +const defaultPayloadOn = 'ON'; +const defaultPayloadOff = 'OFF'; +const defaultPayloadAvailable = 'online'; +const defaultPayloadNotAvailable = 'offline'; + +const abbreviations: Record = { + avty: 'availability', + avty_t: 'availability_topic', + cmd_t: 'command_topic', + cmps: 'components', + def_ent_id: 'default_entity_id', + dev: 'device', + dev_cla: 'device_class', + json_attr_t: 'json_attributes_topic', + name: 'name', + o: 'origin', + p: 'platform', + pl_avail: 'payload_available', + pl_not_avail: 'payload_not_available', + pl_off: 'payload_off', + pl_on: 'payload_on', + pl_prs: 'payload_press', + pl_cls: 'payload_close', + pl_open: 'payload_open', + pl_stop: 'payload_stop', + qos: 'qos', + ret: 'retain', + stat_t: 'state_topic', + stat_val_tpl: 'state_value_template', + t: 'topic', + uniq_id: 'unique_id', + unit_of_meas: 'unit_of_measurement', + val_tpl: 'value_template', +}; + +const deviceAbbreviations: Record = { + cns: 'connections', + cu: 'configuration_url', + hw: 'hw_version', + ids: 'identifiers', + mf: 'manufacturer', + mdl: 'model', + mdl_id: 'model_id', + name: 'name', + sa: 'suggested_area', + sn: 'serial_number', + sw: 'sw_version', +}; + +const originAbbreviations: Record = { + name: 'name', + sw: 'sw_version', + url: 'support_url', +}; + +export class MqttMapper { + public static toSnapshot(configArg: IMqttConfig, connectedArg = false, receivedMessagesArg: IMqttMessage[] = []): IMqttSnapshot { + const messages = [...(configArg.retainedMessages || []), ...(configArg.discoveryMessages || []), ...receivedMessagesArg]; + const entities = [ + ...(configArg.entities || []), + ...this.entitiesFromDiscoveryMessages(configArg.discoveryMessages || [], configArg.discoveryPrefix || defaultDiscoveryPrefix), + ]; + return { + brokerInfo: { + host: configArg.host, + port: configArg.port || (configArg.protocol === 'mqtts' ? 8883 : 1883), + protocol: configArg.protocol || 'mqtt', + clientId: configArg.clientId, + connected: connectedArg, + discoveryPrefix: configArg.discoveryPrefix || defaultDiscoveryPrefix, + }, + entities: this.applyStates(entities, messages), + messages, + }; + } + + public static toDevices(snapshotArg: IMqttSnapshot): plugins.shxInterfaces.data.IDeviceDefinition[] { + const updatedAt = new Date().toISOString(); + const devices = new Map(); + const brokerId = this.brokerDeviceId(snapshotArg); + devices.set(brokerId, { + id: brokerId, + integrationDomain: 'mqtt', + name: snapshotArg.brokerInfo.host ? `MQTT broker ${snapshotArg.brokerInfo.host}` : 'MQTT broker', + protocol: 'mqtt', + manufacturer: 'MQTT', + model: 'Broker', + online: snapshotArg.brokerInfo.connected || Boolean(snapshotArg.messages.length || snapshotArg.entities.length), + features: [ + { id: 'connection', capability: 'sensor', name: 'Connection', readable: true, writable: false }, + { id: 'discovery_prefix', capability: 'sensor', name: 'Discovery prefix', readable: true, writable: false }, + ], + state: [ + { featureId: 'connection', value: snapshotArg.brokerInfo.connected ? 'connected' : 'configured', updatedAt }, + { featureId: 'discovery_prefix', value: snapshotArg.brokerInfo.discoveryPrefix, updatedAt }, + ], + metadata: { + host: snapshotArg.brokerInfo.host, + port: snapshotArg.brokerInfo.port, + protocol: snapshotArg.brokerInfo.protocol, + }, + }); + + for (const entity of snapshotArg.entities) { + const deviceId = this.deviceIdForEntity(entity, snapshotArg); + const existing = devices.get(deviceId); + const feature = this.featureForEntity(entity); + const state = { featureId: feature.id, value: this.deviceStateValue(entity.state ?? null), updatedAt }; + if (existing) { + existing.features.push(feature); + existing.state.push(state); + continue; + } + devices.set(deviceId, { + id: deviceId, + integrationDomain: 'mqtt', + name: this.deviceName(entity.device, entity.name || entity.objectId || 'MQTT device'), + protocol: 'mqtt', + manufacturer: entity.device?.manufacturer, + model: entity.device?.model, + online: this.available(entity, snapshotArg.messages), + features: [feature], + state: [state], + metadata: { + identifiers: entity.device?.identifiers, + connections: entity.device?.connections, + serialNumber: entity.device?.serialNumber, + swVersion: entity.device?.swVersion, + hwVersion: entity.device?.hwVersion, + origin: entity.origin, + }, + }); + } + + return [...devices.values()]; + } + + public static toEntities(snapshotArg: IMqttSnapshot): IIntegrationEntity[] { + return snapshotArg.entities.map((entityArg) => ({ + id: entityArg.defaultEntityId || `${this.toEntityPlatform(entityArg.platform)}.${this.slug(entityArg.name || entityArg.objectId || entityArg.uniqueId || 'mqtt')}`, + uniqueId: entityArg.uniqueId || `mqtt_${this.slug(entityArg.objectId || entityArg.name || entityArg.stateTopic || entityArg.commandTopic || 'entity')}`, + integrationDomain: 'mqtt', + deviceId: this.deviceIdForEntity(entityArg, snapshotArg), + platform: this.toEntityPlatform(entityArg.platform), + name: entityArg.name || entityArg.objectId || 'MQTT entity', + state: this.entityState(entityArg), + attributes: { + ...entityArg.attributes, + mqttPlatform: entityArg.platform, + stateTopic: entityArg.stateTopic, + commandTopic: entityArg.commandTopic, + availabilityTopic: entityArg.availabilityTopic, + deviceClass: entityArg.deviceClass, + unit: entityArg.unitOfMeasurement, + discoveryTopic: entityArg.discoveryTopic, + }, + available: this.available(entityArg, snapshotArg.messages), + })); + } + + public static commandForService(snapshotArg: IMqttSnapshot, requestArg: IServiceCallRequest): IMqttPublishOptions | undefined { + const entity = this.findTargetEntity(snapshotArg, requestArg); + if (!entity?.commandTopic) { + return undefined; + } + const raw = entity.rawConfig || {}; + let payload: IMqttPayload | undefined; + if (requestArg.service === 'turn_on') { + payload = entity.payloadOn || defaultPayloadOn; + } else if (requestArg.service === 'turn_off') { + payload = entity.payloadOff || defaultPayloadOff; + } else if (requestArg.service === 'press') { + payload = typeof raw.payload_press === 'string' ? raw.payload_press : 'PRESS'; + } else if (requestArg.service === 'open_cover') { + payload = typeof raw.payload_open === 'string' ? raw.payload_open : 'OPEN'; + } else if (requestArg.service === 'close_cover') { + payload = typeof raw.payload_close === 'string' ? raw.payload_close : 'CLOSE'; + } else if (requestArg.service === 'stop_cover') { + payload = typeof raw.payload_stop === 'string' ? raw.payload_stop : 'STOP'; + } else if (requestArg.service === 'set_value') { + payload = requestArg.data?.value as IMqttPayload; + } else if (requestArg.service === 'select_option') { + payload = requestArg.data?.option as IMqttPayload; + } else if (requestArg.service === 'set_temperature') { + payload = requestArg.data?.temperature as IMqttPayload; + } + if (payload === undefined) { + return undefined; + } + return { + topic: entity.commandTopic, + payload, + qos: entity.qos || 0, + retain: entity.retain || false, + }; + } + + public static entitiesFromDiscoveryMessages(messagesArg: IMqttMessage[], prefixArg = defaultDiscoveryPrefix): IMqttEntityConfig[] { + const entities: IMqttEntityConfig[] = []; + for (const message of messagesArg) { + const parsedTopic = this.parseDiscoveryTopic(message.topic, prefixArg); + if (!parsedTopic || message.payload === '' || message.payload === null) { + continue; + } + const payload = this.discoveryPayload(message.payload); + if (!payload) { + continue; + } + this.replaceAllAbbreviations(payload); + this.replaceTopicBase(payload); + if (parsedTopic.component === 'device') { + const components = this.asRecord(payload.components); + for (const [componentId, componentConfig] of Object.entries(components)) { + const componentPayload = { ...payload, ...this.asRecord(componentConfig) }; + delete componentPayload.components; + entities.push(this.entityFromPayload(componentPayload, String(componentPayload.platform || 'sensor'), parsedTopic.objectId, parsedTopic.nodeId, message.topic, componentId)); + } + continue; + } + entities.push(this.entityFromPayload(payload, parsedTopic.component, parsedTopic.objectId, parsedTopic.nodeId, message.topic)); + } + return entities; + } + + private static entityFromPayload(payloadArg: Record, platformArg: string, objectIdArg: string, nodeIdArg: string | undefined, topicArg: string, componentIdArg?: string): IMqttEntityConfig { + const device = this.deviceFromPayload(this.asRecord(payloadArg.device)); + const origin = this.originFromPayload(this.asRecord(payloadArg.origin)); + return { + platform: platformArg, + objectId: componentIdArg || objectIdArg, + nodeId: nodeIdArg, + uniqueId: this.stringValue(payloadArg.unique_id), + defaultEntityId: this.stringValue(payloadArg.default_entity_id), + name: payloadArg.name === null ? null : this.stringValue(payloadArg.name), + device, + origin, + stateTopic: this.stringValue(payloadArg.state_topic), + commandTopic: this.stringValue(payloadArg.command_topic), + availabilityTopic: this.stringValue(payloadArg.availability_topic), + jsonAttributesTopic: this.stringValue(payloadArg.json_attributes_topic), + valueTemplate: this.stringValue(payloadArg.value_template), + stateValueTemplate: this.stringValue(payloadArg.state_value_template), + deviceClass: this.stringValue(payloadArg.device_class), + unitOfMeasurement: this.stringValue(payloadArg.unit_of_measurement), + payloadOn: this.stringValue(payloadArg.payload_on), + payloadOff: this.stringValue(payloadArg.payload_off), + payloadAvailable: this.stringValue(payloadArg.payload_available), + payloadNotAvailable: this.stringValue(payloadArg.payload_not_available), + qos: this.qosValue(payloadArg.qos), + retain: typeof payloadArg.retain === 'boolean' ? payloadArg.retain : undefined, + discoveryTopic: topicArg, + rawConfig: payloadArg, + }; + } + + private static applyStates(entitiesArg: IMqttEntityConfig[], messagesArg: IMqttMessage[]): IMqttEntityConfig[] { + return entitiesArg.map((entityArg) => { + const stateMessage = entityArg.stateTopic ? this.latestMessage(messagesArg, entityArg.stateTopic) : undefined; + const attributesMessage = entityArg.jsonAttributesTopic ? this.latestMessage(messagesArg, entityArg.jsonAttributesTopic) : undefined; + return { + ...entityArg, + state: stateMessage ? this.renderValue(stateMessage.payload, entityArg.valueTemplate || entityArg.stateValueTemplate) : entityArg.state, + attributes: attributesMessage ? this.attributesPayload(attributesMessage.payload) : entityArg.attributes, + }; + }); + } + + private static parseDiscoveryTopic(topicArg: string, prefixArg: string): { component: string; nodeId?: string; objectId: string } | undefined { + const parts = topicArg.split('/').filter(Boolean); + if (parts[0] !== prefixArg || parts[parts.length - 1] !== 'config') { + return undefined; + } + if (parts.length === 4) { + return { component: parts[1], objectId: parts[2] }; + } + if (parts.length === 5) { + return { component: parts[1], nodeId: parts[2], objectId: parts[3] }; + } + return undefined; + } + + private static discoveryPayload(payloadArg: IMqttPayload): Record | undefined { + if (typeof payloadArg === 'string') { + try { + const parsed = JSON.parse(payloadArg) as unknown; + return this.asRecord(parsed); + } catch { + return undefined; + } + } + return this.asRecord(payloadArg); + } + + private static replaceAllAbbreviations(payloadArg: Record): void { + this.replaceAbbreviations(payloadArg, abbreviations); + if (this.isRecord(payloadArg.device)) { + this.replaceAbbreviations(payloadArg.device, deviceAbbreviations); + } + if (this.isRecord(payloadArg.origin)) { + this.replaceAbbreviations(payloadArg.origin, originAbbreviations); + } + if (this.isRecord(payloadArg.components)) { + for (const component of Object.values(payloadArg.components)) { + if (this.isRecord(component)) { + this.replaceAbbreviations(component, abbreviations); + } + } + } + } + + private static replaceAbbreviations(payloadArg: Record, mapArg: Record): void { + for (const [shortKey, longKey] of Object.entries(mapArg)) { + if (shortKey in payloadArg && !(longKey in payloadArg)) { + payloadArg[longKey] = payloadArg[shortKey]; + } + } + } + + private static replaceTopicBase(payloadArg: Record): void { + const topicBase = this.stringValue(payloadArg['~']); + if (!topicBase) { + return; + } + for (const [key, value] of Object.entries(payloadArg)) { + if (!key.endsWith('topic') || typeof value !== 'string') { + continue; + } + if (value.startsWith('~')) { + payloadArg[key] = `${topicBase}${value.slice(1)}`; + } else if (value.endsWith('~')) { + payloadArg[key] = `${value.slice(0, -1)}${topicBase}`; + } + } + } + + private static deviceFromPayload(payloadArg: Record): IMqttDeviceInfo | undefined { + if (!Object.keys(payloadArg).length) { + return undefined; + } + return { + identifiers: this.stringOrStringArray(payloadArg.identifiers), + connections: Array.isArray(payloadArg.connections) ? payloadArg.connections as string[][] : undefined, + name: this.stringValue(payloadArg.name), + manufacturer: this.stringValue(payloadArg.manufacturer), + model: this.stringValue(payloadArg.model), + modelId: this.stringValue(payloadArg.model_id), + swVersion: this.stringValue(payloadArg.sw_version), + hwVersion: this.stringValue(payloadArg.hw_version), + serialNumber: this.stringValue(payloadArg.serial_number), + configurationUrl: this.stringValue(payloadArg.configuration_url), + suggestedArea: this.stringValue(payloadArg.suggested_area), + }; + } + + private static originFromPayload(payloadArg: Record) { + if (!Object.keys(payloadArg).length) { + return undefined; + } + return { + name: this.stringValue(payloadArg.name), + swVersion: this.stringValue(payloadArg.sw_version), + supportUrl: this.stringValue(payloadArg.support_url), + }; + } + + private static latestMessage(messagesArg: IMqttMessage[], topicArg: string): IMqttMessage | undefined { + const matches = messagesArg.filter((messageArg) => messageArg.topic === topicArg); + return matches.sort((leftArg, rightArg) => (rightArg.timestamp || 0) - (leftArg.timestamp || 0))[0] || matches[matches.length - 1]; + } + + private static renderValue(payloadArg: IMqttPayload, templateArg?: string): IMqttPayload { + if (!templateArg) { + return payloadArg; + } + if (templateArg.includes('value_json')) { + const parsed = typeof payloadArg === 'string' ? this.safeJson(payloadArg) : payloadArg; + const pathMatch = /value_json(?:\.([a-zA-Z0-9_]+)|\[['"]([^'"]+)['"]\])/.exec(templateArg); + const key = pathMatch?.[1] || pathMatch?.[2]; + if (key && this.isRecord(parsed)) { + return parsed[key] as IMqttPayload; + } + } + if (templateArg.includes('value')) { + return payloadArg; + } + return payloadArg; + } + + private static attributesPayload(payloadArg: IMqttPayload): Record { + if (this.isRecord(payloadArg)) { + return payloadArg; + } + if (typeof payloadArg === 'string') { + return this.asRecord(this.safeJson(payloadArg)); + } + return {}; + } + + private static entityState(entityArg: IMqttEntityConfig): unknown { + const state = entityArg.state; + if (entityArg.platform === 'switch' || entityArg.platform === 'light' || entityArg.platform === 'fan') { + if (state === (entityArg.payloadOn || defaultPayloadOn)) { + return 'on'; + } + if (state === (entityArg.payloadOff || defaultPayloadOff)) { + return 'off'; + } + } + return state ?? 'unknown'; + } + + private static available(entityArg: IMqttEntityConfig, messagesArg: IMqttMessage[]): boolean { + if (!entityArg.availabilityTopic) { + return true; + } + const availabilityMessage = this.latestMessage(messagesArg, entityArg.availabilityTopic); + if (!availabilityMessage) { + return false; + } + return availabilityMessage.payload !== (entityArg.payloadNotAvailable || defaultPayloadNotAvailable) && availabilityMessage.payload === (entityArg.payloadAvailable || defaultPayloadAvailable); + } + + private static featureForEntity(entityArg: IMqttEntityConfig): plugins.shxInterfaces.data.IDeviceFeature { + const platform = this.toEntityPlatform(entityArg.platform); + return { + id: this.slug(entityArg.uniqueId || entityArg.objectId || entityArg.name || platform), + capability: this.capabilityForPlatform(platform), + name: entityArg.name || entityArg.objectId || 'MQTT entity', + readable: Boolean(entityArg.stateTopic), + writable: Boolean(entityArg.commandTopic), + unit: entityArg.unitOfMeasurement, + }; + } + + private static capabilityForPlatform(platformArg: TEntityPlatform): plugins.shxInterfaces.data.TDeviceCapability { + if (platformArg === 'light') { + return 'light'; + } + if (platformArg === 'switch' || platformArg === 'button' || platformArg === 'cover' || platformArg === 'select' || platformArg === 'number' || platformArg === 'text') { + return 'switch'; + } + if (platformArg === 'climate') { + return 'climate'; + } + if (platformArg === 'fan') { + return 'fan'; + } + if (platformArg === 'media_player') { + return 'media'; + } + return 'sensor'; + } + + private static toEntityPlatform(platformArg: string): TEntityPlatform { + const allowed: TEntityPlatform[] = ['light', 'switch', 'sensor', 'binary_sensor', 'button', 'climate', 'cover', 'fan', 'media_player', 'number', 'select', 'text', 'update']; + return allowed.includes(platformArg as TEntityPlatform) ? platformArg as TEntityPlatform : 'sensor'; + } + + private static findTargetEntity(snapshotArg: IMqttSnapshot, requestArg: IServiceCallRequest): IMqttEntityConfig | undefined { + if (requestArg.target.entityId) { + const entities = this.toEntities(snapshotArg); + const entity = entities.find((entityArg) => entityArg.id === requestArg.target.entityId || entityArg.uniqueId === requestArg.target.entityId); + return entity ? snapshotArg.entities.find((entityArg) => (entityArg.uniqueId || `mqtt_${this.slug(entityArg.objectId || entityArg.name || entityArg.stateTopic || entityArg.commandTopic || 'entity')}`) === entity.uniqueId) : undefined; + } + if (requestArg.target.deviceId) { + return snapshotArg.entities.find((entityArg) => this.deviceIdForEntity(entityArg, snapshotArg) === requestArg.target.deviceId && entityArg.commandTopic); + } + return snapshotArg.entities.find((entityArg) => entityArg.commandTopic); + } + + private static deviceIdForEntity(entityArg: IMqttEntityConfig, snapshotArg: IMqttSnapshot): string { + const identifiers = entityArg.device?.identifiers; + const identifier = Array.isArray(identifiers) ? identifiers[0] : identifiers; + const connection = entityArg.device?.connections?.[0]?.join('_'); + return `mqtt.device.${this.slug(identifier || connection || entityArg.uniqueId || entityArg.objectId || snapshotArg.brokerInfo.host || 'broker')}`; + } + + private static brokerDeviceId(snapshotArg: IMqttSnapshot): string { + return `mqtt.broker.${this.slug(snapshotArg.brokerInfo.host || 'configured')}`; + } + + private static deviceName(deviceArg: IMqttDeviceInfo | undefined, fallbackArg: string): string { + return deviceArg?.name || fallbackArg; + } + + private static deviceStateValue(valueArg: IMqttPayload): plugins.shxInterfaces.data.TDeviceStateValue { + return Array.isArray(valueArg) ? JSON.stringify(valueArg) : valueArg as plugins.shxInterfaces.data.TDeviceStateValue; + } + + private static stringValue(valueArg: unknown): string | undefined { + return typeof valueArg === 'string' && valueArg ? valueArg : undefined; + } + + private static stringOrStringArray(valueArg: unknown): string | string[] | undefined { + if (typeof valueArg === 'string') { + return valueArg; + } + if (Array.isArray(valueArg)) { + return valueArg.filter((itemArg): itemArg is string => typeof itemArg === 'string'); + } + return undefined; + } + + private static qosValue(valueArg: unknown): 0 | 1 | 2 | undefined { + return valueArg === 0 || valueArg === 1 || valueArg === 2 ? valueArg : undefined; + } + + private static safeJson(valueArg: string): unknown { + try { + return JSON.parse(valueArg) as unknown; + } catch { + return valueArg; + } + } + + private static asRecord(valueArg: unknown): Record { + return this.isRecord(valueArg) ? valueArg : {}; + } + + private static isRecord(valueArg: unknown): valueArg is Record { + return typeof valueArg === 'object' && valueArg !== null && !Array.isArray(valueArg); + } + + private static slug(valueArg: string): string { + return valueArg.toLowerCase().replace(/[^a-z0-9]+/g, '_').replace(/^_+|_+$/g, '') || 'mqtt'; + } +} diff --git a/ts/integrations/mqtt/mqtt.types.ts b/ts/integrations/mqtt/mqtt.types.ts index a5d6f6a..b89c6d2 100644 --- a/ts/integrations/mqtt/mqtt.types.ts +++ b/ts/integrations/mqtt/mqtt.types.ts @@ -1,4 +1,118 @@ -export interface IHomeAssistantMqttConfig { - // TODO: replace with the TypeScript-native config for mqtt. - [key: string]: unknown; +import type { TEntityPlatform } from '../../core/types.js'; + +export interface IMqttConfig { + host?: string; + port?: number; + protocol?: 'mqtt' | 'mqtts'; + username?: string; + password?: string; + clientId?: string; + keepalive?: number; + clean?: boolean; + rejectUnauthorized?: boolean; + discovery?: boolean; + discoveryPrefix?: string; + birthMessage?: IMqttPublishOptions | false; + willMessage?: IMqttPublishOptions | false; + subscriptions?: string[]; + discoveryMessages?: IMqttMessage[]; + retainedMessages?: IMqttMessage[]; + entities?: IMqttEntityConfig[]; +} + +export interface IMqttPublishOptions { + topic: string; + payload?: IMqttPayload; + qos?: 0 | 1 | 2; + retain?: boolean; +} + +export type IMqttPayload = string | number | boolean | null | Record | unknown[]; + +export interface IMqttMessage { + topic: string; + payload: IMqttPayload; + qos?: 0 | 1 | 2; + retain?: boolean; + timestamp?: number; +} + +export interface IMqttBrokerInfo { + host?: string; + port: number; + protocol: 'mqtt' | 'mqtts'; + clientId?: string; + connected: boolean; + discoveryPrefix: string; +} + +export interface IMqttSnapshot { + brokerInfo: IMqttBrokerInfo; + entities: IMqttEntityConfig[]; + messages: IMqttMessage[]; +} + +export interface IMqttDeviceInfo { + identifiers?: string | string[]; + connections?: string[][]; + name?: string; + manufacturer?: string; + model?: string; + modelId?: string; + swVersion?: string; + hwVersion?: string; + serialNumber?: string; + configurationUrl?: string; + suggestedArea?: string; +} + +export interface IMqttOriginInfo { + name?: string; + swVersion?: string; + supportUrl?: string; +} + +export interface IMqttEntityConfig { + platform: TEntityPlatform | string; + objectId?: string; + nodeId?: string; + uniqueId?: string; + defaultEntityId?: string; + name?: string | null; + device?: IMqttDeviceInfo; + origin?: IMqttOriginInfo; + stateTopic?: string; + commandTopic?: string; + availabilityTopic?: string; + jsonAttributesTopic?: string; + valueTemplate?: string; + stateValueTemplate?: string; + deviceClass?: string; + unitOfMeasurement?: string; + payloadOn?: string; + payloadOff?: string; + payloadAvailable?: string; + payloadNotAvailable?: string; + qos?: 0 | 1 | 2; + retain?: boolean; + state?: IMqttPayload; + attributes?: Record; + discoveryTopic?: string; + rawConfig?: Record; +} + +export interface IMqttManualEntry { + host?: string; + port?: number; + protocol?: 'mqtt' | 'mqtts'; + username?: string; + id?: string; + name?: string; + model?: string; + metadata?: Record; +} + +export interface IMqttDiscoveryMessage { + topic?: string; + payload?: IMqttPayload; } diff --git a/ts/plugins.ts b/ts/plugins.ts index adadcd2..ad8d373 100644 --- a/ts/plugins.ts +++ b/ts/plugins.ts @@ -2,9 +2,10 @@ import * as fs from 'node:fs/promises'; import * as path from 'node:path'; import * as crypto from 'node:crypto'; +import * as net from 'node:net'; import * as tls from 'node:tls'; -export { crypto, fs, path, tls }; +export { crypto, fs, net, path, tls }; // Project scope import * as shxInterfaces from '@smarthome.exchange/interfaces';