feat(stateprocess): add managed state processes with lifecycle controls, scheduled actions, and disposal safety

This commit is contained in:
2026-03-27 11:32:49 +00:00
parent b417e3d049
commit 034ae56536
16 changed files with 3338 additions and 2284 deletions

View File

@@ -1,7 +1,8 @@
import * as plugins from './smartstate.plugins.js';
import { Observable, shareReplay, takeUntil } from 'rxjs';
import { BehaviorSubject, Observable, shareReplay, takeUntil, distinctUntilChanged, interval } from 'rxjs';
import { StateAction, type IActionDef, type IActionContext } from './smartstate.classes.stateaction.js';
import type { Smartstate } from './smartstate.classes.smartstate.js';
import { StateProcess, type IProcessOptions, type IScheduledActionOptions } from './smartstate.classes.stateprocess.js';
export type TMiddleware<TPayload> = (
newState: TPayload,
@@ -31,19 +32,23 @@ export class StatePart<TStatePartName, TStatePayload> {
private static readonly MAX_NESTED_DISPATCH_DEPTH = 10;
public name: TStatePartName;
public state = new plugins.smartrx.rxjs.Subject<TStatePayload>();
public stateStore: TStatePayload | undefined;
private state = new BehaviorSubject<TStatePayload | undefined>(undefined);
private stateStore: TStatePayload | undefined;
public smartstateRef?: Smartstate<any>;
private disposed = false;
private cumulativeDeferred = plugins.smartpromise.cumulativeDefer();
private mutationQueue: Promise<any> = Promise.resolve();
private pendingCumulativeNotification: ReturnType<typeof setTimeout> | null = null;
private webStoreOptions: plugins.webstore.IWebStoreOptions;
private webStoreOptions: plugins.webstore.IWebStoreOptions | undefined;
private webStore: plugins.webstore.WebStore<TStatePayload> | null = null;
private middlewares: TMiddleware<TStatePayload>[] = [];
// Process tracking
private processes: StateProcess<TStatePartName, TStatePayload, any>[] = [];
// Selector memoization
private selectorCache = new WeakMap<Function, plugins.smartrx.rxjs.Observable<any>>();
private defaultSelectObservable: plugins.smartrx.rxjs.Observable<TStatePayload> | null = null;
@@ -97,6 +102,9 @@ export class StatePart<TStatePartName, TStatePayload> {
* sets the stateStore to the new state (serialized via mutation queue)
*/
public async setState(newStateArg: TStatePayload): Promise<TStatePayload> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
return this.mutationQueue = this.mutationQueue.then(
() => this.applyState(newStateArg),
() => this.applyState(newStateArg),
@@ -212,22 +220,24 @@ export class StatePart<TStatePartName, TStatePayload> {
}
const effectiveSelectorFn = selectorFn || ((state: TStatePayload) => <T>(<any>state));
const SELECTOR_ERROR: unique symbol = Symbol('selector-error');
let mapped = this.state.pipe(
plugins.smartrx.rxjs.ops.startWith(this.getState()),
plugins.smartrx.rxjs.ops.filter((stateArg): stateArg is TStatePayload => stateArg !== undefined),
plugins.smartrx.rxjs.ops.map((stateArg) => {
try {
return effectiveSelectorFn(stateArg);
} catch (e) {
console.error(`Selector error in state part '${this.name}':`, e);
return undefined;
return SELECTOR_ERROR as any;
}
})
}),
plugins.smartrx.rxjs.ops.filter((v: any) => v !== SELECTOR_ERROR),
distinctUntilChanged((a: any, b: any) => JSON.stringify(a) === JSON.stringify(b)),
);
if (hasSignal) {
mapped = mapped.pipe(takeUntil(fromAbortSignal(options.signal)));
mapped = mapped.pipe(takeUntil(fromAbortSignal(options.signal!)));
return mapped;
}
@@ -277,19 +287,16 @@ export class StatePart<TStatePartName, TStatePayload> {
* dispatches an action on the statepart level
*/
public async dispatchAction<T>(stateAction: StateAction<TStatePayload, T>, actionPayload: T): Promise<TStatePayload> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
await this.cumulativeDeferred.promise;
return this.mutationQueue = this.mutationQueue.then(
async () => {
const context = this.createActionContext(0);
const newState = await stateAction.actionDef(this, actionPayload, context);
return this.applyState(newState);
},
async () => {
const context = this.createActionContext(0);
const newState = await stateAction.actionDef(this, actionPayload, context);
return this.applyState(newState);
},
);
const execute = async () => {
const context = this.createActionContext(0);
const newState = await stateAction.actionDef(this, actionPayload, context);
return this.applyState(newState);
};
return this.mutationQueue = this.mutationQueue.then(execute, execute);
}
/**
@@ -374,11 +381,68 @@ export class StatePart<TStatePartName, TStatePayload> {
await this.setState(await resultPromise);
}
/**
* creates a managed, pausable process that ties an observable producer to state updates
*/
public createProcess<TProducerValue>(
options: IProcessOptions<TStatePayload, TProducerValue>
): StateProcess<TStatePartName, TStatePayload, TProducerValue> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
const process = new StateProcess<TStatePartName, TStatePayload, TProducerValue>(this, options);
this.processes.push(process);
if (options.autoStart) {
process.start();
}
return process;
}
/**
* creates a process that dispatches an action on a recurring interval
*/
public createScheduledAction<TActionPayload>(
options: IScheduledActionOptions<TStatePayload, TActionPayload>
): StateProcess<TStatePartName, TStatePayload, number> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
const process = new StateProcess<TStatePartName, TStatePayload, number>(this, {
producer: () => interval(options.intervalMs),
sideEffect: async () => {
await options.action.trigger(options.payload);
},
autoPause: options.autoPause ?? false,
});
this.processes.push(process);
process.start();
return process;
}
/** @internal — called by StateProcess.dispose() to remove itself */
public _removeProcess(process: StateProcess<any, any, any>): void {
const idx = this.processes.indexOf(process);
if (idx !== -1) {
this.processes.splice(idx, 1);
}
}
/**
* disposes the state part, completing the Subject and cleaning up resources
*/
public dispose(): void {
if (this.disposed) return;
this.disposed = true;
// Dispose all processes first
for (const process of [...this.processes]) {
process.dispose();
}
this.processes.length = 0;
this.state.complete();
this.mutationQueue = Promise.resolve() as any;
if (this.pendingCumulativeNotification) {
clearTimeout(this.pendingCumulativeNotification);
this.pendingCumulativeNotification = null;
@@ -388,5 +452,6 @@ export class StatePart<TStatePartName, TStatePayload> {
this.defaultSelectObservable = null;
this.webStore = null;
this.smartstateRef = undefined;
this.stateStore = undefined;
}
}