7 Commits
v1.4.0 ... main

Author SHA1 Message Date
81d7ff0722 fix(build): update bundled UI after rebuild 2026-01-26 12:49:23 +00:00
856f13f2ad v1.6.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 14m47s
Default (tags) / release (push) Has been cancelled
Default (tags) / metadata (push) Has been cancelled
2026-01-26 12:48:50 +00:00
f7cd43933f fix(ci): add Gitea CI workflows, documentation updates, and packaging metadata tweaks 2026-01-26 12:48:50 +00:00
4269058ab5 v1.6.0 2026-01-25 22:04:07 +00:00
321e3e89a4 feat(readme): document real-time change streaming and expand README with features, architecture, and configuration updates 2026-01-25 22:04:07 +00:00
75edb510e8 v1.5.0 2026-01-25 21:41:55 +00:00
20e08d123f feat(streaming): add real-time streaming (MongoDB change streams & S3 bucket watchers) with WebSocket subscriptions and activity stream UI 2026-01-25 21:41:55 +00:00
30 changed files with 2878 additions and 145 deletions

View File

@@ -0,0 +1,66 @@
name: Default (not tags)
on:
push:
tags-ignore:
- '**'
env:
IMAGE: code.foss.global/host.today/ht-docker-node:npmci
NPMCI_COMPUTED_REPOURL: https://${-{gitea.repository_owner}-}:${-{secrets.GITEA_TOKEN}-}@{{module.githost}}/${-{gitea.repository}-}.git
NPMCI_TOKEN_NPM: ${-{secrets.NPMCI_TOKEN_NPM}-}
NPMCI_TOKEN_NPM2: ${-{secrets.NPMCI_TOKEN_NPM2}-}
NPMCI_GIT_GITHUBTOKEN: ${-{secrets.NPMCI_GIT_GITHUBTOKEN}-}
NPMCI_URL_CLOUDLY: ${-{secrets.NPMCI_URL_CLOUDLY}-}
jobs:
security:
runs-on: ubuntu-latest
continue-on-error: true
container:
image: ${-{ env.IMAGE }-}
steps:
- uses: actions/checkout@v3
- name: Install pnpm and npmci
run: |
pnpm install -g pnpm
pnpm install -g @ship.zone/npmci
- name: Run npm prepare
run: npmci npm prepare
- name: Audit production dependencies
run: |
npmci command npm config set registry https://registry.npmjs.org
npmci command pnpm audit --audit-level=high --prod
continue-on-error: true
- name: Audit development dependencies
run: |
npmci command npm config set registry https://registry.npmjs.org
npmci command pnpm audit --audit-level=high --dev
continue-on-error: true
test:
if: ${-{ always() }-}
needs: security
runs-on: ubuntu-latest
container:
image: ${-{ env.IMAGE }-}
steps:
- uses: actions/checkout@v3
- name: Test stable
run: |
npmci node install stable
npmci npm install
npmci npm test
- name: Test build
run: |
npmci node install stable
npmci npm install
npmci npm build

View File

@@ -0,0 +1,124 @@
name: Default (tags)
on:
push:
tags:
- '*'
env:
IMAGE: code.foss.global/host.today/ht-docker-node:npmci
NPMCI_COMPUTED_REPOURL: https://${-{gitea.repository_owner}-}:${-{secrets.GITEA_TOKEN}-}@{{module.githost}}/${-{gitea.repository}-}.git
NPMCI_TOKEN_NPM: ${-{secrets.NPMCI_TOKEN_NPM}-}
NPMCI_TOKEN_NPM2: ${-{secrets.NPMCI_TOKEN_NPM2}-}
NPMCI_GIT_GITHUBTOKEN: ${-{secrets.NPMCI_GIT_GITHUBTOKEN}-}
NPMCI_URL_CLOUDLY: ${-{secrets.NPMCI_URL_CLOUDLY}-}
jobs:
security:
runs-on: ubuntu-latest
continue-on-error: true
container:
image: ${-{ env.IMAGE }-}
steps:
- uses: actions/checkout@v3
- name: Prepare
run: |
pnpm install -g pnpm
pnpm install -g @ship.zone/npmci
npmci npm prepare
- name: Audit production dependencies
run: |
npmci command npm config set registry https://registry.npmjs.org
npmci command pnpm audit --audit-level=high --prod
continue-on-error: true
- name: Audit development dependencies
run: |
npmci command npm config set registry https://registry.npmjs.org
npmci command pnpm audit --audit-level=high --dev
continue-on-error: true
test:
if: ${-{ always() }-}
needs: security
runs-on: ubuntu-latest
container:
image: ${-{ env.IMAGE }-}
steps:
- uses: actions/checkout@v3
- name: Prepare
run: |
pnpm install -g pnpm
pnpm install -g @ship.zone/npmci
npmci npm prepare
- name: Test stable
run: |
npmci node install stable
npmci npm install
npmci npm test
- name: Test build
run: |
npmci node install stable
npmci npm install
npmci npm build
release:
needs: test
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
container:
image: ${-{ env.IMAGE }-}
steps:
- uses: actions/checkout@v3
- name: Prepare
run: |
pnpm install -g pnpm
pnpm install -g @ship.zone/npmci
npmci npm prepare
- name: Release
run: |
npmci node install stable
npmci npm publish
metadata:
needs: test
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
container:
image: ${-{ env.IMAGE }-}
continue-on-error: true
steps:
- uses: actions/checkout@v3
- name: Prepare
run: |
pnpm install -g pnpm
pnpm install -g @ship.zone/npmci
npmci npm prepare
- name: Code quality
run: |
npmci command npm install -g typescript
npmci npm install
- name: Trigger
run: npmci trigger
- name: Build docs and upload artifacts
run: |
npmci node install stable
npmci npm install
pnpm install -g @git.zone/tsdoc
npmci command tsdoc
continue-on-error: true

11
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,11 @@
{
"version": "0.2.0",
"configurations": [
{
"command": "npm test",
"name": "Run npm test",
"request": "launch",
"type": "node-terminal"
}
]
}

26
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,26 @@
{
"json.schemas": [
{
"fileMatch": ["/npmextra.json"],
"schema": {
"type": "object",
"properties": {
"npmci": {
"type": "object",
"description": "settings for npmci"
},
"gitzone": {
"type": "object",
"description": "settings for gitzone",
"properties": {
"projectType": {
"type": "string",
"enum": ["website", "element", "service", "npm", "wcc"]
}
}
}
}
}
}
]
}

View File

@@ -1,6 +1,38 @@
# Changelog # Changelog
## 2026-01-26 - 1.6.1 - fix(ci)
add Gitea CI workflows, documentation updates, and packaging metadata tweaks
- Add .gitea workflow files for tag and non-tag pushes (security, test, release, metadata jobs)
- Add buildDocs script (tsdoc) and add bugs/homepage/pnpm.overrides entries to package.json
- Update README and readme.hints.md with real-time streaming docs, formatting/table fixes, and examples punctuation fixes
- Tidy npmextra.json formatting and array inline style changes
- Update changelog.md entries and correct trailing newline/formatting
- Minor .gitignore whitespace fix
## 2026-01-25 - 1.6.0 - feat(readme)
document real-time change streaming and expand README with features, architecture, and configuration updates
- Add Real-Time Change Streaming section: MongoDB change streams, S3 polling (ETag), activity stream, WebSocket subscriptions, and auto-reconnect behavior
- Expand S3 and MongoDB feature lists: in-place text editing, enhanced previews (code), show/hide system databases, live indicators, context menus
- Reintroduce project-level npmextra.json example and clarify environment variable names and priority order
- Add Architecture and How It Works sections with tree layout and streaming design details
- Minor wording, formatting, and installation clarifications (prefer pnpm examples, fix LICENSE filename case)
## 2026-01-25 - 1.5.0 - feat(streaming)
add real-time streaming (MongoDB change streams & S3 bucket watchers) with WebSocket subscriptions and activity stream UI
- Server: add ChangeStreamManager to manage MongoDB change streams and S3 BucketWatcher subscriptions, handle subscription lifecycle, activity ring buffer and push events via TypedSocket.
- API: introduce streaming TypedRequest interfaces and register new handlers (subscribeMongo/unsubscribeMongo, subscribeS3/unsubscribeS3, subscribeActivity/unsubscribeActivity, getRecentActivity, push events).
- Client: add ChangeStreamService (TypedSocket connection, reconnection, RxJS subjects) and integrate into tsview-app, mongo and S3 browsers to show live status, change counters and refresh content on updates.
- UI: add new Activity Stream component (tsview-activity-stream), Activity tab and navigation, plus visual indicators (Live/Offline, change badges, auto-scroll and filters).
- Dependencies & plugins: add @api.global/typedsocket and @push.rocks/smartrx, bump @push.rocks/smartbucket; expose smartrx/typedsocket in plugins for server and web bundles.
- Docs: update readme.hints.md with Real-Time Streaming architecture, interfaces and dependency notes.
## 2026-01-25 - 1.4.0 - feat(web) ## 2026-01-25 - 1.4.0 - feat(web)
add database overview panel, collection overview and resizable panels; show/hide system databases; use code editor with change-tracking in document view; add getDatabaseStats API and typings; enable overwrite for S3 uploads add database overview panel, collection overview and resizable panels; show/hide system databases; use code editor with change-tracking in document view; add getDatabaseStats API and typings; enable overwrite for S3 uploads
- Add backend handler getDatabaseStats + request/response typings (IReq_GetDatabaseStats, IDatabaseStats) and ApiService.getDatabaseStats() - Add backend handler getDatabaseStats + request/response typings (IReq_GetDatabaseStats, IDatabaseStats) and ApiService.getDatabaseStats()
@@ -13,6 +45,7 @@ add database overview panel, collection overview and resizable panels; show/hide
- Minor dependency bumps: @git.zone/tstest and @design.estate/dees-catalog - Minor dependency bumps: @git.zone/tstest and @design.estate/dees-catalog
## 2026-01-25 - 1.3.0 - feat(s3) ## 2026-01-25 - 1.3.0 - feat(s3)
add S3 create file/folder dialogs and in-place text editor; export mongodb plugin add S3 create file/folder dialogs and in-place text editor; export mongodb plugin
- Add mongodb dependency and export mongodb in ts/plugins.ts so ObjectId can be reused from plugins. - Add mongodb dependency and export mongodb in ts/plugins.ts so ObjectId can be reused from plugins.
@@ -23,6 +56,7 @@ add S3 create file/folder dialogs and in-place text editor; export mongodb plugi
- Various styling and UX improvements for dialogs, buttons, and editor states. - Various styling and UX improvements for dialogs, buttons, and editor states.
## 2026-01-25 - 1.2.0 - feat(s3,web-ui) ## 2026-01-25 - 1.2.0 - feat(s3,web-ui)
add S3 deletePrefix and getObjectUrl endpoints and add context menus in UI for S3 and Mongo views add S3 deletePrefix and getObjectUrl endpoints and add context menus in UI for S3 and Mongo views
- Add server-side TypedHandlers: deletePrefix and getObjectUrl (ts/api/handlers.s3.ts) - Add server-side TypedHandlers: deletePrefix and getObjectUrl (ts/api/handlers.s3.ts)
@@ -32,24 +66,28 @@ add S3 deletePrefix and getObjectUrl endpoints and add context menus in UI for S
- Switch from inline delete buttons to contextual menus for safer UX; implement downloads via data URLs returned by getObjectUrl and deletion of S3 prefixes (folders) - Switch from inline delete buttons to contextual menus for safer UX; implement downloads via data URLs returned by getObjectUrl and deletion of S3 prefixes (folders)
## 2026-01-25 - 1.1.3 - fix(package) ## 2026-01-25 - 1.1.3 - fix(package)
update package metadata update package metadata
- metadata-only change; no source code changes - metadata-only change; no source code changes
- current version 1.1.2 → recommended patch bump to 1.1.3 - current version 1.1.2 → recommended patch bump to 1.1.3
## 2026-01-25 - 1.1.2 - fix(package) ## 2026-01-25 - 1.1.2 - fix(package)
apply minor metadata-only change (one-line edit) apply minor metadata-only change (one-line edit)
- Change affects 1 file with a +1 -1 (metadata-only) — no behavioral changes - Change affects 1 file with a +1 -1 (metadata-only) — no behavioral changes
- Recommended bump of patch version from 1.1.1 to 1.1.2 - Recommended bump of patch version from 1.1.1 to 1.1.2
## 2026-01-25 - 1.1.1 - fix(tsview) ## 2026-01-25 - 1.1.1 - fix(tsview)
fix bad build commit - remove accidental include fix bad build commit - remove accidental include
- Removed an accidental include that caused a bad build and unintended files to be part of the commit - Removed an accidental include that caused a bad build and unintended files to be part of the commit
- Patch release recommended from 1.1.0 to 1.1.1 - Patch release recommended from 1.1.0 to 1.1.1
## 2026-01-25 - 1.1.0 - feat(tsview) ## 2026-01-25 - 1.1.0 - feat(tsview)
add database and S3 handlers, tswatch/watch scripts, web utilities, assets and release config add database and S3 handlers, tswatch/watch scripts, web utilities, assets and release config
- Add MongoDB management handlers: createDatabase, dropDatabase, dropCollection (ts/api/handlers.mongodb.ts) - Add MongoDB management handlers: createDatabase, dropDatabase, dropCollection (ts/api/handlers.mongodb.ts)
@@ -61,6 +99,7 @@ add database and S3 handlers, tswatch/watch scripts, web utilities, assets and r
- Add release/registry and project metadata in npmextra.json for publishing - Add release/registry and project metadata in npmextra.json for publishing
## 2026-01-23 - 1.0.0 - initial release: column view UI, S3 integration, and API fixes ## 2026-01-23 - 1.0.0 - initial release: column view UI, S3 integration, and API fixes
Initial public release introducing the new column-based UI with resizable columns and horizontal navigation, plus backend fixes for S3 bucket listing and API endpoint handling. Initial public release introducing the new column-based UI with resizable columns and horizontal navigation, plus backend fixes for S3 bucket listing and API endpoint handling.
- feat: Add resizable columns and horizontal scrolling - feat: Add resizable columns and horizontal scrolling

View File

@@ -6,9 +6,7 @@
"to": "./ts/bundled_ui.ts", "to": "./ts/bundled_ui.ts",
"outputMode": "base64ts", "outputMode": "base64ts",
"bundler": "esbuild", "bundler": "esbuild",
"includeFiles": [ "includeFiles": ["html/**/*"]
"html/**/*"
]
} }
] ]
}, },
@@ -34,14 +32,9 @@
"openBrowser": false "openBrowser": false
}, },
"@git.zone/cli": { "@git.zone/cli": {
"services": [ "services": ["mongodb", "minio"],
"mongodb",
"minio"
],
"release": { "release": {
"registries": [ "registries": ["https://verdaccio.lossless.digital"],
"https://verdaccio.lossless.digital"
],
"accessLevel": "public" "accessLevel": "public"
}, },
"projectType": "npm", "projectType": "npm",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@git.zone/tsview", "name": "@git.zone/tsview",
"version": "1.4.0", "version": "1.6.1",
"private": false, "private": false,
"description": "A CLI tool for viewing S3 and MongoDB data with a web UI", "description": "A CLI tool for viewing S3 and MongoDB data with a web UI",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@@ -13,17 +13,20 @@
"build": "pnpm run bundle && tsbuild --allowimplicitany", "build": "pnpm run bundle && tsbuild --allowimplicitany",
"bundle": "tsbundle", "bundle": "tsbundle",
"startTs": "node cli.ts.js", "startTs": "node cli.ts.js",
"watch": "tswatch" "watch": "tswatch",
"buildDocs": "tsdoc"
}, },
"bin": { "bin": {
"tsview": "cli.js" "tsview": "cli.js"
}, },
"devDependencies": { "devDependencies": {
"@api.global/typedsocket": "^4.1.0",
"@git.zone/tsbuild": "^4.1.2", "@git.zone/tsbuild": "^4.1.2",
"@git.zone/tsbundle": "^2.8.3", "@git.zone/tsbundle": "^2.8.3",
"@git.zone/tsrun": "^2.0.1", "@git.zone/tsrun": "^2.0.1",
"@git.zone/tstest": "^3.1.7", "@git.zone/tstest": "^3.1.7",
"@git.zone/tswatch": "3.0.1", "@git.zone/tswatch": "3.0.1",
"@push.rocks/smartrx": "^3.0.10",
"@types/node": "^25.0.10" "@types/node": "^25.0.10"
}, },
"dependencies": { "dependencies": {
@@ -35,7 +38,7 @@
"@design.estate/dees-element": "^2.1.5", "@design.estate/dees-element": "^2.1.5",
"@push.rocks/early": "^4.0.4", "@push.rocks/early": "^4.0.4",
"@push.rocks/npmextra": "^5.3.3", "@push.rocks/npmextra": "^5.3.3",
"@push.rocks/smartbucket": "^4.3.1", "@push.rocks/smartbucket": "^4.4.1",
"@push.rocks/smartcli": "^4.0.20", "@push.rocks/smartcli": "^4.0.20",
"@push.rocks/smartdata": "^7.0.15", "@push.rocks/smartdata": "^7.0.15",
"@push.rocks/smartfile": "^13.1.2", "@push.rocks/smartfile": "^13.1.2",
@@ -65,5 +68,12 @@
"repository": { "repository": {
"type": "git", "type": "git",
"url": "https://code.foss.global/git.zone/tsview.git" "url": "https://code.foss.global/git.zone/tsview.git"
},
"bugs": {
"url": "https://code.foss.global/git.zone/tsview/issues"
},
"homepage": "https://code.foss.global/git.zone/tsview#readme",
"pnpm": {
"overrides": {}
} }
} }

18
pnpm-lock.yaml generated
View File

@@ -33,8 +33,8 @@ importers:
specifier: ^5.3.3 specifier: ^5.3.3
version: 5.3.3 version: 5.3.3
'@push.rocks/smartbucket': '@push.rocks/smartbucket':
specifier: ^4.3.1 specifier: ^4.4.1
version: 4.3.1 version: 4.4.1
'@push.rocks/smartcli': '@push.rocks/smartcli':
specifier: ^4.0.20 specifier: ^4.0.20
version: 4.0.20 version: 4.0.20
@@ -66,6 +66,9 @@ importers:
specifier: ^7.0.0 specifier: ^7.0.0
version: 7.0.0(socks@2.8.7) version: 7.0.0(socks@2.8.7)
devDependencies: devDependencies:
'@api.global/typedsocket':
specifier: ^4.1.0
version: 4.1.0(@push.rocks/smartserve@2.0.1)
'@git.zone/tsbuild': '@git.zone/tsbuild':
specifier: ^4.1.2 specifier: ^4.1.2
version: 4.1.2 version: 4.1.2
@@ -81,6 +84,9 @@ importers:
'@git.zone/tswatch': '@git.zone/tswatch':
specifier: 3.0.1 specifier: 3.0.1
version: 3.0.1(@tiptap/pm@2.27.2) version: 3.0.1(@tiptap/pm@2.27.2)
'@push.rocks/smartrx':
specifier: ^3.0.10
version: 3.0.10
'@types/node': '@types/node':
specifier: ^25.0.10 specifier: ^25.0.10
version: 25.0.10 version: 25.0.10
@@ -826,8 +832,8 @@ packages:
'@push.rocks/smartbucket@3.3.10': '@push.rocks/smartbucket@3.3.10':
resolution: {integrity: sha512-0H2MioALspC8Aj0Q1FPCs2w4k2u9oJg7Q5yM8+1TZo7aRfrdxgM5HQ7z3apUaqC3ZEDewW6vSlttjHFHhMEC3A==} resolution: {integrity: sha512-0H2MioALspC8Aj0Q1FPCs2w4k2u9oJg7Q5yM8+1TZo7aRfrdxgM5HQ7z3apUaqC3ZEDewW6vSlttjHFHhMEC3A==}
'@push.rocks/smartbucket@4.3.1': '@push.rocks/smartbucket@4.4.1':
resolution: {integrity: sha512-fMA8w98/E+usaaLkLm6wDj1XSpR0shTtG8AxTdwWIlH1YemQj/aCf4wReezDxUFVoUpC3HMzzV2RTFtQvHndeQ==} resolution: {integrity: sha512-68GFLgJKW+LXvuN+yuV8O/FozGMecraoT+PkI5whdRPFe7N3u2iYIHWAUjvQvVU4ygpdJv0kih2JDf5k3PYycw==}
'@push.rocks/smartbuffer@3.0.5': '@push.rocks/smartbuffer@3.0.5':
resolution: {integrity: sha512-pWYF08Mn8s/KF/9nHRk7pZPzuMjmYVQay2c5gGexdayxn1W4eCSYYhWH73vR2JBfGeGq/izbRNuUuEaIEeTIKA==} resolution: {integrity: sha512-pWYF08Mn8s/KF/9nHRk7pZPzuMjmYVQay2c5gGexdayxn1W4eCSYYhWH73vR2JBfGeGq/izbRNuUuEaIEeTIKA==}
@@ -5527,7 +5533,7 @@ snapshots:
transitivePeerDependencies: transitivePeerDependencies:
- aws-crt - aws-crt
'@push.rocks/smartbucket@4.3.1': '@push.rocks/smartbucket@4.4.1':
dependencies: dependencies:
'@aws-sdk/client-s3': 3.975.0 '@aws-sdk/client-s3': 3.975.0
'@push.rocks/smartmime': 2.0.4 '@push.rocks/smartmime': 2.0.4
@@ -5983,7 +5989,7 @@ snapshots:
'@push.rocks/smarts3@3.0.3': '@push.rocks/smarts3@3.0.3':
dependencies: dependencies:
'@push.rocks/smartbucket': 4.3.1 '@push.rocks/smartbucket': 4.4.1
'@push.rocks/smartfs': 1.3.1 '@push.rocks/smartfs': 1.3.1
'@push.rocks/smartpath': 6.0.0 '@push.rocks/smartpath': 6.0.0
'@push.rocks/smartxml': 2.0.0 '@push.rocks/smartxml': 2.0.0

View File

@@ -1,37 +1,44 @@
# tsview - Project Hints # tsview - Project Hints
## Overview ## Overview
tsview is a CLI tool for viewing S3 and MongoDB data through a web UI. tsview is a CLI tool for viewing S3 and MongoDB data through a web UI.
## Key Patterns ## Key Patterns
### Configuration ### Configuration
- Reads from `.nogit/env.json` (created by `gitzone service`) - Reads from `.nogit/env.json` (created by `gitzone service`)
- Environment variables: MONGODB_URL, S3_HOST, S3_ACCESSKEY, etc. - Environment variables: MONGODB_URL, S3_HOST, S3_ACCESSKEY, etc.
### CLI Commands ### CLI Commands
- `tsview` - Start viewer (auto-finds free port from 3010+) - `tsview` - Start viewer (auto-finds free port from 3010+)
- `tsview --port 3000` - Force specific port - `tsview --port 3000` - Force specific port
- `tsview s3` - S3 viewer only - `tsview s3` - S3 viewer only
- `tsview mongo` - MongoDB viewer only - `tsview mongo` - MongoDB viewer only
### Dependencies ### Dependencies
- Uses `@push.rocks/smartbucket` for S3 operations - Uses `@push.rocks/smartbucket` for S3 operations
- Uses `@push.rocks/smartdata` for MongoDB operations - Uses `@push.rocks/smartdata` for MongoDB operations
- Uses `@api.global/typedserver` + `@api.global/typedrequest` for API - Uses `@api.global/typedserver` + `@api.global/typedrequest` for API
- Uses `@design.estate/dees-catalog` for UI components - Uses `@design.estate/dees-catalog` for UI components
### Build Process ### Build Process
- Run `pnpm build` to compile TypeScript and bundle web UI - Run `pnpm build` to compile TypeScript and bundle web UI
- UI is bundled from `ts_web/` to `ts/bundled_ui.ts` as base64 - UI is bundled from `ts_web/` to `ts/bundled_ui.ts` as base64
### Web UI Structure ### Web UI Structure
- `ts_web/elements/` - Web components (LitElement-based) - `ts_web/elements/` - Web components (LitElement-based)
- `ts_web/services/` - API service for backend communication - `ts_web/services/` - API service for backend communication
- `ts_web/utilities/` - Shared formatting functions (formatSize, formatCount, getFileName) - `ts_web/utilities/` - Shared formatting functions (formatSize, formatCount, getFileName)
- `ts_web/styles/` - Shared CSS custom properties (themeStyles) - `ts_web/styles/` - Shared CSS custom properties (themeStyles)
### TypedRequest Pattern ### TypedRequest Pattern
```typescript ```typescript
// Interface definition // Interface definition
export interface IReq_ListBuckets extends plugins.typedrequest.implementsTR< export interface IReq_ListBuckets extends plugins.typedrequest.implementsTR<
@@ -53,3 +60,53 @@ typedrouter.addTypedHandler(
) )
); );
``` ```
## Real-Time Streaming (v1.5.0+)
### Architecture
- `ts/streaming/` - Backend streaming infrastructure
- `classes.changestream-manager.ts` - Manages MongoDB and S3 watchers
- `interfaces.streaming.ts` - TypedRequest interfaces for subscriptions
- `ts_web/services/changestream.service.ts` - Frontend WebSocket client
### MongoDB Change Streams
- Uses native MongoDB Change Streams via `SmartdataDb.mongoDbClient`
- Subscription per collection: `db/collection`
- Events: insert, update, delete, replace, drop
### S3 Change Detection
- Uses `@push.rocks/smartbucket` BucketWatcher (polling-based)
- Polling interval: 5 seconds
- Events: add, modify, delete
### Frontend Components
- `tsview-activity-stream.ts` - Combined activity view with filtering
- MongoDB/S3 browsers auto-subscribe to current resource
- Visual indicators for "Live" status and recent change count
### Key Streaming Interfaces
```typescript
// Subscribe to collection changes
IReq_SubscribeMongo: { database, collection } -> { success, subscriptionId }
// Subscribe to bucket changes
IReq_SubscribeS3: { bucket, prefix? } -> { success, subscriptionId }
// Subscribe to activity stream (all changes)
IReq_SubscribeActivity: {} -> { success, subscriptionId }
// Server pushes changes to subscribed clients
IReq_PushMongoChange: { event: IMongoChangeEvent } -> { received }
IReq_PushS3Change: { event: IS3ChangeEvent } -> { received }
IReq_PushActivityEvent: { event: IActivityEvent } -> { received }
```
### Dependencies Added
- `@api.global/typedsocket` - WebSocket client/server
- `@push.rocks/smartrx` - RxJS utilities

200
readme.md
View File

@@ -1,6 +1,6 @@
# @git.zone/tsview # @git.zone/tsview
A powerful developer tool for browsing and managing S3-compatible storage and MongoDB databases through a sleek web UI. Built with TypeScript, designed for developers who need quick, visual access to their data stores during development. 🚀 A powerful developer tool for browsing and managing S3-compatible storage and MongoDB databases through a sleek web UI — with real-time change streaming baked in. Built with TypeScript, designed for developers who need quick, visual access to their data stores. 🚀
## Issue Reporting and Security ## Issue Reporting and Security
@@ -10,41 +10,51 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
```bash ```bash
# Global installation (recommended for CLI usage) # Global installation (recommended for CLI usage)
npm install -g @git.zone/tsview
# or
pnpm add -g @git.zone/tsview pnpm add -g @git.zone/tsview
# Local installation (for programmatic usage) # Local installation (for programmatic usage)
npm install @git.zone/tsview
# or
pnpm add @git.zone/tsview pnpm add @git.zone/tsview
``` ```
## Features ✨ ## Features ✨
### 🗄️ S3 Storage Browser ### 🗄️ S3 Storage Browser
- **Column View Navigation** - Mac Finder-style interface with resizable columns for intuitive file browsing
- **List View** - Traditional key-based view with hierarchical navigation - **Column View Navigation** — Mac Finder-style interface with resizable columns
- **Real-time Preview** - View images, JSON, text files, and more directly in the browser - **List View** — Traditional key-based view with hierarchical navigation
- **Bucket Management** - Create, delete, and switch between buckets - **Real-time Preview** — View images, JSON, text files, code, and more directly in the browser
- **File Operations** - Upload, download, delete objects with ease - **Bucket Management** — Create, delete, and switch between buckets
- **Smart Content Type Detection** - Automatic content type recognition for 20+ file types - **File Operations** — Upload, download, delete objects
- **Breadcrumb Navigation** - Easy path traversal with clickable breadcrumbs - **In-place Text Editing** Edit text files directly in the browser with change tracking
- **Smart Content Type Detection** — Automatic recognition for 20+ file types
- **Breadcrumb Navigation** — Clickable path traversal
### 🍃 MongoDB Browser ### 🍃 MongoDB Browser
- **Database Explorer** - Hierarchical navigation through databases and collections
- **Document Viewer** - Paginated table view with sorting and filtering - **Database Explorer** — Hierarchical navigation through databases and collections
- **Document Editor** - Full CRUD operations with JSON syntax highlighting - **Database Overview** — Collection counts, data sizes, index stats at a glance
- **Index Management** - View, create, and drop indexes - **Document Viewer** — Paginated table view with JSON filter support
- **Aggregation Pipeline** - Run custom aggregation queries (coming soon) - **Document Editor** — Full CRUD with syntax-highlighted code editor and change tracking
- **Collection Stats** - View document counts, sizes, and storage metrics - **Index Management** View, create, and drop indexes
- **Server Status** - Monitor connection info and server health - **Collection Stats** — Document counts, sizes, storage metrics
- **Server Status** — Connection info, version, uptime
- **Show/Hide System Databases** — Toggle visibility of `admin`, `local`, `config`
### ⚡ Real-Time Change Streaming
- **MongoDB Change Streams** — Live updates via native MongoDB change streams
- **S3 Change Detection** — Polling-based bucket monitoring with ETag comparison (5s intervals)
- **Activity Stream** — Combined timeline of all changes from both sources, filterable by type
- **Live Indicators** — Green dot + change count badges on active views
- **WebSocket Subscriptions** — Per-collection, per-bucket, or global activity feed
- **Auto-Reconnect** — Subscriptions automatically restored after connection loss
### 🎨 Modern Web UI ### 🎨 Modern Web UI
- 🌙 Dark theme designed for developer comfort - 🌙 Dark theme designed for developer comfort
- 📱 Responsive layout with resizable panels - 📱 Responsive layout with resizable panels
- ⌨️ Keyboard-friendly navigation - ⌨️ Context menus for quick actions
- 🔌 Zero external runtime dependencies in the browser - 🔌 Everything bundled — zero external runtime dependencies in the browser
## Quick Start 🚀 ## Quick Start 🚀
@@ -90,28 +100,6 @@ tsview mongo
tsview mongodb tsview mongodb
``` ```
## Configuration via npmextra.json
For project-level configuration, add a `@git.zone/tsview` section to your `npmextra.json`:
```json
{
"@git.zone/tsview": {
"port": 3015,
"killIfBusy": true,
"openBrowser": false
}
}
```
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `port` | `number` | auto | Fixed port to use (auto-finds from 3010 if not set) |
| `killIfBusy` | `boolean` | `false` | Kill existing process if port is busy |
| `openBrowser` | `boolean` | `true` | Automatically open browser on start |
**Priority order:** CLI `--port` flag > `npmextra.json` config > auto-detect
## Programmatic API ## Programmatic API
Use tsview as a library in your own tools: Use tsview as a library in your own tools:
@@ -124,32 +112,32 @@ const viewer = new TsView();
// Option 1: Load from .nogit/env.json (gitzone service format) // Option 1: Load from .nogit/env.json (gitzone service format)
await viewer.loadConfigFromEnv(); await viewer.loadConfigFromEnv();
// Option 2: Configure programmatically for local development // Option 2: Configure programmatically
viewer.setS3Config({ viewer.setS3Config({
endpoint: 'localhost', endpoint: 'localhost',
port: 9000, port: 9000,
accessKey: 'minioadmin', accessKey: 'minioadmin',
accessSecret: 'minioadmin', accessSecret: 'minioadmin',
useSsl: false useSsl: false,
}); });
viewer.setMongoConfig({ viewer.setMongoConfig({
mongoDbUrl: 'mongodb://localhost:27017', mongoDbUrl: 'mongodb://localhost:27017',
mongoDbName: 'mydb' mongoDbName: 'mydb',
}); });
// Option 3: Configure for cloud services // Option 3: Cloud services
viewer.setS3Config({ viewer.setS3Config({
endpoint: 's3.amazonaws.com', endpoint: 's3.amazonaws.com',
accessKey: 'AKIAXXXXXXX', accessKey: 'AKIAXXXXXXX',
accessSecret: 'your-secret-key', accessSecret: 'your-secret-key',
useSsl: true, useSsl: true,
region: 'us-east-1' region: 'us-east-1',
}); });
viewer.setMongoConfig({ viewer.setMongoConfig({
mongoDbUrl: 'mongodb+srv://user:pass@cluster.mongodb.net', mongoDbUrl: 'mongodb+srv://user:pass@cluster.mongodb.net',
mongoDbName: 'production' mongoDbName: 'production',
}); });
// Start the server // Start the server
@@ -163,30 +151,53 @@ await viewer.start(3500);
await viewer.stop(); await viewer.stop();
``` ```
## Environment Variables ## Configuration
The following environment variables are supported in `.nogit/env.json`: ### Project-level via `npmextra.json`
```json
{
"@git.zone/tsview": {
"port": 3015,
"killIfBusy": true,
"openBrowser": false
}
}
```
| Option | Type | Default | Description |
| ------------- | --------- | ------- | -------------------------------------------- |
| `port` | `number` | auto | Fixed port (auto-finds from 3010 if not set) |
| `killIfBusy` | `boolean` | `false` | Kill existing process if port is busy |
| `openBrowser` | `boolean` | `true` | Automatically open browser on start |
**Port priority:** CLI `--port` flag → `npmextra.json` → auto-detect
### Environment Variables (`.nogit/env.json`)
#### S3
### S3 Configuration
| Variable | Description | | Variable | Description |
|----------|-------------| | -------------- | ----------------------------- |
| `S3_ENDPOINT` | S3 server hostname | | `S3_ENDPOINT` | S3-compatible server hostname |
| `S3_PORT` | S3 server port (optional) | | `S3_PORT` | Server port (optional) |
| `S3_ACCESSKEY` | Access key ID | | `S3_ACCESSKEY` | Access key ID |
| `S3_SECRETKEY` | Secret access key | | `S3_SECRETKEY` | Secret access key |
| `S3_USESSL` | Use HTTPS (`true`/`false`) | | `S3_USESSL` | Use HTTPS (`true`/`false`) |
### MongoDB Configuration #### MongoDB
| Variable | Description | | Variable | Description |
|----------|-------------| | -------------- | ---------------------------------- |
| `MONGODB_URL` | Full MongoDB connection string | | `MONGODB_URL` | Full connection string (preferred) |
| `MONGODB_NAME` | Default database name | | `MONGODB_NAME` | Default database name |
Or use individual MongoDB variables: Or use individual variables:
| Variable | Description | | Variable | Description |
|----------|-------------| | -------------- | ------------- |
| `MONGODB_HOST` | MongoDB hostname | | `MONGODB_HOST` | Hostname |
| `MONGODB_PORT` | MongoDB port | | `MONGODB_PORT` | Port |
| `MONGODB_USER` | Username | | `MONGODB_USER` | Username |
| `MONGODB_PASS` | Password | | `MONGODB_PASS` | Password |
| `MONGODB_NAME` | Database name | | `MONGODB_NAME` | Database name |
@@ -196,8 +207,8 @@ Or use individual MongoDB variables:
tsview works with any S3-compatible storage: tsview works with any S3-compatible storage:
| Provider | Status | | Provider | Status |
|----------|--------| | ----------------------- | --------------------------- |
| **MinIO** | ✅ Perfect for local development | | **MinIO** | ✅ Perfect for local dev |
| **AWS S3** | ✅ Amazon's object storage | | **AWS S3** | ✅ Amazon's object storage |
| **DigitalOcean Spaces** | ✅ Simple object storage | | **DigitalOcean Spaces** | ✅ Simple object storage |
| **Backblaze B2** | ✅ S3-compatible API | | **Backblaze B2** | ✅ S3-compatible API |
@@ -208,17 +219,52 @@ tsview works with any S3-compatible storage:
## Supported File Types for Preview ## Supported File Types for Preview
| Category | Extensions | | Category | Extensions |
|----------|------------| | ------------- | ------------------------------------------------------ |
| **Images** | `.png`, `.jpg`, `.jpeg`, `.gif`, `.webp`, `.svg` | | **Images** | `.png`, `.jpg`, `.jpeg`, `.gif`, `.webp`, `.svg` |
| **Text** | `.txt`, `.md`, `.log`, `.sh`, `.env` | | **Text** | `.txt`, `.md`, `.log`, `.sh`, `.env` |
| **Code** | `.json`, `.js`, `.ts`, `.tsx`, `.jsx`, `.html`, `.css` | | **Code** | `.json`, `.js`, `.ts`, `.tsx`, `.jsx`, `.html`, `.css` |
| **Data** | `.csv`, `.xml`, `.yaml`, `.yml` | | **Data** | `.csv`, `.xml`, `.yaml`, `.yml` |
| **Documents** | `.pdf` | | **Documents** | `.pdf` |
## Architecture
```
tsview/
├── ts/ # Backend
│ ├── api/ # TypedRequest API handlers
│ │ ├── handlers.s3.ts # S3 bucket & object operations
│ │ └── handlers.mongodb.ts # MongoDB CRUD & admin operations
│ ├── config/ # Configuration management
│ ├── server/ # Web server (TypedServer + TypedSocket)
│ ├── streaming/ # Real-time change streaming
│ │ ├── classes.changestream-manager.ts # MongoDB + S3 watchers
│ │ └── interfaces.streaming.ts # Subscription interfaces
│ ├── interfaces/ # Shared TypeScript interfaces
│ └── tsview.classes.tsview.ts # Main class
├── ts_web/ # Frontend
│ ├── elements/ # Web components (LitElement)
│ │ ├── tsview-app.ts # App shell + navigation
│ │ ├── tsview-s3-*.ts # S3 browser components
│ │ ├── tsview-mongo-*.ts # MongoDB browser components
│ │ └── tsview-activity-stream.ts # Real-time activity feed
│ ├── services/ # API + WebSocket clients
│ ├── styles/ # Dark theme
│ └── utilities/ # Formatting helpers
└── cli.ts.js # CLI entry point
```
### How It Works
1. **Backend** — A `TypedServer` serves the bundled web UI and exposes a typed API via `TypedRequest` over HTTP. A `TypedSocket` WebSocket layer handles real-time streaming subscriptions.
2. **Frontend** — LitElement-based web components communicate with the backend via `TypedRequest`. The `ChangeStreamService` connects over WebSocket and distributes real-time events to active views via RxJS Subjects.
3. **Streaming** — The `ChangeStreamManager` creates MongoDB Change Streams and S3 BucketWatchers on demand (one per subscribed resource). Changes are pushed to subscribed clients and accumulated in a 1000-event ring buffer for the Activity Stream view.
## Development ## Development
```bash ```bash
# Clone the repository # Clone
git clone https://code.foss.global/git.zone/tsview.git git clone https://code.foss.global/git.zone/tsview.git
cd tsview cd tsview
@@ -235,29 +281,9 @@ pnpm run watch
pnpm test pnpm test
``` ```
## Architecture
```
tsview/
├── ts/ # Backend TypeScript source
│ ├── api/ # TypedRequest API handlers
│ │ ├── handlers.s3.ts
│ │ └── handlers.mongodb.ts
│ ├── config/ # Configuration management
│ ├── server/ # Web server (TypedServer)
│ ├── interfaces/ # Shared TypeScript interfaces
│ └── tsview.classes.tsview.ts # Main class
├── ts_web/ # Frontend TypeScript source
│ ├── elements/ # Web components (LitElement)
│ ├── services/ # API client service
│ ├── styles/ # Shared theme styles
│ └── utilities/ # Helper functions
└── cli.ts.js # CLI entry point
```
## License and Legal Information ## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file. This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file. **Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@git.zone/tsview', name: '@git.zone/tsview',
version: '1.4.0', version: '1.6.1',
description: 'A CLI tool for viewing S3 and MongoDB data with a web UI' description: 'A CLI tool for viewing S3 and MongoDB data with a web UI'
} }

File diff suppressed because one or more lines are too long

View File

@@ -535,3 +535,8 @@ export interface IReq_GetDatabaseStats extends plugins.typedrequestInterfaces.im
stats: IDatabaseStats | null; stats: IDatabaseStats | null;
}; };
} }
// ===========================================
// Re-export streaming interfaces
// ===========================================
export * from '../streaming/interfaces.streaming.js';

View File

@@ -17,6 +17,7 @@ import * as smartnetwork from '@push.rocks/smartnetwork';
import * as smartopen from '@push.rocks/smartopen'; import * as smartopen from '@push.rocks/smartopen';
import * as smartpath from '@push.rocks/smartpath'; import * as smartpath from '@push.rocks/smartpath';
import * as smartpromise from '@push.rocks/smartpromise'; import * as smartpromise from '@push.rocks/smartpromise';
import * as smartrx from '@push.rocks/smartrx';
export { export {
early, early,
@@ -31,6 +32,7 @@ export {
smartopen, smartopen,
smartpath, smartpath,
smartpromise, smartpromise,
smartrx,
}; };
// AWS S3 SDK for direct S3 operations // AWS S3 SDK for direct S3 operations

View File

@@ -1,7 +1,9 @@
import * as plugins from '../plugins.js'; import * as plugins from '../plugins.js';
import type { TsView } from '../tsview.classes.tsview.js'; import type { TsView } from '../tsview.classes.tsview.js';
import type * as interfaces from '../interfaces/index.js';
import { registerS3Handlers } from '../api/handlers.s3.js'; import { registerS3Handlers } from '../api/handlers.s3.js';
import { registerMongoHandlers } from '../api/handlers.mongodb.js'; import { registerMongoHandlers } from '../api/handlers.mongodb.js';
import { ChangeStreamManager } from '../streaming/index.js';
import { files as bundledUiFiles } from '../bundled_ui.js'; import { files as bundledUiFiles } from '../bundled_ui.js';
/** /**
@@ -11,6 +13,7 @@ export class ViewServer {
private tsview: TsView; private tsview: TsView;
private port: number; private port: number;
private typedServer: plugins.typedserver.TypedServer | null = null; private typedServer: plugins.typedserver.TypedServer | null = null;
private changeStreamManager: ChangeStreamManager | null = null;
public typedrouter: plugins.typedrequest.TypedRouter; public typedrouter: plugins.typedrequest.TypedRouter;
constructor(tsview: TsView, port: number) { constructor(tsview: TsView, port: number) {
@@ -41,14 +44,179 @@ export class ViewServer {
await registerMongoHandlers(this.typedServer.typedrouter, this.tsview); await registerMongoHandlers(this.typedServer.typedrouter, this.tsview);
} }
// Initialize ChangeStreamManager for real-time updates
this.changeStreamManager = new ChangeStreamManager(this.tsview);
// Register streaming handlers
await this.registerStreamingHandlers();
// Start server // Start server
await this.typedServer.start(); await this.typedServer.start();
// Set TypedSocket reference after server starts
if (this.typedServer.typedsocket) {
this.changeStreamManager.setTypedSocket(this.typedServer.typedsocket);
}
}
/**
* Register WebSocket streaming handlers
*/
private async registerStreamingHandlers(): Promise<void> {
if (!this.typedServer || !this.changeStreamManager) return;
const typedrouter = this.typedServer.typedrouter;
// Subscribe to MongoDB collection changes
typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.IReq_SubscribeMongo>(
'subscribeMongo',
async (reqData, context) => {
const connectionId = this.getConnectionId(context);
if (!connectionId) {
return { success: false, subscriptionId: '' };
}
const result = await this.changeStreamManager!.subscribeToMongo(
connectionId,
reqData.database,
reqData.collection
);
return result;
}
)
);
// Unsubscribe from MongoDB collection changes
typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.IReq_UnsubscribeMongo>(
'unsubscribeMongo',
async (reqData, context) => {
const connectionId = this.getConnectionId(context);
if (!connectionId) {
return { success: false };
}
const success = await this.changeStreamManager!.unsubscribeFromMongo(
connectionId,
reqData.database,
reqData.collection
);
return { success };
}
)
);
// Subscribe to S3 bucket changes
typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.IReq_SubscribeS3>(
'subscribeS3',
async (reqData, context) => {
const connectionId = this.getConnectionId(context);
if (!connectionId) {
return { success: false, subscriptionId: '' };
}
const result = await this.changeStreamManager!.subscribeToS3(
connectionId,
reqData.bucket,
reqData.prefix
);
return result;
}
)
);
// Unsubscribe from S3 bucket changes
typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.IReq_UnsubscribeS3>(
'unsubscribeS3',
async (reqData, context) => {
const connectionId = this.getConnectionId(context);
if (!connectionId) {
return { success: false };
}
const success = await this.changeStreamManager!.unsubscribeFromS3(
connectionId,
reqData.bucket,
reqData.prefix
);
return { success };
}
)
);
// Subscribe to activity stream
typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.IReq_SubscribeActivity>(
'subscribeActivity',
async (reqData, context) => {
const connectionId = this.getConnectionId(context);
if (!connectionId) {
return { success: false, subscriptionId: '' };
}
const result = await this.changeStreamManager!.subscribeToActivity(connectionId);
return result;
}
)
);
// Unsubscribe from activity stream
typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.IReq_UnsubscribeActivity>(
'unsubscribeActivity',
async (reqData, context) => {
const connectionId = this.getConnectionId(context);
if (!connectionId) {
return { success: false };
}
const success = await this.changeStreamManager!.unsubscribeFromActivity(connectionId);
return { success };
}
)
);
// Get recent activity events
typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.IReq_GetRecentActivity>(
'getRecentActivity',
async (reqData) => {
const events = this.changeStreamManager!.getRecentActivity(reqData.limit || 100);
return { events };
}
)
);
}
/**
* Extract connection ID from request context
*/
private getConnectionId(context: any): string | null {
// Try to get connection ID from WebSocket context
if (context?.socketConnection?.socketId) {
return context.socketConnection.socketId;
}
if (context?.socketConnection?.alias) {
return context.socketConnection.alias;
}
// Fallback: generate a unique ID for HTTP requests
// Note: Real-time streaming requires WebSocket connection
return null;
} }
/** /**
* Stop the server * Stop the server
*/ */
public async stop(): Promise<void> { public async stop(): Promise<void> {
// Stop change stream manager first
if (this.changeStreamManager) {
await this.changeStreamManager.stop();
this.changeStreamManager = null;
}
if (this.typedServer) { if (this.typedServer) {
await this.typedServer.stop(); await this.typedServer.stop();
this.typedServer = null; this.typedServer = null;

View File

@@ -0,0 +1,591 @@
import * as plugins from '../plugins.js';
import type { TsView } from '../tsview.classes.tsview.js';
import type * as interfaces from './interfaces.streaming.js';
import type { IS3ChangeEvent } from '@push.rocks/smartbucket';
/**
* Subscription entry tracking a client's subscription to a resource
*/
interface ISubscriptionEntry {
connectionId: string;
subscriptionId: string;
createdAt: Date;
}
/**
* MongoDB watcher entry
*/
interface IMongoWatcherEntry {
watcher: plugins.mongodb.ChangeStream;
subscriptions: Map<string, ISubscriptionEntry>; // connectionId -> subscription
}
/**
* S3 watcher entry
*/
interface IS3WatcherEntry {
watcher: plugins.smartbucket.BucketWatcher;
subscriptions: Map<string, ISubscriptionEntry>; // connectionId -> subscription
}
/**
* ChangeStreamManager manages real-time change streaming for both MongoDB and S3.
*
* Features:
* - MongoDB Change Streams for real-time database updates
* - S3 BucketWatcher for polling-based S3 change detection
* - Subscription management per WebSocket client
* - Activity stream with ring buffer for recent events
* - Automatic cleanup on client disconnect
*/
export class ChangeStreamManager {
private tsview: TsView;
private typedSocket: plugins.typedserver.TypedServer['typedsocket'] | null = null;
// MongoDB watchers: "db/collection" -> watcher entry
private mongoWatchers: Map<string, IMongoWatcherEntry> = new Map();
// S3 watchers: "bucket/prefix" -> watcher entry
private s3Watchers: Map<string, IS3WatcherEntry> = new Map();
// Activity subscribers: connectionId -> subscription entry
private activitySubscribers: Map<string, ISubscriptionEntry> = new Map();
// Activity ring buffer (max 1000 events)
private activityBuffer: interfaces.IActivityEvent[] = [];
private readonly ACTIVITY_BUFFER_SIZE = 1000;
// Counter for generating unique subscription IDs
private subscriptionCounter = 0;
constructor(tsview: TsView) {
this.tsview = tsview;
}
/**
* Initialize the manager with a TypedSocket instance
*/
public setTypedSocket(typedSocket: plugins.typedserver.TypedServer['typedsocket']): void {
this.typedSocket = typedSocket;
}
/**
* Generate a unique subscription ID
*/
private generateSubscriptionId(): string {
return `sub_${Date.now()}_${++this.subscriptionCounter}`;
}
/**
* Get the MongoDB key for a database/collection pair
*/
private getMongoKey(database: string, collection: string): string {
return `${database}/${collection}`;
}
/**
* Get the S3 key for a bucket/prefix pair
*/
private getS3Key(bucket: string, prefix?: string): string {
return prefix ? `${bucket}/${prefix}` : bucket;
}
// ===========================================
// MongoDB Change Streams
// ===========================================
/**
* Subscribe a client to MongoDB collection changes
*/
public async subscribeToMongo(
connectionId: string,
database: string,
collection: string
): Promise<{ success: boolean; subscriptionId: string }> {
const key = this.getMongoKey(database, collection);
let entry = this.mongoWatchers.get(key);
// Create watcher if it doesn't exist
if (!entry) {
const watcher = await this.createMongoWatcher(database, collection);
if (!watcher) {
return { success: false, subscriptionId: '' };
}
entry = {
watcher,
subscriptions: new Map(),
};
this.mongoWatchers.set(key, entry);
}
// Add subscription
const subscriptionId = this.generateSubscriptionId();
entry.subscriptions.set(connectionId, {
connectionId,
subscriptionId,
createdAt: new Date(),
});
console.log(`[ChangeStream] MongoDB subscription added: ${key} for connection ${connectionId}`);
return { success: true, subscriptionId };
}
/**
* Unsubscribe a client from MongoDB collection changes
*/
public async unsubscribeFromMongo(
connectionId: string,
database: string,
collection: string
): Promise<boolean> {
const key = this.getMongoKey(database, collection);
const entry = this.mongoWatchers.get(key);
if (!entry) {
return false;
}
entry.subscriptions.delete(connectionId);
console.log(`[ChangeStream] MongoDB subscription removed: ${key} for connection ${connectionId}`);
// Close watcher if no more subscribers
if (entry.subscriptions.size === 0) {
await this.closeMongoWatcher(key);
}
return true;
}
/**
* Create a MongoDB change stream for a collection
*/
private async createMongoWatcher(
database: string,
collection: string
): Promise<plugins.mongodb.ChangeStream | null> {
try {
const db = await this.tsview.getMongoDb();
if (!db) {
console.error('[ChangeStream] MongoDB not configured');
return null;
}
const client = (db as any).mongoDbClient;
const mongoDb = client.db(database);
const mongoCollection = mongoDb.collection(collection);
// Create change stream
const changeStream = mongoCollection.watch([], {
fullDocument: 'updateLookup',
});
// Handle change events
changeStream.on('change', (change: any) => {
this.handleMongoChange(database, collection, change);
});
changeStream.on('error', (error: Error) => {
console.error(`[ChangeStream] MongoDB error for ${database}/${collection}:`, error);
});
console.log(`[ChangeStream] MongoDB watcher created for ${database}/${collection}`);
return changeStream;
} catch (error) {
console.error(`[ChangeStream] Failed to create MongoDB watcher for ${database}/${collection}:`, error);
return null;
}
}
/**
* Handle a MongoDB change event
*/
private handleMongoChange(database: string, collection: string, change: any): void {
const key = this.getMongoKey(database, collection);
const entry = this.mongoWatchers.get(key);
if (!entry) return;
// Convert MongoDB change event to our interface
const event: interfaces.IMongoChangeEvent = {
type: change.operationType as interfaces.IMongoChangeEvent['type'],
database,
collection,
documentId: change.documentKey?._id?.toString(),
document: change.fullDocument,
updateDescription: change.updateDescription,
timestamp: new Date().toISOString(),
};
// Add to activity buffer
this.addToActivityBuffer('mongodb', event);
// Push to all subscribed clients
this.pushMongoChangeToClients(key, event);
}
/**
* Push MongoDB change to subscribed clients
*/
private async pushMongoChangeToClients(
key: string,
event: interfaces.IMongoChangeEvent
): Promise<void> {
const entry = this.mongoWatchers.get(key);
if (!entry || !this.typedSocket) return;
for (const [connectionId, _sub] of entry.subscriptions) {
try {
// Find the connection and push the event
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
return conn.alias === connectionId || conn.socketId === connectionId;
});
if (connection) {
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushMongoChange>(
'pushMongoChange',
connection
);
await request.fire({ event });
}
} catch (error) {
console.error(`[ChangeStream] Failed to push MongoDB change to ${connectionId}:`, error);
}
}
}
/**
* Close a MongoDB change stream
*/
private async closeMongoWatcher(key: string): Promise<void> {
const entry = this.mongoWatchers.get(key);
if (!entry) return;
try {
await entry.watcher.close();
this.mongoWatchers.delete(key);
console.log(`[ChangeStream] MongoDB watcher closed for ${key}`);
} catch (error) {
console.error(`[ChangeStream] Error closing MongoDB watcher for ${key}:`, error);
}
}
// ===========================================
// S3 Change Watching
// ===========================================
/**
* Subscribe a client to S3 bucket/prefix changes
*/
public async subscribeToS3(
connectionId: string,
bucket: string,
prefix?: string
): Promise<{ success: boolean; subscriptionId: string }> {
const key = this.getS3Key(bucket, prefix);
let entry = this.s3Watchers.get(key);
// Create watcher if it doesn't exist
if (!entry) {
const watcher = await this.createS3Watcher(bucket, prefix);
if (!watcher) {
return { success: false, subscriptionId: '' };
}
entry = {
watcher,
subscriptions: new Map(),
};
this.s3Watchers.set(key, entry);
}
// Add subscription
const subscriptionId = this.generateSubscriptionId();
entry.subscriptions.set(connectionId, {
connectionId,
subscriptionId,
createdAt: new Date(),
});
console.log(`[ChangeStream] S3 subscription added: ${key} for connection ${connectionId}`);
return { success: true, subscriptionId };
}
/**
* Unsubscribe a client from S3 bucket/prefix changes
*/
public async unsubscribeFromS3(
connectionId: string,
bucket: string,
prefix?: string
): Promise<boolean> {
const key = this.getS3Key(bucket, prefix);
const entry = this.s3Watchers.get(key);
if (!entry) {
return false;
}
entry.subscriptions.delete(connectionId);
console.log(`[ChangeStream] S3 subscription removed: ${key} for connection ${connectionId}`);
// Close watcher if no more subscribers
if (entry.subscriptions.size === 0) {
await this.closeS3Watcher(key);
}
return true;
}
/**
* Create an S3 bucket watcher
*/
private async createS3Watcher(
bucket: string,
prefix?: string
): Promise<plugins.smartbucket.BucketWatcher | null> {
try {
const smartbucket = await this.tsview.getSmartBucket();
if (!smartbucket) {
console.error('[ChangeStream] S3 not configured');
return null;
}
const bucketInstance = await smartbucket.getBucketByName(bucket);
// Create watcher using smartbucket's BucketWatcher
const watcher = bucketInstance.createWatcher({
prefix: prefix || '',
pollIntervalMs: 5000,
bufferTimeMs: 500,
});
// Subscribe to change events
watcher.changeSubject.subscribe((eventOrEvents: IS3ChangeEvent | IS3ChangeEvent[]) => {
const events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents];
for (const event of events) {
this.handleS3Change(bucket, prefix, event);
}
});
// Start the watcher
await watcher.start();
await watcher.readyDeferred.promise;
console.log(`[ChangeStream] S3 watcher created for ${bucket}${prefix ? '/' + prefix : ''}`);
return watcher;
} catch (error) {
console.error(`[ChangeStream] Failed to create S3 watcher for ${bucket}:`, error);
return null;
}
}
/**
* Handle an S3 change event
*/
private handleS3Change(bucket: string, prefix: string | undefined, event: IS3ChangeEvent): void {
const key = this.getS3Key(bucket, prefix);
const entry = this.s3Watchers.get(key);
if (!entry) return;
// Add to activity buffer
this.addToActivityBuffer('s3', event);
// Push to all subscribed clients
this.pushS3ChangeToClients(key, event);
}
/**
* Push S3 change to subscribed clients
*/
private async pushS3ChangeToClients(
key: string,
event: IS3ChangeEvent
): Promise<void> {
const entry = this.s3Watchers.get(key);
if (!entry || !this.typedSocket) return;
for (const [connectionId, _sub] of entry.subscriptions) {
try {
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
return conn.alias === connectionId || conn.socketId === connectionId;
});
if (connection) {
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushS3Change>(
'pushS3Change',
connection
);
await request.fire({ event });
}
} catch (error) {
console.error(`[ChangeStream] Failed to push S3 change to ${connectionId}:`, error);
}
}
}
/**
* Close an S3 bucket watcher
*/
private async closeS3Watcher(key: string): Promise<void> {
const entry = this.s3Watchers.get(key);
if (!entry) return;
try {
await entry.watcher.stop();
this.s3Watchers.delete(key);
console.log(`[ChangeStream] S3 watcher closed for ${key}`);
} catch (error) {
console.error(`[ChangeStream] Error closing S3 watcher for ${key}:`, error);
}
}
// ===========================================
// Activity Stream
// ===========================================
/**
* Subscribe a client to the activity stream
*/
public async subscribeToActivity(connectionId: string): Promise<{ success: boolean; subscriptionId: string }> {
const subscriptionId = this.generateSubscriptionId();
this.activitySubscribers.set(connectionId, {
connectionId,
subscriptionId,
createdAt: new Date(),
});
console.log(`[ChangeStream] Activity subscription added for connection ${connectionId}`);
return { success: true, subscriptionId };
}
/**
* Unsubscribe a client from the activity stream
*/
public async unsubscribeFromActivity(connectionId: string): Promise<boolean> {
const result = this.activitySubscribers.delete(connectionId);
if (result) {
console.log(`[ChangeStream] Activity subscription removed for connection ${connectionId}`);
}
return result;
}
/**
* Get recent activity events
*/
public getRecentActivity(limit: number = 100): interfaces.IActivityEvent[] {
const count = Math.min(limit, this.activityBuffer.length);
return this.activityBuffer.slice(-count);
}
/**
* Add an event to the activity buffer
*/
private addToActivityBuffer(
source: 'mongodb' | 's3',
event: interfaces.IMongoChangeEvent | IS3ChangeEvent
): void {
const activityEvent: interfaces.IActivityEvent = {
id: `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
source,
event,
timestamp: new Date().toISOString(),
};
this.activityBuffer.push(activityEvent);
// Trim buffer if it exceeds max size
if (this.activityBuffer.length > this.ACTIVITY_BUFFER_SIZE) {
this.activityBuffer = this.activityBuffer.slice(-this.ACTIVITY_BUFFER_SIZE);
}
// Push to activity subscribers
this.pushActivityToClients(activityEvent);
}
/**
* Push activity event to subscribed clients
*/
private async pushActivityToClients(event: interfaces.IActivityEvent): Promise<void> {
if (!this.typedSocket || this.activitySubscribers.size === 0) return;
for (const [connectionId, _sub] of this.activitySubscribers) {
try {
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
return conn.alias === connectionId || conn.socketId === connectionId;
});
if (connection) {
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushActivityEvent>(
'pushActivityEvent',
connection
);
await request.fire({ event });
}
} catch (error) {
console.error(`[ChangeStream] Failed to push activity to ${connectionId}:`, error);
}
}
}
// ===========================================
// Connection Management
// ===========================================
/**
* Handle client disconnect - clean up all subscriptions
*/
public async handleDisconnect(connectionId: string): Promise<void> {
console.log(`[ChangeStream] Cleaning up subscriptions for disconnected connection ${connectionId}`);
// Clean up MongoDB subscriptions
for (const [key, entry] of this.mongoWatchers) {
if (entry.subscriptions.has(connectionId)) {
entry.subscriptions.delete(connectionId);
if (entry.subscriptions.size === 0) {
await this.closeMongoWatcher(key);
}
}
}
// Clean up S3 subscriptions
for (const [key, entry] of this.s3Watchers) {
if (entry.subscriptions.has(connectionId)) {
entry.subscriptions.delete(connectionId);
if (entry.subscriptions.size === 0) {
await this.closeS3Watcher(key);
}
}
}
// Clean up activity subscription
this.activitySubscribers.delete(connectionId);
}
/**
* Stop all watchers and clean up
*/
public async stop(): Promise<void> {
console.log('[ChangeStream] Stopping all watchers...');
// Close all MongoDB watchers
for (const key of this.mongoWatchers.keys()) {
await this.closeMongoWatcher(key);
}
// Close all S3 watchers
for (const key of this.s3Watchers.keys()) {
await this.closeS3Watcher(key);
}
// Clear activity buffer and subscribers
this.activityBuffer = [];
this.activitySubscribers.clear();
console.log('[ChangeStream] All watchers stopped');
}
}

2
ts/streaming/index.ts Normal file
View File

@@ -0,0 +1,2 @@
export * from './interfaces.streaming.js';
export * from './classes.changestream-manager.js';

View File

@@ -0,0 +1,212 @@
import type * as plugins from '../plugins.js';
// Re-export S3 change event from smartbucket
export type { IS3ChangeEvent } from '@push.rocks/smartbucket';
/**
* MongoDB change event - wraps smartdata watcher output
*/
export interface IMongoChangeEvent {
type: 'insert' | 'update' | 'delete' | 'replace' | 'drop' | 'invalidate';
database: string;
collection: string;
documentId?: string;
document?: Record<string, unknown>;
updateDescription?: {
updatedFields?: Record<string, unknown>;
removedFields?: string[];
};
timestamp: string;
}
/**
* Combined activity event for the activity stream
*/
export interface IActivityEvent {
id: string;
source: 'mongodb' | 's3';
event: IMongoChangeEvent | import('@push.rocks/smartbucket').IS3ChangeEvent;
timestamp: string;
}
// ===========================================
// TypedRequest interfaces for streaming subscriptions
// ===========================================
/**
* Subscribe to MongoDB collection changes
*/
export interface IReq_SubscribeMongo extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_SubscribeMongo
> {
method: 'subscribeMongo';
request: {
database: string;
collection: string;
};
response: {
success: boolean;
subscriptionId: string;
};
}
/**
* Unsubscribe from MongoDB collection changes
*/
export interface IReq_UnsubscribeMongo extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_UnsubscribeMongo
> {
method: 'unsubscribeMongo';
request: {
database: string;
collection: string;
};
response: {
success: boolean;
};
}
/**
* Subscribe to S3 bucket/prefix changes
*/
export interface IReq_SubscribeS3 extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_SubscribeS3
> {
method: 'subscribeS3';
request: {
bucket: string;
prefix?: string;
};
response: {
success: boolean;
subscriptionId: string;
};
}
/**
* Unsubscribe from S3 bucket/prefix changes
*/
export interface IReq_UnsubscribeS3 extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_UnsubscribeS3
> {
method: 'unsubscribeS3';
request: {
bucket: string;
prefix?: string;
};
response: {
success: boolean;
};
}
/**
* Subscribe to activity stream (all changes from MongoDB and S3)
*/
export interface IReq_SubscribeActivity extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_SubscribeActivity
> {
method: 'subscribeActivity';
request: {};
response: {
success: boolean;
subscriptionId: string;
};
}
/**
* Unsubscribe from activity stream
*/
export interface IReq_UnsubscribeActivity extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_UnsubscribeActivity
> {
method: 'unsubscribeActivity';
request: {};
response: {
success: boolean;
};
}
/**
* Get recent activity events (for initial load or reconnection)
*/
export interface IReq_GetRecentActivity extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_GetRecentActivity
> {
method: 'getRecentActivity';
request: {
limit?: number; // Default: 100
};
response: {
events: IActivityEvent[];
};
}
// ===========================================
// Server-to-client push event interfaces
// ===========================================
/**
* Server pushes MongoDB change to client
*/
export interface IReq_PushMongoChange extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_PushMongoChange
> {
method: 'pushMongoChange';
request: {
event: IMongoChangeEvent;
};
response: {
received: boolean;
};
}
/**
* Server pushes S3 change to client
*/
export interface IReq_PushS3Change extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_PushS3Change
> {
method: 'pushS3Change';
request: {
event: import('@push.rocks/smartbucket').IS3ChangeEvent;
};
response: {
received: boolean;
};
}
/**
* Server pushes activity event to client
*/
export interface IReq_PushActivityEvent extends plugins.typedrequestInterfaces.implementsTR<
plugins.typedrequestInterfaces.ITypedRequest,
IReq_PushActivityEvent
> {
method: 'pushActivityEvent';
request: {
event: IActivityEvent;
};
response: {
received: boolean;
};
}
/**
* Connection tag for tracking subscriptions
*/
export interface ISubscriptionTag extends plugins.typedrequestInterfaces.ITag {
name: 'subscription';
payload: {
type: 'mongo' | 's3' | 'activity';
key: string; // e.g., "db/collection" or "bucket/prefix" or "activity"
};
}

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@git.zone/tsview', name: '@git.zone/tsview',
version: '1.4.0', version: '1.6.1',
description: 'A CLI tool for viewing S3 and MongoDB data with a web UI' description: 'A CLI tool for viewing S3 and MongoDB data with a web UI'
} }

View File

@@ -14,3 +14,6 @@ export * from './tsview-mongo-documents.js';
export * from './tsview-mongo-document.js'; export * from './tsview-mongo-document.js';
export * from './tsview-mongo-indexes.js'; export * from './tsview-mongo-indexes.js';
export * from './tsview-mongo-db-overview.js'; export * from './tsview-mongo-db-overview.js';
// Activity stream component
export * from './tsview-activity-stream.js';

View File

@@ -0,0 +1,561 @@
import * as plugins from '../plugins.js';
import { changeStreamService, type IActivityEvent, type IMongoChangeEvent, type IS3ChangeEvent } from '../services/index.js';
import { themeStyles } from '../styles/index.js';
const { html, css, cssManager, customElement, property, state, DeesElement } = plugins;
type TFilterMode = 'all' | 'mongodb' | 's3';
@customElement('tsview-activity-stream')
export class TsviewActivityStream extends DeesElement {
@state()
private accessor events: IActivityEvent[] = [];
@state()
private accessor filterMode: TFilterMode = 'all';
@state()
private accessor isConnected: boolean = false;
@state()
private accessor isLoading: boolean = true;
@state()
private accessor autoScroll: boolean = true;
private subscription: plugins.smartrx.rxjs.Subscription | null = null;
private connectionSubscription: plugins.smartrx.rxjs.Subscription | null = null;
public static styles = [
cssManager.defaultStyles,
themeStyles,
css`
:host {
display: block;
height: 100%;
overflow: hidden;
}
.activity-container {
display: flex;
flex-direction: column;
height: 100%;
}
.header {
display: flex;
align-items: center;
justify-content: space-between;
padding: 16px;
border-bottom: 1px solid #333;
}
.header-title {
font-size: 18px;
font-weight: 600;
color: #fff;
display: flex;
align-items: center;
gap: 12px;
}
.connection-status {
display: flex;
align-items: center;
gap: 6px;
font-size: 12px;
font-weight: 400;
}
.status-dot {
width: 8px;
height: 8px;
border-radius: 50%;
}
.status-dot.connected {
background: #22c55e;
box-shadow: 0 0 6px rgba(34, 197, 94, 0.5);
}
.status-dot.disconnected {
background: #ef4444;
}
.status-dot.connecting {
background: #f59e0b;
animation: pulse 1s infinite;
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
.header-controls {
display: flex;
align-items: center;
gap: 12px;
}
.filter-tabs {
display: flex;
gap: 4px;
}
.filter-tab {
padding: 6px 12px;
background: transparent;
border: 1px solid #444;
color: #888;
border-radius: 4px;
cursor: pointer;
font-size: 12px;
transition: all 0.2s;
}
.filter-tab:hover {
border-color: #666;
color: #aaa;
}
.filter-tab.active {
background: rgba(255, 255, 255, 0.1);
border-color: #666;
color: #e0e0e0;
}
.auto-scroll-toggle {
display: flex;
align-items: center;
gap: 6px;
font-size: 12px;
color: #888;
cursor: pointer;
}
.auto-scroll-toggle input {
cursor: pointer;
}
.events-list {
flex: 1;
overflow-y: auto;
padding: 12px;
}
.event-item {
display: flex;
gap: 12px;
padding: 12px;
margin-bottom: 8px;
background: rgba(0, 0, 0, 0.2);
border-radius: 8px;
cursor: pointer;
transition: background 0.1s;
}
.event-item:hover {
background: rgba(255, 255, 255, 0.05);
}
.event-icon {
width: 36px;
height: 36px;
border-radius: 8px;
display: flex;
align-items: center;
justify-content: center;
flex-shrink: 0;
}
.event-icon.mongodb {
background: rgba(16, 185, 129, 0.2);
color: #10b981;
}
.event-icon.s3 {
background: rgba(245, 158, 11, 0.2);
color: #f59e0b;
}
.event-icon svg {
width: 18px;
height: 18px;
}
.event-content {
flex: 1;
min-width: 0;
}
.event-header {
display: flex;
align-items: center;
justify-content: space-between;
margin-bottom: 4px;
}
.event-title {
font-size: 13px;
font-weight: 500;
color: #e0e0e0;
}
.event-time {
font-size: 11px;
color: #666;
}
.event-details {
font-size: 12px;
color: #888;
font-family: monospace;
}
.event-type {
display: inline-block;
padding: 2px 6px;
border-radius: 4px;
font-size: 10px;
font-weight: 500;
text-transform: uppercase;
margin-right: 8px;
}
.event-type.insert, .event-type.add {
background: rgba(34, 197, 94, 0.2);
color: #4ade80;
}
.event-type.update, .event-type.modify {
background: rgba(59, 130, 246, 0.2);
color: #60a5fa;
}
.event-type.delete {
background: rgba(239, 68, 68, 0.2);
color: #f87171;
}
.event-type.replace {
background: rgba(168, 85, 247, 0.2);
color: #c084fc;
}
.event-type.drop, .event-type.invalidate {
background: rgba(239, 68, 68, 0.3);
color: #f87171;
}
.empty-state {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100%;
color: #666;
gap: 16px;
}
.empty-state svg {
width: 64px;
height: 64px;
opacity: 0.5;
}
.empty-state p {
font-size: 14px;
}
.loading-state {
display: flex;
align-items: center;
justify-content: center;
height: 100%;
color: #888;
}
.event-path {
color: #aaa;
word-break: break-all;
}
`,
];
async connectedCallback() {
super.connectedCallback();
await this.initializeStreaming();
}
disconnectedCallback() {
super.disconnectedCallback();
this.cleanup();
}
private async initializeStreaming() {
this.isLoading = true;
try {
// Connect to WebSocket if not connected
await changeStreamService.connect();
// Subscribe to connection status
this.connectionSubscription = changeStreamService.connectionStatus$.subscribe((status) => {
this.isConnected = status === 'connected';
});
// Subscribe to activity stream
await changeStreamService.subscribeToActivity();
// Load recent events
const recentEvents = await changeStreamService.getRecentActivity(100);
this.events = recentEvents;
// Subscribe to new events
this.subscription = changeStreamService.getActivityStream().subscribe((event) => {
this.events = [...this.events, event].slice(-500); // Keep last 500 events
// Auto-scroll if enabled
if (this.autoScroll) {
this.scrollToBottom();
}
});
this.isConnected = true;
} catch (error) {
console.error('Failed to initialize activity stream:', error);
this.isConnected = false;
}
this.isLoading = false;
}
private cleanup() {
if (this.subscription) {
this.subscription.unsubscribe();
this.subscription = null;
}
if (this.connectionSubscription) {
this.connectionSubscription.unsubscribe();
this.connectionSubscription = null;
}
changeStreamService.unsubscribeFromActivity();
}
private scrollToBottom() {
requestAnimationFrame(() => {
const list = this.shadowRoot?.querySelector('.events-list');
if (list) {
list.scrollTop = list.scrollHeight;
}
});
}
private setFilterMode(mode: TFilterMode) {
this.filterMode = mode;
}
private toggleAutoScroll() {
this.autoScroll = !this.autoScroll;
}
private get filteredEvents(): IActivityEvent[] {
if (this.filterMode === 'all') {
return this.events;
}
return this.events.filter((e) => e.source === this.filterMode);
}
private formatTime(timestamp: string): string {
const date = new Date(timestamp);
return date.toLocaleTimeString('en-US', {
hour: '2-digit',
minute: '2-digit',
second: '2-digit',
});
}
private formatRelativeTime(timestamp: string): string {
const date = new Date(timestamp);
const now = new Date();
const diff = now.getTime() - date.getTime();
if (diff < 60000) {
return 'just now';
} else if (diff < 3600000) {
const mins = Math.floor(diff / 60000);
return `${mins}m ago`;
} else if (diff < 86400000) {
const hours = Math.floor(diff / 3600000);
return `${hours}h ago`;
} else {
return date.toLocaleDateString();
}
}
private getEventTitle(event: IActivityEvent): string {
if (event.source === 'mongodb') {
const mongoEvent = event.event as IMongoChangeEvent;
return `${mongoEvent.database}.${mongoEvent.collection}`;
} else {
const s3Event = event.event as IS3ChangeEvent;
return s3Event.bucket;
}
}
private getEventDetails(event: IActivityEvent): string {
if (event.source === 'mongodb') {
const mongoEvent = event.event as IMongoChangeEvent;
if (mongoEvent.documentId) {
return `Document: ${mongoEvent.documentId}`;
}
return '';
} else {
const s3Event = event.event as IS3ChangeEvent;
return s3Event.key;
}
}
private getEventType(event: IActivityEvent): string {
return event.event.type;
}
private handleEventClick(event: IActivityEvent) {
// Dispatch navigation event
if (event.source === 'mongodb') {
const mongoEvent = event.event as IMongoChangeEvent;
this.dispatchEvent(
new CustomEvent('navigate-to-mongo', {
detail: {
database: mongoEvent.database,
collection: mongoEvent.collection,
documentId: mongoEvent.documentId,
},
bubbles: true,
composed: true,
})
);
} else {
const s3Event = event.event as IS3ChangeEvent;
this.dispatchEvent(
new CustomEvent('navigate-to-s3', {
detail: {
bucket: s3Event.bucket,
key: s3Event.key,
},
bubbles: true,
composed: true,
})
);
}
}
private renderMongoIcon() {
return html`
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<ellipse cx="12" cy="5" rx="9" ry="3"></ellipse>
<path d="M21 12c0 1.66-4 3-9 3s-9-1.34-9-3"></path>
<path d="M3 5v14c0 1.66 4 3 9 3s9-1.34 9-3V5"></path>
</svg>
`;
}
private renderS3Icon() {
return html`
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M22 19a2 2 0 0 1-2 2H4a2 2 0 0 1-2-2V5a2 2 0 0 1 2-2h5l2 3h9a2 2 0 0 1 2 2z"></path>
</svg>
`;
}
private getConnectionStatusText(): string {
if (this.isConnected) {
return 'Live';
}
return 'Disconnected';
}
render() {
return html`
<div class="activity-container">
<div class="header">
<div class="header-title">
Activity Stream
<div class="connection-status">
<span class="status-dot ${this.isConnected ? 'connected' : 'disconnected'}"></span>
${this.getConnectionStatusText()}
</div>
</div>
<div class="header-controls">
<div class="filter-tabs">
<button
class="filter-tab ${this.filterMode === 'all' ? 'active' : ''}"
@click=${() => this.setFilterMode('all')}
>
All
</button>
<button
class="filter-tab ${this.filterMode === 'mongodb' ? 'active' : ''}"
@click=${() => this.setFilterMode('mongodb')}
>
MongoDB
</button>
<button
class="filter-tab ${this.filterMode === 's3' ? 'active' : ''}"
@click=${() => this.setFilterMode('s3')}
>
S3
</button>
</div>
<label class="auto-scroll-toggle">
<input
type="checkbox"
.checked=${this.autoScroll}
@change=${this.toggleAutoScroll}
/>
Auto-scroll
</label>
</div>
</div>
<div class="events-list">
${this.isLoading
? html`<div class="loading-state">Connecting to activity stream...</div>`
: this.filteredEvents.length === 0
? html`
<div class="empty-state">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M22 12h-4l-3 9L9 3l-3 9H2"></path>
</svg>
<p>No activity yet. Changes will appear here in real-time.</p>
</div>
`
: this.filteredEvents.map(
(event) => html`
<div class="event-item" @click=${() => this.handleEventClick(event)}>
<div class="event-icon ${event.source}">
${event.source === 'mongodb' ? this.renderMongoIcon() : this.renderS3Icon()}
</div>
<div class="event-content">
<div class="event-header">
<div class="event-title">
<span class="event-type ${this.getEventType(event)}">${this.getEventType(event)}</span>
${this.getEventTitle(event)}
</div>
<div class="event-time" title=${this.formatTime(event.timestamp)}>
${this.formatRelativeTime(event.timestamp)}
</div>
</div>
<div class="event-details">
<span class="event-path">${this.getEventDetails(event)}</span>
</div>
</div>
</div>
`
)}
</div>
</div>
`;
}
}

View File

@@ -1,11 +1,11 @@
import * as plugins from '../plugins.js'; import * as plugins from '../plugins.js';
import { apiService } from '../services/index.js'; import { apiService, changeStreamService } from '../services/index.js';
import { themeStyles } from '../styles/index.js'; import { themeStyles } from '../styles/index.js';
const { html, css, cssManager, customElement, state, DeesElement } = plugins; const { html, css, cssManager, customElement, state, DeesElement } = plugins;
const { DeesContextmenu } = plugins.deesCatalog; const { DeesContextmenu } = plugins.deesCatalog;
type TViewMode = 's3' | 'mongo' | 'settings'; type TViewMode = 's3' | 'mongo' | 'activity' | 'settings';
@customElement('tsview-app') @customElement('tsview-app')
export class TsviewApp extends DeesElement { export class TsviewApp extends DeesElement {
@@ -422,6 +422,17 @@ export class TsviewApp extends DeesElement {
async connectedCallback() { async connectedCallback() {
super.connectedCallback(); super.connectedCallback();
await this.loadData(); await this.loadData();
// Initialize WebSocket connection for real-time updates
this.initializeChangeStream();
}
private async initializeChangeStream() {
try {
await changeStreamService.connect();
console.log('[TsviewApp] ChangeStream connected');
} catch (error) {
console.warn('[TsviewApp] Failed to connect to ChangeStream:', error);
}
} }
private async loadData() { private async loadData() {
@@ -729,6 +740,12 @@ export class TsviewApp extends DeesElement {
> >
MongoDB MongoDB
</button> </button>
<button
class="nav-tab ${this.viewMode === 'activity' ? 'active' : ''}"
@click=${() => this.setViewMode('activity')}
>
Activity
</button>
<button <button
class="nav-tab ${this.viewMode === 'settings' ? 'active' : ''}" class="nav-tab ${this.viewMode === 'settings' ? 'active' : ''}"
@click=${() => this.setViewMode('settings')} @click=${() => this.setViewMode('settings')}
@@ -940,6 +957,19 @@ export class TsviewApp extends DeesElement {
`; `;
} }
if (this.viewMode === 'activity') {
return html`
<aside class="sidebar">
<div class="sidebar-header">Activity Stream</div>
<div class="sidebar-list">
<div class="sidebar-item" style="color: #888; font-size: 12px; cursor: default;">
Real-time changes from MongoDB and S3 appear here automatically.
</div>
</div>
</aside>
`;
}
return html` return html`
<aside class="sidebar"> <aside class="sidebar">
<div class="sidebar-header">Settings</div> <div class="sidebar-header">Settings</div>
@@ -1048,6 +1078,17 @@ export class TsviewApp extends DeesElement {
`; `;
} }
if (this.viewMode === 'activity') {
return html`
<div class="content-area" style="padding: 0;">
<tsview-activity-stream
@navigate-to-mongo=${this.handleNavigateToMongo}
@navigate-to-s3=${this.handleNavigateToS3}
></tsview-activity-stream>
</div>
`;
}
return html` return html`
<div class="content-area"> <div class="content-area">
<h2>Settings</h2> <h2>Settings</h2>
@@ -1055,4 +1096,21 @@ export class TsviewApp extends DeesElement {
</div> </div>
`; `;
} }
private handleNavigateToMongo(e: CustomEvent) {
const { database, collection, documentId } = e.detail;
this.viewMode = 'mongo';
this.selectedDatabase = database;
this.selectedCollection = collection;
// If documentId is provided, we could potentially scroll to or highlight that document
// For now, just navigate to the collection
}
private handleNavigateToS3(e: CustomEvent) {
const { bucket, key } = e.detail;
this.viewMode = 's3';
this.selectedBucket = bucket;
// The S3 browser will need to be updated to navigate to the specific key
// For now, just navigate to the bucket
}
} }

View File

@@ -1,5 +1,5 @@
import * as plugins from '../plugins.js'; import * as plugins from '../plugins.js';
import { apiService, type ICollectionStats } from '../services/index.js'; import { apiService, changeStreamService, type ICollectionStats, type IMongoChangeEvent } from '../services/index.js';
import { formatSize, formatCount } from '../utilities/index.js'; import { formatSize, formatCount } from '../utilities/index.js';
import { themeStyles } from '../styles/index.js'; import { themeStyles } from '../styles/index.js';
@@ -30,6 +30,14 @@ export class TsviewMongoBrowser extends DeesElement {
@state() @state()
private accessor isResizingEditor: boolean = false; private accessor isResizingEditor: boolean = false;
@state()
private accessor recentChangeCount: number = 0;
@state()
private accessor isStreamConnected: boolean = false;
private changeSubscription: plugins.smartrx.rxjs.Subscription | null = null;
public static styles = [ public static styles = [
cssManager.defaultStyles, cssManager.defaultStyles,
themeStyles, themeStyles,
@@ -148,18 +156,114 @@ export class TsviewMongoBrowser extends DeesElement {
display: none; display: none;
} }
} }
.change-indicator {
display: flex;
align-items: center;
gap: 6px;
padding: 4px 8px;
background: rgba(34, 197, 94, 0.2);
border-radius: 4px;
font-size: 11px;
color: #4ade80;
}
.change-indicator.pulse {
animation: pulse-green 1s ease-in-out;
}
@keyframes pulse-green {
0% { background: rgba(34, 197, 94, 0.4); }
100% { background: rgba(34, 197, 94, 0.2); }
}
.stream-status {
display: flex;
align-items: center;
gap: 4px;
font-size: 11px;
color: #888;
}
.stream-dot {
width: 6px;
height: 6px;
border-radius: 50%;
background: #888;
}
.stream-dot.connected {
background: #22c55e;
}
`, `,
]; ];
async connectedCallback() { async connectedCallback() {
super.connectedCallback(); super.connectedCallback();
await this.loadStats(); await this.loadStats();
this.subscribeToChanges();
}
disconnectedCallback() {
super.disconnectedCallback();
this.unsubscribeFromChanges();
} }
updated(changedProperties: Map<string, unknown>) { updated(changedProperties: Map<string, unknown>) {
if (changedProperties.has('databaseName') || changedProperties.has('collectionName')) { if (changedProperties.has('databaseName') || changedProperties.has('collectionName')) {
this.loadStats(); this.loadStats();
this.selectedDocumentId = ''; this.selectedDocumentId = '';
this.recentChangeCount = 0;
// Re-subscribe to the new collection
this.unsubscribeFromChanges();
this.subscribeToChanges();
}
}
private async subscribeToChanges() {
if (!this.databaseName || !this.collectionName) return;
try {
// Subscribe to collection changes
const success = await changeStreamService.subscribeToCollection(this.databaseName, this.collectionName);
this.isStreamConnected = success;
if (success) {
// Listen for changes
this.changeSubscription = changeStreamService
.getCollectionChanges(this.databaseName, this.collectionName)
.subscribe((event) => {
this.handleChange(event);
});
}
} catch (error) {
console.warn('[MongoBrowser] Failed to subscribe to changes:', error);
this.isStreamConnected = false;
}
}
private unsubscribeFromChanges() {
if (this.changeSubscription) {
this.changeSubscription.unsubscribe();
this.changeSubscription = null;
}
if (this.databaseName && this.collectionName) {
changeStreamService.unsubscribeFromCollection(this.databaseName, this.collectionName);
}
this.isStreamConnected = false;
}
private handleChange(event: IMongoChangeEvent) {
console.log('[MongoBrowser] Received change:', event);
this.recentChangeCount++;
// Refresh stats to reflect changes
this.loadStats();
// Notify the documents component to refresh
const documentsEl = this.shadowRoot?.querySelector('tsview-mongo-documents') as any;
if (documentsEl?.refresh) {
documentsEl.refresh();
} }
} }
@@ -219,6 +323,17 @@ export class TsviewMongoBrowser extends DeesElement {
</div> </div>
` `
: ''} : ''}
<div class="stream-status">
<span class="stream-dot ${this.isStreamConnected ? 'connected' : ''}"></span>
${this.isStreamConnected ? 'Live' : 'Offline'}
</div>
${this.recentChangeCount > 0
? html`
<div class="change-indicator pulse">
${this.recentChangeCount} change${this.recentChangeCount > 1 ? 's' : ''}
</div>
`
: ''}
</div> </div>
<div class="tabs"> <div class="tabs">

View File

@@ -259,6 +259,13 @@ export class TsviewMongoDocuments extends DeesElement {
this.loading = false; this.loading = false;
} }
/**
* Public method to refresh documents (called by parent on change events)
*/
public async refresh() {
await this.loadDocuments();
}
private handleFilterInput(e: Event) { private handleFilterInput(e: Event) {
this.filterText = (e.target as HTMLInputElement).value; this.filterText = (e.target as HTMLInputElement).value;
} }

View File

@@ -1,5 +1,5 @@
import * as plugins from '../plugins.js'; import * as plugins from '../plugins.js';
import { apiService } from '../services/index.js'; import { apiService, changeStreamService, type IS3ChangeEvent } from '../services/index.js';
import { themeStyles } from '../styles/index.js'; import { themeStyles } from '../styles/index.js';
const { html, css, cssManager, customElement, property, state, DeesElement } = plugins; const { html, css, cssManager, customElement, property, state, DeesElement } = plugins;
@@ -29,6 +29,14 @@ export class TsviewS3Browser extends DeesElement {
@state() @state()
private accessor isResizingPreview: boolean = false; private accessor isResizingPreview: boolean = false;
@state()
private accessor recentChangeCount: number = 0;
@state()
private accessor isStreamConnected: boolean = false;
private changeSubscription: plugins.smartrx.rxjs.Subscription | null = null;
public static styles = [ public static styles = [
cssManager.defaultStyles, cssManager.defaultStyles,
themeStyles, themeStyles,
@@ -154,9 +162,61 @@ export class TsviewS3Browser extends DeesElement {
display: none; display: none;
} }
} }
.stream-status {
display: flex;
align-items: center;
gap: 4px;
font-size: 11px;
color: #888;
margin-left: auto;
margin-right: 12px;
}
.stream-dot {
width: 6px;
height: 6px;
border-radius: 50%;
background: #888;
}
.stream-dot.connected {
background: #22c55e;
}
.change-indicator {
display: flex;
align-items: center;
gap: 6px;
padding: 4px 8px;
background: rgba(245, 158, 11, 0.2);
border-radius: 4px;
font-size: 11px;
color: #f59e0b;
margin-right: 12px;
}
.change-indicator.pulse {
animation: pulse-orange 1s ease-in-out;
}
@keyframes pulse-orange {
0% { background: rgba(245, 158, 11, 0.4); }
100% { background: rgba(245, 158, 11, 0.2); }
}
`, `,
]; ];
async connectedCallback() {
super.connectedCallback();
this.subscribeToChanges();
}
disconnectedCallback() {
super.disconnectedCallback();
this.unsubscribeFromChanges();
}
private setViewType(type: TViewType) { private setViewType(type: TViewType) {
this.viewType = type; this.viewType = type;
} }
@@ -185,9 +245,54 @@ export class TsviewS3Browser extends DeesElement {
// Clear selection when bucket changes // Clear selection when bucket changes
this.selectedKey = ''; this.selectedKey = '';
this.currentPrefix = ''; this.currentPrefix = '';
this.recentChangeCount = 0;
// Re-subscribe to the new bucket
this.unsubscribeFromChanges();
this.subscribeToChanges();
} }
} }
private async subscribeToChanges() {
if (!this.bucketName) return;
try {
// Subscribe to bucket changes (with optional prefix)
const success = await changeStreamService.subscribeToBucket(this.bucketName, this.currentPrefix || undefined);
this.isStreamConnected = success;
if (success) {
// Listen for changes
this.changeSubscription = changeStreamService
.getBucketChanges(this.bucketName, this.currentPrefix || undefined)
.subscribe((event) => {
this.handleChange(event);
});
}
} catch (error) {
console.warn('[S3Browser] Failed to subscribe to changes:', error);
this.isStreamConnected = false;
}
}
private unsubscribeFromChanges() {
if (this.changeSubscription) {
this.changeSubscription.unsubscribe();
this.changeSubscription = null;
}
if (this.bucketName) {
changeStreamService.unsubscribeFromBucket(this.bucketName, this.currentPrefix || undefined);
}
this.isStreamConnected = false;
}
private handleChange(event: IS3ChangeEvent) {
console.log('[S3Browser] Received change:', event);
this.recentChangeCount++;
// Trigger refresh of child components
this.refreshKey++;
}
private startPreviewResize = (e: MouseEvent) => { private startPreviewResize = (e: MouseEvent) => {
e.preventDefault(); e.preventDefault();
this.isResizingPreview = true; this.isResizingPreview = true;
@@ -239,6 +344,17 @@ export class TsviewS3Browser extends DeesElement {
})} })}
</div> </div>
<div class="stream-status">
<span class="stream-dot ${this.isStreamConnected ? 'connected' : ''}"></span>
${this.isStreamConnected ? 'Live' : 'Offline'}
</div>
${this.recentChangeCount > 0
? html`
<div class="change-indicator pulse">
${this.recentChangeCount} change${this.recentChangeCount > 1 ? 's' : ''}
</div>
`
: ''}
<div class="view-toggle"> <div class="view-toggle">
<button <button
class="view-btn ${this.viewType === 'columns' ? 'active' : ''}" class="view-btn ${this.viewType === 'columns' ? 'active' : ''}"

View File

@@ -256,10 +256,15 @@ export class TsviewS3Preview extends DeesElement {
try { try {
const result = await apiService.getObject(this.bucketName, this.objectKey); const result = await apiService.getObject(this.bucketName, this.objectKey);
this.content = result.content; if (!result) {
this.contentType = result.contentType; this.error = 'Object not found';
this.size = result.size; this.loading = false;
this.lastModified = result.lastModified; return;
}
this.content = result.content || '';
this.contentType = result.contentType || '';
this.size = result.size || 0;
this.lastModified = result.lastModified || '';
// For text files, decode and store original content // For text files, decode and store original content
if (this.isText()) { if (this.isText()) {

View File

@@ -18,4 +18,9 @@ export const DeesElement = deesElement.DeesElement;
// @api.global scope // @api.global scope
import * as typedrequest from '@api.global/typedrequest'; import * as typedrequest from '@api.global/typedrequest';
export { typedrequest }; import * as typedsocket from '@api.global/typedsocket';
export { typedrequest, typedsocket };
// @push.rocks scope
import * as smartrx from '@push.rocks/smartrx';
export { smartrx };

View File

@@ -0,0 +1,521 @@
import * as plugins from '../plugins.js';
/**
* MongoDB change event
*/
export interface IMongoChangeEvent {
type: 'insert' | 'update' | 'delete' | 'replace' | 'drop' | 'invalidate';
database: string;
collection: string;
documentId?: string;
document?: Record<string, unknown>;
updateDescription?: {
updatedFields?: Record<string, unknown>;
removedFields?: string[];
};
timestamp: string;
}
/**
* S3 change event
*/
export interface IS3ChangeEvent {
type: 'add' | 'modify' | 'delete';
key: string;
size?: number;
etag?: string;
lastModified?: Date;
bucket: string;
}
/**
* Combined activity event
*/
export interface IActivityEvent {
id: string;
source: 'mongodb' | 's3';
event: IMongoChangeEvent | IS3ChangeEvent;
timestamp: string;
}
/**
* Subscription info tracked by the service
*/
interface ISubscription {
type: 'mongo' | 's3' | 'activity';
key: string; // "db/collection" or "bucket/prefix" or "activity"
subscriptionId: string;
}
/**
* ChangeStreamService manages real-time change subscriptions from the browser.
*
* Features:
* - WebSocket connection via TypedSocket
* - Automatic reconnection with subscription restoration
* - RxJS Subjects for reactive UI updates
* - Subscription lifecycle management
*/
export class ChangeStreamService {
private typedSocket: plugins.typedsocket.TypedSocket | null = null;
private isConnected = false;
private isConnecting = false;
private subscriptions: Map<string, ISubscription> = new Map();
// RxJS Subjects for UI components
public readonly mongoChanges$ = new plugins.smartrx.rxjs.Subject<IMongoChangeEvent>();
public readonly s3Changes$ = new plugins.smartrx.rxjs.Subject<IS3ChangeEvent>();
public readonly activityEvents$ = new plugins.smartrx.rxjs.Subject<IActivityEvent>();
public readonly connectionStatus$ = new plugins.smartrx.rxjs.ReplaySubject<'connected' | 'disconnected' | 'connecting'>(1);
constructor() {
// Emit initial disconnected status
this.connectionStatus$.next('disconnected');
}
/**
* Connect to the WebSocket server
*/
public async connect(): Promise<void> {
if (this.isConnected || this.isConnecting) {
return;
}
this.isConnecting = true;
this.connectionStatus$.next('connecting');
try {
// Create client router to handle server-initiated pushes
const clientRouter = new plugins.typedrequest.TypedRouter();
// Register handlers for server push events
this.registerPushHandlers(clientRouter);
// Connect to WebSocket server using current origin
this.typedSocket = await plugins.typedsocket.TypedSocket.createClient(
clientRouter,
plugins.typedsocket.TypedSocket.useWindowLocationOriginUrl()
);
this.isConnected = true;
this.isConnecting = false;
this.connectionStatus$.next('connected');
console.log('[ChangeStream] WebSocket connected');
// Handle reconnection events via statusSubject
this.typedSocket.statusSubject.subscribe((status) => {
if (status === 'disconnected') {
this.handleDisconnect();
} else if (status === 'connected') {
this.handleReconnect();
}
});
} catch (error) {
this.isConnecting = false;
this.connectionStatus$.next('disconnected');
console.error('[ChangeStream] Failed to connect:', error);
throw error;
}
}
/**
* Disconnect from the WebSocket server
*/
public async disconnect(): Promise<void> {
if (!this.typedSocket) {
return;
}
try {
await this.typedSocket.stop();
} catch (error) {
console.error('[ChangeStream] Error during disconnect:', error);
}
this.typedSocket = null;
this.isConnected = false;
this.subscriptions.clear();
this.connectionStatus$.next('disconnected');
console.log('[ChangeStream] WebSocket disconnected');
}
/**
* Register handlers for server push events
*/
private registerPushHandlers(router: plugins.typedrequest.TypedRouter): void {
// Handle MongoDB change push
router.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>(
'pushMongoChange',
async (data: { event: IMongoChangeEvent }) => {
this.mongoChanges$.next(data.event);
return { received: true };
}
)
);
// Handle S3 change push
router.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>(
'pushS3Change',
async (data: { event: IS3ChangeEvent }) => {
this.s3Changes$.next(data.event);
return { received: true };
}
)
);
// Handle activity event push
router.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>(
'pushActivityEvent',
async (data: { event: IActivityEvent }) => {
this.activityEvents$.next(data.event);
return { received: true };
}
)
);
}
/**
* Handle WebSocket disconnection
*/
private handleDisconnect(): void {
this.isConnected = false;
this.connectionStatus$.next('disconnected');
console.log('[ChangeStream] WebSocket disconnected, waiting for reconnect...');
}
/**
* Handle WebSocket reconnection - restore all subscriptions
*/
private async handleReconnect(): Promise<void> {
this.isConnected = true;
this.connectionStatus$.next('connected');
console.log('[ChangeStream] WebSocket reconnected, restoring subscriptions...');
// Restore all subscriptions
const subscriptionsToRestore = Array.from(this.subscriptions.values());
this.subscriptions.clear();
for (const sub of subscriptionsToRestore) {
try {
if (sub.type === 'mongo') {
const [database, collection] = sub.key.split('/');
await this.subscribeToCollection(database, collection);
} else if (sub.type === 's3') {
const parts = sub.key.split('/');
const bucket = parts[0];
const prefix = parts.slice(1).join('/') || undefined;
await this.subscribeToBucket(bucket, prefix);
} else if (sub.type === 'activity') {
await this.subscribeToActivity();
}
} catch (error) {
console.error(`[ChangeStream] Failed to restore subscription ${sub.key}:`, error);
}
}
}
// ===========================================
// MongoDB Subscriptions
// ===========================================
/**
* Subscribe to changes in a MongoDB collection
*/
public async subscribeToCollection(database: string, collection: string): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
console.warn('[ChangeStream] Not connected, cannot subscribe');
return false;
}
const key = `${database}/${collection}`;
// Check if already subscribed
if (this.subscriptions.has(`mongo:${key}`)) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('subscribeMongo');
const response = await request.fire({ database, collection });
if (response.success) {
this.subscriptions.set(`mongo:${key}`, {
type: 'mongo',
key,
subscriptionId: response.subscriptionId,
});
console.log(`[ChangeStream] Subscribed to MongoDB ${key}`);
return true;
}
return false;
} catch (error) {
console.error(`[ChangeStream] Failed to subscribe to MongoDB ${key}:`, error);
return false;
}
}
/**
* Unsubscribe from changes in a MongoDB collection
*/
public async unsubscribeFromCollection(database: string, collection: string): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
return false;
}
const key = `${database}/${collection}`;
const subKey = `mongo:${key}`;
if (!this.subscriptions.has(subKey)) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('unsubscribeMongo');
const response = await request.fire({ database, collection });
if (response.success) {
this.subscriptions.delete(subKey);
console.log(`[ChangeStream] Unsubscribed from MongoDB ${key}`);
}
return response.success;
} catch (error) {
console.error(`[ChangeStream] Failed to unsubscribe from MongoDB ${key}:`, error);
return false;
}
}
/**
* Check if subscribed to a MongoDB collection
*/
public isSubscribedToCollection(database: string, collection: string): boolean {
return this.subscriptions.has(`mongo:${database}/${collection}`);
}
// ===========================================
// S3 Subscriptions
// ===========================================
/**
* Subscribe to changes in an S3 bucket/prefix
*/
public async subscribeToBucket(bucket: string, prefix?: string): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
console.warn('[ChangeStream] Not connected, cannot subscribe');
return false;
}
const key = prefix ? `${bucket}/${prefix}` : bucket;
// Check if already subscribed
if (this.subscriptions.has(`s3:${key}`)) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('subscribeS3');
const response = await request.fire({ bucket, prefix });
if (response.success) {
this.subscriptions.set(`s3:${key}`, {
type: 's3',
key,
subscriptionId: response.subscriptionId,
});
console.log(`[ChangeStream] Subscribed to S3 ${key}`);
return true;
}
return false;
} catch (error) {
console.error(`[ChangeStream] Failed to subscribe to S3 ${key}:`, error);
return false;
}
}
/**
* Unsubscribe from changes in an S3 bucket/prefix
*/
public async unsubscribeFromBucket(bucket: string, prefix?: string): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
return false;
}
const key = prefix ? `${bucket}/${prefix}` : bucket;
const subKey = `s3:${key}`;
if (!this.subscriptions.has(subKey)) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('unsubscribeS3');
const response = await request.fire({ bucket, prefix });
if (response.success) {
this.subscriptions.delete(subKey);
console.log(`[ChangeStream] Unsubscribed from S3 ${key}`);
}
return response.success;
} catch (error) {
console.error(`[ChangeStream] Failed to unsubscribe from S3 ${key}:`, error);
return false;
}
}
/**
* Check if subscribed to an S3 bucket/prefix
*/
public isSubscribedToBucket(bucket: string, prefix?: string): boolean {
const key = prefix ? `${bucket}/${prefix}` : bucket;
return this.subscriptions.has(`s3:${key}`);
}
// ===========================================
// Activity Stream Subscriptions
// ===========================================
/**
* Subscribe to the activity stream
*/
public async subscribeToActivity(): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
console.warn('[ChangeStream] Not connected, cannot subscribe');
return false;
}
// Check if already subscribed
if (this.subscriptions.has('activity:activity')) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('subscribeActivity');
const response = await request.fire({});
if (response.success) {
this.subscriptions.set('activity:activity', {
type: 'activity',
key: 'activity',
subscriptionId: response.subscriptionId,
});
console.log('[ChangeStream] Subscribed to activity stream');
return true;
}
return false;
} catch (error) {
console.error('[ChangeStream] Failed to subscribe to activity stream:', error);
return false;
}
}
/**
* Unsubscribe from the activity stream
*/
public async unsubscribeFromActivity(): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
return false;
}
if (!this.subscriptions.has('activity:activity')) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('unsubscribeActivity');
const response = await request.fire({});
if (response.success) {
this.subscriptions.delete('activity:activity');
console.log('[ChangeStream] Unsubscribed from activity stream');
}
return response.success;
} catch (error) {
console.error('[ChangeStream] Failed to unsubscribe from activity stream:', error);
return false;
}
}
/**
* Get recent activity events
*/
public async getRecentActivity(limit: number = 100): Promise<IActivityEvent[]> {
if (!this.typedSocket || !this.isConnected) {
return [];
}
try {
const request = this.typedSocket.createTypedRequest<any>('getRecentActivity');
const response = await request.fire({ limit });
return response.events || [];
} catch (error) {
console.error('[ChangeStream] Failed to get recent activity:', error);
return [];
}
}
/**
* Check if subscribed to activity stream
*/
public isSubscribedToActivity(): boolean {
return this.subscriptions.has('activity:activity');
}
// ===========================================
// Observables for UI Components
// ===========================================
/**
* Get MongoDB changes as an Observable
*/
public getMongoChanges(): plugins.smartrx.rxjs.Observable<IMongoChangeEvent> {
return this.mongoChanges$.asObservable();
}
/**
* Get S3 changes as an Observable
*/
public getS3Changes(): plugins.smartrx.rxjs.Observable<IS3ChangeEvent> {
return this.s3Changes$.asObservable();
}
/**
* Get activity events as an Observable
*/
public getActivityStream(): plugins.smartrx.rxjs.Observable<IActivityEvent> {
return this.activityEvents$.asObservable();
}
/**
* Get filtered MongoDB changes for a specific collection
*/
public getCollectionChanges(database: string, collection: string): plugins.smartrx.rxjs.Observable<IMongoChangeEvent> {
return this.mongoChanges$.pipe(
plugins.smartrx.rxjs.ops.filter(
(event) => event.database === database && event.collection === collection
)
);
}
/**
* Get filtered S3 changes for a specific bucket/prefix
*/
public getBucketChanges(bucket: string, prefix?: string): plugins.smartrx.rxjs.Observable<IS3ChangeEvent> {
return this.s3Changes$.pipe(
plugins.smartrx.rxjs.ops.filter((event) => {
if (event.bucket !== bucket) return false;
if (prefix && !event.key.startsWith(prefix)) return false;
return true;
})
);
}
}

View File

@@ -1,4 +1,8 @@
export * from './api.service.js'; export * from './api.service.js';
export * from './changestream.service.js';
import { ApiService } from './api.service.js'; import { ApiService } from './api.service.js';
import { ChangeStreamService } from './changestream.service.js';
export const apiService = new ApiService(); export const apiService = new ApiService();
export const changeStreamService = new ChangeStreamService();