Compare commits

..

4 Commits
v2.2.0 ... main

Author SHA1 Message Date
a66518bde8 v2.3.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-27 11:32:49 +00:00
034ae56536 feat(stateprocess): add managed state processes with lifecycle controls, scheduled actions, and disposal safety 2026-03-27 11:32:49 +00:00
b417e3d049 v2.2.1
Some checks failed
Default (tags) / security (push) Successful in 43s
Default (tags) / test (push) Failing after 1m20s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-04 00:03:01 +00:00
2b871402cc fix(smartstate): no changes detected; no version bump required 2026-03-04 00:03:01 +00:00
16 changed files with 3346 additions and 2285 deletions

View File

@@ -1,7 +1,7 @@
{
"json.schemas": [
{
"fileMatch": ["/npmextra.json"],
"fileMatch": ["/.smartconfig.json"],
"schema": {
"type": "object",
"properties": {

View File

@@ -1,5 +1,21 @@
# Changelog
## 2026-03-27 - 2.3.0 - feat(stateprocess)
add managed state processes with lifecycle controls, scheduled actions, and disposal safety
- introduces StateProcess with start, pause, resume, dispose, status, and auto-pause support
- adds createProcess() and createScheduledAction() on StatePart for polling, streams, and recurring actions
- adds disposal guards and Smartstate.dispose() to clean up state parts and attached processes
- improves selector and computed observables with distinct-until-changed behavior and skipped selector error emissions
- renames npmextra.json to .smartconfig.json and updates package tooling dependencies
## 2026-03-04 - 2.2.1 - fix(smartstate)
no changes detected; no version bump required
- Git diff shows no changes
- package.json version is 2.2.0
- No files modified — no release needed
## 2026-03-02 - 2.2.0 - feat(actions)
add action context for safe nested dispatch with depth limit to prevent deadlocks

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartstate",
"version": "2.2.0",
"version": "2.3.0",
"private": false,
"description": "A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support.",
"main": "dist_ts/index.js",
@@ -14,19 +14,19 @@
"buildDocs": "tsdoc"
},
"devDependencies": {
"@git.zone/tsbuild": "^4.1.2",
"@git.zone/tsbundle": "^2.9.0",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tstest": "^3.1.8",
"@git.zone/tsbuild": "^4.4.0",
"@git.zone/tsbundle": "^2.10.0",
"@git.zone/tsrun": "^2.0.2",
"@git.zone/tstest": "^3.6.1",
"@push.rocks/tapbundle": "^6.0.3",
"@types/node": "^25.3.2"
"@types/node": "^25.5.0"
},
"dependencies": {
"@push.rocks/smarthash": "^3.2.6",
"@push.rocks/smartjson": "^6.0.0",
"@push.rocks/smartpromise": "^4.2.3",
"@push.rocks/smartrx": "^3.0.10",
"@push.rocks/webstore": "^2.0.20"
"@push.rocks/webstore": "^2.0.21"
},
"files": [
"ts/**/*",
@@ -37,7 +37,7 @@
"dist_ts_web/**/*",
"assets/**/*",
"cli.js",
"npmextra.json",
".smartconfig.json",
"readme.md"
],
"browserslist": [

4655
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

224
readme.md
View File

@@ -1,6 +1,6 @@
# @push.rocks/smartstate
A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support 🚀
A TypeScript-first reactive state management library with processes, middleware, computed state, batching, persistence, and Web Component Context Protocol support 🚀
## Issue Reporting and Security
@@ -48,7 +48,7 @@ await userState.setState({ name: 'Alice', loggedIn: true });
### 🧩 State Parts & Init Modes
State parts are isolated, typed units of state. They are the building blocks of your application's state tree. Create them via `getStatePart()`:
State parts are isolated, typed units of state the building blocks of your application's state tree. Create them via `getStatePart()`:
```typescript
const part = await state.getStatePart<IMyState>(name, initialState, initMode);
@@ -58,10 +58,10 @@ const part = await state.getStatePart<IMyState>(name, initialState, initMode);
|-----------|----------|
| `'soft'` (default) | Returns existing if found, creates new otherwise |
| `'mandatory'` | Throws if state part already exists — useful for ensuring single-initialization |
| `'force'` | Always creates a new state part, overwriting any existing one |
| `'force'` | Always creates a new state part, disposing and overwriting any existing one |
| `'persistent'` | Like `'soft'` but automatically persists state to IndexedDB via WebStore |
You can use either enums or string literal types for state part names:
You can use either string literal union types or enums for state part names:
```typescript
// String literal types (simpler)
@@ -82,12 +82,12 @@ const settings = await state.getStatePart('settings', { theme: 'dark', fontSize:
// ✅ Automatically saved to IndexedDB on every setState()
// ✅ On next app load, persisted values override defaults
// ✅ Persistence writes complete before in-memory updates (atomic)
// ✅ Persistence writes complete before in-memory updates
```
### 🔭 Selecting State
`select()` returns an RxJS Observable that emits the current value immediately and on every subsequent change:
`select()` returns an RxJS Observable that emits the current value immediately (via `BehaviorSubject`) and on every subsequent change:
```typescript
// Full state
@@ -99,6 +99,8 @@ userState.select((s) => s.name).subscribe((name) => console.log(name));
Selectors are **memoized** — calling `select(fn)` with the same function reference returns the same cached Observable, shared across all subscribers via `shareReplay`. This means you can call `select(mySelector)` in multiple places without creating duplicate subscriptions.
**Change detection** is built in: `select()` uses `distinctUntilChanged` with deep JSON comparison, so subscribers only fire when the selected value actually changes. Selecting `s => s.name` won't re-emit when only `s.count` changes.
#### ✂️ AbortSignal Support
Clean up subscriptions without manual `.unsubscribe()` — the modern way:
@@ -125,7 +127,6 @@ interface ILoginPayload {
}
const loginAction = userState.createAction<ILoginPayload>(async (statePart, payload) => {
// You have access to the current state via statePart.getState()
const current = statePart.getState();
return { ...current, name: payload.username, loggedIn: true };
});
@@ -136,7 +137,157 @@ await loginAction.trigger({ username: 'Alice', email: 'alice@example.com' });
await userState.dispatchAction(loginAction, { username: 'Alice', email: 'alice@example.com' });
```
Both `trigger()` and `dispatchAction()` return a Promise with the new state.
Both `trigger()` and `dispatchAction()` return a Promise with the new state. All dispatches are serialized through a mutation queue, so concurrent dispatches never cause lost updates.
#### 🔗 Nested Actions (Action Context)
When you need to dispatch sub-actions from within an action, use the `context` parameter. This is critical because calling `dispatchAction()` directly from inside an action would deadlock (it tries to acquire the mutation queue that's already held). The context's `dispatch()` bypasses the queue and executes inline:
```typescript
const incrementAction = userState.createAction<number>(async (statePart, amount) => {
const current = statePart.getState();
return { ...current, count: current.count + amount };
});
const doubleIncrementAction = userState.createAction<number>(async (statePart, amount, context) => {
// ✅ Safe: uses context.dispatch() which bypasses the mutation queue
await context.dispatch(incrementAction, amount);
const current = statePart.getState();
return { ...current, count: current.count + amount };
});
// ❌ DON'T do this inside an action — it will deadlock:
// await statePart.dispatchAction(someAction, payload);
```
A built-in depth limit (10 levels) prevents infinite circular dispatch chains, throwing a clear error if exceeded.
### 🔄 Processes (Polling, Streams & Scheduled Tasks)
Processes are managed, pausable observable-to-state bridges — the "side effects" layer. They tie an ongoing data source (polling, WebSockets, event streams) to state updates with full lifecycle control and optional auto-pause.
#### Basic Process: Polling an API
```typescript
import { interval, switchMap, from } from 'rxjs';
const metricsPoller = dashboard.createProcess<{ cpu: number; memory: number }>({
// Producer: an Observable factory — called on start and each resume
producer: () => interval(5000).pipe(
switchMap(() => from(fetch('/api/metrics').then(r => r.json()))),
),
// Reducer: folds each produced value into state (runs through middleware & validation)
reducer: (currentState, metrics) => ({
...currentState,
metrics,
lastUpdated: Date.now(),
}),
autoPause: 'visibility', // ⏸️ Stop polling when the tab is hidden
autoStart: true, // ▶️ Start immediately
});
// Full lifecycle control
metricsPoller.pause(); // Unsubscribes from producer
metricsPoller.resume(); // Re-subscribes (fresh subscription)
metricsPoller.dispose(); // Permanent cleanup
// Observe status reactively
metricsPoller.status; // 'idle' | 'running' | 'paused' | 'disposed'
metricsPoller.status$.subscribe(s => console.log('Process:', s));
```
#### Scheduled Actions
Dispatch an existing action on a recurring interval — syntactic sugar over `createProcess`:
```typescript
const refreshAction = dashboard.createAction<void>(async (sp) => {
const data = await fetch('/api/dashboard').then(r => r.json());
return { ...sp.getState()!, ...data, lastUpdated: Date.now() };
});
// Dispatches refreshAction every 30 seconds, auto-pauses when tab is hidden
const scheduled = dashboard.createScheduledAction({
action: refreshAction,
payload: undefined,
intervalMs: 30000,
autoPause: 'visibility',
});
// It's a full StateProcess — pause, resume, dispose all work
scheduled.dispose();
```
#### Custom Auto-Pause Signals
Pass any `Observable<boolean>` as the auto-pause signal — `true` means active, `false` means pause:
```typescript
import { fromEvent, map, startWith } from 'rxjs';
// Pause when offline, resume when online
const onlineSignal = fromEvent(window, 'online').pipe(
startWith(null),
map(() => navigator.onLine),
);
const syncProcess = userPart.createProcess<SyncPayload>({
producer: () => interval(10000).pipe(
switchMap(() => from(syncWithServer())),
),
reducer: (state, result) => ({ ...state, ...result }),
autoPause: onlineSignal,
});
syncProcess.start();
```
#### WebSocket / Live Streams
Pause disconnects; resume creates a fresh connection:
```typescript
const liveProcess = tickerPart.createProcess<TradeEvent>({
producer: () => new Observable<TradeEvent>(subscriber => {
const ws = new WebSocket('wss://trades.example.com');
ws.onmessage = (e) => subscriber.next(JSON.parse(e.data));
ws.onerror = (e) => subscriber.error(e);
ws.onclose = () => subscriber.complete();
return () => ws.close(); // Teardown: close WebSocket on unsubscribe
}),
reducer: (state, trade) => ({
...state,
lastPrice: trade.price,
trades: [...state.trades.slice(-99), trade],
}),
autoPause: 'visibility',
});
liveProcess.start();
```
#### Error Recovery
If a producer errors, the process gracefully transitions to `'paused'` instead of dying. Call `resume()` to retry with a fresh subscription:
```typescript
process.start();
// Producer errors → status becomes 'paused'
process.resume(); // Creates a fresh subscription — retry
```
#### Process Cleanup Cascades
Disposing a `StatePart` or `Smartstate` instance automatically disposes all attached processes:
```typescript
const p1 = part.createProcess({ ... });
const p2 = part.createProcess({ ... });
p1.start();
p2.start();
part.dispose();
console.log(p1.status); // 'disposed'
console.log(p2.status); // 'disposed'
```
### 🛡️ Middleware
@@ -171,7 +322,7 @@ const remove = userState.addMiddleware(myMiddleware);
remove(); // middleware no longer runs
```
Middleware runs **sequentially** in insertion order. If any middleware throws, the state remains unchanged — the operation is **atomic**.
Middleware runs **sequentially** in insertion order. If any middleware throws, the state remains unchanged — the operation is **atomic**. Process-driven state updates go through middleware too.
### 🧮 Computed / Derived State
@@ -199,7 +350,7 @@ const greeting2$ = state.computed(
);
```
Computed observables are **lazy** — they only subscribe to their sources when someone subscribes to them, and they automatically unsubscribe when all subscribers disconnect.
Computed observables are **lazy** — they only subscribe to their sources when someone subscribes to them, and they automatically unsubscribe when all subscribers disconnect. They also use `distinctUntilChanged` to avoid redundant emissions when the derived value hasn't actually changed.
### 📦 Batch Updates
@@ -322,15 +473,31 @@ await userState.stateSetup(async (statePart) => {
// Any dispatchAction() calls will automatically wait for stateSetup() to finish
```
### 🧹 Disposal & Cleanup
Both `Smartstate` and individual `StatePart` instances support disposal for proper cleanup:
```typescript
// Dispose a single state part — completes the BehaviorSubject, clears middleware, caches,
// and disposes all attached processes
userState.dispose();
// Dispose the entire Smartstate instance — disposes all state parts and clears internal maps
state.dispose();
```
After disposal, `setState()` and `dispatchAction()` will throw if called on a disposed `StatePart`. Calling `start()`, `pause()`, or `resume()` on a disposed `StateProcess` also throws.
### 🏎️ Performance
Smartstate is built with performance in mind:
- **🔒 SHA256 Change Detection** — Uses content hashing to detect actual changes. Identical state values don't trigger notifications, even with different object references.
- **🎯 distinctUntilChanged on Selectors** — Sub-selectors only fire when the selected slice actually changes. `select(s => s.name)` won't emit when `s.count` changes.
- **♻️ Selector Memoization** — `select(fn)` caches observables by function reference and shares them via `shareReplay({ refCount: true })`. Multiple subscribers share one upstream subscription.
- **📦 Cumulative Notifications** — `notifyChangeCumulative()` debounces rapid changes into a single notification at the end of the call stack.
- **🔐 Concurrent Safety** — Simultaneous `getStatePart()` calls for the same name return the same promise, preventing duplicate creation or race conditions.
- **💾 Atomic Persistence** — WebStore writes complete before in-memory state updates, ensuring consistency even if the process crashes mid-write.
- **🔐 Concurrent Safety** — Simultaneous `getStatePart()` calls for the same name return the same promise, preventing duplicate creation. All `setState()` and `dispatchAction()` calls are serialized through a mutation queue. Process values are serialized through their own internal queue.
- **💾 Atomic Persistence** — WebStore writes complete before in-memory state updates, ensuring consistency.
- **⏸️ Batch Deferred Notifications** — `batch()` suppresses all subscriber notifications until every update in the batch completes.
## API Reference
@@ -342,23 +509,26 @@ Smartstate is built with performance in mind:
| `getStatePart(name, initial?, initMode?)` | Get or create a typed state part |
| `batch(fn)` | Batch state updates, defer all notifications until complete |
| `computed(sources, fn)` | Create a computed observable from multiple state parts |
| `dispose()` | Dispose all state parts and clear internal state |
| `isBatching` | `boolean` — whether a batch is currently active |
| `statePartMap` | Registry of all created state parts |
### `StatePart<TName, TPayload>`
| Method | Description |
|--------|-------------|
| `getState()` | Get current state (returns `TPayload \| undefined`) |
| `getState()` | Get current state synchronously (`TPayload \| undefined`) |
| `setState(newState)` | Set state — runs middleware → validates → persists → notifies |
| `select(selectorFn?, options?)` | Returns an Observable of state or derived values. Options: `{ signal?: AbortSignal }` |
| `select(selectorFn?, options?)` | Observable of state or derived values. Options: `{ signal?: AbortSignal }` |
| `createAction(actionDef)` | Create a reusable, typed state action |
| `dispatchAction(action, payload)` | Dispatch an action and return the new state |
| `addMiddleware(fn)` | Add a middleware interceptor. Returns a removal function |
| `waitUntilPresent(selectorFn?, opts?)` | Wait for a state condition. Opts: `number` (timeout) or `{ timeoutMs?, signal? }` |
| `createProcess(options)` | Create a managed, pausable process tied to this state part |
| `createScheduledAction(options)` | Create a process that dispatches an action on a recurring interval |
| `notifyChange()` | Manually trigger a change notification (with hash dedup) |
| `notifyChangeCumulative()` | Debounced notification — fires at end of call stack |
| `stateSetup(fn)` | Async state initialization with action serialization |
| `dispose()` | Complete the BehaviorSubject, dispose processes, clear middleware and caches |
### `StateAction<TState, TPayload>`
@@ -366,6 +536,23 @@ Smartstate is built with performance in mind:
|--------|-------------|
| `trigger(payload)` | Dispatch the action on its associated state part |
### `StateProcess<TName, TPayload, TProducerValue>`
| Method / Property | Description |
|-------------------|-------------|
| `start()` | Start the process (subscribes to producer, sets up auto-pause) |
| `pause()` | Pause the process (unsubscribes from producer) |
| `resume()` | Resume a paused process (fresh subscription to producer) |
| `dispose()` | Permanently stop the process and clean up |
| `status` | Current status: `'idle' \| 'running' \| 'paused' \| 'disposed'` |
| `status$` | Observable of status transitions |
### `IActionContext<TState>`
| Method | Description |
|--------|-------------|
| `dispatch(action, payload)` | Dispatch a sub-action inline (bypasses mutation queue). Available as the third argument to action definitions |
### Standalone Functions
| Function | Description |
@@ -379,8 +566,13 @@ Smartstate is built with performance in mind:
|------|-------------|
| `TInitMode` | `'soft' \| 'mandatory' \| 'force' \| 'persistent'` |
| `TMiddleware<TPayload>` | `(newState, oldState) => TPayload \| Promise<TPayload>` |
| `IActionDef<TState, TPayload>` | Action definition function signature |
| `IActionDef<TState, TPayload>` | Action definition function signature (receives statePart, payload, context?) |
| `IActionContext<TState>` | Context for safe nested dispatch within actions |
| `IContextProviderOptions<TPayload>` | Options for `attachContextProvider` |
| `IProcessOptions<TPayload, TValue>` | Options for `createProcess` (producer, reducer, autoPause, autoStart) |
| `IScheduledActionOptions<TPayload, TActionPayload>` | Options for `createScheduledAction` (action, payload, intervalMs, autoPause) |
| `TProcessStatus` | `'idle' \| 'running' \| 'paused' \| 'disposed'` |
| `TAutoPause` | `'visibility' \| Observable<boolean> \| false` |
## License and Legal Information

View File

@@ -874,4 +874,122 @@ tap.test('concurrent dispatches still serialize correctly with context feature',
expect(part.getState().count).toEqual(10);
});
// ── distinctUntilChanged on selectors ──────────────────────────────────
tap.test('select should not emit when selected sub-state has not changed', async () => {
type TParts = 'distinctSelector';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ name: string; count: number }>('distinctSelector', { name: 'alice', count: 0 });
const nameSelector = (s: { name: string; count: number }) => s.name;
const nameValues: string[] = [];
part.select(nameSelector).subscribe((v) => nameValues.push(v));
// Wait for initial emission
await new Promise((r) => setTimeout(r, 50));
expect(nameValues).toHaveLength(1);
expect(nameValues[0]).toEqual('alice');
// Change only count — name selector should NOT re-emit
await part.setState({ name: 'alice', count: 1 });
await new Promise((r) => setTimeout(r, 50));
expect(nameValues).toHaveLength(1);
// Change name — name selector SHOULD emit
await part.setState({ name: 'bob', count: 1 });
await new Promise((r) => setTimeout(r, 50));
expect(nameValues).toHaveLength(2);
expect(nameValues[1]).toEqual('bob');
});
// ── Selector error skipping ────────────────────────────────────────────
tap.test('selector errors should be skipped, not emit undefined', async () => {
type TParts = 'selectorError';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ value: number }>('selectorError', { value: 1 });
let callCount = 0;
const faultySelector = (s: { value: number }) => {
if (s.value === 2) throw new Error('selector boom');
return s.value;
};
const values: number[] = [];
part.select(faultySelector).subscribe((v) => {
callCount++;
values.push(v);
});
await new Promise((r) => setTimeout(r, 50));
expect(values).toEqual([1]);
// This setState triggers a selector error — should be skipped, no undefined emitted
await part.setState({ value: 2 });
await new Promise((r) => setTimeout(r, 50));
expect(values).toEqual([1]); // no new emission
expect(callCount).toEqual(1);
// Normal value again
await part.setState({ value: 3 });
await new Promise((r) => setTimeout(r, 50));
expect(values).toEqual([1, 3]);
});
// ── Smartstate.dispose() ───────────────────────────────────────────────
tap.test('Smartstate.dispose should dispose all state parts', async () => {
type TParts = 'partA' | 'partB';
const state = new smartstate.Smartstate<TParts>();
const partA = await state.getStatePart<{ x: number }>('partA', { x: 1 });
const partB = await state.getStatePart<{ y: number }>('partB', { y: 2 });
let aCompleted = false;
let bCompleted = false;
partA.select().subscribe({ complete: () => { aCompleted = true; } });
partB.select().subscribe({ complete: () => { bCompleted = true; } });
state.dispose();
expect(aCompleted).toBeTrue();
expect(bCompleted).toBeTrue();
});
// ── Post-dispose setState throws ───────────────────────────────────────
tap.test('setState on disposed StatePart should throw', async () => {
type TParts = 'disposedPart';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ v: number }>('disposedPart', { v: 0 });
part.dispose();
let threw = false;
try {
await part.setState({ v: 1 });
} catch (e) {
threw = true;
expect((e as Error).message).toInclude('disposed');
}
expect(threw).toBeTrue();
});
// ── Post-dispose dispatchAction throws ─────────────────────────────────
tap.test('dispatchAction on disposed StatePart should throw', async () => {
type TParts = 'disposedDispatch';
const state = new smartstate.Smartstate<TParts>();
const part = await state.getStatePart<{ v: number }>('disposedDispatch', { v: 0 });
const action = part.createAction<number>(async (sp, payload) => {
return { v: payload };
});
part.dispose();
let threw = false;
try {
await part.dispatchAction(action, 5);
} catch (e) {
threw = true;
expect((e as Error).message).toInclude('disposed');
}
expect(threw).toBeTrue();
});
export default tap.start();

278
test/test.stateprocess.ts Normal file
View File

@@ -0,0 +1,278 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as smartstate from '../ts/index.js';
import { Subject, of, Observable, throwError, concat } from 'rxjs';
// ── Lifecycle ──────────────────────────────────────────────────────────
tap.test('process should start in idle status', async () => {
const state = new smartstate.Smartstate<'test'>();
const part = await state.getStatePart<{ v: number }>('test', { v: 0 });
const process = part.createProcess<number>({
producer: () => of(1),
reducer: (s, v) => ({ v: s.v + v }),
});
expect(process.status).toEqual('idle');
process.dispose();
});
tap.test('start/pause/resume/dispose lifecycle', async () => {
const state = new smartstate.Smartstate<'lifecycle'>();
const part = await state.getStatePart<{ v: number }>('lifecycle', { v: 0 });
const subject = new Subject<number>();
const process = part.createProcess<number>({
producer: () => subject.asObservable(),
reducer: (s, v) => ({ v: s.v + v }),
});
expect(process.status).toEqual('idle');
process.start();
expect(process.status).toEqual('running');
process.pause();
expect(process.status).toEqual('paused');
process.resume();
expect(process.status).toEqual('running');
process.dispose();
expect(process.status).toEqual('disposed');
});
// ── Producer → state integration ───────────────────────────────────────
tap.test('producer values should update state through reducer', async () => {
const state = new smartstate.Smartstate<'producer'>();
const part = await state.getStatePart<{ values: number[] }>('producer', { values: [] });
const subject = new Subject<number>();
const process = part.createProcess<number>({
producer: () => subject.asObservable(),
reducer: (s, v) => ({ values: [...s.values, v] }),
});
process.start();
subject.next(1);
subject.next(2);
subject.next(3);
await new Promise((r) => setTimeout(r, 100));
expect(part.getState()!.values).toEqual([1, 2, 3]);
process.dispose();
});
// ── Pause stops producer, resume restarts ──────────────────────────────
tap.test('pause should stop receiving values, resume should restart', async () => {
const state = new smartstate.Smartstate<'pauseResume'>();
const part = await state.getStatePart<{ count: number }>('pauseResume', { count: 0 });
const subject = new Subject<number>();
const process = part.createProcess<number>({
producer: () => subject.asObservable(),
reducer: (s, v) => ({ count: s.count + v }),
});
process.start();
subject.next(1);
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(1);
process.pause();
subject.next(1); // should be ignored — producer unsubscribed
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(1); // unchanged
process.resume();
subject.next(1);
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(2);
process.dispose();
});
// ── Auto-pause with custom Observable ──────────────────────────────────
tap.test('auto-pause with custom Observable<boolean> signal', async () => {
const state = new smartstate.Smartstate<'autoPause'>();
const part = await state.getStatePart<{ count: number }>('autoPause', { count: 0 });
const producer = new Subject<number>();
const pauseSignal = new Subject<boolean>();
const process = part.createProcess<number>({
producer: () => producer.asObservable(),
reducer: (s, v) => ({ count: s.count + v }),
autoPause: pauseSignal.asObservable(),
});
process.start();
producer.next(1);
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(1);
// Signal pause
pauseSignal.next(false);
await new Promise((r) => setTimeout(r, 50));
expect(process.status).toEqual('paused');
producer.next(1); // ignored
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(1);
// Signal resume
pauseSignal.next(true);
await new Promise((r) => setTimeout(r, 50));
expect(process.status).toEqual('running');
producer.next(1);
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.count).toEqual(2);
process.dispose();
});
// ── Auto-pause 'visibility' in Node.js (no document) ──────────────────
tap.test('autoPause visibility should be always-active in Node.js', async () => {
const state = new smartstate.Smartstate<'vis'>();
const part = await state.getStatePart<{ v: number }>('vis', { v: 0 });
const subject = new Subject<number>();
const process = part.createProcess<number>({
producer: () => subject.asObservable(),
reducer: (s, v) => ({ v: v }),
autoPause: 'visibility',
});
process.start();
expect(process.status).toEqual('running');
subject.next(42);
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.v).toEqual(42);
process.dispose();
});
// ── Scheduled action ───────────────────────────────────────────────────
tap.test('createScheduledAction should dispatch action on interval', async () => {
const state = new smartstate.Smartstate<'scheduled'>();
const part = await state.getStatePart<{ ticks: number }>('scheduled', { ticks: 0 });
const tickAction = part.createAction<void>(async (sp) => {
return { ticks: sp.getState()!.ticks + 1 };
});
const scheduled = part.createScheduledAction({
action: tickAction,
payload: undefined,
intervalMs: 50,
});
await new Promise((r) => setTimeout(r, 280));
scheduled.dispose();
// Should have ticked at least 3 times in ~280ms with 50ms interval
expect(part.getState()!.ticks).toBeGreaterThanOrEqual(3);
});
// ── StatePart.dispose cascades ─────────────────────────────────────────
tap.test('StatePart.dispose should dispose all processes', async () => {
const state = new smartstate.Smartstate<'cascade'>();
const part = await state.getStatePart<{ v: number }>('cascade', { v: 0 });
const p1 = part.createProcess<number>({
producer: () => new Subject<number>().asObservable(),
reducer: (s, v) => ({ v }),
});
const p2 = part.createProcess<number>({
producer: () => new Subject<number>().asObservable(),
reducer: (s, v) => ({ v }),
});
p1.start();
p2.start();
part.dispose();
expect(p1.status).toEqual('disposed');
expect(p2.status).toEqual('disposed');
});
// ── status$ observable ─────────────────────────────────────────────────
tap.test('status$ should emit lifecycle transitions', async () => {
const state = new smartstate.Smartstate<'status$'>();
const part = await state.getStatePart<{ v: number }>('status$', { v: 0 });
const subject = new Subject<number>();
const process = part.createProcess<number>({
producer: () => subject.asObservable(),
reducer: (s, v) => ({ v }),
});
const statuses: string[] = [];
process.status$.subscribe((s) => statuses.push(s));
process.start();
process.pause();
process.resume();
process.dispose();
expect(statuses).toEqual(['idle', 'running', 'paused', 'running', 'disposed']);
});
// ── Producer error → graceful pause ────────────────────────────────────
tap.test('producer error should pause process gracefully', async () => {
const state = new smartstate.Smartstate<'error'>();
const part = await state.getStatePart<{ v: number }>('error', { v: 0 });
let callCount = 0;
const process = part.createProcess<number>({
producer: () => {
callCount++;
if (callCount === 1) {
// First subscription: emit 1, then error
return concat(of(1), throwError(() => new Error('boom')));
}
// After resume: emit 2 successfully
return of(2);
},
reducer: (s, v) => ({ v }),
});
process.start();
await new Promise((r) => setTimeout(r, 50));
expect(process.status).toEqual('paused');
expect(part.getState()!.v).toEqual(1); // got the value before error
// Resume creates a fresh subscription
process.resume();
await new Promise((r) => setTimeout(r, 50));
expect(part.getState()!.v).toEqual(2);
process.dispose();
});
// ── Disposed guards ────────────────────────────────────────────────────
tap.test('start/pause/resume on disposed process should throw', async () => {
const state = new smartstate.Smartstate<'guards'>();
const part = await state.getStatePart<{ v: number }>('guards', { v: 0 });
const process = part.createProcess<number>({
producer: () => of(1),
reducer: (s, v) => ({ v }),
});
process.dispose();
let errors = 0;
try { process.start(); } catch { errors++; }
try { process.pause(); } catch { errors++; }
try { process.resume(); } catch { errors++; }
expect(errors).toEqual(3);
});
// ── autoStart option ───────────────────────────────────────────────────
tap.test('autoStart should start process immediately', async () => {
const state = new smartstate.Smartstate<'autoStart'>();
const part = await state.getStatePart<{ v: number }>('autoStart', { v: 0 });
const process = part.createProcess<number>({
producer: () => of(42),
reducer: (s, v) => ({ v }),
autoStart: true,
});
await new Promise((r) => setTimeout(r, 50));
expect(process.status).toEqual('running');
expect(part.getState()!.v).toEqual(42);
process.dispose();
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstate',
version: '2.2.0',
version: '2.3.0',
description: 'A TypeScript-first reactive state management library with middleware, computed state, batching, persistence, and Web Component Context Protocol support.'
}

View File

@@ -3,3 +3,4 @@ export * from './smartstate.classes.statepart.js';
export * from './smartstate.classes.stateaction.js';
export * from './smartstate.classes.computed.js';
export * from './smartstate.contextprovider.js';
export * from './smartstate.classes.stateprocess.js';

View File

@@ -1,5 +1,5 @@
import * as plugins from './smartstate.plugins.js';
import { combineLatest, map } from 'rxjs';
import { combineLatest, map, distinctUntilChanged } from 'rxjs';
import type { StatePart } from './smartstate.classes.statepart.js';
/**
@@ -12,5 +12,6 @@ export function computed<TResult>(
): plugins.smartrx.rxjs.Observable<TResult> {
return combineLatest(sources.map((sp) => sp.select())).pipe(
map((states) => computeFn(...states)),
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
) as plugins.smartrx.rxjs.Observable<TResult>;
}

View File

@@ -49,7 +49,11 @@ export class Smartstate<StatePartNameType extends string> {
const pending = [...this.pendingNotifications];
this.pendingNotifications.clear();
for (const sp of pending) {
try {
await sp.notifyChange();
} catch (err) {
console.error(`Error flushing notification for state part:`, err);
}
}
}
} finally {
@@ -69,6 +73,21 @@ export class Smartstate<StatePartNameType extends string> {
return computed(sources, computeFn);
}
/**
* disposes all state parts and clears internal state
*/
public dispose(): void {
for (const key of Object.keys(this.statePartMap)) {
const part = this.statePartMap[key as StatePartNameType];
if (part) {
part.dispose();
}
}
this.statePartMap = {} as any;
this.pendingStatePartCreation.clear();
this.pendingNotifications.clear();
}
/**
* Allows getting and initializing a new statepart
*/
@@ -107,7 +126,7 @@ export class Smartstate<StatePartNameType extends string> {
}
}
const creationPromise = this.createStatePart<PayloadType>(statePartNameArg, initialArg, initMode);
const creationPromise = this.createStatePart<PayloadType>(statePartNameArg, initialArg!, initMode);
this.pendingStatePartCreation.set(statePartNameArg, creationPromise);
try {
@@ -133,7 +152,7 @@ export class Smartstate<StatePartNameType extends string> {
dbName: 'smartstate',
storeName: statePartName,
}
: null
: undefined
);
newState.smartstateRef = this;
await newState.init();

View File

@@ -18,8 +18,8 @@ export interface IActionDef<TStateType, TActionPayloadType> {
*/
export class StateAction<TStateType, TActionPayloadType> {
constructor(
public statePartRef: StatePart<any, any>,
public actionDef: IActionDef<TStateType, TActionPayloadType>
public readonly statePartRef: StatePart<any, any>,
public readonly actionDef: IActionDef<TStateType, TActionPayloadType>
) {}
public trigger(payload: TActionPayloadType): Promise<TStateType> {

View File

@@ -1,7 +1,8 @@
import * as plugins from './smartstate.plugins.js';
import { Observable, shareReplay, takeUntil } from 'rxjs';
import { BehaviorSubject, Observable, shareReplay, takeUntil, distinctUntilChanged, interval } from 'rxjs';
import { StateAction, type IActionDef, type IActionContext } from './smartstate.classes.stateaction.js';
import type { Smartstate } from './smartstate.classes.smartstate.js';
import { StateProcess, type IProcessOptions, type IScheduledActionOptions } from './smartstate.classes.stateprocess.js';
export type TMiddleware<TPayload> = (
newState: TPayload,
@@ -31,19 +32,23 @@ export class StatePart<TStatePartName, TStatePayload> {
private static readonly MAX_NESTED_DISPATCH_DEPTH = 10;
public name: TStatePartName;
public state = new plugins.smartrx.rxjs.Subject<TStatePayload>();
public stateStore: TStatePayload | undefined;
private state = new BehaviorSubject<TStatePayload | undefined>(undefined);
private stateStore: TStatePayload | undefined;
public smartstateRef?: Smartstate<any>;
private disposed = false;
private cumulativeDeferred = plugins.smartpromise.cumulativeDefer();
private mutationQueue: Promise<any> = Promise.resolve();
private pendingCumulativeNotification: ReturnType<typeof setTimeout> | null = null;
private webStoreOptions: plugins.webstore.IWebStoreOptions;
private webStoreOptions: plugins.webstore.IWebStoreOptions | undefined;
private webStore: plugins.webstore.WebStore<TStatePayload> | null = null;
private middlewares: TMiddleware<TStatePayload>[] = [];
// Process tracking
private processes: StateProcess<TStatePartName, TStatePayload, any>[] = [];
// Selector memoization
private selectorCache = new WeakMap<Function, plugins.smartrx.rxjs.Observable<any>>();
private defaultSelectObservable: plugins.smartrx.rxjs.Observable<TStatePayload> | null = null;
@@ -97,6 +102,9 @@ export class StatePart<TStatePartName, TStatePayload> {
* sets the stateStore to the new state (serialized via mutation queue)
*/
public async setState(newStateArg: TStatePayload): Promise<TStatePayload> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
return this.mutationQueue = this.mutationQueue.then(
() => this.applyState(newStateArg),
() => this.applyState(newStateArg),
@@ -212,22 +220,24 @@ export class StatePart<TStatePartName, TStatePayload> {
}
const effectiveSelectorFn = selectorFn || ((state: TStatePayload) => <T>(<any>state));
const SELECTOR_ERROR: unique symbol = Symbol('selector-error');
let mapped = this.state.pipe(
plugins.smartrx.rxjs.ops.startWith(this.getState()),
plugins.smartrx.rxjs.ops.filter((stateArg): stateArg is TStatePayload => stateArg !== undefined),
plugins.smartrx.rxjs.ops.map((stateArg) => {
try {
return effectiveSelectorFn(stateArg);
} catch (e) {
console.error(`Selector error in state part '${this.name}':`, e);
return undefined;
return SELECTOR_ERROR as any;
}
})
}),
plugins.smartrx.rxjs.ops.filter((v: any) => v !== SELECTOR_ERROR),
distinctUntilChanged((a: any, b: any) => JSON.stringify(a) === JSON.stringify(b)),
);
if (hasSignal) {
mapped = mapped.pipe(takeUntil(fromAbortSignal(options.signal)));
mapped = mapped.pipe(takeUntil(fromAbortSignal(options.signal!)));
return mapped;
}
@@ -277,19 +287,16 @@ export class StatePart<TStatePartName, TStatePayload> {
* dispatches an action on the statepart level
*/
public async dispatchAction<T>(stateAction: StateAction<TStatePayload, T>, actionPayload: T): Promise<TStatePayload> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
await this.cumulativeDeferred.promise;
return this.mutationQueue = this.mutationQueue.then(
async () => {
const execute = async () => {
const context = this.createActionContext(0);
const newState = await stateAction.actionDef(this, actionPayload, context);
return this.applyState(newState);
},
async () => {
const context = this.createActionContext(0);
const newState = await stateAction.actionDef(this, actionPayload, context);
return this.applyState(newState);
},
);
};
return this.mutationQueue = this.mutationQueue.then(execute, execute);
}
/**
@@ -374,11 +381,68 @@ export class StatePart<TStatePartName, TStatePayload> {
await this.setState(await resultPromise);
}
/**
* creates a managed, pausable process that ties an observable producer to state updates
*/
public createProcess<TProducerValue>(
options: IProcessOptions<TStatePayload, TProducerValue>
): StateProcess<TStatePartName, TStatePayload, TProducerValue> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
const process = new StateProcess<TStatePartName, TStatePayload, TProducerValue>(this, options);
this.processes.push(process);
if (options.autoStart) {
process.start();
}
return process;
}
/**
* creates a process that dispatches an action on a recurring interval
*/
public createScheduledAction<TActionPayload>(
options: IScheduledActionOptions<TStatePayload, TActionPayload>
): StateProcess<TStatePartName, TStatePayload, number> {
if (this.disposed) {
throw new Error(`StatePart '${this.name}' has been disposed`);
}
const process = new StateProcess<TStatePartName, TStatePayload, number>(this, {
producer: () => interval(options.intervalMs),
sideEffect: async () => {
await options.action.trigger(options.payload);
},
autoPause: options.autoPause ?? false,
});
this.processes.push(process);
process.start();
return process;
}
/** @internal — called by StateProcess.dispose() to remove itself */
public _removeProcess(process: StateProcess<any, any, any>): void {
const idx = this.processes.indexOf(process);
if (idx !== -1) {
this.processes.splice(idx, 1);
}
}
/**
* disposes the state part, completing the Subject and cleaning up resources
*/
public dispose(): void {
if (this.disposed) return;
this.disposed = true;
// Dispose all processes first
for (const process of [...this.processes]) {
process.dispose();
}
this.processes.length = 0;
this.state.complete();
this.mutationQueue = Promise.resolve() as any;
if (this.pendingCumulativeNotification) {
clearTimeout(this.pendingCumulativeNotification);
this.pendingCumulativeNotification = null;
@@ -388,5 +452,6 @@ export class StatePart<TStatePartName, TStatePayload> {
this.defaultSelectObservable = null;
this.webStore = null;
this.smartstateRef = undefined;
this.stateStore = undefined;
}
}

View File

@@ -0,0 +1,177 @@
import { BehaviorSubject, Observable, Subscription, of } from 'rxjs';
import type { StatePart } from './smartstate.classes.statepart.js';
import type { StateAction } from './smartstate.classes.stateaction.js';
export type TProcessStatus = 'idle' | 'running' | 'paused' | 'disposed';
export type TAutoPause = 'visibility' | Observable<boolean> | false;
export interface IProcessOptions<TStatePayload, TProducerValue> {
producer: () => Observable<TProducerValue>;
reducer: (currentState: TStatePayload, value: TProducerValue) => TStatePayload;
autoPause?: TAutoPause;
autoStart?: boolean;
}
export interface IScheduledActionOptions<TStatePayload, TActionPayload> {
action: StateAction<TStatePayload, TActionPayload>;
payload: TActionPayload;
intervalMs: number;
autoPause?: TAutoPause;
}
/**
* creates an Observable<boolean> from the Page Visibility API.
* emits true when the page is visible, false when hidden.
* in Node.js (no document), returns an always-true observable.
*/
function createVisibilityObservable(): Observable<boolean> {
if (typeof document === 'undefined') {
return of(true);
}
return new Observable<boolean>((subscriber) => {
subscriber.next(!document.hidden);
const handler = () => subscriber.next(!document.hidden);
document.addEventListener('visibilitychange', handler);
return () => document.removeEventListener('visibilitychange', handler);
});
}
/**
* a managed, pausable process that ties an observable producer to state updates.
* supports lifecycle management (start/pause/resume/dispose) and auto-pause signals.
*/
export class StateProcess<TStatePartName, TStatePayload, TProducerValue> {
private readonly statePartRef: StatePart<TStatePartName, TStatePayload>;
private readonly producerFn: () => Observable<TProducerValue>;
private readonly reducer?: (currentState: TStatePayload, value: TProducerValue) => TStatePayload;
private readonly sideEffect?: (value: TProducerValue) => Promise<void> | void;
private readonly autoPauseOption: TAutoPause;
private statusSubject = new BehaviorSubject<TProcessStatus>('idle');
private producerSubscription: Subscription | null = null;
private autoPauseSubscription: Subscription | null = null;
private processingQueue: Promise<void> = Promise.resolve();
constructor(
statePartRef: StatePart<TStatePartName, TStatePayload>,
options: {
producer: () => Observable<TProducerValue>;
reducer?: (currentState: TStatePayload, value: TProducerValue) => TStatePayload;
sideEffect?: (value: TProducerValue) => Promise<void> | void;
autoPause?: TAutoPause;
}
) {
this.statePartRef = statePartRef;
this.producerFn = options.producer;
this.reducer = options.reducer;
this.sideEffect = options.sideEffect;
this.autoPauseOption = options.autoPause ?? false;
}
public get status(): TProcessStatus {
return this.statusSubject.getValue();
}
public get status$(): Observable<TProcessStatus> {
return this.statusSubject.asObservable();
}
public start(): void {
if (this.status === 'disposed') {
throw new Error('Cannot start a disposed process');
}
if (this.status === 'running') return;
this.statusSubject.next('running');
this.subscribeProducer();
this.setupAutoPause();
}
public pause(): void {
if (this.status === 'disposed') {
throw new Error('Cannot pause a disposed process');
}
if (this.status !== 'running') return;
this.statusSubject.next('paused');
this.unsubscribeProducer();
}
public resume(): void {
if (this.status === 'disposed') {
throw new Error('Cannot resume a disposed process');
}
if (this.status !== 'paused') return;
this.statusSubject.next('running');
this.subscribeProducer();
}
public dispose(): void {
if (this.status === 'disposed') return;
this.unsubscribeProducer();
this.teardownAutoPause();
this.statusSubject.next('disposed');
this.statusSubject.complete();
this.statePartRef._removeProcess(this);
}
private subscribeProducer(): void {
this.unsubscribeProducer();
const source = this.producerFn();
this.producerSubscription = source.subscribe({
next: (value) => {
// Queue value processing to ensure each reads fresh state after the previous completes
this.processingQueue = this.processingQueue.then(async () => {
try {
if (this.sideEffect) {
await this.sideEffect(value);
} else if (this.reducer) {
const currentState = this.statePartRef.getState();
if (currentState !== undefined) {
await this.statePartRef.setState(this.reducer(currentState, value));
}
}
} catch (err) {
console.error('StateProcess value handling error:', err);
}
});
},
error: (err) => {
console.error('StateProcess producer error:', err);
if (this.status === 'running') {
this.statusSubject.next('paused');
this.unsubscribeProducer();
}
},
});
}
private unsubscribeProducer(): void {
if (this.producerSubscription) {
this.producerSubscription.unsubscribe();
this.producerSubscription = null;
}
}
private setupAutoPause(): void {
this.teardownAutoPause();
if (!this.autoPauseOption) return;
const signal$ = this.autoPauseOption === 'visibility'
? createVisibilityObservable()
: this.autoPauseOption;
this.autoPauseSubscription = signal$.subscribe((active) => {
if (!active && this.status === 'running') {
this.pause();
} else if (active && this.status === 'paused') {
this.resume();
}
});
}
private teardownAutoPause(): void {
if (this.autoPauseSubscription) {
this.autoPauseSubscription.unsubscribe();
this.autoPauseSubscription = null;
}
}
}

View File

@@ -6,7 +6,8 @@
"module": "NodeNext",
"moduleResolution": "NodeNext",
"esModuleInterop": true,
"verbatimModuleSyntax": true
"verbatimModuleSyntax": true,
"types": ["node"]
},
"exclude": [
"dist_*/**/*.d.ts"