fix(streaming): Convert smartrequest v5 web ReadableStreams to Node.js streams and update deps for streaming compatibility
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@apiclient.xyz/docker',
|
||||
version: '1.3.5',
|
||||
version: '1.3.6',
|
||||
description: 'Provides easy communication with Docker remote API from Node.js, with TypeScript support.'
|
||||
}
|
||||
|
||||
@@ -182,8 +182,12 @@ export class DockerHost {
|
||||
*/
|
||||
public async getEventObservable(): Promise<plugins.rxjs.Observable<any>> {
|
||||
const response = await this.requestStreaming('GET', '/events');
|
||||
|
||||
// requestStreaming now returns Node.js stream, not web stream
|
||||
const nodeStream = response as plugins.smartstream.stream.Readable;
|
||||
|
||||
return plugins.rxjs.Observable.create((observer) => {
|
||||
response.on('data', (data) => {
|
||||
nodeStream.on('data', (data) => {
|
||||
const eventString = data.toString();
|
||||
try {
|
||||
const eventObject = JSON.parse(eventString);
|
||||
@@ -193,7 +197,7 @@ export class DockerHost {
|
||||
}
|
||||
});
|
||||
return () => {
|
||||
response.emit('end');
|
||||
nodeStream.emit('end');
|
||||
};
|
||||
});
|
||||
}
|
||||
@@ -347,7 +351,7 @@ export class DockerHost {
|
||||
}
|
||||
|
||||
// Execute the request based on method
|
||||
let response;
|
||||
let response: plugins.smartrequest.ICoreResponse;
|
||||
switch (methodArg.toUpperCase()) {
|
||||
case 'GET':
|
||||
response = await smartRequest.get();
|
||||
@@ -367,10 +371,10 @@ export class DockerHost {
|
||||
|
||||
console.log(response.status);
|
||||
|
||||
// For streaming responses, get the Node.js stream
|
||||
const nodeStream = response.streamNode();
|
||||
// For streaming responses, get the web stream
|
||||
const webStream = response.stream();
|
||||
|
||||
if (!nodeStream) {
|
||||
if (!webStream) {
|
||||
// If no stream is available, consume the body as text
|
||||
const body = await response.text();
|
||||
console.log(body);
|
||||
@@ -383,7 +387,10 @@ export class DockerHost {
|
||||
};
|
||||
}
|
||||
|
||||
// For streaming responses, return the stream with added properties
|
||||
// Convert web ReadableStream to Node.js stream for backward compatibility
|
||||
const nodeStream = plugins.smartstream.nodewebhelpers.convertWebReadableToNodeReadable(webStream);
|
||||
|
||||
// Add properties for compatibility
|
||||
(nodeStream as any).statusCode = response.status;
|
||||
(nodeStream as any).body = ''; // For compatibility
|
||||
|
||||
|
||||
@@ -105,6 +105,9 @@ export class DockerImage {
|
||||
optionsArg.tarStream,
|
||||
);
|
||||
|
||||
// requestStreaming now returns Node.js stream
|
||||
const nodeStream = response as plugins.smartstream.stream.Readable;
|
||||
|
||||
/**
|
||||
* Docker typically returns lines like:
|
||||
* {"stream":"Loaded image: myrepo/myimage:latest"}
|
||||
@@ -112,16 +115,16 @@ export class DockerImage {
|
||||
* So we will collect those lines and parse out the final image name.
|
||||
*/
|
||||
let rawOutput = '';
|
||||
response.on('data', (chunk) => {
|
||||
nodeStream.on('data', (chunk) => {
|
||||
rawOutput += chunk.toString();
|
||||
});
|
||||
|
||||
// Wrap the end event in a Promise for easier async/await usage
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
response.on('end', () => {
|
||||
nodeStream.on('end', () => {
|
||||
resolve();
|
||||
});
|
||||
response.on('error', (err) => {
|
||||
nodeStream.on('error', (err) => {
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
@@ -260,10 +263,8 @@ export class DockerImage {
|
||||
`/images/${encodeURIComponent(this.RepoTags[0])}/get`,
|
||||
);
|
||||
|
||||
// Check if response is a Node.js stream
|
||||
if (!response || typeof response.on !== 'function') {
|
||||
throw new Error('Failed to get streaming response for image export');
|
||||
}
|
||||
// requestStreaming now returns Node.js stream
|
||||
const nodeStream = response as plugins.smartstream.stream.Readable;
|
||||
|
||||
let counter = 0;
|
||||
const webduplexStream = new plugins.smartstream.SmartDuplex({
|
||||
@@ -274,20 +275,20 @@ export class DockerImage {
|
||||
},
|
||||
});
|
||||
|
||||
response.on('data', (chunk) => {
|
||||
nodeStream.on('data', (chunk) => {
|
||||
if (!webduplexStream.write(chunk)) {
|
||||
response.pause();
|
||||
nodeStream.pause();
|
||||
webduplexStream.once('drain', () => {
|
||||
response.resume();
|
||||
nodeStream.resume();
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
response.on('end', () => {
|
||||
nodeStream.on('end', () => {
|
||||
webduplexStream.end();
|
||||
});
|
||||
|
||||
response.on('error', (error) => {
|
||||
nodeStream.on('error', (error) => {
|
||||
logger.log('error', `Error during image export: ${error.message}`);
|
||||
webduplexStream.destroy(error);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user