Compare commits

...

6 Commits

Author SHA1 Message Date
a66518bde8 v2.3.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-27 11:32:49 +00:00
034ae56536 feat(stateprocess): add managed state processes with lifecycle controls, scheduled actions, and disposal safety 2026-03-27 11:32:49 +00:00
b417e3d049 v2.2.1
Some checks failed
Default (tags) / security (push) Successful in 43s
Default (tags) / test (push) Failing after 1m20s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-04 00:03:01 +00:00
2b871402cc fix(smartstate): no changes detected; no version bump required 2026-03-04 00:03:01 +00:00
24c48d8e9b v2.2.0
Some checks failed
Default (tags) / security (push) Successful in 34s
Default (tags) / test (push) Failing after 1m22s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-02 19:11:44 +00:00
9ba75f6f98 feat(actions): add action context for safe nested dispatch with depth limit to prevent deadlocks 2026-03-02 19:11:44 +00:00
16 changed files with 3500 additions and 2285 deletions

View File

@@ -1,7 +1,7 @@
{ {
"json.schemas": [ "json.schemas": [
{ {
"fileMatch": ["/npmextra.json"], "fileMatch": ["/.smartconfig.json"],
"schema": { "schema": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -1,5 +1,30 @@
# Changelog # Changelog
## 2026-03-27 - 2.3.0 - feat(stateprocess)
add managed state processes with lifecycle controls, scheduled actions, and disposal safety
- introduces StateProcess with start, pause, resume, dispose, status, and auto-pause support
- adds createProcess() and createScheduledAction() on StatePart for polling, streams, and recurring actions
- adds disposal guards and Smartstate.dispose() to clean up state parts and attached processes
- improves selector and computed observables with distinct-until-changed behavior and skipped selector error emissions
- renames npmextra.json to .smartconfig.json and updates package tooling dependencies
## 2026-03-04 - 2.2.1 - fix(smartstate)
no changes detected; no version bump required
- Git diff shows no changes
- package.json version is 2.2.0
- No files modified — no release needed
## 2026-03-02 - 2.2.0 - feat(actions)
add action context for safe nested dispatch with depth limit to prevent deadlocks
- Introduce IActionContext to allow actions to dispatch sub-actions inline via context.dispatch
- Update IActionDef signature to accept an optional context parameter for backward compatibility
- Add StatePart.createActionContext and MAX_NESTED_DISPATCH_DEPTH to track and limit nested dispatch depth (throws on circular dispatchs)
- Pass a created context into dispatchAction so actionDefs can safely perform nested dispatches without deadlocking the mutation queue
- Add tests covering re-entrancy, deeply nested dispatch, circular dispatch depth detection, backward compatibility with actions that omit context, and concurrent dispatch serialization
## 2026-02-28 - 2.1.1 - fix(core) ## 2026-02-28 - 2.1.1 - fix(core)
serialize state mutations, fix batch flushing/reentrancy, handle falsy initial values, dispose old StatePart on force, and improve notification/error handling serialize state mutations, fix batch flushing/reentrancy, handle falsy initial values, dispose old StatePart on force, and improve notification/error handling

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartstate", "name": "@push.rocks/smartstate",
"version": "2.1.1", "version": "2.3.0",
"private": false, "private": false,
"description": "A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support.", "description": "A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@@ -14,19 +14,19 @@
"buildDocs": "tsdoc" "buildDocs": "tsdoc"
}, },
"devDependencies": { "devDependencies": {
"@git.zone/tsbuild": "^4.1.2", "@git.zone/tsbuild": "^4.4.0",
"@git.zone/tsbundle": "^2.9.0", "@git.zone/tsbundle": "^2.10.0",
"@git.zone/tsrun": "^2.0.1", "@git.zone/tsrun": "^2.0.2",
"@git.zone/tstest": "^3.1.8", "@git.zone/tstest": "^3.6.1",
"@push.rocks/tapbundle": "^6.0.3", "@push.rocks/tapbundle": "^6.0.3",
"@types/node": "^25.3.2" "@types/node": "^25.5.0"
}, },
"dependencies": { "dependencies": {
"@push.rocks/smarthash": "^3.2.6", "@push.rocks/smarthash": "^3.2.6",
"@push.rocks/smartjson": "^6.0.0", "@push.rocks/smartjson": "^6.0.0",
"@push.rocks/smartpromise": "^4.2.3", "@push.rocks/smartpromise": "^4.2.3",
"@push.rocks/smartrx": "^3.0.10", "@push.rocks/smartrx": "^3.0.10",
"@push.rocks/webstore": "^2.0.20" "@push.rocks/webstore": "^2.0.21"
}, },
"files": [ "files": [
"ts/**/*", "ts/**/*",
@@ -37,7 +37,7 @@
"dist_ts_web/**/*", "dist_ts_web/**/*",
"assets/**/*", "assets/**/*",
"cli.js", "cli.js",
"npmextra.json", ".smartconfig.json",
"readme.md" "readme.md"
], ],
"browserslist": [ "browserslist": [

4655
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

224
readme.md
View File

@@ -1,6 +1,6 @@
# @push.rocks/smartstate # @push.rocks/smartstate
A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support 🚀 A TypeScript-first reactive state management library with processes, middleware, computed state, batching, persistence, and Web Component Context Protocol support 🚀
## Issue Reporting and Security ## Issue Reporting and Security
@@ -48,7 +48,7 @@ await userState.setState({ name: 'Alice', loggedIn: true });
### 🧩 State Parts & Init Modes ### 🧩 State Parts & Init Modes
State parts are isolated, typed units of state. They are the building blocks of your application's state tree. Create them via `getStatePart()`: State parts are isolated, typed units of state the building blocks of your application's state tree. Create them via `getStatePart()`:
```typescript ```typescript
const part = await state.getStatePart<IMyState>(name, initialState, initMode); const part = await state.getStatePart<IMyState>(name, initialState, initMode);
@@ -58,10 +58,10 @@ const part = await state.getStatePart<IMyState>(name, initialState, initMode);
|-----------|----------| |-----------|----------|
| `'soft'` (default) | Returns existing if found, creates new otherwise | | `'soft'` (default) | Returns existing if found, creates new otherwise |
| `'mandatory'` | Throws if state part already exists — useful for ensuring single-initialization | | `'mandatory'` | Throws if state part already exists — useful for ensuring single-initialization |
| `'force'` | Always creates a new state part, overwriting any existing one | | `'force'` | Always creates a new state part, disposing and overwriting any existing one |
| `'persistent'` | Like `'soft'` but automatically persists state to IndexedDB via WebStore | | `'persistent'` | Like `'soft'` but automatically persists state to IndexedDB via WebStore |
You can use either enums or string literal types for state part names: You can use either string literal union types or enums for state part names:
```typescript ```typescript
// String literal types (simpler) // String literal types (simpler)
@@ -82,12 +82,12 @@ const settings = await state.getStatePart('settings', { theme: 'dark', fontSize:
// ✅ Automatically saved to IndexedDB on every setState() // ✅ Automatically saved to IndexedDB on every setState()
// ✅ On next app load, persisted values override defaults // ✅ On next app load, persisted values override defaults
// ✅ Persistence writes complete before in-memory updates (atomic) // ✅ Persistence writes complete before in-memory updates
``` ```
### 🔭 Selecting State ### 🔭 Selecting State
`select()` returns an RxJS Observable that emits the current value immediately and on every subsequent change: `select()` returns an RxJS Observable that emits the current value immediately (via `BehaviorSubject`) and on every subsequent change:
```typescript ```typescript
// Full state // Full state
@@ -99,6 +99,8 @@ userState.select((s) => s.name).subscribe((name) => console.log(name));
Selectors are **memoized** — calling `select(fn)` with the same function reference returns the same cached Observable, shared across all subscribers via `shareReplay`. This means you can call `select(mySelector)` in multiple places without creating duplicate subscriptions. Selectors are **memoized** — calling `select(fn)` with the same function reference returns the same cached Observable, shared across all subscribers via `shareReplay`. This means you can call `select(mySelector)` in multiple places without creating duplicate subscriptions.
**Change detection** is built in: `select()` uses `distinctUntilChanged` with deep JSON comparison, so subscribers only fire when the selected value actually changes. Selecting `s => s.name` won't re-emit when only `s.count` changes.
#### ✂️ AbortSignal Support #### ✂️ AbortSignal Support
Clean up subscriptions without manual `.unsubscribe()` — the modern way: Clean up subscriptions without manual `.unsubscribe()` — the modern way:
@@ -125,7 +127,6 @@ interface ILoginPayload {
} }
const loginAction = userState.createAction<ILoginPayload>(async (statePart, payload) => { const loginAction = userState.createAction<ILoginPayload>(async (statePart, payload) => {
// You have access to the current state via statePart.getState()
const current = statePart.getState(); const current = statePart.getState();
return { ...current, name: payload.username, loggedIn: true }; return { ...current, name: payload.username, loggedIn: true };
}); });
@@ -136,7 +137,157 @@ await loginAction.trigger({ username: 'Alice', email: 'alice@example.com' });
await userState.dispatchAction(loginAction, { username: 'Alice', email: 'alice@example.com' }); await userState.dispatchAction(loginAction, { username: 'Alice', email: 'alice@example.com' });
``` ```
Both `trigger()` and `dispatchAction()` return a Promise with the new state. Both `trigger()` and `dispatchAction()` return a Promise with the new state. All dispatches are serialized through a mutation queue, so concurrent dispatches never cause lost updates.
#### 🔗 Nested Actions (Action Context)
When you need to dispatch sub-actions from within an action, use the `context` parameter. This is critical because calling `dispatchAction()` directly from inside an action would deadlock (it tries to acquire the mutation queue that's already held). The context's `dispatch()` bypasses the queue and executes inline:
```typescript
const incrementAction = userState.createAction<number>(async (statePart, amount) => {
const current = statePart.getState();
return { ...current, count: current.count + amount };
});
const doubleIncrementAction = userState.createAction<number>(async (statePart, amount, context) => {
// ✅ Safe: uses context.dispatch() which bypasses the mutation queue
await context.dispatch(incrementAction, amount);
const current = statePart.getState();
return { ...current, count: current.count + amount };
});
// ❌ DON'T do this inside an action — it will deadlock:
// await statePart.dispatchAction(someAction, payload);
```
A built-in depth limit (10 levels) prevents infinite circular dispatch chains, throwing a clear error if exceeded.
### 🔄 Processes (Polling, Streams & Scheduled Tasks)
Processes are managed, pausable observable-to-state bridges — the "side effects" layer. They tie an ongoing data source (polling, WebSockets, event streams) to state updates with full lifecycle control and optional auto-pause.
#### Basic Process: Polling an API
```typescript
import { interval, switchMap, from } from 'rxjs';
const metricsPoller = dashboard.createProcess<{ cpu: number; memory: number }>({
// Producer: an Observable factory — called on start and each resume
producer: () => interval(5000).pipe(
switchMap(() => from(fetch('/api/metrics').then(r => r.json()))),
),
// Reducer: folds each produced value into state (runs through middleware & validation)
reducer: (currentState, metrics) => ({
...currentState,
metrics,
lastUpdated: Date.now(),
}),
autoPause: 'visibility', // ⏸️ Stop polling when the tab is hidden
autoStart: true, // ▶️ Start immediately
});
// Full lifecycle control
metricsPoller.pause(); // Unsubscribes from producer
metricsPoller.resume(); // Re-subscribes (fresh subscription)
metricsPoller.dispose(); // Permanent cleanup
// Observe status reactively
metricsPoller.status; // 'idle' | 'running' | 'paused' | 'disposed'
metricsPoller.status$.subscribe(s => console.log('Process:', s));
```
#### Scheduled Actions
Dispatch an existing action on a recurring interval — syntactic sugar over `createProcess`:
```typescript
const refreshAction = dashboard.createAction<void>(async (sp) => {
const data = await fetch('/api/dashboard').then(r => r.json());
return { ...sp.getState()!, ...data, lastUpdated: Date.now() };
});
// Dispatches refreshAction every 30 seconds, auto-pauses when tab is hidden
const scheduled = dashboard.createScheduledAction({
action: refreshAction,
payload: undefined,
intervalMs: 30000,
autoPause: 'visibility',
});
// It's a full StateProcess — pause, resume, dispose all work
scheduled.dispose();
```
#### Custom Auto-Pause Signals
Pass any `Observable<boolean>` as the auto-pause signal — `true` means active, `false` means pause:
```typescript
import { fromEvent, map, startWith } from 'rxjs';
// Pause when offline, resume when online
const onlineSignal = fromEvent(window, 'online').pipe(
startWith(null),
map(() => navigator.onLine),
);
const syncProcess = userPart.createProcess<SyncPayload>({
producer: () => interval(10000).pipe(
switchMap(() => from(syncWithServer())),
),
reducer: (state, result) => ({ ...state, ...result }),
autoPause: onlineSignal,
});
syncProcess.start();
```
#### WebSocket / Live Streams
Pause disconnects; resume creates a fresh connection:
```typescript
const liveProcess = tickerPart.createProcess<TradeEvent>({
producer: () => new Observable<TradeEvent>(subscriber => {
const ws = new WebSocket('wss://trades.example.com');
ws.onmessage = (e) => subscriber.next(JSON.parse(e.data));
ws.onerror = (e) => subscriber.error(e);
ws.onclose = () => subscriber.complete();
return () => ws.close(); // Teardown: close WebSocket on unsubscribe
}),
reducer: (state, trade) => ({
...state,
lastPrice: trade.price,
trades: [...state.trades.slice(-99), trade],
}),
autoPause: 'visibility',
});
liveProcess.start();
```
#### Error Recovery
If a producer errors, the process gracefully transitions to `'paused'` instead of dying. Call `resume()` to retry with a fresh subscription:
```typescript
process.start();
// Producer errors → status becomes 'paused'
process.resume(); // Creates a fresh subscription — retry
```
#### Process Cleanup Cascades
Disposing a `StatePart` or `Smartstate` instance automatically disposes all attached processes:
```typescript
const p1 = part.createProcess({ ... });
const p2 = part.createProcess({ ... });
p1.start();
p2.start();
part.dispose();
console.log(p1.status); // 'disposed'
console.log(p2.status); // 'disposed'
```
### 🛡️ Middleware ### 🛡️ Middleware
@@ -171,7 +322,7 @@ const remove = userState.addMiddleware(myMiddleware);
remove(); // middleware no longer runs remove(); // middleware no longer runs
``` ```
Middleware runs **sequentially** in insertion order. If any middleware throws, the state remains unchanged — the operation is **atomic**. Middleware runs **sequentially** in insertion order. If any middleware throws, the state remains unchanged — the operation is **atomic**. Process-driven state updates go through middleware too.
### 🧮 Computed / Derived State ### 🧮 Computed / Derived State
@@ -199,7 +350,7 @@ const greeting2$ = state.computed(
); );
``` ```
Computed observables are **lazy** — they only subscribe to their sources when someone subscribes to them, and they automatically unsubscribe when all subscribers disconnect. Computed observables are **lazy** — they only subscribe to their sources when someone subscribes to them, and they automatically unsubscribe when all subscribers disconnect. They also use `distinctUntilChanged` to avoid redundant emissions when the derived value hasn't actually changed.
### 📦 Batch Updates ### 📦 Batch Updates
@@ -322,15 +473,31 @@ await userState.stateSetup(async (statePart) => {
// Any dispatchAction() calls will automatically wait for stateSetup() to finish // Any dispatchAction() calls will automatically wait for stateSetup() to finish
``` ```
### 🧹 Disposal & Cleanup
Both `Smartstate` and individual `StatePart` instances support disposal for proper cleanup:
```typescript
// Dispose a single state part — completes the BehaviorSubject, clears middleware, caches,
// and disposes all attached processes
userState.dispose();
// Dispose the entire Smartstate instance — disposes all state parts and clears internal maps
state.dispose();
```
After disposal, `setState()` and `dispatchAction()` will throw if called on a disposed `StatePart`. Calling `start()`, `pause()`, or `resume()` on a disposed `StateProcess` also throws.
### 🏎️ Performance ### 🏎️ Performance
Smartstate is built with performance in mind: Smartstate is built with performance in mind:
- **🔒 SHA256 Change Detection** — Uses content hashing to detect actual changes. Identical state values don't trigger notifications, even with different object references. - **🔒 SHA256 Change Detection** — Uses content hashing to detect actual changes. Identical state values don't trigger notifications, even with different object references.
- **🎯 distinctUntilChanged on Selectors** — Sub-selectors only fire when the selected slice actually changes. `select(s => s.name)` won't emit when `s.count` changes.
- **♻️ Selector Memoization** — `select(fn)` caches observables by function reference and shares them via `shareReplay({ refCount: true })`. Multiple subscribers share one upstream subscription. - **♻️ Selector Memoization** — `select(fn)` caches observables by function reference and shares them via `shareReplay({ refCount: true })`. Multiple subscribers share one upstream subscription.
- **📦 Cumulative Notifications** — `notifyChangeCumulative()` debounces rapid changes into a single notification at the end of the call stack. - **📦 Cumulative Notifications** — `notifyChangeCumulative()` debounces rapid changes into a single notification at the end of the call stack.
- **🔐 Concurrent Safety** — Simultaneous `getStatePart()` calls for the same name return the same promise, preventing duplicate creation or race conditions. - **🔐 Concurrent Safety** — Simultaneous `getStatePart()` calls for the same name return the same promise, preventing duplicate creation. All `setState()` and `dispatchAction()` calls are serialized through a mutation queue. Process values are serialized through their own internal queue.
- **💾 Atomic Persistence** — WebStore writes complete before in-memory state updates, ensuring consistency even if the process crashes mid-write. - **💾 Atomic Persistence** — WebStore writes complete before in-memory state updates, ensuring consistency.
- **⏸️ Batch Deferred Notifications** — `batch()` suppresses all subscriber notifications until every update in the batch completes. - **⏸️ Batch Deferred Notifications** — `batch()` suppresses all subscriber notifications until every update in the batch completes.
## API Reference ## API Reference
@@ -342,23 +509,26 @@ Smartstate is built with performance in mind:
| `getStatePart(name, initial?, initMode?)` | Get or create a typed state part | | `getStatePart(name, initial?, initMode?)` | Get or create a typed state part |
| `batch(fn)` | Batch state updates, defer all notifications until complete | | `batch(fn)` | Batch state updates, defer all notifications until complete |
| `computed(sources, fn)` | Create a computed observable from multiple state parts | | `computed(sources, fn)` | Create a computed observable from multiple state parts |
| `dispose()` | Dispose all state parts and clear internal state |
| `isBatching` | `boolean` — whether a batch is currently active | | `isBatching` | `boolean` — whether a batch is currently active |
| `statePartMap` | Registry of all created state parts |
### `StatePart<TName, TPayload>` ### `StatePart<TName, TPayload>`
| Method | Description | | Method | Description |
|--------|-------------| |--------|-------------|
| `getState()` | Get current state (returns `TPayload \| undefined`) | | `getState()` | Get current state synchronously (`TPayload \| undefined`) |
| `setState(newState)` | Set state — runs middleware → validates → persists → notifies | | `setState(newState)` | Set state — runs middleware → validates → persists → notifies |
| `select(selectorFn?, options?)` | Returns an Observable of state or derived values. Options: `{ signal?: AbortSignal }` | | `select(selectorFn?, options?)` | Observable of state or derived values. Options: `{ signal?: AbortSignal }` |
| `createAction(actionDef)` | Create a reusable, typed state action | | `createAction(actionDef)` | Create a reusable, typed state action |
| `dispatchAction(action, payload)` | Dispatch an action and return the new state | | `dispatchAction(action, payload)` | Dispatch an action and return the new state |
| `addMiddleware(fn)` | Add a middleware interceptor. Returns a removal function | | `addMiddleware(fn)` | Add a middleware interceptor. Returns a removal function |
| `waitUntilPresent(selectorFn?, opts?)` | Wait for a state condition. Opts: `number` (timeout) or `{ timeoutMs?, signal? }` | | `waitUntilPresent(selectorFn?, opts?)` | Wait for a state condition. Opts: `number` (timeout) or `{ timeoutMs?, signal? }` |
| `createProcess(options)` | Create a managed, pausable process tied to this state part |
| `createScheduledAction(options)` | Create a process that dispatches an action on a recurring interval |
| `notifyChange()` | Manually trigger a change notification (with hash dedup) | | `notifyChange()` | Manually trigger a change notification (with hash dedup) |
| `notifyChangeCumulative()` | Debounced notification — fires at end of call stack | | `notifyChangeCumulative()` | Debounced notification — fires at end of call stack |
| `stateSetup(fn)` | Async state initialization with action serialization | | `stateSetup(fn)` | Async state initialization with action serialization |
| `dispose()` | Complete the BehaviorSubject, dispose processes, clear middleware and caches |
### `StateAction<TState, TPayload>` ### `StateAction<TState, TPayload>`
@@ -366,6 +536,23 @@ Smartstate is built with performance in mind:
|--------|-------------| |--------|-------------|
| `trigger(payload)` | Dispatch the action on its associated state part | | `trigger(payload)` | Dispatch the action on its associated state part |
### `StateProcess<TName, TPayload, TProducerValue>`
| Method / Property | Description |
|-------------------|-------------|
| `start()` | Start the process (subscribes to producer, sets up auto-pause) |
| `pause()` | Pause the process (unsubscribes from producer) |
| `resume()` | Resume a paused process (fresh subscription to producer) |
| `dispose()` | Permanently stop the process and clean up |
| `status` | Current status: `'idle' \| 'running' \| 'paused' \| 'disposed'` |
| `status$` | Observable of status transitions |
### `IActionContext<TState>`
| Method | Description |
|--------|-------------|
| `dispatch(action, payload)` | Dispatch a sub-action inline (bypasses mutation queue). Available as the third argument to action definitions |
### Standalone Functions ### Standalone Functions
| Function | Description | | Function | Description |
@@ -379,8 +566,13 @@ Smartstate is built with performance in mind:
|------|-------------| |------|-------------|
| `TInitMode` | `'soft' \| 'mandatory' \| 'force' \| 'persistent'` | | `TInitMode` | `'soft' \| 'mandatory' \| 'force' \| 'persistent'` |
| `TMiddleware<TPayload>` | `(newState, oldState) => TPayload \| Promise<TPayload>` | | `TMiddleware<TPayload>` | `(newState, oldState) => TPayload \| Promise<TPayload>` |
| `IActionDef<TState, TPayload>` | Action definition function signature | | `IActionDef<TState, TPayload>` | Action definition function signature (receives statePart, payload, context?) |
| `IActionContext<TState>` | Context for safe nested dispatch within actions |
| `IContextProviderOptions<TPayload>` | Options for `attachContextProvider` | | `IContextProviderOptions<TPayload>` | Options for `attachContextProvider` |
| `IProcessOptions<TPayload, TValue>` | Options for `createProcess` (producer, reducer, autoPause, autoStart) |
| `IScheduledActionOptions<TPayload, TActionPayload>` | Options for `createScheduledAction` (action, payload, intervalMs, autoPause) |
| `TProcessStatus` | `'idle' \| 'running' \| 'paused' \| 'disposed'` |
| `TAutoPause` | `'visibility' \| Observable<boolean> \| false` |
## License and Legal Information ## License and Legal Information

View File

@@ -764,4 +764,232 @@ tap.test('getStatePart should accept 0 as initial value', async () => {
expect(part.getState()).toEqual(0); expect(part.getState()).toEqual(0);
}); });
// ============================
// Action context re-entrancy tests
// ============================
tap.test('context.dispatch should not deadlock on same StatePart', async () => {
type TParts = 'reentrantAction';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ count: number }>('reentrantAction', { count: 0 });
const innerIncrement = part.createAction<void>(async (sp) => {
return { count: sp.getState().count + 1 };
});
const outerAction = part.createAction<void>(async (sp, _payload, context) => {
// This would deadlock without the context.dispatch() mechanism
await context.dispatch(innerIncrement, undefined);
return sp.getState();
});
const result = await part.dispatchAction(outerAction, undefined);
expect(result.count).toEqual(1);
});
tap.test('deeply nested context.dispatch should work (3 levels)', async () => {
type TParts = 'deepNested';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ steps: string[] }>('deepNested', { steps: [] });
const appendStep = part.createAction<string>(async (sp, step) => {
return { steps: [...sp.getState().steps, step] };
});
const level2 = part.createAction<void>(async (sp, _payload, context) => {
await context.dispatch(appendStep, 'level-2');
return sp.getState();
});
const level1 = part.createAction<void>(async (sp, _payload, context) => {
await context.dispatch(appendStep, 'level-1');
await context.dispatch(level2, undefined);
await context.dispatch(appendStep, 'level-1-after');
return sp.getState();
});
await part.dispatchAction(level1, undefined);
expect(part.getState().steps).toEqual(['level-1', 'level-2', 'level-1-after']);
});
tap.test('circular context.dispatch should throw max depth error', async () => {
type TParts = 'circular';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ count: number }>('circular', { count: 0 });
// Create a self-referencing action that will loop forever
const circularAction: smartstate.StateAction<{ count: number }, void> = part.createAction<void>(
async (sp, _payload, context) => {
const current = sp.getState();
if (current.count < 100) {
// This should eventually hit the depth limit
await context.dispatch(circularAction, undefined);
}
return sp.getState();
}
);
let error: Error | null = null;
try {
await part.dispatchAction(circularAction, undefined);
} catch (e) {
error = e as Error;
}
expect(error).not.toBeNull();
expect(error?.message).toMatch(/Maximum nested action dispatch depth/);
});
tap.test('actions without context arg should still work (backward compat)', async () => {
type TParts = 'backwardCompat';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ value: number }>('backwardCompat', { value: 0 });
// Old-style action that doesn't use the context parameter
const simpleAction = part.createAction<number>(async (sp, payload) => {
return { value: payload };
});
await part.dispatchAction(simpleAction, 42);
expect(part.getState().value).toEqual(42);
});
tap.test('concurrent dispatches still serialize correctly with context feature', async () => {
type TParts = 'concurrentWithContext';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ count: number }>('concurrentWithContext', { count: 0 });
const increment = part.createAction<void>(async (sp) => {
const current = sp.getState();
return { count: current.count + 1 };
});
// Fire 10 concurrent dispatches — must still serialize
const promises: Promise<any>[] = [];
for (let i = 0; i < 10; i++) {
promises.push(part.dispatchAction(increment, undefined));
}
await Promise.all(promises);
expect(part.getState().count).toEqual(10);
});
// ── distinctUntilChanged on selectors ──────────────────────────────────
tap.test('select should not emit when selected sub-state has not changed', async () => {
type TParts = 'distinctSelector';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ name: string; count: number }>('distinctSelector', { name: 'alice', count: 0 });
const nameSelector = (s: { name: string; count: number }) => s.name;
const nameValues: string[] = [];
part.select(nameSelector).subscribe((v) => nameValues.push(v));
// Wait for initial emission
await new Promise((r) => setTimeout(r, 50));
expect(nameValues).toHaveLength(1);
expect(nameValues[0]).toEqual('alice');
// Change only count — name selector should NOT re-emit
await part.setState({ name: 'alice', count: 1 });
await new Promise((r) => setTimeout(r, 50));
expect(nameValues).toHaveLength(1);
// Change name — name selector SHOULD emit
await part.setState({ name: 'bob', count: 1 });
await new Promise((r) => setTimeout(r, 50));
expect(nameValues).toHaveLength(2);
expect(nameValues[1]).toEqual('bob');
});
// ── Selector error skipping ────────────────────────────────────────────
tap.test('selector errors should be skipped, not emit undefined', async () => {
type TParts = 'selectorError';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ value: number }>('selectorError', { value: 1 });
let callCount = 0;
const faultySelector = (s: { value: number }) => {
if (s.value === 2) throw new Error('selector boom');
return s.value;
};
const values: number[] = [];
part.select(faultySelector).subscribe((v) => {
callCount++;
values.push(v);
});
await new Promise((r) => setTimeout(r, 50));
expect(values).toEqual([1]);
// This setState triggers a selector error — should be skipped, no undefined emitted
await part.setState({ value: 2 });
await new Promise((r) => setTimeout(r, 50));
expect(values).toEqual([1]); // no new emission
expect(callCount).toEqual(1);
// Normal value again
await part.setState({ value: 3 });
await new Promise((r) => setTimeout(r, 50));
expect(values).toEqual([1, 3]);
});
// ── Smartstate.dispose() ───────────────────────────────────────────────
tap.test('Smartstate.dispose should dispose all state parts', async () => {
type TParts = 'partA' | 'partB';
const state = new smartstate.Smartstate<TParts>();
const partA = await state.getStatePart<{ x: number }>('partA', { x: 1 });
const partB = await state.getStatePart<{ y: number }>('partB', { y: 2 });
let aCompleted = false;
let bCompleted = false;
partA.select().subscribe({ complete: () => { aCompleted = true; } });
partB.select().subscribe({ complete: () => { bCompleted = true; } });
state.dispose();
expect(aCompleted).toBeTrue();
expect(bCompleted).toBeTrue();
});
// ── Post-dispose setState throws ───────────────────────────────────────
tap.test('setState on disposed StatePart should throw', async () => {
type TParts = 'disposedPart';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ v: number }>('disposedPart', { v: 0 });
part.dispose();
let threw = false;
try {
await part.setState({ v: 1 });
} catch (e) {
threw = true;
expect((e as Error).message).toInclude('disposed');
}
expect(threw).toBeTrue();
});
// ── Post-dispose dispatchAction throws ─────────────────────────────────
tap.test('dispatchAction on disposed StatePart should throw', async () => {
type TParts = 'disposedDispatch';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ v: number }>('disposedDispatch', { v: 0 });
const action = part.createAction<number>(async (sp, payload) => {
return { v: payload };
});
part.dispose();
let threw = false;
try {
await part.dispatchAction(action, 5);
} catch (e) {
threw = true;
expect((e as Error).message).toInclude('disposed');
}
expect(threw).toBeTrue();
});
export default tap.start(); export default tap.start();

278
test/test.stateprocess.ts Normal file
View File

@@ -0,0 +1,278 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as smartstate from '../ts/index.js';
import { Subject, of, Observable, throwError, concat } from 'rxjs';
// ── Lifecycle ──────────────────────────────────────────────────────────
tap.test('process should start in idle status', async () => {
const state = new smartstate.Smartstate<'test'>();
const part = await state.getStatePart<{ v: number }>('test', { v: 0 });
const process = part.createProcess<number>({
producer: () => of(1),
reducer: (s, v) => ({ v: s.v + v }),
});
expect(process.status).toEqual('idle');
process.dispose();
});
tap.test('start/pause/resume/dispose lifecycle', async () => {
const state = new smartstate.Smartstate<'lifecycle'>();
const part = await state.getStatePart<{ v: number }>('lifecycle', { v: 0 });
const subject = new Subject<number>();
const process = part.createProcess<number>({
producer: () => subject.asObservable(),
reducer: (s, v) => ({ v: s.v + v }),
});
expect(process.status).toEqual('idle');
process.start();
expect(process.status).toEqual('running');
process.pause();
expect(process.status).toEqual('paused');
process.resume();
expect(process.status).toEqual('running');
process.dispose();
expect(process.status).toEqual('disposed');
});
// ── Producer → state integration ───────────────────────────────────────
tap.test('producer values should update state through reducer', async () => {
const state = new smartstate.Smartstate<'producer'>();
const part = await state.getStatePart<{ values: number[] }>('producer', { values: [] });
const subject = new Subject<number>();
const process = part.createProcess<number>({
producer: () => subject.asObservable(),
reducer: (s, v) => ({ values: [...s.values, v] }),
});
process.start();
subject.next(1);
subject.next(2);
subject.next(3);
await new Promise((r) => setTimeout(r, 100));
expect(part.getState()!.values).toEqual([1, 2, 3]);
process.dispose();
});
// ── Pause stops producer, resume restarts ──────────────────────────────
tap.test('pause should stop receiving values, resume should restart', async () => {
const state = new smartstate.Smartstate<'pauseResume'>();
const part = await state.getStatePart<{ count: number }>('pauseResume', { count: 0 });
const subject = new Subject<number>();
const process = part.createProcess<number>({
producer: () => subject.asObservable(),
reducer: (s, v) => ({ count: s.count + v }),
});
process.start();
subject.next(1);
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(1);
process.pause();
subject.next(1); // should be ignored — producer unsubscribed
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(1); // unchanged
process.resume();
subject.next(1);
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(2);
process.dispose();
});
// ── Auto-pause with custom Observable ──────────────────────────────────
tap.test('auto-pause with custom Observable<boolean> signal', async () => {
const state = new smartstate.Smartstate<'autoPause'>();
const part = await state.getStatePart<{ count: number }>('autoPause', { count: 0 });
const producer = new Subject<number>();
const pauseSignal = new Subject<boolean>();
const process = part.createProcess<number>({
producer: () => producer.asObservable(),
reducer: (s, v) => ({ count: s.count + v }),
autoPause: pauseSignal.asObservable(),
});
process.start();
producer.next(1);
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(1);
// Signal pause
pauseSignal.next(false);
await new Promise((r) => setTimeout(r, 50));
expect(process.status).toEqual('paused');
producer.next(1); // ignored
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(1);
// Signal resume
pauseSignal.next(true);
await new Promise((r) => setTimeout(r, 50));
expect(process.status).toEqual('running');
producer.next(1);
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(2);
process.dispose();
});
// ── Auto-pause 'visibility' in Node.js (no document) ──────────────────
tap.test('autoPause visibility should be always-active in Node.js', async () => {
const state = new smartstate.Smartstate<'vis'>();
const part = await state.getStatePart<{ v: number }>('vis', { v: 0 });
const subject = new Subject<number>();
const process = part.createProcess<number>({
producer: () => subject.asObservable(),
reducer: (s, v) => ({ v: v }),
autoPause: 'visibility',
});
process.start();
expect(process.status).toEqual('running');
subject.next(42);
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.v).toEqual(42);
process.dispose();
});
// ── Scheduled action ───────────────────────────────────────────────────
tap.test('createScheduledAction should dispatch action on interval', async () => {
const state = new smartstate.Smartstate<'scheduled'>();
const part = await state.getStatePart<{ ticks: number }>('scheduled', { ticks: 0 });
const tickAction = part.createAction<void>(async (sp) => {
return { ticks: sp.getState()!.ticks + 1 };
});
const scheduled = part.createScheduledAction({
action: tickAction,
payload: undefined,
intervalMs: 50,
});
await new Promise((r) => setTimeout(r, 280));
scheduled.dispose();
// Should have ticked at least 3 times in ~280ms with 50ms interval
expect(part.getState()!.ticks).toBeGreaterThanOrEqual(3);
});
// ── StatePart.dispose cascades ─────────────────────────────────────────
tap.test('StatePart.dispose should dispose all processes', async () => {
const state = new smartstate.Smartstate<'cascade'>();
const part = await state.getStatePart<{ v: number }>('cascade', { v: 0 });
const p1 = part.createProcess<number>({
producer: () => new Subject<number>().asObservable(),
reducer: (s, v) => ({ v }),
});
const p2 = part.createProcess<number>({
producer: () => new Subject<number>().asObservable(),
reducer: (s, v) => ({ v }),
});
p1.start();
p2.start();
part.dispose();
expect(p1.status).toEqual('disposed');
expect(p2.status).toEqual('disposed');
});
// ── status$ observable ─────────────────────────────────────────────────
tap.test('status$ should emit lifecycle transitions', async () => {
const state = new smartstate.Smartstate<'status$'>();
const part = await state.getStatePart<{ v: number }>('status$', { v: 0 });
const subject = new Subject<number>();
const process = part.createProcess<number>({
producer: () => subject.asObservable(),
reducer: (s, v) => ({ v }),
});
const statuses: string[] = [];
process.status$.subscribe((s) => statuses.push(s));
process.start();
process.pause();
process.resume();
process.dispose();
expect(statuses).toEqual(['idle', 'running', 'paused', 'running', 'disposed']);
});
// ── Producer error → graceful pause ────────────────────────────────────
tap.test('producer error should pause process gracefully', async () => {
const state = new smartstate.Smartstate<'error'>();
const part = await state.getStatePart<{ v: number }>('error', { v: 0 });
let callCount = 0;
const process = part.createProcess<number>({
producer: () => {
callCount++;
if (callCount === 1) {
// First subscription: emit 1, then error
return concat(of(1), throwError(() => new Error('boom')));
}
// After resume: emit 2 successfully
return of(2);
},
reducer: (s, v) => ({ v }),
});
process.start();
await new Promise((r) => setTimeout(r, 50));
expect(process.status).toEqual('paused');
expect(part.getState()!.v).toEqual(1); // got the value before error
// Resume creates a fresh subscription
process.resume();
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.v).toEqual(2);
process.dispose();
});
// ── Disposed guards ────────────────────────────────────────────────────
tap.test('start/pause/resume on disposed process should throw', async () => {
const state = new smartstate.Smartstate<'guards'>();
const part = await state.getStatePart<{ v: number }>('guards', { v: 0 });
const process = part.createProcess<number>({
producer: () => of(1),
reducer: (s, v) => ({ v }),
});
process.dispose();
let errors = 0;
try { process.start(); } catch { errors++; }
try { process.pause(); } catch { errors++; }
try { process.resume(); } catch { errors++; }
expect(errors).toEqual(3);
});
// ── autoStart option ───────────────────────────────────────────────────
tap.test('autoStart should start process immediately', async () => {
const state = new smartstate.Smartstate<'autoStart'>();
const part = await state.getStatePart<{ v: number }>('autoStart', { v: 0 });
const process = part.createProcess<number>({
producer: () => of(42),
reducer: (s, v) => ({ v }),
autoStart: true,
});
await new Promise((r) => setTimeout(r, 50));
expect(process.status).toEqual('running');
expect(part.getState()!.v).toEqual(42);
process.dispose();
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstate', name: '@push.rocks/smartstate',
version: '2.1.1', version: '2.3.0',
description: 'A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support.' description: 'A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support.'
} }

View File

@@ -3,3 +3,4 @@ export * from './smartstate.classes.statepart.js';
export * from './smartstate.classes.stateaction.js'; export * from './smartstate.classes.stateaction.js';
export * from './smartstate.classes.computed.js'; export * from './smartstate.classes.computed.js';
export * from './smartstate.contextprovider.js'; export * from './smartstate.contextprovider.js';
export * from './smartstate.classes.stateprocess.js';

View File

@@ -1,5 +1,5 @@
import * as plugins from './smartstate.plugins.js'; import * as plugins from './smartstate.plugins.js';
import { combineLatest, map } from 'rxjs'; import { combineLatest, map, distinctUntilChanged } from 'rxjs';
import type { StatePart } from './smartstate.classes.statepart.js'; import type { StatePart } from './smartstate.classes.statepart.js';
/** /**
@@ -12,5 +12,6 @@ export function computed<TResult>(
): plugins.smartrx.rxjs.Observable<TResult> { ): plugins.smartrx.rxjs.Observable<TResult> {
return combineLatest(sources.map((sp) => sp.select())).pipe( return combineLatest(sources.map((sp) => sp.select())).pipe(
map((states) => computeFn(...states)), map((states) => computeFn(...states)),
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
) as plugins.smartrx.rxjs.Observable<TResult>; ) as plugins.smartrx.rxjs.Observable<TResult>;
} }

View File

@@ -49,7 +49,11 @@ export class Smartstate<StatePartNameType extends string> {
const pending = [...this.pendingNotifications]; const pending = [...this.pendingNotifications];
this.pendingNotifications.clear(); this.pendingNotifications.clear();
for (const sp of pending) { for (const sp of pending) {
try {
await sp.notifyChange(); await sp.notifyChange();
} catch (err) {
console.error(`Error flushing notification for state part:`, err);
}
} }
} }
} finally { } finally {
@@ -69,6 +73,21 @@ export class Smartstate<StatePartNameType extends string> {
return computed(sources, computeFn); return computed(sources, computeFn);
} }
/**
* disposes all state parts and clears internal state
*/
public dispose(): void {
for (const key of Object.keys(this.statePartMap)) {
const part = this.statePartMap[key as StatePartNameType];
if (part) {
part.dispose();
}
}
this.statePartMap = {} as any;
this.pendingStatePartCreation.clear();
this.pendingNotifications.clear();
}
/** /**
* Allows getting and initializing a new statepart * Allows getting and initializing a new statepart
*/ */
@@ -107,7 +126,7 @@ export class Smartstate<StatePartNameType extends string> {
} }
} }
const creationPromise = this.createStatePart<PayloadType>(statePartNameArg, initialArg, initMode); const creationPromise = this.createStatePart<PayloadType>(statePartNameArg, initialArg!, initMode);
this.pendingStatePartCreation.set(statePartNameArg, creationPromise); this.pendingStatePartCreation.set(statePartNameArg, creationPromise);
try { try {
@@ -133,7 +152,7 @@ export class Smartstate<StatePartNameType extends string> {
dbName: 'smartstate', dbName: 'smartstate',
storeName: statePartName, storeName: statePartName,
} }
: null : undefined
); );
newState.smartstateRef = this; newState.smartstateRef = this;
await newState.init(); await newState.init();

View File

@@ -1,7 +1,16 @@
import { StatePart } from './smartstate.classes.statepart.js'; import { StatePart } from './smartstate.classes.statepart.js';
/**
* Context object passed to action definitions, enabling safe nested dispatch.
* Use `context.dispatch()` to dispatch sub-actions inline (bypasses the mutation queue).
* Direct `statePart.dispatchAction()` from within an action will deadlock.
*/
export interface IActionContext<TStateType> {
dispatch<T>(action: StateAction<TStateType, T>, payload: T): Promise<TStateType>;
}
export interface IActionDef<TStateType, TActionPayloadType> { export interface IActionDef<TStateType, TActionPayloadType> {
(stateArg: StatePart<any, TStateType>, actionPayload: TActionPayloadType): Promise<TStateType>; (stateArg: StatePart<any, TStateType>, actionPayload: TActionPayloadType, context?: IActionContext<TStateType>): Promise<TStateType>;
} }
/** /**
@@ -9,8 +18,8 @@ export interface IActionDef<TStateType, TActionPayloadType> {
*/ */
export class StateAction<TStateType, TActionPayloadType> { export class StateAction<TStateType, TActionPayloadType> {
constructor( constructor(
public statePartRef: StatePart<any, any>, public readonly statePartRef: StatePart<any, any>,
public actionDef: IActionDef<TStateType, TActionPayloadType> public readonly actionDef: IActionDef<TStateType, TActionPayloadType>
) {} ) {}
public trigger(payload: TActionPayloadType): Promise<TStateType> { public trigger(payload: TActionPayloadType): Promise<TStateType> {

View File

@@ -1,7 +1,8 @@
import * as plugins from './smartstate.plugins.js'; import * as plugins from './smartstate.plugins.js';
import { Observable, shareReplay, takeUntil } from 'rxjs'; import { BehaviorSubject, Observable, shareReplay, takeUntil, distinctUntilChanged, interval } from 'rxjs';
import { StateAction, type IActionDef } from './smartstate.classes.stateaction.js'; import { StateAction, type IActionDef, type IActionContext } from './smartstate.classes.stateaction.js';
import type { Smartstate } from './smartstate.classes.smartstate.js'; import type { Smartstate } from './smartstate.classes.smartstate.js';
import { StateProcess, type IProcessOptions, type IScheduledActionOptions } from './smartstate.classes.stateprocess.js';
export type TMiddleware<TPayload> = ( export type TMiddleware<TPayload> = (
newState: TPayload, newState: TPayload,
@@ -28,20 +29,26 @@ function fromAbortSignal(signal: AbortSignal): Observable<void> {
} }
export class StatePart<TStatePartName, TStatePayload> { export class StatePart<TStatePartName, TStatePayload> {
private static readonly MAX_NESTED_DISPATCH_DEPTH = 10;
public name: TStatePartName; public name: TStatePartName;
public state = new plugins.smartrx.rxjs.Subject<TStatePayload>(); private state = new BehaviorSubject<TStatePayload | undefined>(undefined);
public stateStore: TStatePayload | undefined; private stateStore: TStatePayload | undefined;
public smartstateRef?: Smartstate<any>; public smartstateRef?: Smartstate<any>;
private disposed = false;
private cumulativeDeferred = plugins.smartpromise.cumulativeDefer(); private cumulativeDeferred = plugins.smartpromise.cumulativeDefer();
private mutationQueue: Promise<any> = Promise.resolve(); private mutationQueue: Promise<any> = Promise.resolve();
private pendingCumulativeNotification: ReturnType<typeof setTimeout> | null = null; private pendingCumulativeNotification: ReturnType<typeof setTimeout> | null = null;
private webStoreOptions: plugins.webstore.IWebStoreOptions; private webStoreOptions: plugins.webstore.IWebStoreOptions | undefined;
private webStore: plugins.webstore.WebStore<TStatePayload> | null = null; private webStore: plugins.webstore.WebStore<TStatePayload> | null = null;
private middlewares: TMiddleware<TStatePayload>[] = []; private middlewares: TMiddleware<TStatePayload>[] = [];
// Process tracking
private processes: StateProcess<TStatePartName, TStatePayload, any>[] = [];
// Selector memoization // Selector memoization
private selectorCache = new WeakMap<Function, plugins.smartrx.rxjs.Observable<any>>(); private selectorCache = new WeakMap<Function, plugins.smartrx.rxjs.Observable<any>>();
private defaultSelectObservable: plugins.smartrx.rxjs.Observable<TStatePayload> | null = null; private defaultSelectObservable: plugins.smartrx.rxjs.Observable<TStatePayload> | null = null;
@@ -95,6 +102,9 @@ export class StatePart<TStatePartName, TStatePayload> {
* sets the stateStore to the new state (serialized via mutation queue) * sets the stateStore to the new state (serialized via mutation queue)
*/ */
public async setState(newStateArg: TStatePayload): Promise<TStatePayload> { public async setState(newStateArg: TStatePayload): Promise<TStatePayload> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
return this.mutationQueue = this.mutationQueue.then( return this.mutationQueue = this.mutationQueue.then(
() => this.applyState(newStateArg), () => this.applyState(newStateArg),
() => this.applyState(newStateArg), () => this.applyState(newStateArg),
@@ -210,22 +220,24 @@ export class StatePart<TStatePartName, TStatePayload> {
} }
const effectiveSelectorFn = selectorFn || ((state: TStatePayload) => <T>(<any>state)); const effectiveSelectorFn = selectorFn || ((state: TStatePayload) => <T>(<any>state));
const SELECTOR_ERROR: unique symbol = Symbol('selector-error');
let mapped = this.state.pipe( let mapped = this.state.pipe(
plugins.smartrx.rxjs.ops.startWith(this.getState()),
plugins.smartrx.rxjs.ops.filter((stateArg): stateArg is TStatePayload => stateArg !== undefined), plugins.smartrx.rxjs.ops.filter((stateArg): stateArg is TStatePayload => stateArg !== undefined),
plugins.smartrx.rxjs.ops.map((stateArg) => { plugins.smartrx.rxjs.ops.map((stateArg) => {
try { try {
return effectiveSelectorFn(stateArg); return effectiveSelectorFn(stateArg);
} catch (e) { } catch (e) {
console.error(`Selector error in state part '${this.name}':`, e); console.error(`Selector error in state part '${this.name}':`, e);
return undefined; return SELECTOR_ERROR as any;
} }
}) }),
plugins.smartrx.rxjs.ops.filter((v: any) => v !== SELECTOR_ERROR),
distinctUntilChanged((a: any, b: any) => JSON.stringify(a) === JSON.stringify(b)),
); );
if (hasSignal) { if (hasSignal) {
mapped = mapped.pipe(takeUntil(fromAbortSignal(options.signal))); mapped = mapped.pipe(takeUntil(fromAbortSignal(options.signal!)));
return mapped; return mapped;
} }
@@ -249,21 +261,42 @@ export class StatePart<TStatePartName, TStatePayload> {
return new StateAction(this, actionDef); return new StateAction(this, actionDef);
} }
/**
* creates a depth-tracked action context for safe nested dispatch.
* Using context.dispatch() within an actionDef bypasses the mutation queue
* and executes the sub-action inline, preventing deadlocks.
*/
private createActionContext(depth: number): IActionContext<TStatePayload> {
const self = this;
return {
dispatch: async <U>(action: StateAction<TStatePayload, U>, payload: U): Promise<TStatePayload> => {
if (depth >= StatePart.MAX_NESTED_DISPATCH_DEPTH) {
throw new Error(
`Maximum nested action dispatch depth (${StatePart.MAX_NESTED_DISPATCH_DEPTH}) exceeded. ` +
`Check for circular action dispatches.`
);
}
const innerContext = self.createActionContext(depth + 1);
const newState = await action.actionDef(self, payload, innerContext);
return self.applyState(newState);
},
};
}
/** /**
* dispatches an action on the statepart level * dispatches an action on the statepart level
*/ */
public async dispatchAction<T>(stateAction: StateAction<TStatePayload, T>, actionPayload: T): Promise<TStatePayload> { public async dispatchAction<T>(stateAction: StateAction<TStatePayload, T>, actionPayload: T): Promise<TStatePayload> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
await this.cumulativeDeferred.promise; await this.cumulativeDeferred.promise;
return this.mutationQueue = this.mutationQueue.then( const execute = async () => {
async () => { const context = this.createActionContext(0);
const newState = await stateAction.actionDef(this, actionPayload); const newState = await stateAction.actionDef(this, actionPayload, context);
return this.applyState(newState); return this.applyState(newState);
}, };
async () => { return this.mutationQueue = this.mutationQueue.then(execute, execute);
const newState = await stateAction.actionDef(this, actionPayload);
return this.applyState(newState);
},
);
} }
/** /**
@@ -348,11 +381,68 @@ export class StatePart<TStatePartName, TStatePayload> {
await this.setState(await resultPromise); await this.setState(await resultPromise);
} }
/**
* creates a managed, pausable process that ties an observable producer to state updates
*/
public createProcess<TProducerValue>(
options: IProcessOptions<TStatePayload, TProducerValue>
): StateProcess<TStatePartName, TStatePayload, TProducerValue> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
const process = new StateProcess<TStatePartName, TStatePayload, TProducerValue>(this, options);
this.processes.push(process);
if (options.autoStart) {
process.start();
}
return process;
}
/**
* creates a process that dispatches an action on a recurring interval
*/
public createScheduledAction<TActionPayload>(
options: IScheduledActionOptions<TStatePayload, TActionPayload>
): StateProcess<TStatePartName, TStatePayload, number> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
const process = new StateProcess<TStatePartName, TStatePayload, number>(this, {
producer: () => interval(options.intervalMs),
sideEffect: async () => {
await options.action.trigger(options.payload);
},
autoPause: options.autoPause ?? false,
});
this.processes.push(process);
process.start();
return process;
}
/** @internal — called by StateProcess.dispose() to remove itself */
public _removeProcess(process: StateProcess<any, any, any>): void {
const idx = this.processes.indexOf(process);
if (idx !== -1) {
this.processes.splice(idx, 1);
}
}
/** /**
* disposes the state part, completing the Subject and cleaning up resources * disposes the state part, completing the Subject and cleaning up resources
*/ */
public dispose(): void { public dispose(): void {
if (this.disposed) return;
this.disposed = true;
// Dispose all processes first
for (const process of [...this.processes]) {
process.dispose();
}
this.processes.length = 0;
this.state.complete(); this.state.complete();
this.mutationQueue = Promise.resolve() as any;
if (this.pendingCumulativeNotification) { if (this.pendingCumulativeNotification) {
clearTimeout(this.pendingCumulativeNotification); clearTimeout(this.pendingCumulativeNotification);
this.pendingCumulativeNotification = null; this.pendingCumulativeNotification = null;
@@ -362,5 +452,6 @@ export class StatePart<TStatePartName, TStatePayload> {
this.defaultSelectObservable = null; this.defaultSelectObservable = null;
this.webStore = null; this.webStore = null;
this.smartstateRef = undefined; this.smartstateRef = undefined;
this.stateStore = undefined;
} }
} }

View File

@@ -0,0 +1,177 @@
import { BehaviorSubject, Observable, Subscription, of } from 'rxjs';
import type { StatePart } from './smartstate.classes.statepart.js';
import type { StateAction } from './smartstate.classes.stateaction.js';
export type TProcessStatus = 'idle' | 'running' | 'paused' | 'disposed';
export type TAutoPause = 'visibility' | Observable<boolean> | false;
export interface IProcessOptions<TStatePayload, TProducerValue> {
producer: () => Observable<TProducerValue>;
reducer: (currentState: TStatePayload, value: TProducerValue) => TStatePayload;
autoPause?: TAutoPause;
autoStart?: boolean;
}
export interface IScheduledActionOptions<TStatePayload, TActionPayload> {
action: StateAction<TStatePayload, TActionPayload>;
payload: TActionPayload;
intervalMs: number;
autoPause?: TAutoPause;
}
/**
* creates an Observable<boolean> from the Page Visibility API.
* emits true when the page is visible, false when hidden.
* in Node.js (no document), returns an always-true observable.
*/
function createVisibilityObservable(): Observable<boolean> {
if (typeof document === 'undefined') {
return of(true);
}
return new Observable<boolean>((subscriber) => {
subscriber.next(!document.hidden);
const handler = () => subscriber.next(!document.hidden);
document.addEventListener('visibilitychange', handler);
return () => document.removeEventListener('visibilitychange', handler);
});
}
/**
* a managed, pausable process that ties an observable producer to state updates.
* supports lifecycle management (start/pause/resume/dispose) and auto-pause signals.
*/
export class StateProcess<TStatePartName, TStatePayload, TProducerValue> {
private readonly statePartRef: StatePart<TStatePartName, TStatePayload>;
private readonly producerFn: () => Observable<TProducerValue>;
private readonly reducer?: (currentState: TStatePayload, value: TProducerValue) => TStatePayload;
private readonly sideEffect?: (value: TProducerValue) => Promise<void> | void;
private readonly autoPauseOption: TAutoPause;
private statusSubject = new BehaviorSubject<TProcessStatus>('idle');
private producerSubscription: Subscription | null = null;
private autoPauseSubscription: Subscription | null = null;
private processingQueue: Promise<void> = Promise.resolve();
constructor(
statePartRef: StatePart<TStatePartName, TStatePayload>,
options: {
producer: () => Observable<TProducerValue>;
reducer?: (currentState: TStatePayload, value: TProducerValue) => TStatePayload;
sideEffect?: (value: TProducerValue) => Promise<void> | void;
autoPause?: TAutoPause;
}
) {
this.statePartRef = statePartRef;
this.producerFn = options.producer;
this.reducer = options.reducer;
this.sideEffect = options.sideEffect;
this.autoPauseOption = options.autoPause ?? false;
}
public get status(): TProcessStatus {
return this.statusSubject.getValue();
}
public get status$(): Observable<TProcessStatus> {
return this.statusSubject.asObservable();
}
public start(): void {
if (this.status === 'disposed') {
throw new Error('Cannot start a disposed process');
}
if (this.status === 'running') return;
this.statusSubject.next('running');
this.subscribeProducer();
this.setupAutoPause();
}
public pause(): void {
if (this.status === 'disposed') {
throw new Error('Cannot pause a disposed process');
}
if (this.status !== 'running') return;
this.statusSubject.next('paused');
this.unsubscribeProducer();
}
public resume(): void {
if (this.status === 'disposed') {
throw new Error('Cannot resume a disposed process');
}
if (this.status !== 'paused') return;
this.statusSubject.next('running');
this.subscribeProducer();
}
public dispose(): void {
if (this.status === 'disposed') return;
this.unsubscribeProducer();
this.teardownAutoPause();
this.statusSubject.next('disposed');
this.statusSubject.complete();
this.statePartRef._removeProcess(this);
}
private subscribeProducer(): void {
this.unsubscribeProducer();
const source = this.producerFn();
this.producerSubscription = source.subscribe({
next: (value) => {
// Queue value processing to ensure each reads fresh state after the previous completes
this.processingQueue = this.processingQueue.then(async () => {
try {
if (this.sideEffect) {
await this.sideEffect(value);
} else if (this.reducer) {
const currentState = this.statePartRef.getState();
if (currentState !== undefined) {
await this.statePartRef.setState(this.reducer(currentState, value));
}
}
} catch (err) {
console.error('StateProcess value handling error:', err);
}
});
},
error: (err) => {
console.error('StateProcess producer error:', err);
if (this.status === 'running') {
this.statusSubject.next('paused');
this.unsubscribeProducer();
}
},
});
}
private unsubscribeProducer(): void {
if (this.producerSubscription) {
this.producerSubscription.unsubscribe();
this.producerSubscription = null;
}
}
private setupAutoPause(): void {
this.teardownAutoPause();
if (!this.autoPauseOption) return;
const signal$ = this.autoPauseOption === 'visibility'
? createVisibilityObservable()
: this.autoPauseOption;
this.autoPauseSubscription = signal$.subscribe((active) => {
if (!active && this.status === 'running') {
this.pause();
} else if (active && this.status === 'paused') {
this.resume();
}
});
}
private teardownAutoPause(): void {
if (this.autoPauseSubscription) {
this.autoPauseSubscription.unsubscribe();
this.autoPauseSubscription = null;
}
}
}

View File

@@ -6,7 +6,8 @@
"module": "NodeNext", "module": "NodeNext",
"moduleResolution": "NodeNext", "moduleResolution": "NodeNext",
"esModuleInterop": true, "esModuleInterop": true,
"verbatimModuleSyntax": true "verbatimModuleSyntax": true,
"types": ["node"]
}, },
"exclude": [ "exclude": [
"dist_*/**/*.d.ts" "dist_*/**/*.d.ts"