Files
smartipc/test/test.streaming.ts

219 lines
7.0 KiB
TypeScript

import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartipc from '../ts/index.js';
import * as smartdelay from '@push.rocks/smartdelay';
import * as plugins from '../ts/smartipc.plugins.js';
import * as fs from 'fs';
import * as path from 'path';
let server: smartipc.IpcServer;
let client: smartipc.IpcClient;
tap.test('setup TCP server and client (streaming)', async () => {
server = smartipc.SmartIpc.createServer({
id: 'stream-test-server',
host: '127.0.0.1',
port: 19876,
heartbeat: false
});
await server.start();
client = smartipc.SmartIpc.createClient({
id: 'stream-test-server',
host: '127.0.0.1',
port: 19876,
clientId: 'stream-client-1',
heartbeat: false
});
await client.connect();
expect(client.getIsConnected()).toBeTrue();
});
tap.test('client -> server streaming large payload', async () => {
// Create ~5MB buffer
const size = 5 * 1024 * 1024 + 123; // add some non-chunk-aligned bytes
const data = Buffer.alloc(size);
for (let i = 0; i < size; i++) data[i] = i % 251;
const received: Buffer[] = [];
const done = new Promise<void>((resolve, reject) => {
server.on('stream', (info: any, readable: plugins.stream.Readable) => {
// only handle our test stream
if (info?.meta?.direction === 'client-to-server') {
readable.on('data', chunk => received.push(Buffer.from(chunk)));
readable.on('end', resolve);
readable.on('error', reject);
}
});
});
// Send stream from client
const readable = plugins.stream.Readable.from(data);
await client.sendStream(readable, { meta: { direction: 'client-to-server' }, chunkSize: 64 * 1024 });
await done;
const result = Buffer.concat(received);
expect(result.length).toEqual(data.length);
expect(result.equals(data)).toBeTrue();
});
tap.test('server -> client streaming large payload', async () => {
const size = 6 * 1024 * 1024 + 7;
const data = Buffer.alloc(size);
for (let i = 0; i < size; i++) data[i] = (i * 7) % 255;
const received: Buffer[] = [];
const done = new Promise<void>((resolve, reject) => {
client.on('stream', (info: any, readable: plugins.stream.Readable) => {
if (info?.meta?.direction === 'server-to-client') {
readable.on('data', chunk => received.push(Buffer.from(chunk)));
readable.on('end', resolve);
readable.on('error', reject);
}
});
});
const readable = plugins.stream.Readable.from(data);
await server.sendStreamToClient('stream-client-1', readable, { meta: { direction: 'server-to-client' }, chunkSize: 64 * 1024 });
await done;
const result = Buffer.concat(received);
expect(result.length).toEqual(data.length);
expect(result.equals(data)).toBeTrue();
});
tap.test('client -> server file transfer to disk', async () => {
const baseTmp1 = path.join(process.cwd(), '.nogit', 'tmp');
fs.mkdirSync(baseTmp1, { recursive: true });
const tmpDir = fs.mkdtempSync(path.join(baseTmp1, 'tmp-'));
const srcPath = path.join(tmpDir, 'src.bin');
const dstPath = path.join(tmpDir, 'dst.bin');
// Prepare file ~1MB
const size = 1024 * 1024 + 333;
const buf = Buffer.alloc(size);
for (let i = 0; i < size; i++) buf[i] = (i * 11) % 255;
fs.writeFileSync(srcPath, buf);
const done = new Promise<void>((resolve, reject) => {
server.on('stream', async (info: any, readable: plugins.stream.Readable) => {
if (info?.meta?.type === 'file' && info?.meta?.basename === 'src.bin') {
try {
await smartipc.pipeStreamToFile(readable, dstPath);
resolve();
} catch (e) {
reject(e);
}
}
});
});
await client.sendFile(srcPath);
await done;
const out = fs.readFileSync(dstPath);
expect(out.equals(buf)).toBeTrue();
});
tap.test('server -> client file transfer to disk', async () => {
const baseTmp2 = path.join(process.cwd(), '.nogit', 'tmp');
fs.mkdirSync(baseTmp2, { recursive: true });
const tmpDir = fs.mkdtempSync(path.join(baseTmp2, 'tmp-'));
const srcPath = path.join(tmpDir, 'serverfile.bin');
const dstPath = path.join(tmpDir, 'clientfile.bin');
const size = 512 * 1024 + 77;
const buf = Buffer.alloc(size);
for (let i = 0; i < size; i++) buf[i] = (i * 17) % 251;
fs.writeFileSync(srcPath, buf);
const done = new Promise<void>((resolve, reject) => {
client.on('stream', async (info: any, readable: plugins.stream.Readable) => {
if (info?.meta?.type === 'file' && info?.meta?.basename === 'serverfile.bin') {
try {
await smartipc.pipeStreamToFile(readable, dstPath);
resolve();
} catch (e) {
reject(e);
}
}
});
});
await server.sendFileToClient('stream-client-1', srcPath);
await done;
const out = fs.readFileSync(dstPath);
expect(out.equals(buf)).toBeTrue();
});
tap.test('receiver cancels an incoming stream', async () => {
// Create a slow readable that emits many chunks
const bigChunk = Buffer.alloc(128 * 1024, 1);
let pushed = 0;
const readable = new plugins.stream.Readable({
read() {
setTimeout(() => {
if (pushed > 200) {
this.push(null);
} else {
this.push(bigChunk);
pushed++;
}
}, 5);
}
});
let cancelled = false;
const cancelPromise = new Promise<void>((resolve) => {
server.on('stream', (info: any, r: plugins.stream.Readable) => {
if (info?.meta?.direction === 'client-to-server-cancel') {
// cancel after first chunk
r.once('data', async () => {
cancelled = true;
// send cancel back to sender
await (server as any).primaryChannel.cancelIncomingStream(info.streamId, { clientId: info.clientId });
resolve();
});
r.on('error', () => { /* ignore cancellation error */ });
// drain to trigger data
r.resume();
}
});
});
const sendPromise = client
.sendStream(readable, { meta: { direction: 'client-to-server-cancel' } })
.catch(() => { /* expected due to cancel */ });
await cancelPromise;
expect(cancelled).toBeTrue();
await sendPromise;
});
tap.test('enforce maxConcurrentStreams option', async () => {
// Setup separate low-limit server/client
const srv = smartipc.SmartIpc.createServer({ id: 'limit-srv', host: '127.0.0.1', port: 19999, heartbeat: false, maxConcurrentStreams: 1 });
await srv.start();
const cli = smartipc.SmartIpc.createClient({ id: 'limit-srv', host: '127.0.0.1', port: 19999, clientId: 'limit-client', heartbeat: false, maxConcurrentStreams: 1 });
await cli.connect();
const r1 = plugins.stream.Readable.from(Buffer.alloc(256 * 1024));
const r2 = plugins.stream.Readable.from(Buffer.alloc(256 * 1024));
const p1 = cli.sendStream(r1, { meta: { n: 1 } });
let threw = false;
try {
await cli.sendStream(r2, { meta: { n: 2 } });
} catch (e) {
threw = true;
}
expect(threw).toBeTrue();
await p1;
await cli.disconnect();
await srv.stop();
});
tap.test('cleanup streaming test', async () => {
await client.disconnect();
await server.stop();
await smartdelay.delayFor(50);
});
export default tap.start();