Files
smartstate/ts/smartstate.classes.statepart.ts

329 lines
9.8 KiB
TypeScript

import * as plugins from './smartstate.plugins.js';
import { Observable, shareReplay, takeUntil } from 'rxjs';
import { StateAction, type IActionDef } from './smartstate.classes.stateaction.js';
import type { Smartstate } from './smartstate.classes.smartstate.js';
export type TMiddleware<TPayload> = (
newState: TPayload,
oldState: TPayload | undefined,
) => TPayload | Promise<TPayload>;
/**
* creates an Observable that emits once when the given AbortSignal fires
*/
function fromAbortSignal(signal: AbortSignal): Observable<void> {
return new Observable<void>((subscriber) => {
if (signal.aborted) {
subscriber.next();
subscriber.complete();
return;
}
const handler = () => {
subscriber.next();
subscriber.complete();
};
signal.addEventListener('abort', handler);
return () => signal.removeEventListener('abort', handler);
});
}
export class StatePart<TStatePartName, TStatePayload> {
public name: TStatePartName;
public state = new plugins.smartrx.rxjs.Subject<TStatePayload>();
public stateStore: TStatePayload | undefined;
public smartstateRef?: Smartstate<any>;
private cumulativeDeferred = plugins.smartpromise.cumulativeDefer();
private pendingCumulativeNotification: ReturnType<typeof setTimeout> | null = null;
private pendingBatchNotification = false;
private webStoreOptions: plugins.webstore.IWebStoreOptions;
private webStore: plugins.webstore.WebStore<TStatePayload> | null = null;
private middlewares: TMiddleware<TStatePayload>[] = [];
// Selector memoization
private selectorCache = new WeakMap<Function, plugins.smartrx.rxjs.Observable<any>>();
private defaultSelectObservable: plugins.smartrx.rxjs.Observable<TStatePayload> | null = null;
constructor(nameArg: TStatePartName, webStoreOptionsArg?: plugins.webstore.IWebStoreOptions) {
this.name = nameArg;
if (webStoreOptionsArg) {
this.webStoreOptions = webStoreOptionsArg;
}
}
/**
* initializes the webstore
*/
public async init() {
if (this.webStoreOptions) {
this.webStore = new plugins.webstore.WebStore<TStatePayload>(this.webStoreOptions);
await this.webStore.init();
const storedState = await this.webStore.get(String(this.name));
if (storedState && this.validateState(storedState)) {
this.stateStore = storedState;
await this.notifyChange();
}
}
}
/**
* gets the state from the state store
*/
public getState(): TStatePayload | undefined {
return this.stateStore;
}
/**
* adds a middleware that intercepts setState calls.
* middleware can transform the state or throw to reject it.
* returns a removal function.
*/
public addMiddleware(middleware: TMiddleware<TStatePayload>): () => void {
this.middlewares.push(middleware);
return () => {
const idx = this.middlewares.indexOf(middleware);
if (idx !== -1) {
this.middlewares.splice(idx, 1);
}
};
}
/**
* sets the stateStore to the new state
*/
public async setState(newStateArg: TStatePayload) {
// Run middleware chain
let processedState = newStateArg;
for (const mw of this.middlewares) {
processedState = await mw(processedState, this.stateStore);
}
// Validate state structure
if (!this.validateState(processedState)) {
throw new Error(`Invalid state structure for state part '${this.name}'`);
}
// Save to WebStore first to ensure atomicity
if (this.webStore) {
await this.webStore.set(String(this.name), processedState);
}
// Update in-memory state after successful persistence
this.stateStore = processedState;
await this.notifyChange();
return this.stateStore;
}
/**
* Validates state structure - can be overridden for custom validation
*/
protected validateState(stateArg: any): stateArg is TStatePayload {
return stateArg !== null && stateArg !== undefined;
}
/**
* notifies of a change on the state
*/
public async notifyChange() {
if (!this.stateStore) {
return;
}
// If inside a batch, defer the notification
if (this.smartstateRef?.isBatching) {
this.pendingBatchNotification = true;
this.smartstateRef.registerPendingNotification(this);
return;
}
const createStateHash = async (stateArg: any) => {
return await plugins.smarthashWeb.sha256FromString(plugins.smartjson.stableOneWayStringify(stateArg));
};
const currentHash = await createStateHash(this.stateStore);
if (
this.lastStateNotificationPayloadHash &&
currentHash === this.lastStateNotificationPayloadHash
) {
return;
} else {
this.lastStateNotificationPayloadHash = currentHash;
}
this.state.next(this.stateStore);
}
private lastStateNotificationPayloadHash: any;
/**
* creates a cumulative notification by adding a change notification at the end of the call stack
*/
public notifyChangeCumulative() {
if (this.pendingCumulativeNotification) {
clearTimeout(this.pendingCumulativeNotification);
}
this.pendingCumulativeNotification = setTimeout(async () => {
this.pendingCumulativeNotification = null;
if (this.stateStore) {
await this.notifyChange();
}
}, 0);
}
/**
* selects a state or a substate.
* supports an optional AbortSignal for automatic unsubscription.
* memoizes observables by selector function reference.
*/
public select<T = TStatePayload>(
selectorFn?: (state: TStatePayload) => T,
options?: { signal?: AbortSignal }
): plugins.smartrx.rxjs.Observable<T> {
const hasSignal = options?.signal != null;
// Check memoization cache (only for non-signal selects)
if (!hasSignal) {
if (!selectorFn) {
if (this.defaultSelectObservable) {
return this.defaultSelectObservable as unknown as plugins.smartrx.rxjs.Observable<T>;
}
} else if (this.selectorCache.has(selectorFn)) {
return this.selectorCache.get(selectorFn)!;
}
}
const effectiveSelectorFn = selectorFn || ((state: TStatePayload) => <T>(<any>state));
let mapped = this.state.pipe(
plugins.smartrx.rxjs.ops.startWith(this.getState()),
plugins.smartrx.rxjs.ops.filter((stateArg): stateArg is TStatePayload => stateArg !== undefined),
plugins.smartrx.rxjs.ops.map((stateArg) => {
try {
return effectiveSelectorFn(stateArg);
} catch (e) {
console.error(`Selector error in state part '${this.name}':`, e);
return undefined;
}
})
);
if (hasSignal) {
mapped = mapped.pipe(takeUntil(fromAbortSignal(options.signal)));
return mapped;
}
// Apply shareReplay for caching and store in memo cache
const shared = mapped.pipe(shareReplay({ bufferSize: 1, refCount: true }));
if (!selectorFn) {
this.defaultSelectObservable = shared as unknown as plugins.smartrx.rxjs.Observable<TStatePayload>;
} else {
this.selectorCache.set(selectorFn, shared);
}
return shared;
}
/**
* creates an action capable of modifying the state
*/
public createAction<TActionPayload>(
actionDef: IActionDef<TStatePayload, TActionPayload>
): StateAction<TStatePayload, TActionPayload> {
return new StateAction(this, actionDef);
}
/**
* dispatches an action on the statepart level
*/
public async dispatchAction<T>(stateAction: StateAction<TStatePayload, T>, actionPayload: T): Promise<TStatePayload> {
await this.cumulativeDeferred.promise;
const newState = await stateAction.actionDef(this, actionPayload);
await this.setState(newState);
return this.getState();
}
/**
* waits until a certain part of the state becomes available.
* supports optional timeout and AbortSignal.
*/
public async waitUntilPresent<T = TStatePayload>(
selectorFn?: (state: TStatePayload) => T,
optionsOrTimeout?: number | { timeoutMs?: number; signal?: AbortSignal }
): Promise<T> {
// Parse backward-compatible args
let timeoutMs: number | undefined;
let signal: AbortSignal | undefined;
if (typeof optionsOrTimeout === 'number') {
timeoutMs = optionsOrTimeout;
} else if (optionsOrTimeout) {
timeoutMs = optionsOrTimeout.timeoutMs;
signal = optionsOrTimeout.signal;
}
const done = plugins.smartpromise.defer<T>();
const selectedObservable = this.select(selectorFn);
let resolved = false;
// Check if already aborted
if (signal?.aborted) {
throw new Error('Aborted');
}
const subscription = selectedObservable.subscribe((value) => {
if (value && !resolved) {
resolved = true;
done.resolve(value);
}
});
let timeoutId: ReturnType<typeof setTimeout> | undefined;
if (timeoutMs) {
timeoutId = setTimeout(() => {
if (!resolved) {
resolved = true;
subscription.unsubscribe();
done.reject(new Error(`waitUntilPresent timed out after ${timeoutMs}ms`));
}
}, timeoutMs);
}
// Handle abort signal
const abortHandler = signal ? () => {
if (!resolved) {
resolved = true;
subscription.unsubscribe();
if (timeoutId) clearTimeout(timeoutId);
done.reject(new Error('Aborted'));
}
} : undefined;
if (signal && abortHandler) {
signal.addEventListener('abort', abortHandler);
}
try {
const result = await done.promise;
return result;
} finally {
subscription.unsubscribe();
if (timeoutId) clearTimeout(timeoutId);
if (signal && abortHandler) {
signal.removeEventListener('abort', abortHandler);
}
}
}
/**
* is executed
*/
public async stateSetup(
funcArg: (statePartArg?: StatePart<any, TStatePayload>) => Promise<TStatePayload>
) {
const resultPromise = funcArg(this);
this.cumulativeDeferred.addPromise(resultPromise);
await this.setState(await resultPromise);
}
}