367 lines
11 KiB
TypeScript
367 lines
11 KiB
TypeScript
import * as plugins from './smartstate.plugins.js';
|
|
import { Observable, shareReplay, takeUntil } from 'rxjs';
|
|
import { StateAction, type IActionDef } from './smartstate.classes.stateaction.js';
|
|
import type { Smartstate } from './smartstate.classes.smartstate.js';
|
|
|
|
export type TMiddleware<TPayload> = (
|
|
newState: TPayload,
|
|
oldState: TPayload | undefined,
|
|
) => TPayload | Promise<TPayload>;
|
|
|
|
/**
|
|
* creates an Observable that emits once when the given AbortSignal fires
|
|
*/
|
|
function fromAbortSignal(signal: AbortSignal): Observable<void> {
|
|
return new Observable<void>((subscriber) => {
|
|
if (signal.aborted) {
|
|
subscriber.next();
|
|
subscriber.complete();
|
|
return;
|
|
}
|
|
const handler = () => {
|
|
subscriber.next();
|
|
subscriber.complete();
|
|
};
|
|
signal.addEventListener('abort', handler);
|
|
return () => signal.removeEventListener('abort', handler);
|
|
});
|
|
}
|
|
|
|
export class StatePart<TStatePartName, TStatePayload> {
|
|
public name: TStatePartName;
|
|
public state = new plugins.smartrx.rxjs.Subject<TStatePayload>();
|
|
public stateStore: TStatePayload | undefined;
|
|
public smartstateRef?: Smartstate<any>;
|
|
private cumulativeDeferred = plugins.smartpromise.cumulativeDefer();
|
|
|
|
private mutationQueue: Promise<any> = Promise.resolve();
|
|
private pendingCumulativeNotification: ReturnType<typeof setTimeout> | null = null;
|
|
|
|
private webStoreOptions: plugins.webstore.IWebStoreOptions;
|
|
private webStore: plugins.webstore.WebStore<TStatePayload> | null = null;
|
|
|
|
private middlewares: TMiddleware<TStatePayload>[] = [];
|
|
|
|
// Selector memoization
|
|
private selectorCache = new WeakMap<Function, plugins.smartrx.rxjs.Observable<any>>();
|
|
private defaultSelectObservable: plugins.smartrx.rxjs.Observable<TStatePayload> | null = null;
|
|
|
|
constructor(nameArg: TStatePartName, webStoreOptionsArg?: plugins.webstore.IWebStoreOptions) {
|
|
this.name = nameArg;
|
|
|
|
if (webStoreOptionsArg) {
|
|
this.webStoreOptions = webStoreOptionsArg;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* initializes the webstore
|
|
*/
|
|
public async init() {
|
|
if (this.webStoreOptions) {
|
|
this.webStore = new plugins.webstore.WebStore<TStatePayload>(this.webStoreOptions);
|
|
await this.webStore.init();
|
|
const storedState = await this.webStore.get(String(this.name));
|
|
if (storedState && this.validateState(storedState)) {
|
|
this.stateStore = storedState;
|
|
await this.notifyChange();
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* gets the state from the state store
|
|
*/
|
|
public getState(): TStatePayload | undefined {
|
|
return this.stateStore;
|
|
}
|
|
|
|
/**
|
|
* adds a middleware that intercepts setState calls.
|
|
* middleware can transform the state or throw to reject it.
|
|
* returns a removal function.
|
|
*/
|
|
public addMiddleware(middleware: TMiddleware<TStatePayload>): () => void {
|
|
this.middlewares.push(middleware);
|
|
return () => {
|
|
const idx = this.middlewares.indexOf(middleware);
|
|
if (idx !== -1) {
|
|
this.middlewares.splice(idx, 1);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* sets the stateStore to the new state (serialized via mutation queue)
|
|
*/
|
|
public async setState(newStateArg: TStatePayload): Promise<TStatePayload> {
|
|
return this.mutationQueue = this.mutationQueue.then(
|
|
() => this.applyState(newStateArg),
|
|
() => this.applyState(newStateArg),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* applies the state change (middleware → validate → persist → notify)
|
|
*/
|
|
private async applyState(newStateArg: TStatePayload): Promise<TStatePayload> {
|
|
// Run middleware chain
|
|
let processedState = newStateArg;
|
|
for (const mw of this.middlewares) {
|
|
processedState = await mw(processedState, this.stateStore);
|
|
}
|
|
|
|
// Validate state structure
|
|
if (!this.validateState(processedState)) {
|
|
throw new Error(`Invalid state structure for state part '${this.name}'`);
|
|
}
|
|
|
|
// Save to WebStore first to ensure atomicity
|
|
if (this.webStore) {
|
|
await this.webStore.set(String(this.name), processedState);
|
|
}
|
|
|
|
// Update in-memory state after successful persistence
|
|
this.stateStore = processedState;
|
|
await this.notifyChange();
|
|
|
|
return this.stateStore;
|
|
}
|
|
|
|
/**
|
|
* Validates state structure - can be overridden for custom validation
|
|
*/
|
|
protected validateState(stateArg: any): stateArg is TStatePayload {
|
|
return stateArg !== null && stateArg !== undefined;
|
|
}
|
|
|
|
/**
|
|
* notifies of a change on the state
|
|
*/
|
|
public async notifyChange() {
|
|
const snapshot = this.stateStore;
|
|
if (snapshot === undefined) {
|
|
return;
|
|
}
|
|
|
|
// If inside a batch, defer the notification
|
|
if (this.smartstateRef?.isBatching) {
|
|
this.smartstateRef.registerPendingNotification(this);
|
|
return;
|
|
}
|
|
|
|
const createStateHash = async (stateArg: any) => {
|
|
return await plugins.smarthashWeb.sha256FromString(plugins.smartjson.stableOneWayStringify(stateArg));
|
|
};
|
|
try {
|
|
const currentHash = await createStateHash(snapshot);
|
|
if (
|
|
this.lastStateNotificationPayloadHash &&
|
|
currentHash === this.lastStateNotificationPayloadHash
|
|
) {
|
|
return;
|
|
}
|
|
this.lastStateNotificationPayloadHash = currentHash;
|
|
} catch (err) {
|
|
console.error(`State hash computation failed for '${this.name}':`, err);
|
|
}
|
|
this.state.next(snapshot);
|
|
}
|
|
private lastStateNotificationPayloadHash: any;
|
|
|
|
/**
|
|
* creates a cumulative notification by adding a change notification at the end of the call stack
|
|
*/
|
|
public notifyChangeCumulative() {
|
|
if (this.pendingCumulativeNotification) {
|
|
clearTimeout(this.pendingCumulativeNotification);
|
|
}
|
|
|
|
this.pendingCumulativeNotification = setTimeout(() => {
|
|
this.pendingCumulativeNotification = null;
|
|
if (this.stateStore !== undefined) {
|
|
this.notifyChange().catch((err) => {
|
|
console.error(`notifyChangeCumulative failed for '${this.name}':`, err);
|
|
});
|
|
}
|
|
}, 0);
|
|
}
|
|
|
|
/**
|
|
* selects a state or a substate.
|
|
* supports an optional AbortSignal for automatic unsubscription.
|
|
* memoizes observables by selector function reference.
|
|
*/
|
|
public select<T = TStatePayload>(
|
|
selectorFn?: (state: TStatePayload) => T,
|
|
options?: { signal?: AbortSignal }
|
|
): plugins.smartrx.rxjs.Observable<T> {
|
|
const hasSignal = options?.signal != null;
|
|
|
|
// Check memoization cache (only for non-signal selects)
|
|
if (!hasSignal) {
|
|
if (!selectorFn) {
|
|
if (this.defaultSelectObservable) {
|
|
return this.defaultSelectObservable as unknown as plugins.smartrx.rxjs.Observable<T>;
|
|
}
|
|
} else if (this.selectorCache.has(selectorFn)) {
|
|
return this.selectorCache.get(selectorFn)!;
|
|
}
|
|
}
|
|
|
|
const effectiveSelectorFn = selectorFn || ((state: TStatePayload) => <T>(<any>state));
|
|
|
|
let mapped = this.state.pipe(
|
|
plugins.smartrx.rxjs.ops.startWith(this.getState()),
|
|
plugins.smartrx.rxjs.ops.filter((stateArg): stateArg is TStatePayload => stateArg !== undefined),
|
|
plugins.smartrx.rxjs.ops.map((stateArg) => {
|
|
try {
|
|
return effectiveSelectorFn(stateArg);
|
|
} catch (e) {
|
|
console.error(`Selector error in state part '${this.name}':`, e);
|
|
return undefined;
|
|
}
|
|
})
|
|
);
|
|
|
|
if (hasSignal) {
|
|
mapped = mapped.pipe(takeUntil(fromAbortSignal(options.signal)));
|
|
return mapped;
|
|
}
|
|
|
|
// Apply shareReplay for caching and store in memo cache
|
|
const shared = mapped.pipe(shareReplay({ bufferSize: 1, refCount: true }));
|
|
if (!selectorFn) {
|
|
this.defaultSelectObservable = shared as unknown as plugins.smartrx.rxjs.Observable<TStatePayload>;
|
|
} else {
|
|
this.selectorCache.set(selectorFn, shared);
|
|
}
|
|
|
|
return shared;
|
|
}
|
|
|
|
/**
|
|
* creates an action capable of modifying the state
|
|
*/
|
|
public createAction<TActionPayload>(
|
|
actionDef: IActionDef<TStatePayload, TActionPayload>
|
|
): StateAction<TStatePayload, TActionPayload> {
|
|
return new StateAction(this, actionDef);
|
|
}
|
|
|
|
/**
|
|
* dispatches an action on the statepart level
|
|
*/
|
|
public async dispatchAction<T>(stateAction: StateAction<TStatePayload, T>, actionPayload: T): Promise<TStatePayload> {
|
|
await this.cumulativeDeferred.promise;
|
|
return this.mutationQueue = this.mutationQueue.then(
|
|
async () => {
|
|
const newState = await stateAction.actionDef(this, actionPayload);
|
|
return this.applyState(newState);
|
|
},
|
|
async () => {
|
|
const newState = await stateAction.actionDef(this, actionPayload);
|
|
return this.applyState(newState);
|
|
},
|
|
);
|
|
}
|
|
|
|
/**
|
|
* waits until a certain part of the state becomes available.
|
|
* supports optional timeout and AbortSignal.
|
|
*/
|
|
public async waitUntilPresent<T = TStatePayload>(
|
|
selectorFn?: (state: TStatePayload) => T,
|
|
optionsOrTimeout?: number | { timeoutMs?: number; signal?: AbortSignal }
|
|
): Promise<T> {
|
|
// Parse backward-compatible args
|
|
let timeoutMs: number | undefined;
|
|
let signal: AbortSignal | undefined;
|
|
if (typeof optionsOrTimeout === 'number') {
|
|
timeoutMs = optionsOrTimeout;
|
|
} else if (optionsOrTimeout) {
|
|
timeoutMs = optionsOrTimeout.timeoutMs;
|
|
signal = optionsOrTimeout.signal;
|
|
}
|
|
|
|
const done = plugins.smartpromise.defer<T>();
|
|
const selectedObservable = this.select(selectorFn);
|
|
let resolved = false;
|
|
|
|
// Check if already aborted
|
|
if (signal?.aborted) {
|
|
throw new Error('Aborted');
|
|
}
|
|
|
|
const subscription = selectedObservable.subscribe((value) => {
|
|
if (value !== undefined && value !== null && !resolved) {
|
|
resolved = true;
|
|
done.resolve(value);
|
|
}
|
|
});
|
|
|
|
let timeoutId: ReturnType<typeof setTimeout> | undefined;
|
|
if (timeoutMs) {
|
|
timeoutId = setTimeout(() => {
|
|
if (!resolved) {
|
|
resolved = true;
|
|
subscription.unsubscribe();
|
|
done.reject(new Error(`waitUntilPresent timed out after ${timeoutMs}ms`));
|
|
}
|
|
}, timeoutMs);
|
|
}
|
|
|
|
// Handle abort signal
|
|
const abortHandler = signal ? () => {
|
|
if (!resolved) {
|
|
resolved = true;
|
|
subscription.unsubscribe();
|
|
if (timeoutId) clearTimeout(timeoutId);
|
|
done.reject(new Error('Aborted'));
|
|
}
|
|
} : undefined;
|
|
|
|
if (signal && abortHandler) {
|
|
signal.addEventListener('abort', abortHandler);
|
|
}
|
|
|
|
try {
|
|
const result = await done.promise;
|
|
return result;
|
|
} finally {
|
|
subscription.unsubscribe();
|
|
if (timeoutId) clearTimeout(timeoutId);
|
|
if (signal && abortHandler) {
|
|
signal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* is executed
|
|
*/
|
|
public async stateSetup(
|
|
funcArg: (statePartArg?: StatePart<any, TStatePayload>) => Promise<TStatePayload>
|
|
) {
|
|
const resultPromise = funcArg(this);
|
|
this.cumulativeDeferred.addPromise(resultPromise);
|
|
await this.setState(await resultPromise);
|
|
}
|
|
|
|
/**
|
|
* disposes the state part, completing the Subject and cleaning up resources
|
|
*/
|
|
public dispose(): void {
|
|
this.state.complete();
|
|
if (this.pendingCumulativeNotification) {
|
|
clearTimeout(this.pendingCumulativeNotification);
|
|
this.pendingCumulativeNotification = null;
|
|
}
|
|
this.middlewares.length = 0;
|
|
this.selectorCache = new WeakMap();
|
|
this.defaultSelectObservable = null;
|
|
this.webStore = null;
|
|
this.smartstateRef = undefined;
|
|
}
|
|
}
|