Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a66518bde8 | |||
| 034ae56536 | |||
| b417e3d049 | |||
| 2b871402cc |
2
.vscode/settings.json
vendored
2
.vscode/settings.json
vendored
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"json.schemas": [
|
||||
{
|
||||
"fileMatch": ["/npmextra.json"],
|
||||
"fileMatch": ["/.smartconfig.json"],
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
||||
16
changelog.md
16
changelog.md
@@ -1,5 +1,21 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-27 - 2.3.0 - feat(stateprocess)
|
||||
add managed state processes with lifecycle controls, scheduled actions, and disposal safety
|
||||
|
||||
- introduces StateProcess with start, pause, resume, dispose, status, and auto-pause support
|
||||
- adds createProcess() and createScheduledAction() on StatePart for polling, streams, and recurring actions
|
||||
- adds disposal guards and Smartstate.dispose() to clean up state parts and attached processes
|
||||
- improves selector and computed observables with distinct-until-changed behavior and skipped selector error emissions
|
||||
- renames npmextra.json to .smartconfig.json and updates package tooling dependencies
|
||||
|
||||
## 2026-03-04 - 2.2.1 - fix(smartstate)
|
||||
no changes detected; no version bump required
|
||||
|
||||
- Git diff shows no changes
|
||||
- package.json version is 2.2.0
|
||||
- No files modified — no release needed
|
||||
|
||||
## 2026-03-02 - 2.2.0 - feat(actions)
|
||||
add action context for safe nested dispatch with depth limit to prevent deadlocks
|
||||
|
||||
|
||||
16
package.json
16
package.json
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartstate",
|
||||
"version": "2.2.0",
|
||||
"version": "2.3.0",
|
||||
"private": false,
|
||||
"description": "A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support.",
|
||||
"main": "dist_ts/index.js",
|
||||
@@ -14,19 +14,19 @@
|
||||
"buildDocs": "tsdoc"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@git.zone/tsbuild": "^4.1.2",
|
||||
"@git.zone/tsbundle": "^2.9.0",
|
||||
"@git.zone/tsrun": "^2.0.1",
|
||||
"@git.zone/tstest": "^3.1.8",
|
||||
"@git.zone/tsbuild": "^4.4.0",
|
||||
"@git.zone/tsbundle": "^2.10.0",
|
||||
"@git.zone/tsrun": "^2.0.2",
|
||||
"@git.zone/tstest": "^3.6.1",
|
||||
"@push.rocks/tapbundle": "^6.0.3",
|
||||
"@types/node": "^25.3.2"
|
||||
"@types/node": "^25.5.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"@push.rocks/smarthash": "^3.2.6",
|
||||
"@push.rocks/smartjson": "^6.0.0",
|
||||
"@push.rocks/smartpromise": "^4.2.3",
|
||||
"@push.rocks/smartrx": "^3.0.10",
|
||||
"@push.rocks/webstore": "^2.0.20"
|
||||
"@push.rocks/webstore": "^2.0.21"
|
||||
},
|
||||
"files": [
|
||||
"ts/**/*",
|
||||
@@ -37,7 +37,7 @@
|
||||
"dist_ts_web/**/*",
|
||||
"assets/**/*",
|
||||
"cli.js",
|
||||
"npmextra.json",
|
||||
".smartconfig.json",
|
||||
"readme.md"
|
||||
],
|
||||
"browserslist": [
|
||||
|
||||
4655
pnpm-lock.yaml
generated
4655
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
224
readme.md
224
readme.md
@@ -1,6 +1,6 @@
|
||||
# @push.rocks/smartstate
|
||||
|
||||
A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support 🚀
|
||||
A TypeScript-first reactive state management library with processes, middleware, computed state, batching, persistence, and Web Component Context Protocol support 🚀
|
||||
|
||||
## Issue Reporting and Security
|
||||
|
||||
@@ -48,7 +48,7 @@ await userState.setState({ name: 'Alice', loggedIn: true });
|
||||
|
||||
### 🧩 State Parts & Init Modes
|
||||
|
||||
State parts are isolated, typed units of state. They are the building blocks of your application's state tree. Create them via `getStatePart()`:
|
||||
State parts are isolated, typed units of state — the building blocks of your application's state tree. Create them via `getStatePart()`:
|
||||
|
||||
```typescript
|
||||
const part = await state.getStatePart<IMyState>(name, initialState, initMode);
|
||||
@@ -58,10 +58,10 @@ const part = await state.getStatePart<IMyState>(name, initialState, initMode);
|
||||
|-----------|----------|
|
||||
| `'soft'` (default) | Returns existing if found, creates new otherwise |
|
||||
| `'mandatory'` | Throws if state part already exists — useful for ensuring single-initialization |
|
||||
| `'force'` | Always creates a new state part, overwriting any existing one |
|
||||
| `'force'` | Always creates a new state part, disposing and overwriting any existing one |
|
||||
| `'persistent'` | Like `'soft'` but automatically persists state to IndexedDB via WebStore |
|
||||
|
||||
You can use either enums or string literal types for state part names:
|
||||
You can use either string literal union types or enums for state part names:
|
||||
|
||||
```typescript
|
||||
// String literal types (simpler)
|
||||
@@ -82,12 +82,12 @@ const settings = await state.getStatePart('settings', { theme: 'dark', fontSize:
|
||||
|
||||
// ✅ Automatically saved to IndexedDB on every setState()
|
||||
// ✅ On next app load, persisted values override defaults
|
||||
// ✅ Persistence writes complete before in-memory updates (atomic)
|
||||
// ✅ Persistence writes complete before in-memory updates
|
||||
```
|
||||
|
||||
### 🔭 Selecting State
|
||||
|
||||
`select()` returns an RxJS Observable that emits the current value immediately and on every subsequent change:
|
||||
`select()` returns an RxJS Observable that emits the current value immediately (via `BehaviorSubject`) and on every subsequent change:
|
||||
|
||||
```typescript
|
||||
// Full state
|
||||
@@ -99,6 +99,8 @@ userState.select((s) => s.name).subscribe((name) => console.log(name));
|
||||
|
||||
Selectors are **memoized** — calling `select(fn)` with the same function reference returns the same cached Observable, shared across all subscribers via `shareReplay`. This means you can call `select(mySelector)` in multiple places without creating duplicate subscriptions.
|
||||
|
||||
**Change detection** is built in: `select()` uses `distinctUntilChanged` with deep JSON comparison, so subscribers only fire when the selected value actually changes. Selecting `s => s.name` won't re-emit when only `s.count` changes.
|
||||
|
||||
#### ✂️ AbortSignal Support
|
||||
|
||||
Clean up subscriptions without manual `.unsubscribe()` — the modern way:
|
||||
@@ -125,7 +127,6 @@ interface ILoginPayload {
|
||||
}
|
||||
|
||||
const loginAction = userState.createAction<ILoginPayload>(async (statePart, payload) => {
|
||||
// You have access to the current state via statePart.getState()
|
||||
const current = statePart.getState();
|
||||
return { ...current, name: payload.username, loggedIn: true };
|
||||
});
|
||||
@@ -136,7 +137,157 @@ await loginAction.trigger({ username: 'Alice', email: 'alice@example.com' });
|
||||
await userState.dispatchAction(loginAction, { username: 'Alice', email: 'alice@example.com' });
|
||||
```
|
||||
|
||||
Both `trigger()` and `dispatchAction()` return a Promise with the new state.
|
||||
Both `trigger()` and `dispatchAction()` return a Promise with the new state. All dispatches are serialized through a mutation queue, so concurrent dispatches never cause lost updates.
|
||||
|
||||
#### 🔗 Nested Actions (Action Context)
|
||||
|
||||
When you need to dispatch sub-actions from within an action, use the `context` parameter. This is critical because calling `dispatchAction()` directly from inside an action would deadlock (it tries to acquire the mutation queue that's already held). The context's `dispatch()` bypasses the queue and executes inline:
|
||||
|
||||
```typescript
|
||||
const incrementAction = userState.createAction<number>(async (statePart, amount) => {
|
||||
const current = statePart.getState();
|
||||
return { ...current, count: current.count + amount };
|
||||
});
|
||||
|
||||
const doubleIncrementAction = userState.createAction<number>(async (statePart, amount, context) => {
|
||||
// ✅ Safe: uses context.dispatch() which bypasses the mutation queue
|
||||
await context.dispatch(incrementAction, amount);
|
||||
const current = statePart.getState();
|
||||
return { ...current, count: current.count + amount };
|
||||
});
|
||||
|
||||
// ❌ DON'T do this inside an action — it will deadlock:
|
||||
// await statePart.dispatchAction(someAction, payload);
|
||||
```
|
||||
|
||||
A built-in depth limit (10 levels) prevents infinite circular dispatch chains, throwing a clear error if exceeded.
|
||||
|
||||
### 🔄 Processes (Polling, Streams & Scheduled Tasks)
|
||||
|
||||
Processes are managed, pausable observable-to-state bridges — the "side effects" layer. They tie an ongoing data source (polling, WebSockets, event streams) to state updates with full lifecycle control and optional auto-pause.
|
||||
|
||||
#### Basic Process: Polling an API
|
||||
|
||||
```typescript
|
||||
import { interval, switchMap, from } from 'rxjs';
|
||||
|
||||
const metricsPoller = dashboard.createProcess<{ cpu: number; memory: number }>({
|
||||
// Producer: an Observable factory — called on start and each resume
|
||||
producer: () => interval(5000).pipe(
|
||||
switchMap(() => from(fetch('/api/metrics').then(r => r.json()))),
|
||||
),
|
||||
// Reducer: folds each produced value into state (runs through middleware & validation)
|
||||
reducer: (currentState, metrics) => ({
|
||||
...currentState,
|
||||
metrics,
|
||||
lastUpdated: Date.now(),
|
||||
}),
|
||||
autoPause: 'visibility', // ⏸️ Stop polling when the tab is hidden
|
||||
autoStart: true, // ▶️ Start immediately
|
||||
});
|
||||
|
||||
// Full lifecycle control
|
||||
metricsPoller.pause(); // Unsubscribes from producer
|
||||
metricsPoller.resume(); // Re-subscribes (fresh subscription)
|
||||
metricsPoller.dispose(); // Permanent cleanup
|
||||
|
||||
// Observe status reactively
|
||||
metricsPoller.status; // 'idle' | 'running' | 'paused' | 'disposed'
|
||||
metricsPoller.status$.subscribe(s => console.log('Process:', s));
|
||||
```
|
||||
|
||||
#### Scheduled Actions
|
||||
|
||||
Dispatch an existing action on a recurring interval — syntactic sugar over `createProcess`:
|
||||
|
||||
```typescript
|
||||
const refreshAction = dashboard.createAction<void>(async (sp) => {
|
||||
const data = await fetch('/api/dashboard').then(r => r.json());
|
||||
return { ...sp.getState()!, ...data, lastUpdated: Date.now() };
|
||||
});
|
||||
|
||||
// Dispatches refreshAction every 30 seconds, auto-pauses when tab is hidden
|
||||
const scheduled = dashboard.createScheduledAction({
|
||||
action: refreshAction,
|
||||
payload: undefined,
|
||||
intervalMs: 30000,
|
||||
autoPause: 'visibility',
|
||||
});
|
||||
|
||||
// It's a full StateProcess — pause, resume, dispose all work
|
||||
scheduled.dispose();
|
||||
```
|
||||
|
||||
#### Custom Auto-Pause Signals
|
||||
|
||||
Pass any `Observable<boolean>` as the auto-pause signal — `true` means active, `false` means pause:
|
||||
|
||||
```typescript
|
||||
import { fromEvent, map, startWith } from 'rxjs';
|
||||
|
||||
// Pause when offline, resume when online
|
||||
const onlineSignal = fromEvent(window, 'online').pipe(
|
||||
startWith(null),
|
||||
map(() => navigator.onLine),
|
||||
);
|
||||
|
||||
const syncProcess = userPart.createProcess<SyncPayload>({
|
||||
producer: () => interval(10000).pipe(
|
||||
switchMap(() => from(syncWithServer())),
|
||||
),
|
||||
reducer: (state, result) => ({ ...state, ...result }),
|
||||
autoPause: onlineSignal,
|
||||
});
|
||||
syncProcess.start();
|
||||
```
|
||||
|
||||
#### WebSocket / Live Streams
|
||||
|
||||
Pause disconnects; resume creates a fresh connection:
|
||||
|
||||
```typescript
|
||||
const liveProcess = tickerPart.createProcess<TradeEvent>({
|
||||
producer: () => new Observable<TradeEvent>(subscriber => {
|
||||
const ws = new WebSocket('wss://trades.example.com');
|
||||
ws.onmessage = (e) => subscriber.next(JSON.parse(e.data));
|
||||
ws.onerror = (e) => subscriber.error(e);
|
||||
ws.onclose = () => subscriber.complete();
|
||||
return () => ws.close(); // Teardown: close WebSocket on unsubscribe
|
||||
}),
|
||||
reducer: (state, trade) => ({
|
||||
...state,
|
||||
lastPrice: trade.price,
|
||||
trades: [...state.trades.slice(-99), trade],
|
||||
}),
|
||||
autoPause: 'visibility',
|
||||
});
|
||||
liveProcess.start();
|
||||
```
|
||||
|
||||
#### Error Recovery
|
||||
|
||||
If a producer errors, the process gracefully transitions to `'paused'` instead of dying. Call `resume()` to retry with a fresh subscription:
|
||||
|
||||
```typescript
|
||||
process.start();
|
||||
// Producer errors → status becomes 'paused'
|
||||
process.resume(); // Creates a fresh subscription — retry
|
||||
```
|
||||
|
||||
#### Process Cleanup Cascades
|
||||
|
||||
Disposing a `StatePart` or `Smartstate` instance automatically disposes all attached processes:
|
||||
|
||||
```typescript
|
||||
const p1 = part.createProcess({ ... });
|
||||
const p2 = part.createProcess({ ... });
|
||||
p1.start();
|
||||
p2.start();
|
||||
|
||||
part.dispose();
|
||||
console.log(p1.status); // 'disposed'
|
||||
console.log(p2.status); // 'disposed'
|
||||
```
|
||||
|
||||
### 🛡️ Middleware
|
||||
|
||||
@@ -171,7 +322,7 @@ const remove = userState.addMiddleware(myMiddleware);
|
||||
remove(); // middleware no longer runs
|
||||
```
|
||||
|
||||
Middleware runs **sequentially** in insertion order. If any middleware throws, the state remains unchanged — the operation is **atomic**.
|
||||
Middleware runs **sequentially** in insertion order. If any middleware throws, the state remains unchanged — the operation is **atomic**. Process-driven state updates go through middleware too.
|
||||
|
||||
### 🧮 Computed / Derived State
|
||||
|
||||
@@ -199,7 +350,7 @@ const greeting2$ = state.computed(
|
||||
);
|
||||
```
|
||||
|
||||
Computed observables are **lazy** — they only subscribe to their sources when someone subscribes to them, and they automatically unsubscribe when all subscribers disconnect.
|
||||
Computed observables are **lazy** — they only subscribe to their sources when someone subscribes to them, and they automatically unsubscribe when all subscribers disconnect. They also use `distinctUntilChanged` to avoid redundant emissions when the derived value hasn't actually changed.
|
||||
|
||||
### 📦 Batch Updates
|
||||
|
||||
@@ -322,15 +473,31 @@ await userState.stateSetup(async (statePart) => {
|
||||
// Any dispatchAction() calls will automatically wait for stateSetup() to finish
|
||||
```
|
||||
|
||||
### 🧹 Disposal & Cleanup
|
||||
|
||||
Both `Smartstate` and individual `StatePart` instances support disposal for proper cleanup:
|
||||
|
||||
```typescript
|
||||
// Dispose a single state part — completes the BehaviorSubject, clears middleware, caches,
|
||||
// and disposes all attached processes
|
||||
userState.dispose();
|
||||
|
||||
// Dispose the entire Smartstate instance — disposes all state parts and clears internal maps
|
||||
state.dispose();
|
||||
```
|
||||
|
||||
After disposal, `setState()` and `dispatchAction()` will throw if called on a disposed `StatePart`. Calling `start()`, `pause()`, or `resume()` on a disposed `StateProcess` also throws.
|
||||
|
||||
### 🏎️ Performance
|
||||
|
||||
Smartstate is built with performance in mind:
|
||||
|
||||
- **🔒 SHA256 Change Detection** — Uses content hashing to detect actual changes. Identical state values don't trigger notifications, even with different object references.
|
||||
- **🎯 distinctUntilChanged on Selectors** — Sub-selectors only fire when the selected slice actually changes. `select(s => s.name)` won't emit when `s.count` changes.
|
||||
- **♻️ Selector Memoization** — `select(fn)` caches observables by function reference and shares them via `shareReplay({ refCount: true })`. Multiple subscribers share one upstream subscription.
|
||||
- **📦 Cumulative Notifications** — `notifyChangeCumulative()` debounces rapid changes into a single notification at the end of the call stack.
|
||||
- **🔐 Concurrent Safety** — Simultaneous `getStatePart()` calls for the same name return the same promise, preventing duplicate creation or race conditions.
|
||||
- **💾 Atomic Persistence** — WebStore writes complete before in-memory state updates, ensuring consistency even if the process crashes mid-write.
|
||||
- **🔐 Concurrent Safety** — Simultaneous `getStatePart()` calls for the same name return the same promise, preventing duplicate creation. All `setState()` and `dispatchAction()` calls are serialized through a mutation queue. Process values are serialized through their own internal queue.
|
||||
- **💾 Atomic Persistence** — WebStore writes complete before in-memory state updates, ensuring consistency.
|
||||
- **⏸️ Batch Deferred Notifications** — `batch()` suppresses all subscriber notifications until every update in the batch completes.
|
||||
|
||||
## API Reference
|
||||
@@ -342,23 +509,26 @@ Smartstate is built with performance in mind:
|
||||
| `getStatePart(name, initial?, initMode?)` | Get or create a typed state part |
|
||||
| `batch(fn)` | Batch state updates, defer all notifications until complete |
|
||||
| `computed(sources, fn)` | Create a computed observable from multiple state parts |
|
||||
| `dispose()` | Dispose all state parts and clear internal state |
|
||||
| `isBatching` | `boolean` — whether a batch is currently active |
|
||||
| `statePartMap` | Registry of all created state parts |
|
||||
|
||||
### `StatePart<TName, TPayload>`
|
||||
|
||||
| Method | Description |
|
||||
|--------|-------------|
|
||||
| `getState()` | Get current state (returns `TPayload \| undefined`) |
|
||||
| `getState()` | Get current state synchronously (`TPayload \| undefined`) |
|
||||
| `setState(newState)` | Set state — runs middleware → validates → persists → notifies |
|
||||
| `select(selectorFn?, options?)` | Returns an Observable of state or derived values. Options: `{ signal?: AbortSignal }` |
|
||||
| `select(selectorFn?, options?)` | Observable of state or derived values. Options: `{ signal?: AbortSignal }` |
|
||||
| `createAction(actionDef)` | Create a reusable, typed state action |
|
||||
| `dispatchAction(action, payload)` | Dispatch an action and return the new state |
|
||||
| `addMiddleware(fn)` | Add a middleware interceptor. Returns a removal function |
|
||||
| `waitUntilPresent(selectorFn?, opts?)` | Wait for a state condition. Opts: `number` (timeout) or `{ timeoutMs?, signal? }` |
|
||||
| `createProcess(options)` | Create a managed, pausable process tied to this state part |
|
||||
| `createScheduledAction(options)` | Create a process that dispatches an action on a recurring interval |
|
||||
| `notifyChange()` | Manually trigger a change notification (with hash dedup) |
|
||||
| `notifyChangeCumulative()` | Debounced notification — fires at end of call stack |
|
||||
| `stateSetup(fn)` | Async state initialization with action serialization |
|
||||
| `dispose()` | Complete the BehaviorSubject, dispose processes, clear middleware and caches |
|
||||
|
||||
### `StateAction<TState, TPayload>`
|
||||
|
||||
@@ -366,6 +536,23 @@ Smartstate is built with performance in mind:
|
||||
|--------|-------------|
|
||||
| `trigger(payload)` | Dispatch the action on its associated state part |
|
||||
|
||||
### `StateProcess<TName, TPayload, TProducerValue>`
|
||||
|
||||
| Method / Property | Description |
|
||||
|-------------------|-------------|
|
||||
| `start()` | Start the process (subscribes to producer, sets up auto-pause) |
|
||||
| `pause()` | Pause the process (unsubscribes from producer) |
|
||||
| `resume()` | Resume a paused process (fresh subscription to producer) |
|
||||
| `dispose()` | Permanently stop the process and clean up |
|
||||
| `status` | Current status: `'idle' \| 'running' \| 'paused' \| 'disposed'` |
|
||||
| `status$` | Observable of status transitions |
|
||||
|
||||
### `IActionContext<TState>`
|
||||
|
||||
| Method | Description |
|
||||
|--------|-------------|
|
||||
| `dispatch(action, payload)` | Dispatch a sub-action inline (bypasses mutation queue). Available as the third argument to action definitions |
|
||||
|
||||
### Standalone Functions
|
||||
|
||||
| Function | Description |
|
||||
@@ -379,8 +566,13 @@ Smartstate is built with performance in mind:
|
||||
|------|-------------|
|
||||
| `TInitMode` | `'soft' \| 'mandatory' \| 'force' \| 'persistent'` |
|
||||
| `TMiddleware<TPayload>` | `(newState, oldState) => TPayload \| Promise<TPayload>` |
|
||||
| `IActionDef<TState, TPayload>` | Action definition function signature |
|
||||
| `IActionDef<TState, TPayload>` | Action definition function signature (receives statePart, payload, context?) |
|
||||
| `IActionContext<TState>` | Context for safe nested dispatch within actions |
|
||||
| `IContextProviderOptions<TPayload>` | Options for `attachContextProvider` |
|
||||
| `IProcessOptions<TPayload, TValue>` | Options for `createProcess` (producer, reducer, autoPause, autoStart) |
|
||||
| `IScheduledActionOptions<TPayload, TActionPayload>` | Options for `createScheduledAction` (action, payload, intervalMs, autoPause) |
|
||||
| `TProcessStatus` | `'idle' \| 'running' \| 'paused' \| 'disposed'` |
|
||||
| `TAutoPause` | `'visibility' \| Observable<boolean> \| false` |
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
|
||||
@@ -874,4 +874,122 @@ tap.test('concurrent dispatches still serialize correctly with context feature',
|
||||
expect(part.getState().count).toEqual(10);
|
||||
});
|
||||
|
||||
// ── distinctUntilChanged on selectors ──────────────────────────────────
|
||||
tap.test('select should not emit when selected sub-state has not changed', async () => {
|
||||
type TParts = 'distinctSelector';
|
||||
const state = new smartstate.Smartstate<TParts>();
|
||||
const part = await state.getStatePart<{ name: string; count: number }>('distinctSelector', { name: 'alice', count: 0 });
|
||||
|
||||
const nameSelector = (s: { name: string; count: number }) => s.name;
|
||||
const nameValues: string[] = [];
|
||||
part.select(nameSelector).subscribe((v) => nameValues.push(v));
|
||||
|
||||
// Wait for initial emission
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(nameValues).toHaveLength(1);
|
||||
expect(nameValues[0]).toEqual('alice');
|
||||
|
||||
// Change only count — name selector should NOT re-emit
|
||||
await part.setState({ name: 'alice', count: 1 });
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(nameValues).toHaveLength(1);
|
||||
|
||||
// Change name — name selector SHOULD emit
|
||||
await part.setState({ name: 'bob', count: 1 });
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(nameValues).toHaveLength(2);
|
||||
expect(nameValues[1]).toEqual('bob');
|
||||
});
|
||||
|
||||
// ── Selector error skipping ────────────────────────────────────────────
|
||||
tap.test('selector errors should be skipped, not emit undefined', async () => {
|
||||
type TParts = 'selectorError';
|
||||
const state = new smartstate.Smartstate<TParts>();
|
||||
const part = await state.getStatePart<{ value: number }>('selectorError', { value: 1 });
|
||||
|
||||
let callCount = 0;
|
||||
const faultySelector = (s: { value: number }) => {
|
||||
if (s.value === 2) throw new Error('selector boom');
|
||||
return s.value;
|
||||
};
|
||||
|
||||
const values: number[] = [];
|
||||
part.select(faultySelector).subscribe((v) => {
|
||||
callCount++;
|
||||
values.push(v);
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(values).toEqual([1]);
|
||||
|
||||
// This setState triggers a selector error — should be skipped, no undefined emitted
|
||||
await part.setState({ value: 2 });
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(values).toEqual([1]); // no new emission
|
||||
expect(callCount).toEqual(1);
|
||||
|
||||
// Normal value again
|
||||
await part.setState({ value: 3 });
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(values).toEqual([1, 3]);
|
||||
});
|
||||
|
||||
// ── Smartstate.dispose() ───────────────────────────────────────────────
|
||||
tap.test('Smartstate.dispose should dispose all state parts', async () => {
|
||||
type TParts = 'partA' | 'partB';
|
||||
const state = new smartstate.Smartstate<TParts>();
|
||||
const partA = await state.getStatePart<{ x: number }>('partA', { x: 1 });
|
||||
const partB = await state.getStatePart<{ y: number }>('partB', { y: 2 });
|
||||
|
||||
let aCompleted = false;
|
||||
let bCompleted = false;
|
||||
partA.select().subscribe({ complete: () => { aCompleted = true; } });
|
||||
partB.select().subscribe({ complete: () => { bCompleted = true; } });
|
||||
|
||||
state.dispose();
|
||||
|
||||
expect(aCompleted).toBeTrue();
|
||||
expect(bCompleted).toBeTrue();
|
||||
});
|
||||
|
||||
// ── Post-dispose setState throws ───────────────────────────────────────
|
||||
tap.test('setState on disposed StatePart should throw', async () => {
|
||||
type TParts = 'disposedPart';
|
||||
const state = new smartstate.Smartstate<TParts>();
|
||||
const part = await state.getStatePart<{ v: number }>('disposedPart', { v: 0 });
|
||||
|
||||
part.dispose();
|
||||
|
||||
let threw = false;
|
||||
try {
|
||||
await part.setState({ v: 1 });
|
||||
} catch (e) {
|
||||
threw = true;
|
||||
expect((e as Error).message).toInclude('disposed');
|
||||
}
|
||||
expect(threw).toBeTrue();
|
||||
});
|
||||
|
||||
// ── Post-dispose dispatchAction throws ─────────────────────────────────
|
||||
tap.test('dispatchAction on disposed StatePart should throw', async () => {
|
||||
type TParts = 'disposedDispatch';
|
||||
const state = new smartstate.Smartstate<TParts>();
|
||||
const part = await state.getStatePart<{ v: number }>('disposedDispatch', { v: 0 });
|
||||
|
||||
const action = part.createAction<number>(async (sp, payload) => {
|
||||
return { v: payload };
|
||||
});
|
||||
|
||||
part.dispose();
|
||||
|
||||
let threw = false;
|
||||
try {
|
||||
await part.dispatchAction(action, 5);
|
||||
} catch (e) {
|
||||
threw = true;
|
||||
expect((e as Error).message).toInclude('disposed');
|
||||
}
|
||||
expect(threw).toBeTrue();
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
|
||||
278
test/test.stateprocess.ts
Normal file
278
test/test.stateprocess.ts
Normal file
@@ -0,0 +1,278 @@
|
||||
import { tap, expect } from '@git.zone/tstest/tapbundle';
|
||||
import * as smartstate from '../ts/index.js';
|
||||
import { Subject, of, Observable, throwError, concat } from 'rxjs';
|
||||
|
||||
// ── Lifecycle ──────────────────────────────────────────────────────────
|
||||
tap.test('process should start in idle status', async () => {
|
||||
const state = new smartstate.Smartstate<'test'>();
|
||||
const part = await state.getStatePart<{ v: number }>('test', { v: 0 });
|
||||
const process = part.createProcess<number>({
|
||||
producer: () => of(1),
|
||||
reducer: (s, v) => ({ v: s.v + v }),
|
||||
});
|
||||
expect(process.status).toEqual('idle');
|
||||
process.dispose();
|
||||
});
|
||||
|
||||
tap.test('start/pause/resume/dispose lifecycle', async () => {
|
||||
const state = new smartstate.Smartstate<'lifecycle'>();
|
||||
const part = await state.getStatePart<{ v: number }>('lifecycle', { v: 0 });
|
||||
const subject = new Subject<number>();
|
||||
const process = part.createProcess<number>({
|
||||
producer: () => subject.asObservable(),
|
||||
reducer: (s, v) => ({ v: s.v + v }),
|
||||
});
|
||||
|
||||
expect(process.status).toEqual('idle');
|
||||
process.start();
|
||||
expect(process.status).toEqual('running');
|
||||
process.pause();
|
||||
expect(process.status).toEqual('paused');
|
||||
process.resume();
|
||||
expect(process.status).toEqual('running');
|
||||
process.dispose();
|
||||
expect(process.status).toEqual('disposed');
|
||||
});
|
||||
|
||||
// ── Producer → state integration ───────────────────────────────────────
|
||||
tap.test('producer values should update state through reducer', async () => {
|
||||
const state = new smartstate.Smartstate<'producer'>();
|
||||
const part = await state.getStatePart<{ values: number[] }>('producer', { values: [] });
|
||||
const subject = new Subject<number>();
|
||||
|
||||
const process = part.createProcess<number>({
|
||||
producer: () => subject.asObservable(),
|
||||
reducer: (s, v) => ({ values: [...s.values, v] }),
|
||||
});
|
||||
process.start();
|
||||
|
||||
subject.next(1);
|
||||
subject.next(2);
|
||||
subject.next(3);
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
|
||||
expect(part.getState()!.values).toEqual([1, 2, 3]);
|
||||
process.dispose();
|
||||
});
|
||||
|
||||
// ── Pause stops producer, resume restarts ──────────────────────────────
|
||||
tap.test('pause should stop receiving values, resume should restart', async () => {
|
||||
const state = new smartstate.Smartstate<'pauseResume'>();
|
||||
const part = await state.getStatePart<{ count: number }>('pauseResume', { count: 0 });
|
||||
const subject = new Subject<number>();
|
||||
|
||||
const process = part.createProcess<number>({
|
||||
producer: () => subject.asObservable(),
|
||||
reducer: (s, v) => ({ count: s.count + v }),
|
||||
});
|
||||
process.start();
|
||||
|
||||
subject.next(1);
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(part.getState()!.count).toEqual(1);
|
||||
|
||||
process.pause();
|
||||
subject.next(1); // should be ignored — producer unsubscribed
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(part.getState()!.count).toEqual(1); // unchanged
|
||||
|
||||
process.resume();
|
||||
subject.next(1);
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(part.getState()!.count).toEqual(2);
|
||||
|
||||
process.dispose();
|
||||
});
|
||||
|
||||
// ── Auto-pause with custom Observable ──────────────────────────────────
|
||||
tap.test('auto-pause with custom Observable<boolean> signal', async () => {
|
||||
const state = new smartstate.Smartstate<'autoPause'>();
|
||||
const part = await state.getStatePart<{ count: number }>('autoPause', { count: 0 });
|
||||
const producer = new Subject<number>();
|
||||
const pauseSignal = new Subject<boolean>();
|
||||
|
||||
const process = part.createProcess<number>({
|
||||
producer: () => producer.asObservable(),
|
||||
reducer: (s, v) => ({ count: s.count + v }),
|
||||
autoPause: pauseSignal.asObservable(),
|
||||
});
|
||||
process.start();
|
||||
|
||||
producer.next(1);
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(part.getState()!.count).toEqual(1);
|
||||
|
||||
// Signal pause
|
||||
pauseSignal.next(false);
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(process.status).toEqual('paused');
|
||||
|
||||
producer.next(1); // ignored
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(part.getState()!.count).toEqual(1);
|
||||
|
||||
// Signal resume
|
||||
pauseSignal.next(true);
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(process.status).toEqual('running');
|
||||
|
||||
producer.next(1);
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(part.getState()!.count).toEqual(2);
|
||||
|
||||
process.dispose();
|
||||
});
|
||||
|
||||
// ── Auto-pause 'visibility' in Node.js (no document) ──────────────────
|
||||
tap.test('autoPause visibility should be always-active in Node.js', async () => {
|
||||
const state = new smartstate.Smartstate<'vis'>();
|
||||
const part = await state.getStatePart<{ v: number }>('vis', { v: 0 });
|
||||
const subject = new Subject<number>();
|
||||
|
||||
const process = part.createProcess<number>({
|
||||
producer: () => subject.asObservable(),
|
||||
reducer: (s, v) => ({ v: v }),
|
||||
autoPause: 'visibility',
|
||||
});
|
||||
process.start();
|
||||
expect(process.status).toEqual('running');
|
||||
|
||||
subject.next(42);
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(part.getState()!.v).toEqual(42);
|
||||
process.dispose();
|
||||
});
|
||||
|
||||
// ── Scheduled action ───────────────────────────────────────────────────
|
||||
tap.test('createScheduledAction should dispatch action on interval', async () => {
|
||||
const state = new smartstate.Smartstate<'scheduled'>();
|
||||
const part = await state.getStatePart<{ ticks: number }>('scheduled', { ticks: 0 });
|
||||
|
||||
const tickAction = part.createAction<void>(async (sp) => {
|
||||
return { ticks: sp.getState()!.ticks + 1 };
|
||||
});
|
||||
|
||||
const scheduled = part.createScheduledAction({
|
||||
action: tickAction,
|
||||
payload: undefined,
|
||||
intervalMs: 50,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 280));
|
||||
scheduled.dispose();
|
||||
|
||||
// Should have ticked at least 3 times in ~280ms with 50ms interval
|
||||
expect(part.getState()!.ticks).toBeGreaterThanOrEqual(3);
|
||||
});
|
||||
|
||||
// ── StatePart.dispose cascades ─────────────────────────────────────────
|
||||
tap.test('StatePart.dispose should dispose all processes', async () => {
|
||||
const state = new smartstate.Smartstate<'cascade'>();
|
||||
const part = await state.getStatePart<{ v: number }>('cascade', { v: 0 });
|
||||
|
||||
const p1 = part.createProcess<number>({
|
||||
producer: () => new Subject<number>().asObservable(),
|
||||
reducer: (s, v) => ({ v }),
|
||||
});
|
||||
const p2 = part.createProcess<number>({
|
||||
producer: () => new Subject<number>().asObservable(),
|
||||
reducer: (s, v) => ({ v }),
|
||||
});
|
||||
p1.start();
|
||||
p2.start();
|
||||
|
||||
part.dispose();
|
||||
expect(p1.status).toEqual('disposed');
|
||||
expect(p2.status).toEqual('disposed');
|
||||
});
|
||||
|
||||
// ── status$ observable ─────────────────────────────────────────────────
|
||||
tap.test('status$ should emit lifecycle transitions', async () => {
|
||||
const state = new smartstate.Smartstate<'status$'>();
|
||||
const part = await state.getStatePart<{ v: number }>('status$', { v: 0 });
|
||||
const subject = new Subject<number>();
|
||||
|
||||
const process = part.createProcess<number>({
|
||||
producer: () => subject.asObservable(),
|
||||
reducer: (s, v) => ({ v }),
|
||||
});
|
||||
|
||||
const statuses: string[] = [];
|
||||
process.status$.subscribe((s) => statuses.push(s));
|
||||
|
||||
process.start();
|
||||
process.pause();
|
||||
process.resume();
|
||||
process.dispose();
|
||||
|
||||
expect(statuses).toEqual(['idle', 'running', 'paused', 'running', 'disposed']);
|
||||
});
|
||||
|
||||
// ── Producer error → graceful pause ────────────────────────────────────
|
||||
tap.test('producer error should pause process gracefully', async () => {
|
||||
const state = new smartstate.Smartstate<'error'>();
|
||||
const part = await state.getStatePart<{ v: number }>('error', { v: 0 });
|
||||
|
||||
let callCount = 0;
|
||||
const process = part.createProcess<number>({
|
||||
producer: () => {
|
||||
callCount++;
|
||||
if (callCount === 1) {
|
||||
// First subscription: emit 1, then error
|
||||
return concat(of(1), throwError(() => new Error('boom')));
|
||||
}
|
||||
// After resume: emit 2 successfully
|
||||
return of(2);
|
||||
},
|
||||
reducer: (s, v) => ({ v }),
|
||||
});
|
||||
process.start();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
expect(process.status).toEqual('paused');
|
||||
expect(part.getState()!.v).toEqual(1); // got the value before error
|
||||
|
||||
// Resume creates a fresh subscription
|
||||
process.resume();
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(part.getState()!.v).toEqual(2);
|
||||
|
||||
process.dispose();
|
||||
});
|
||||
|
||||
// ── Disposed guards ────────────────────────────────────────────────────
|
||||
tap.test('start/pause/resume on disposed process should throw', async () => {
|
||||
const state = new smartstate.Smartstate<'guards'>();
|
||||
const part = await state.getStatePart<{ v: number }>('guards', { v: 0 });
|
||||
|
||||
const process = part.createProcess<number>({
|
||||
producer: () => of(1),
|
||||
reducer: (s, v) => ({ v }),
|
||||
});
|
||||
process.dispose();
|
||||
|
||||
let errors = 0;
|
||||
try { process.start(); } catch { errors++; }
|
||||
try { process.pause(); } catch { errors++; }
|
||||
try { process.resume(); } catch { errors++; }
|
||||
expect(errors).toEqual(3);
|
||||
});
|
||||
|
||||
// ── autoStart option ───────────────────────────────────────────────────
|
||||
tap.test('autoStart should start process immediately', async () => {
|
||||
const state = new smartstate.Smartstate<'autoStart'>();
|
||||
const part = await state.getStatePart<{ v: number }>('autoStart', { v: 0 });
|
||||
|
||||
const process = part.createProcess<number>({
|
||||
producer: () => of(42),
|
||||
reducer: (s, v) => ({ v }),
|
||||
autoStart: true,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
expect(process.status).toEqual('running');
|
||||
expect(part.getState()!.v).toEqual(42);
|
||||
process.dispose();
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstate',
|
||||
version: '2.2.0',
|
||||
version: '2.3.0',
|
||||
description: 'A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support.'
|
||||
}
|
||||
|
||||
@@ -3,3 +3,4 @@ export * from './smartstate.classes.statepart.js';
|
||||
export * from './smartstate.classes.stateaction.js';
|
||||
export * from './smartstate.classes.computed.js';
|
||||
export * from './smartstate.contextprovider.js';
|
||||
export * from './smartstate.classes.stateprocess.js';
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import * as plugins from './smartstate.plugins.js';
|
||||
import { combineLatest, map } from 'rxjs';
|
||||
import { combineLatest, map, distinctUntilChanged } from 'rxjs';
|
||||
import type { StatePart } from './smartstate.classes.statepart.js';
|
||||
|
||||
/**
|
||||
@@ -12,5 +12,6 @@ export function computed<TResult>(
|
||||
): plugins.smartrx.rxjs.Observable<TResult> {
|
||||
return combineLatest(sources.map((sp) => sp.select())).pipe(
|
||||
map((states) => computeFn(...states)),
|
||||
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
|
||||
) as plugins.smartrx.rxjs.Observable<TResult>;
|
||||
}
|
||||
|
||||
@@ -49,7 +49,11 @@ export class Smartstate<StatePartNameType extends string> {
|
||||
const pending = [...this.pendingNotifications];
|
||||
this.pendingNotifications.clear();
|
||||
for (const sp of pending) {
|
||||
try {
|
||||
await sp.notifyChange();
|
||||
} catch (err) {
|
||||
console.error(`Error flushing notification for state part:`, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@@ -69,6 +73,21 @@ export class Smartstate<StatePartNameType extends string> {
|
||||
return computed(sources, computeFn);
|
||||
}
|
||||
|
||||
/**
|
||||
* disposes all state parts and clears internal state
|
||||
*/
|
||||
public dispose(): void {
|
||||
for (const key of Object.keys(this.statePartMap)) {
|
||||
const part = this.statePartMap[key as StatePartNameType];
|
||||
if (part) {
|
||||
part.dispose();
|
||||
}
|
||||
}
|
||||
this.statePartMap = {} as any;
|
||||
this.pendingStatePartCreation.clear();
|
||||
this.pendingNotifications.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows getting and initializing a new statepart
|
||||
*/
|
||||
@@ -107,7 +126,7 @@ export class Smartstate<StatePartNameType extends string> {
|
||||
}
|
||||
}
|
||||
|
||||
const creationPromise = this.createStatePart<PayloadType>(statePartNameArg, initialArg, initMode);
|
||||
const creationPromise = this.createStatePart<PayloadType>(statePartNameArg, initialArg!, initMode);
|
||||
this.pendingStatePartCreation.set(statePartNameArg, creationPromise);
|
||||
|
||||
try {
|
||||
@@ -133,7 +152,7 @@ export class Smartstate<StatePartNameType extends string> {
|
||||
dbName: 'smartstate',
|
||||
storeName: statePartName,
|
||||
}
|
||||
: null
|
||||
: undefined
|
||||
);
|
||||
newState.smartstateRef = this;
|
||||
await newState.init();
|
||||
|
||||
@@ -18,8 +18,8 @@ export interface IActionDef<TStateType, TActionPayloadType> {
|
||||
*/
|
||||
export class StateAction<TStateType, TActionPayloadType> {
|
||||
constructor(
|
||||
public statePartRef: StatePart<any, any>,
|
||||
public actionDef: IActionDef<TStateType, TActionPayloadType>
|
||||
public readonly statePartRef: StatePart<any, any>,
|
||||
public readonly actionDef: IActionDef<TStateType, TActionPayloadType>
|
||||
) {}
|
||||
|
||||
public trigger(payload: TActionPayloadType): Promise<TStateType> {
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import * as plugins from './smartstate.plugins.js';
|
||||
import { Observable, shareReplay, takeUntil } from 'rxjs';
|
||||
import { BehaviorSubject, Observable, shareReplay, takeUntil, distinctUntilChanged, interval } from 'rxjs';
|
||||
import { StateAction, type IActionDef, type IActionContext } from './smartstate.classes.stateaction.js';
|
||||
import type { Smartstate } from './smartstate.classes.smartstate.js';
|
||||
import { StateProcess, type IProcessOptions, type IScheduledActionOptions } from './smartstate.classes.stateprocess.js';
|
||||
|
||||
export type TMiddleware<TPayload> = (
|
||||
newState: TPayload,
|
||||
@@ -31,19 +32,23 @@ export class StatePart<TStatePartName, TStatePayload> {
|
||||
private static readonly MAX_NESTED_DISPATCH_DEPTH = 10;
|
||||
|
||||
public name: TStatePartName;
|
||||
public state = new plugins.smartrx.rxjs.Subject<TStatePayload>();
|
||||
public stateStore: TStatePayload | undefined;
|
||||
private state = new BehaviorSubject<TStatePayload | undefined>(undefined);
|
||||
private stateStore: TStatePayload | undefined;
|
||||
public smartstateRef?: Smartstate<any>;
|
||||
private disposed = false;
|
||||
private cumulativeDeferred = plugins.smartpromise.cumulativeDefer();
|
||||
|
||||
private mutationQueue: Promise<any> = Promise.resolve();
|
||||
private pendingCumulativeNotification: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
private webStoreOptions: plugins.webstore.IWebStoreOptions;
|
||||
private webStoreOptions: plugins.webstore.IWebStoreOptions | undefined;
|
||||
private webStore: plugins.webstore.WebStore<TStatePayload> | null = null;
|
||||
|
||||
private middlewares: TMiddleware<TStatePayload>[] = [];
|
||||
|
||||
// Process tracking
|
||||
private processes: StateProcess<TStatePartName, TStatePayload, any>[] = [];
|
||||
|
||||
// Selector memoization
|
||||
private selectorCache = new WeakMap<Function, plugins.smartrx.rxjs.Observable<any>>();
|
||||
private defaultSelectObservable: plugins.smartrx.rxjs.Observable<TStatePayload> | null = null;
|
||||
@@ -97,6 +102,9 @@ export class StatePart<TStatePartName, TStatePayload> {
|
||||
* sets the stateStore to the new state (serialized via mutation queue)
|
||||
*/
|
||||
public async setState(newStateArg: TStatePayload): Promise<TStatePayload> {
|
||||
if (this.disposed) {
|
||||
throw new Error(`StatePart '${this.name}' has been disposed`);
|
||||
}
|
||||
return this.mutationQueue = this.mutationQueue.then(
|
||||
() => this.applyState(newStateArg),
|
||||
() => this.applyState(newStateArg),
|
||||
@@ -212,22 +220,24 @@ export class StatePart<TStatePartName, TStatePayload> {
|
||||
}
|
||||
|
||||
const effectiveSelectorFn = selectorFn || ((state: TStatePayload) => <T>(<any>state));
|
||||
const SELECTOR_ERROR: unique symbol = Symbol('selector-error');
|
||||
|
||||
let mapped = this.state.pipe(
|
||||
plugins.smartrx.rxjs.ops.startWith(this.getState()),
|
||||
plugins.smartrx.rxjs.ops.filter((stateArg): stateArg is TStatePayload => stateArg !== undefined),
|
||||
plugins.smartrx.rxjs.ops.map((stateArg) => {
|
||||
try {
|
||||
return effectiveSelectorFn(stateArg);
|
||||
} catch (e) {
|
||||
console.error(`Selector error in state part '${this.name}':`, e);
|
||||
return undefined;
|
||||
return SELECTOR_ERROR as any;
|
||||
}
|
||||
})
|
||||
}),
|
||||
plugins.smartrx.rxjs.ops.filter((v: any) => v !== SELECTOR_ERROR),
|
||||
distinctUntilChanged((a: any, b: any) => JSON.stringify(a) === JSON.stringify(b)),
|
||||
);
|
||||
|
||||
if (hasSignal) {
|
||||
mapped = mapped.pipe(takeUntil(fromAbortSignal(options.signal)));
|
||||
mapped = mapped.pipe(takeUntil(fromAbortSignal(options.signal!)));
|
||||
return mapped;
|
||||
}
|
||||
|
||||
@@ -277,19 +287,16 @@ export class StatePart<TStatePartName, TStatePayload> {
|
||||
* dispatches an action on the statepart level
|
||||
*/
|
||||
public async dispatchAction<T>(stateAction: StateAction<TStatePayload, T>, actionPayload: T): Promise<TStatePayload> {
|
||||
if (this.disposed) {
|
||||
throw new Error(`StatePart '${this.name}' has been disposed`);
|
||||
}
|
||||
await this.cumulativeDeferred.promise;
|
||||
return this.mutationQueue = this.mutationQueue.then(
|
||||
async () => {
|
||||
const execute = async () => {
|
||||
const context = this.createActionContext(0);
|
||||
const newState = await stateAction.actionDef(this, actionPayload, context);
|
||||
return this.applyState(newState);
|
||||
},
|
||||
async () => {
|
||||
const context = this.createActionContext(0);
|
||||
const newState = await stateAction.actionDef(this, actionPayload, context);
|
||||
return this.applyState(newState);
|
||||
},
|
||||
);
|
||||
};
|
||||
return this.mutationQueue = this.mutationQueue.then(execute, execute);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -374,11 +381,68 @@ export class StatePart<TStatePartName, TStatePayload> {
|
||||
await this.setState(await resultPromise);
|
||||
}
|
||||
|
||||
/**
|
||||
* creates a managed, pausable process that ties an observable producer to state updates
|
||||
*/
|
||||
public createProcess<TProducerValue>(
|
||||
options: IProcessOptions<TStatePayload, TProducerValue>
|
||||
): StateProcess<TStatePartName, TStatePayload, TProducerValue> {
|
||||
if (this.disposed) {
|
||||
throw new Error(`StatePart '${this.name}' has been disposed`);
|
||||
}
|
||||
const process = new StateProcess<TStatePartName, TStatePayload, TProducerValue>(this, options);
|
||||
this.processes.push(process);
|
||||
if (options.autoStart) {
|
||||
process.start();
|
||||
}
|
||||
return process;
|
||||
}
|
||||
|
||||
/**
|
||||
* creates a process that dispatches an action on a recurring interval
|
||||
*/
|
||||
public createScheduledAction<TActionPayload>(
|
||||
options: IScheduledActionOptions<TStatePayload, TActionPayload>
|
||||
): StateProcess<TStatePartName, TStatePayload, number> {
|
||||
if (this.disposed) {
|
||||
throw new Error(`StatePart '${this.name}' has been disposed`);
|
||||
}
|
||||
const process = new StateProcess<TStatePartName, TStatePayload, number>(this, {
|
||||
producer: () => interval(options.intervalMs),
|
||||
sideEffect: async () => {
|
||||
await options.action.trigger(options.payload);
|
||||
},
|
||||
autoPause: options.autoPause ?? false,
|
||||
});
|
||||
this.processes.push(process);
|
||||
process.start();
|
||||
return process;
|
||||
}
|
||||
|
||||
/** @internal — called by StateProcess.dispose() to remove itself */
|
||||
public _removeProcess(process: StateProcess<any, any, any>): void {
|
||||
const idx = this.processes.indexOf(process);
|
||||
if (idx !== -1) {
|
||||
this.processes.splice(idx, 1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* disposes the state part, completing the Subject and cleaning up resources
|
||||
*/
|
||||
public dispose(): void {
|
||||
if (this.disposed) return;
|
||||
this.disposed = true;
|
||||
|
||||
// Dispose all processes first
|
||||
for (const process of [...this.processes]) {
|
||||
process.dispose();
|
||||
}
|
||||
this.processes.length = 0;
|
||||
|
||||
this.state.complete();
|
||||
this.mutationQueue = Promise.resolve() as any;
|
||||
|
||||
if (this.pendingCumulativeNotification) {
|
||||
clearTimeout(this.pendingCumulativeNotification);
|
||||
this.pendingCumulativeNotification = null;
|
||||
@@ -388,5 +452,6 @@ export class StatePart<TStatePartName, TStatePayload> {
|
||||
this.defaultSelectObservable = null;
|
||||
this.webStore = null;
|
||||
this.smartstateRef = undefined;
|
||||
this.stateStore = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
177
ts/smartstate.classes.stateprocess.ts
Normal file
177
ts/smartstate.classes.stateprocess.ts
Normal file
@@ -0,0 +1,177 @@
|
||||
import { BehaviorSubject, Observable, Subscription, of } from 'rxjs';
|
||||
import type { StatePart } from './smartstate.classes.statepart.js';
|
||||
import type { StateAction } from './smartstate.classes.stateaction.js';
|
||||
|
||||
export type TProcessStatus = 'idle' | 'running' | 'paused' | 'disposed';
|
||||
export type TAutoPause = 'visibility' | Observable<boolean> | false;
|
||||
|
||||
export interface IProcessOptions<TStatePayload, TProducerValue> {
|
||||
producer: () => Observable<TProducerValue>;
|
||||
reducer: (currentState: TStatePayload, value: TProducerValue) => TStatePayload;
|
||||
autoPause?: TAutoPause;
|
||||
autoStart?: boolean;
|
||||
}
|
||||
|
||||
export interface IScheduledActionOptions<TStatePayload, TActionPayload> {
|
||||
action: StateAction<TStatePayload, TActionPayload>;
|
||||
payload: TActionPayload;
|
||||
intervalMs: number;
|
||||
autoPause?: TAutoPause;
|
||||
}
|
||||
|
||||
/**
|
||||
* creates an Observable<boolean> from the Page Visibility API.
|
||||
* emits true when the page is visible, false when hidden.
|
||||
* in Node.js (no document), returns an always-true observable.
|
||||
*/
|
||||
function createVisibilityObservable(): Observable<boolean> {
|
||||
if (typeof document === 'undefined') {
|
||||
return of(true);
|
||||
}
|
||||
return new Observable<boolean>((subscriber) => {
|
||||
subscriber.next(!document.hidden);
|
||||
const handler = () => subscriber.next(!document.hidden);
|
||||
document.addEventListener('visibilitychange', handler);
|
||||
return () => document.removeEventListener('visibilitychange', handler);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* a managed, pausable process that ties an observable producer to state updates.
|
||||
* supports lifecycle management (start/pause/resume/dispose) and auto-pause signals.
|
||||
*/
|
||||
export class StateProcess<TStatePartName, TStatePayload, TProducerValue> {
|
||||
private readonly statePartRef: StatePart<TStatePartName, TStatePayload>;
|
||||
private readonly producerFn: () => Observable<TProducerValue>;
|
||||
private readonly reducer?: (currentState: TStatePayload, value: TProducerValue) => TStatePayload;
|
||||
private readonly sideEffect?: (value: TProducerValue) => Promise<void> | void;
|
||||
private readonly autoPauseOption: TAutoPause;
|
||||
|
||||
private statusSubject = new BehaviorSubject<TProcessStatus>('idle');
|
||||
private producerSubscription: Subscription | null = null;
|
||||
private autoPauseSubscription: Subscription | null = null;
|
||||
private processingQueue: Promise<void> = Promise.resolve();
|
||||
|
||||
constructor(
|
||||
statePartRef: StatePart<TStatePartName, TStatePayload>,
|
||||
options: {
|
||||
producer: () => Observable<TProducerValue>;
|
||||
reducer?: (currentState: TStatePayload, value: TProducerValue) => TStatePayload;
|
||||
sideEffect?: (value: TProducerValue) => Promise<void> | void;
|
||||
autoPause?: TAutoPause;
|
||||
}
|
||||
) {
|
||||
this.statePartRef = statePartRef;
|
||||
this.producerFn = options.producer;
|
||||
this.reducer = options.reducer;
|
||||
this.sideEffect = options.sideEffect;
|
||||
this.autoPauseOption = options.autoPause ?? false;
|
||||
}
|
||||
|
||||
public get status(): TProcessStatus {
|
||||
return this.statusSubject.getValue();
|
||||
}
|
||||
|
||||
public get status$(): Observable<TProcessStatus> {
|
||||
return this.statusSubject.asObservable();
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
if (this.status === 'disposed') {
|
||||
throw new Error('Cannot start a disposed process');
|
||||
}
|
||||
if (this.status === 'running') return;
|
||||
this.statusSubject.next('running');
|
||||
this.subscribeProducer();
|
||||
this.setupAutoPause();
|
||||
}
|
||||
|
||||
public pause(): void {
|
||||
if (this.status === 'disposed') {
|
||||
throw new Error('Cannot pause a disposed process');
|
||||
}
|
||||
if (this.status !== 'running') return;
|
||||
this.statusSubject.next('paused');
|
||||
this.unsubscribeProducer();
|
||||
}
|
||||
|
||||
public resume(): void {
|
||||
if (this.status === 'disposed') {
|
||||
throw new Error('Cannot resume a disposed process');
|
||||
}
|
||||
if (this.status !== 'paused') return;
|
||||
this.statusSubject.next('running');
|
||||
this.subscribeProducer();
|
||||
}
|
||||
|
||||
public dispose(): void {
|
||||
if (this.status === 'disposed') return;
|
||||
this.unsubscribeProducer();
|
||||
this.teardownAutoPause();
|
||||
this.statusSubject.next('disposed');
|
||||
this.statusSubject.complete();
|
||||
this.statePartRef._removeProcess(this);
|
||||
}
|
||||
|
||||
private subscribeProducer(): void {
|
||||
this.unsubscribeProducer();
|
||||
const source = this.producerFn();
|
||||
this.producerSubscription = source.subscribe({
|
||||
next: (value) => {
|
||||
// Queue value processing to ensure each reads fresh state after the previous completes
|
||||
this.processingQueue = this.processingQueue.then(async () => {
|
||||
try {
|
||||
if (this.sideEffect) {
|
||||
await this.sideEffect(value);
|
||||
} else if (this.reducer) {
|
||||
const currentState = this.statePartRef.getState();
|
||||
if (currentState !== undefined) {
|
||||
await this.statePartRef.setState(this.reducer(currentState, value));
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('StateProcess value handling error:', err);
|
||||
}
|
||||
});
|
||||
},
|
||||
error: (err) => {
|
||||
console.error('StateProcess producer error:', err);
|
||||
if (this.status === 'running') {
|
||||
this.statusSubject.next('paused');
|
||||
this.unsubscribeProducer();
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private unsubscribeProducer(): void {
|
||||
if (this.producerSubscription) {
|
||||
this.producerSubscription.unsubscribe();
|
||||
this.producerSubscription = null;
|
||||
}
|
||||
}
|
||||
|
||||
private setupAutoPause(): void {
|
||||
this.teardownAutoPause();
|
||||
if (!this.autoPauseOption) return;
|
||||
|
||||
const signal$ = this.autoPauseOption === 'visibility'
|
||||
? createVisibilityObservable()
|
||||
: this.autoPauseOption;
|
||||
|
||||
this.autoPauseSubscription = signal$.subscribe((active) => {
|
||||
if (!active && this.status === 'running') {
|
||||
this.pause();
|
||||
} else if (active && this.status === 'paused') {
|
||||
this.resume();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private teardownAutoPause(): void {
|
||||
if (this.autoPauseSubscription) {
|
||||
this.autoPauseSubscription.unsubscribe();
|
||||
this.autoPauseSubscription = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,8 @@
|
||||
"module": "NodeNext",
|
||||
"moduleResolution": "NodeNext",
|
||||
"esModuleInterop": true,
|
||||
"verbatimModuleSyntax": true
|
||||
"verbatimModuleSyntax": true,
|
||||
"types": ["node"]
|
||||
},
|
||||
"exclude": [
|
||||
"dist_*/**/*.d.ts"
|
||||
|
||||
Reference in New Issue
Block a user