BREAKING CHANGE(db): replace StorageManager and CacheDb with a unified smartdata-backed database layer

This commit is contained in:
2026-03-31 15:31:16 +00:00
parent 193a4bb180
commit bb6c26484d
49 changed files with 1475 additions and 1687 deletions

View File

@@ -1,6 +1,6 @@
import * as plugins from '../plugins.js';
import { logger } from '../logger.js';
import type { StorageManager } from '../storage/index.js';
import { AccountingSessionDoc } from '../db/index.js';
/**
* RADIUS accounting session
@@ -84,8 +84,6 @@ export interface IAccountingSummary {
* Accounting manager configuration
*/
export interface IAccountingManagerConfig {
/** Storage key prefix */
storagePrefix?: string;
/** Session retention period in days (default: 30) */
retentionDays?: number;
/** Enable detailed session logging */
@@ -106,7 +104,6 @@ export interface IAccountingManagerConfig {
export class AccountingManager {
private activeSessions: Map<string, IAccountingSession> = new Map();
private config: Required<IAccountingManagerConfig>;
private storageManager?: StorageManager;
private staleSessionSweepTimer?: ReturnType<typeof setInterval>;
// Counters for statistics
@@ -118,24 +115,20 @@ export class AccountingManager {
interimUpdatesReceived: 0,
};
constructor(config?: IAccountingManagerConfig, storageManager?: StorageManager) {
constructor(config?: IAccountingManagerConfig) {
this.config = {
storagePrefix: config?.storagePrefix ?? '/radius/accounting',
retentionDays: config?.retentionDays ?? 30,
detailedLogging: config?.detailedLogging ?? false,
maxActiveSessions: config?.maxActiveSessions ?? 10000,
staleSessionTimeoutHours: config?.staleSessionTimeoutHours ?? 24,
};
this.storageManager = storageManager;
}
/**
* Initialize the accounting manager
*/
async initialize(): Promise<void> {
if (this.storageManager) {
await this.loadActiveSessions();
}
await this.loadActiveSessions();
// Start periodic sweep to evict stale sessions (every 15 minutes)
this.staleSessionSweepTimer = setInterval(() => {
@@ -176,9 +169,7 @@ export class AccountingManager {
session.endTime = Date.now();
session.sessionTime = Math.floor((session.endTime - session.startTime) / 1000);
if (this.storageManager) {
this.archiveSession(session).catch(() => {});
}
this.persistSession(session).catch(() => {});
this.activeSessions.delete(sessionId);
swept++;
@@ -250,9 +241,7 @@ export class AccountingManager {
}
// Persist session
if (this.storageManager) {
await this.persistSession(session);
}
await this.persistSession(session);
}
/**
@@ -298,9 +287,7 @@ export class AccountingManager {
}
// Update persisted session
if (this.storageManager) {
await this.persistSession(session);
}
await this.persistSession(session);
}
/**
@@ -353,10 +340,8 @@ export class AccountingManager {
logger.log('info', `Accounting Stop: session=${data.sessionId}, duration=${session.sessionTime}s, in=${session.inputOctets}, out=${session.outputOctets}`);
}
// Archive the session
if (this.storageManager) {
await this.archiveSession(session);
}
// Update status in the database (single collection, no active->archive move needed)
await this.persistSession(session);
// Remove from active sessions
this.activeSessions.delete(data.sessionId);
@@ -493,23 +478,16 @@ export class AccountingManager {
* Clean up old archived sessions based on retention policy
*/
async cleanupOldSessions(): Promise<number> {
if (!this.storageManager) {
return 0;
}
const cutoffTime = Date.now() - this.config.retentionDays * 24 * 60 * 60 * 1000;
let deletedCount = 0;
try {
const keys = await this.storageManager.list(`${this.config.storagePrefix}/archive/`);
const oldDocs = await AccountingSessionDoc.findStoppedBefore(cutoffTime);
for (const key of keys) {
for (const doc of oldDocs) {
try {
const session = await this.storageManager.getJSON<IAccountingSession>(key);
if (session && session.endTime > 0 && session.endTime < cutoffTime) {
await this.storageManager.delete(key);
deletedCount++;
}
await doc.delete();
deletedCount++;
} catch (error) {
// Ignore individual errors
}
@@ -552,9 +530,7 @@ export class AccountingManager {
session.terminateCause = 'SessionEvicted';
session.endTime = Date.now();
if (this.storageManager) {
await this.archiveSession(session);
}
await this.persistSession(session);
this.activeSessions.delete(sessionId);
logger.log('warn', `Evicted session ${sessionId} due to capacity limit`);
@@ -562,25 +538,38 @@ export class AccountingManager {
}
/**
* Load active sessions from storage
* Load active sessions from database
*/
private async loadActiveSessions(): Promise<void> {
if (!this.storageManager) {
return;
}
try {
const keys = await this.storageManager.list(`${this.config.storagePrefix}/active/`);
const docs = await AccountingSessionDoc.findActive();
for (const key of keys) {
try {
const session = await this.storageManager.getJSON<IAccountingSession>(key);
if (session && session.status === 'active') {
this.activeSessions.set(session.sessionId, session);
}
} catch (error) {
// Ignore individual errors
}
for (const doc of docs) {
const session: IAccountingSession = {
sessionId: doc.sessionId,
username: doc.username,
macAddress: doc.macAddress,
nasIpAddress: doc.nasIpAddress,
nasPort: doc.nasPort,
nasPortType: doc.nasPortType,
nasIdentifier: doc.nasIdentifier,
vlanId: doc.vlanId,
framedIpAddress: doc.framedIpAddress,
calledStationId: doc.calledStationId,
callingStationId: doc.callingStationId,
startTime: doc.startTime,
endTime: doc.endTime,
lastUpdateTime: doc.lastUpdateTime,
status: doc.status,
terminateCause: doc.terminateCause,
inputOctets: doc.inputOctets,
outputOctets: doc.outputOctets,
inputPackets: doc.inputPackets,
outputPackets: doc.outputPackets,
sessionTime: doc.sessionTime,
serviceType: doc.serviceType,
};
this.activeSessions.set(session.sessionId, session);
}
} catch (error: unknown) {
logger.log('warn', `Failed to load active sessions: ${(error as Error).message}`);
@@ -588,70 +577,59 @@ export class AccountingManager {
}
/**
* Persist a session to storage
* Persist a session to the database (create or update)
*/
private async persistSession(session: IAccountingSession): Promise<void> {
if (!this.storageManager) {
return;
}
const key = `${this.config.storagePrefix}/active/${session.sessionId}.json`;
try {
await this.storageManager.setJSON(key, session);
let doc = await AccountingSessionDoc.findBySessionId(session.sessionId);
if (!doc) {
doc = new AccountingSessionDoc();
}
Object.assign(doc, session);
await doc.save();
} catch (error: unknown) {
logger.log('error', `Failed to persist session ${session.sessionId}: ${(error as Error).message}`);
}
}
/**
* Archive a completed session
*/
private async archiveSession(session: IAccountingSession): Promise<void> {
if (!this.storageManager) {
return;
}
try {
// Remove from active
const activeKey = `${this.config.storagePrefix}/active/${session.sessionId}.json`;
await this.storageManager.delete(activeKey);
// Add to archive with date-based path
const date = new Date(session.endTime);
const archiveKey = `${this.config.storagePrefix}/archive/${date.getFullYear()}/${String(date.getMonth() + 1).padStart(2, '0')}/${String(date.getDate()).padStart(2, '0')}/${session.sessionId}.json`;
await this.storageManager.setJSON(archiveKey, session);
} catch (error: unknown) {
logger.log('error', `Failed to archive session ${session.sessionId}: ${(error as Error).message}`);
}
}
/**
* Get archived sessions for a time period
* Get archived (stopped/terminated) sessions for a time period
*/
private async getArchivedSessions(startTime: number, endTime: number): Promise<IAccountingSession[]> {
if (!this.storageManager) {
return [];
}
const sessions: IAccountingSession[] = [];
try {
const keys = await this.storageManager.list(`${this.config.storagePrefix}/archive/`);
const docs = await AccountingSessionDoc.getInstances({
status: { $in: ['stopped', 'terminated'] } as any,
endTime: { $gt: 0, $gte: startTime } as any,
startTime: { $lte: endTime } as any,
});
for (const key of keys) {
try {
const session = await this.storageManager.getJSON<IAccountingSession>(key);
if (
session &&
session.endTime > 0 &&
session.startTime <= endTime &&
session.endTime >= startTime
) {
sessions.push(session);
}
} catch (error) {
// Ignore individual errors
}
for (const doc of docs) {
sessions.push({
sessionId: doc.sessionId,
username: doc.username,
macAddress: doc.macAddress,
nasIpAddress: doc.nasIpAddress,
nasPort: doc.nasPort,
nasPortType: doc.nasPortType,
nasIdentifier: doc.nasIdentifier,
vlanId: doc.vlanId,
framedIpAddress: doc.framedIpAddress,
calledStationId: doc.calledStationId,
callingStationId: doc.callingStationId,
startTime: doc.startTime,
endTime: doc.endTime,
lastUpdateTime: doc.lastUpdateTime,
status: doc.status,
terminateCause: doc.terminateCause,
inputOctets: doc.inputOctets,
outputOctets: doc.outputOctets,
inputPackets: doc.inputPackets,
outputPackets: doc.outputPackets,
sessionTime: doc.sessionTime,
serviceType: doc.serviceType,
});
}
} catch (error: unknown) {
logger.log('warn', `Failed to get archived sessions: ${(error as Error).message}`);