Pipe a stream to s3.upload()
I'm currently making use of a node.js plugin called s3-upload-stream to stream very large files to Amazon S3. It uses the multipart API and for the most part it works very well.
However, this module is showing its age and I've already had to make modifications to it (the author has deprecated it as well). Today I ran into another issue with Amazon, and I would really like to take the author's recommendation and start using the official aws-sdk to accomplish my uploads.
BUT.
The official SDK does not seem to support piping to s3.upload()
. The nature of s3.upload is that you have to pass the readable stream as an argument to the S3 constructor.
I have roughly 120+ user code modules that do various file processing, and they are agnostic to the final destination of their output. The engine hands them a pipeable writeable output stream, and they pipe to it. I cannot hand them an AWS.S3
object and ask them to call upload()
on it without adding code to all the modules. The reason I used s3-upload-stream
was because it supported piping.
Is there a way to make aws-sdk s3.upload()
something I can pipe the stream to?
Wrap the S3 upload()
function with the node.js stream.PassThrough()
stream.
Here's an example:
inputStream
.pipe(uploadFromStream(s3));
function uploadFromStream(s3) {
var pass = new stream.PassThrough();
var params = {Bucket: BUCKET, Key: KEY, Body: pass};
s3.upload(params, function(err, data) {
console.log(err, data);
});
return pass;
}
A bit late answer, it might help someone else hopefully. You can return both writeable stream and the promise, so you can get response data when the upload finishes.
const AWS = require('aws-sdk');
const stream = require('stream');
const uploadStream = ({ Bucket, Key }) => {
const s3 = new AWS.S3();
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
};
}
And you can use the function as follows:
const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');
const pipeline = readStream.pipe(writeStream);
Now you can either check promise:
promise.then(() => {
console.log('upload completed successfully');
}).catch((err) => {
console.log('upload failed.', err.message);
});
Or using async/await:
try {
await promise;
console.log('upload completed successfully');
} catch (error) {
console.log('upload failed.', error.message);
}
Or as stream.pipe()
returns stream.Writable, the destination (writeStream variable above), allowing for a chain of pipes, we can also use its events:
pipeline.on('close', () => {
console.log('upload successful');
});
pipeline.on('error', (err) => {
console.log('upload failed', err.message)
});
In the accepted answer, the function ends before the upload is complete, and thus, it's incorrect. The code below pipes correctly from a readable stream.
Upload reference
async function uploadReadableStream(stream) {
const params = {Bucket: bucket, Key: key, Body: stream};
return s3.upload(params).promise();
}
async function upload() {
const readable = getSomeReadableStream();
const results = await uploadReadableStream(readable);
console.log('upload complete', results);
}
You can also go a step further and output progress info using ManagedUpload
as such:
const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});
ManagedUpload reference
A list of available events
None of the answers worked for me because I wanted to:
- Pipe into
s3.upload()
- Pipe the result of
s3.upload()
into another stream
The accepted answer doesn't do the latter. The others rely on the promise api, which is cumbersome to work when working with stream pipes.
This is my modification of the accepted answer.
const s3 = new S3();
function writeToS3({Key, Bucket}) {
const Body = new stream.PassThrough();
s3.upload({
Body,
Key,
Bucket: process.env.adpBucket
})
.on('httpUploadProgress', progress => {
console.log('progress', progress);
})
.send((err, data) => {
if (err) {
Body.destroy(err);
} else {
console.log(`File uploaded and available at ${data.Location}`);
Body.destroy();
}
});
return Body;
}
const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});
pipeline.on('close', () => {
// upload finished, do something else
})
pipeline.on('error', () => {
// upload wasn't successful. Handle it
})