feat(storage): persist siprouter data in smartdata and smartbucket
This commit is contained in:
+93
-45
@@ -1,6 +1,9 @@
|
||||
import fs from 'node:fs';
|
||||
import * as fsPromises from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
|
||||
import type { SiprouterStorage } from './storage.ts';
|
||||
|
||||
export interface IFaxBoxConfig {
|
||||
id: string;
|
||||
enabled: boolean;
|
||||
@@ -13,6 +16,7 @@ export interface IFaxMessage {
|
||||
callerNumber?: string;
|
||||
timestamp: number;
|
||||
fileName: string;
|
||||
objectKey?: string;
|
||||
completionCode?: number | null;
|
||||
completionLabel?: string | null;
|
||||
pageCount?: number;
|
||||
@@ -21,24 +25,28 @@ export interface IFaxMessage {
|
||||
|
||||
export class FaxBoxManager {
|
||||
private boxes = new Map<string, IFaxBoxConfig>();
|
||||
private messagesByBox = new Map<string, IFaxMessage[]>();
|
||||
private readonly basePath: string;
|
||||
private readonly log: (msg: string) => void;
|
||||
private readonly storage: SiprouterStorage;
|
||||
|
||||
constructor(log: (msg: string) => void) {
|
||||
constructor(log: (msg: string) => void, storageArg: SiprouterStorage) {
|
||||
this.basePath = path.join(process.cwd(), '.nogit', 'fax', 'inboxes');
|
||||
this.log = log;
|
||||
this.storage = storageArg;
|
||||
}
|
||||
|
||||
init(faxBoxConfigs: IFaxBoxConfig[]): void {
|
||||
async init(faxBoxConfigs: IFaxBoxConfig[]): Promise<void> {
|
||||
this.boxes.clear();
|
||||
|
||||
for (const cfg of faxBoxConfigs) {
|
||||
cfg.enabled ??= true;
|
||||
cfg.maxMessages ??= 50;
|
||||
this.boxes.set(cfg.id, cfg);
|
||||
this.messagesByBox.set(cfg.id, await this.loadMessages(cfg.id));
|
||||
}
|
||||
|
||||
fs.mkdirSync(this.basePath, { recursive: true });
|
||||
await fsPromises.mkdir(this.basePath, { recursive: true });
|
||||
this.log(`[faxbox] initialized ${this.boxes.size} fax box(es)`);
|
||||
}
|
||||
|
||||
@@ -50,7 +58,13 @@ export class FaxBoxManager {
|
||||
return path.join(this.basePath, boxId);
|
||||
}
|
||||
|
||||
addMessage(
|
||||
async prepareOutboundFaxFile(filePathArg: string): Promise<string> {
|
||||
const localPath = path.isAbsolute(filePathArg) ? filePathArg : path.join(process.cwd(), filePathArg);
|
||||
await fsPromises.access(localPath);
|
||||
return localPath;
|
||||
}
|
||||
|
||||
async addMessage(
|
||||
boxId: string,
|
||||
info: {
|
||||
callerNumber?: string;
|
||||
@@ -60,90 +74,124 @@ export class FaxBoxManager {
|
||||
pageCount?: number;
|
||||
bitRate?: number;
|
||||
},
|
||||
): void {
|
||||
): Promise<void> {
|
||||
const id = crypto.randomUUID();
|
||||
const localPath = path.isAbsolute(info.fileName) ? info.fileName : path.join(process.cwd(), info.fileName);
|
||||
const objectKey = await this.storage.putFileObject(`fax/inboxes/${boxId}/${id}.tif`, localPath);
|
||||
|
||||
const msg: IFaxMessage = {
|
||||
id: crypto.randomUUID(),
|
||||
id,
|
||||
boxId,
|
||||
callerNumber: info.callerNumber,
|
||||
timestamp: Date.now(),
|
||||
fileName: path.basename(info.fileName),
|
||||
fileName: path.basename(localPath),
|
||||
objectKey,
|
||||
completionCode: info.completionCode ?? null,
|
||||
completionLabel: info.completionLabel ?? null,
|
||||
pageCount: info.pageCount,
|
||||
bitRate: info.bitRate,
|
||||
};
|
||||
this.saveMessage(msg);
|
||||
}
|
||||
|
||||
saveMessage(msg: IFaxMessage): void {
|
||||
const boxDir = this.getBoxDir(msg.boxId);
|
||||
fs.mkdirSync(boxDir, { recursive: true });
|
||||
|
||||
const messages = this.loadMessages(msg.boxId);
|
||||
const messages = this.getMessages(boxId);
|
||||
messages.unshift(msg);
|
||||
|
||||
const box = this.boxes.get(msg.boxId);
|
||||
const maxMessages = box?.maxMessages ?? 50;
|
||||
while (messages.length > maxMessages) {
|
||||
const old = messages.pop()!;
|
||||
const oldPath = path.join(boxDir, old.fileName);
|
||||
try {
|
||||
if (fs.existsSync(oldPath)) fs.unlinkSync(oldPath);
|
||||
} catch {}
|
||||
}
|
||||
|
||||
this.writeMessages(msg.boxId, messages);
|
||||
await this.enforceLimit(boxId, messages);
|
||||
await this.writeMessages(boxId, messages);
|
||||
await fsPromises.rm(localPath, { force: true }).catch(() => {});
|
||||
this.log(`[faxbox] saved fax ${msg.id} in box "${msg.boxId}" (${msg.fileName})`);
|
||||
}
|
||||
|
||||
getMessages(boxId: string): IFaxMessage[] {
|
||||
return this.loadMessages(boxId);
|
||||
return [...(this.messagesByBox.get(boxId) || [])];
|
||||
}
|
||||
|
||||
getMessage(boxId: string, messageId: string): IFaxMessage | null {
|
||||
return this.loadMessages(boxId).find((m) => m.id === messageId) ?? null;
|
||||
const messages = this.messagesByBox.get(boxId) || [];
|
||||
return messages.find((m) => m.id === messageId) ?? null;
|
||||
}
|
||||
|
||||
getMessageFilePath(boxId: string, messageId: string): string | null {
|
||||
async getMessageFilePath(boxId: string, messageId: string): Promise<string | null> {
|
||||
const msg = this.getMessage(boxId, messageId);
|
||||
if (!msg) return null;
|
||||
if (msg.objectKey) {
|
||||
return await this.storage.getObjectAsCachedFile(msg.objectKey, msg.fileName);
|
||||
}
|
||||
const filePath = path.join(this.getBoxDir(boxId), msg.fileName);
|
||||
return fs.existsSync(filePath) ? filePath : null;
|
||||
}
|
||||
|
||||
deleteMessage(boxId: string, messageId: string): boolean {
|
||||
const messages = this.loadMessages(boxId);
|
||||
async deleteMessage(boxId: string, messageId: string): Promise<boolean> {
|
||||
const messages = this.messagesByBox.get(boxId) || [];
|
||||
const idx = messages.findIndex((m) => m.id === messageId);
|
||||
if (idx === -1) return false;
|
||||
|
||||
const msg = messages[idx];
|
||||
const filePath = path.join(this.getBoxDir(boxId), msg.fileName);
|
||||
try {
|
||||
if (fs.existsSync(filePath)) fs.unlinkSync(filePath);
|
||||
} catch {}
|
||||
await this.storage.removeObject(msg.objectKey);
|
||||
if (!msg.objectKey) {
|
||||
await fsPromises.rm(path.join(this.getBoxDir(boxId), msg.fileName), { force: true }).catch(() => {});
|
||||
}
|
||||
|
||||
messages.splice(idx, 1);
|
||||
this.writeMessages(boxId, messages);
|
||||
await this.writeMessages(boxId, messages);
|
||||
return true;
|
||||
}
|
||||
|
||||
private messagesPath(boxId: string): string {
|
||||
return path.join(this.getBoxDir(boxId), 'messages.json');
|
||||
private async enforceLimit(boxId: string, messages: IFaxMessage[]): Promise<void> {
|
||||
const box = this.boxes.get(boxId);
|
||||
const maxMessages = box?.maxMessages ?? 50;
|
||||
while (messages.length > maxMessages) {
|
||||
const old = messages.pop()!;
|
||||
await this.storage.removeObject(old.objectKey);
|
||||
if (!old.objectKey) {
|
||||
await fsPromises.rm(path.join(this.getBoxDir(boxId), old.fileName), { force: true }).catch(() => {});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private loadMessages(boxId: string): IFaxMessage[] {
|
||||
const filePath = this.messagesPath(boxId);
|
||||
private async loadMessages(boxId: string): Promise<IFaxMessage[]> {
|
||||
const storedMessages = await this.storage.getFaxMessages(boxId);
|
||||
if (storedMessages.length) return await this.ensureMessageObjects(boxId, storedMessages);
|
||||
|
||||
const filePath = path.join(this.getBoxDir(boxId), 'messages.json');
|
||||
try {
|
||||
if (!fs.existsSync(filePath)) return [];
|
||||
return JSON.parse(fs.readFileSync(filePath, 'utf8')) as IFaxMessage[];
|
||||
const raw = await fsPromises.readFile(filePath, 'utf8');
|
||||
const legacyMessages = await this.ensureMessageObjects(boxId, JSON.parse(raw) as IFaxMessage[]);
|
||||
await this.storage.writeFaxMessages(boxId, legacyMessages);
|
||||
return legacyMessages;
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
private writeMessages(boxId: string, messages: IFaxMessage[]): void {
|
||||
const boxDir = this.getBoxDir(boxId);
|
||||
fs.mkdirSync(boxDir, { recursive: true });
|
||||
fs.writeFileSync(this.messagesPath(boxId), JSON.stringify(messages, null, 2), 'utf8');
|
||||
private async ensureMessageObjects(boxId: string, messages: IFaxMessage[]): Promise<IFaxMessage[]> {
|
||||
let changed = false;
|
||||
|
||||
for (const msg of messages) {
|
||||
if (!msg.id) {
|
||||
msg.id = crypto.randomUUID();
|
||||
changed = true;
|
||||
}
|
||||
if (msg.objectKey) continue;
|
||||
|
||||
const localPath = path.isAbsolute(msg.fileName) ? msg.fileName : path.join(this.getBoxDir(boxId), msg.fileName);
|
||||
if (!fs.existsSync(localPath)) continue;
|
||||
|
||||
const extension = path.extname(localPath) || '.tif';
|
||||
msg.objectKey = await this.storage.putFileObject(`fax/inboxes/${boxId}/${msg.id}${extension}`, localPath);
|
||||
msg.fileName = path.basename(localPath);
|
||||
changed = true;
|
||||
}
|
||||
|
||||
if (changed) {
|
||||
await this.storage.writeFaxMessages(boxId, messages);
|
||||
this.log(`[faxbox] migrated legacy messages for box "${boxId}" to smartbucket`);
|
||||
}
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
private async writeMessages(boxId: string, messages: IFaxMessage[]): Promise<void> {
|
||||
this.messagesByBox.set(boxId, [...messages]);
|
||||
await this.storage.writeFaxMessages(boxId, messages);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user