Files
integrations/ts/integrations/dsmr/dsmr.classes.client.ts
T

184 lines
7.8 KiB
TypeScript

import * as plugins from '../../plugins.js';
import { DsmrTelegramParser } from './dsmr.parser.js';
import type { IDsmrConfig, IDsmrEvent, IDsmrRefreshResult, IDsmrSnapshot, IDsmrStatusSnapshot } from './dsmr.types.js';
import { dsmrDefaultNetworkPort, dsmrDefaultTimeoutMs } from './dsmr.types.js';
type TDsmrEventHandler = (eventArg: IDsmrEvent) => void;
export class DsmrClient {
private currentSnapshot?: IDsmrSnapshot;
private readonly eventHandlers = new Set<TDsmrEventHandler>();
constructor(private readonly config: IDsmrConfig) {}
public async getSnapshot(): Promise<IDsmrSnapshot> {
if (!this.currentSnapshot) {
this.currentSnapshot = await this.initialSnapshot();
}
return this.cloneSnapshot(this.currentSnapshot);
}
public onEvent(handlerArg: TDsmrEventHandler): () => void {
this.eventHandlers.add(handlerArg);
return () => this.eventHandlers.delete(handlerArg);
}
public async refresh(): Promise<IDsmrRefreshResult> {
try {
const provided = await this.providedTelegramSnapshot();
if (provided) {
this.currentSnapshot = provided;
this.emit({ type: 'snapshot_refreshed', snapshot: this.cloneSnapshot(provided), timestamp: Date.now() });
return { success: true, snapshot: this.cloneSnapshot(provided), data: { source: provided.source } };
}
if (this.canReadNetwork()) {
const telegram = await this.readNetworkTelegram();
const snapshot = DsmrTelegramParser.parseTelegram(telegram, { config: this.config, source: 'network', connected: true });
this.currentSnapshot = snapshot;
this.emit({ type: 'telegram_received', snapshot: this.cloneSnapshot(snapshot), timestamp: Date.now() });
return { success: true, snapshot: this.cloneSnapshot(snapshot), data: { source: 'network' } };
}
const connection = DsmrTelegramParser.connectionInfo(this.config);
const reason = connection.connectionType === 'serial'
? 'Native DSMR serial reading requires a telegramProvider or snapshot; no serial transport dependency is configured.'
: 'No DSMR telegram source is configured. Provide a snapshot, telegram, telegramProvider, or enable liveRead for a network endpoint.';
const snapshot = await this.getSnapshot();
this.emit({ type: 'refresh_failed', snapshot, error: reason, timestamp: Date.now() });
return { success: false, snapshot, error: reason };
} catch (errorArg) {
const snapshot = await this.getSnapshot().catch(() => DsmrTelegramParser.emptySnapshot(this.config));
const error = errorArg instanceof Error ? errorArg.message : String(errorArg);
this.currentSnapshot = { ...snapshot, connected: false, updatedAt: new Date().toISOString() };
this.emit({ type: 'refresh_failed', snapshot: this.cloneSnapshot(this.currentSnapshot), error, timestamp: Date.now() });
return { success: false, snapshot: this.cloneSnapshot(this.currentSnapshot), error };
}
}
public async readNetworkTelegram(): Promise<string> {
const connection = DsmrTelegramParser.connectionInfo(this.config);
if (connection.connectionType !== 'network' || !connection.host) {
throw new Error('DSMR network telegram reading requires config.host and network connection type.');
}
const port = typeof connection.port === 'number' ? connection.port : dsmrDefaultNetworkPort;
const timeoutMs = this.config.timeoutMs || dsmrDefaultTimeoutMs;
return await new Promise<string>((resolve, reject) => {
const socket = plugins.net.createConnection({ host: connection.host, port });
let buffer = '';
let settled = false;
const timeout = setTimeout(() => finish(new Error(`DSMR network telegram read timed out after ${timeoutMs}ms.`)), timeoutMs);
const finish = (errorArg?: Error, telegramArg?: string) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timeout);
socket.removeAllListeners();
socket.destroy();
if (errorArg) {
reject(errorArg);
return;
}
resolve(telegramArg || '');
};
const processBuffer = () => {
const telegram = this.extractTelegram(buffer);
if (telegram) {
finish(undefined, telegram);
}
};
socket.on('data', (dataArg) => {
buffer += dataArg.toString('utf8');
processBuffer();
});
socket.on('error', (errorArg) => finish(errorArg));
socket.on('close', () => {
processBuffer();
if (!settled) {
finish(new Error('DSMR network connection closed before a complete telegram was received.'));
}
});
});
}
public async destroy(): Promise<void> {
this.eventHandlers.clear();
}
private async initialSnapshot(): Promise<IDsmrSnapshot> {
if (this.config.snapshot) {
return DsmrTelegramParser.normalizeSnapshot(this.cloneSnapshot(this.config.snapshot), this.config);
}
if (this.config.status) {
return DsmrTelegramParser.snapshotFromStatus(this.config.status, this.config);
}
if (typeof this.config.telegram === 'string') {
return DsmrTelegramParser.parseTelegram(this.config.telegram, { config: this.config, source: 'telegram' });
}
if (this.isSnapshot(this.config.telegram)) {
return DsmrTelegramParser.normalizeSnapshot(this.cloneSnapshot(this.config.telegram), this.config);
}
if (Array.isArray(this.config.telegrams) && this.config.telegrams.length) {
return DsmrTelegramParser.parseTelegram(this.config.telegrams[this.config.telegrams.length - 1], { config: this.config, source: 'telegram' });
}
return DsmrTelegramParser.emptySnapshot(this.config, this.config.connected ?? false);
}
private async providedTelegramSnapshot(): Promise<IDsmrSnapshot | undefined> {
if (this.config.telegramProvider) {
const provided = await this.config.telegramProvider();
return this.snapshotFromProvided(provided);
}
if (this.config.snapshot || this.config.status || this.config.telegram || this.config.telegrams?.length) {
return await this.initialSnapshot();
}
return undefined;
}
private snapshotFromProvided(providedArg: string | IDsmrSnapshot | IDsmrStatusSnapshot | undefined): IDsmrSnapshot | undefined {
if (typeof providedArg === 'string') {
return DsmrTelegramParser.parseTelegram(providedArg, { config: this.config, source: 'telegram' });
}
if (this.isSnapshot(providedArg)) {
return DsmrTelegramParser.normalizeSnapshot(this.cloneSnapshot(providedArg), this.config);
}
if (providedArg && typeof providedArg === 'object') {
return DsmrTelegramParser.snapshotFromStatus(providedArg as IDsmrStatusSnapshot, this.config);
}
return undefined;
}
private canReadNetwork(): boolean {
const connection = DsmrTelegramParser.connectionInfo(this.config);
return this.config.liveRead === true && connection.connectionType === 'network' && Boolean(connection.host);
}
private extractTelegram(bufferArg: string): string | undefined {
const start = bufferArg.indexOf('/');
if (start < 0) {
return undefined;
}
const body = bufferArg.slice(start);
const checksumMatch = body.match(/![0-9a-fA-F]{0,4}/);
if (!checksumMatch || checksumMatch.index === undefined) {
return undefined;
}
return body.slice(0, checksumMatch.index + checksumMatch[0].length);
}
private emit(eventArg: IDsmrEvent): void {
for (const handler of this.eventHandlers) {
handler(eventArg);
}
}
private isSnapshot(valueArg: unknown): valueArg is IDsmrSnapshot {
return Boolean(valueArg && typeof valueArg === 'object' && 'meter' in valueArg && Array.isArray((valueArg as IDsmrSnapshot).sensors));
}
private cloneSnapshot<T extends IDsmrSnapshot>(snapshotArg: T): T {
return JSON.parse(JSON.stringify(snapshotArg)) as T;
}
}