Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4d275f1
Add stream support in storage clients (closes #1702)
umesh-timalsina May 18, 2020
f44a357
WIP- Update aws-sdk and remove pormisify in S3Client
umesh-timalsina May 18, 2020
6a1ddc9
WIP- Skip unit test for gme client
umesh-timalsina May 18, 2020
15f8176
Merge branch master into 1702-streams-in-storage-clients
umesh-timalsina May 20, 2020
c3a7f80
Merge remote-tracking branch 'origin/master' into 1702-streams-in-sto…
umesh-timalsina May 21, 2020
ce88fb2
Refactor stream support files and tests
umesh-timalsina May 22, 2020
73efc2f
WIP- Fix getBlobClientParameters signature
umesh-timalsina May 22, 2020
692ee5a
Merge remote-tracking branch 'origin/master' into 1702-streams-in-sto…
umesh-timalsina Jul 16, 2020
625bbf0
WIP-Remove StreamBlobClient from src/common
umesh-timalsina Jul 16, 2020
7877464
Refactor StorageHelpers.js back to StorageClient.js
umesh-timalsina Jul 16, 2020
97f9d08
pin webgme to v2.42.0
umesh-timalsina Jul 16, 2020
a940226
WIP- Add Readable stream check before putStream
umesh-timalsina Jul 17, 2020
18087c8
Merge remote-tracking branch 'origin/master' into 1702-streams-in-sto…
umesh-timalsina Jul 20, 2020
f732e21
WIP- refactor getFile->getFileStream, putFile->putFileStream storage
umesh-timalsina Jul 20, 2020
1ee8613
Merge remote-tracking branch 'origin/master' into 1702-streams-in-sto…
umesh-timalsina Jul 20, 2020
6808996
WIP- add uniform error message in StorageClient.js
umesh-timalsina Jul 20, 2020
2ed9819
WIP- Change Storage.getStream to Storage.getFileStream
umesh-timalsina Jul 20, 2020
64c8715
Merge remote-tracking branch 'origin/master' into 1702-streams-in-sto…
umesh-timalsina Jul 22, 2020
eae92e2
Merge remote-tracking branch 'origin/master' into 1702-streams-in-sto…
umesh-timalsina Jul 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/common/storage/backends/StorageClient.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
/* globals define*/
define([
'client/logger',
'deepforge/gmeConfig'
'deepforge/gmeConfig',
], function(
Logger,
gmeConfig
) {
const fetch = require.isBrowser ? window.fetch :
require.nodeRequire('node-fetch');
const Headers = require.isBrowser ? window.Headers : fetch.Headers;
const stream = require.isBrowser ? null : require.nodeRequire('stream');
const StorageClient = function(id, name, logger) {
this.id = id;
this.name = name;
Expand Down Expand Up @@ -48,10 +49,18 @@ define([
throw new Error(`File download not implemented for ${this.name}`);
};

StorageClient.prototype.getFileStream = async function(/*dataInfo*/) {
throw new Error(`Stream download not implemented for ${this.name}`);
};

StorageClient.prototype.putFile = async function(/*filename, content*/) {
throw new Error(`File upload not supported by ${this.name}`);
};

StorageClient.prototype.putFileStream = async function(/*filename, stream*/) {
throw new Error(`Stream upload not supported by ${this.name}`);
};

StorageClient.prototype.deleteFile = async function(/*dataInfo*/) {
throw new Error(`File deletion not supported by ${this.name}`);
};
Expand Down Expand Up @@ -87,5 +96,17 @@ define([
throw new Error(`stat not implemented for ${this.name}`);
};

StorageClient.prototype.ensureStreamSupport = function() {
if(require.isBrowser) {
throw new Error('Streams are not supported in browser');
}
};

StorageClient.prototype.ensureReadableStream = function (obj) {
if(stream && !(obj instanceof stream.Readable)) {
throw new Error(`${obj} should be an instance of a readable stream`);
}
};

return StorageClient;
});
13 changes: 13 additions & 0 deletions src/common/storage/backends/gme/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,24 @@ define([
return await this.blobClient.getObject(data);
};

GMEStorage.prototype.getFileStream = async function(dataInfo) {
const url = await this.getDownloadURL(dataInfo);
const response = await this.fetch(url, {method: 'GET'});
return response.body;
};

GMEStorage.prototype.putFile = async function(filename, content) {
const hash = await this.blobClient.putFile(filename, content);
return this.createDataInfo(hash);
};

GMEStorage.prototype.putFileStream = async function(filename, stream) {
this.ensureStreamSupport();
this.ensureReadableStream(stream);
const hash = await this.blobClient.putFile(filename, stream);
return this.createDataInfo(hash);
};

GMEStorage.prototype.deleteDir =
GMEStorage.prototype.deleteFile = async function() {};

Expand Down
36 changes: 35 additions & 1 deletion src/common/storage/backends/s3/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ define([
this.logger.error(`Failed to create bucket ${this.bucketName} in S3 server.`);
throw err;
}

}
};

Expand All @@ -78,6 +77,16 @@ define([
return data.Body;
};

S3Storage.prototype.getFileStream = async function(dataInfo) {
const {endpoint, bucketName, filename} = dataInfo.data;
const {accessKeyId, secretAccessKey} = this.config;
const s3Client = await this.getS3Client({endpoint, accessKeyId, secretAccessKey});
return s3Client.getObject({
Bucket: bucketName,
Key: filename
}).createReadStream();
};

S3Storage.prototype.putFile = async function (filename, content) {
const s3Client = await this.getS3Client();
await this.createBucketIfNeeded(s3Client);
Expand All @@ -98,6 +107,31 @@ define([
return dataInfo;
};

S3Storage.prototype.putFileStream = async function(filename, stream) {
this.ensureStreamSupport();
this.ensureReadableStream(stream);
const s3Client = await this.getS3Client();
const params = await this.getUploadParams(s3Client, filename, stream);
try {
await s3Client.upload(params).promise();
} catch (err) {
throw new Error(`Unable to upload ${stream}: ${err.message}`);
}
const dataInfo = await this.stat(filename);
this.logger.debug(`Successfully uploaded file ${filename} to the S3 server using stream`);
return dataInfo;
};

S3Storage.prototype.getUploadParams = async function(s3Client, filename, body) {
await this.createBucketIfNeeded(s3Client);
this.logger.debug(`Created bucket ${this.bucketName}`);
return {
Body: require.isBrowser ? new Blob([body]) : body,
Bucket: this.bucketName,
Key: filename,
};
};

S3Storage.prototype.deleteDir = async function (dirname) {
const s3Client = await this.getS3Client();
const {Contents} = await s3Client.listObjectsV2({
Expand Down
23 changes: 20 additions & 3 deletions src/common/storage/backends/sciserver-files/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ define([
SciServerFiles.prototype = Object.create(StorageClient.prototype);

SciServerFiles.prototype.getFile = async function (dataInfo) {
let {volume, filename, volumePool='Storage'} = dataInfo.data;
const url = `file/${volumePool}/${volume}/${filename}`;
const response = await this.fetch('download', url);
const response = await this.getDownloadResponse(dataInfo);
if (require.isBrowser) {
return await response.arrayBuffer();
} else {
return Buffer.from(await response.arrayBuffer());
}
};

SciServerFiles.prototype.getFileStream = async function(dataInfo) {
const response = await this.getDownloadResponse(dataInfo);
return response.body;
};

SciServerFiles.prototype.putFile = async function (filename, content) {
if (!this.volume) {
throw new Error('Cannot upload file to SciServer. No volume specified.');
Expand All @@ -49,6 +52,14 @@ define([
return this.createDataInfo(metadata);
};

SciServerFiles.prototype.putFileStream = async function(filename, stream) {
this.ensureStreamSupport();
this.ensureReadableStream(stream);
await this.putFile(filename, stream);
// stat necessary because of byteLength
return await this.stat(filename);
};

SciServerFiles.prototype.deleteDir = async function (dirname) {
const url = `data/${this.volumePool}/${this.volume}/${dirname}`;
const opts = {method: 'DELETE'};
Expand Down Expand Up @@ -120,5 +131,11 @@ define([
return this.createDataInfo(metadata);
};

SciServerFiles.prototype.getDownloadResponse = async function (dataInfo) {
let {volume, filename, volumePool='Storage'} = dataInfo.data;
const url = `file/${volumePool}/${volume}/${filename}`;
return await this.fetch('download', url);
};

return SciServerFiles;
});
5 changes: 5 additions & 0 deletions src/common/storage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ define([
return client.getFile(dataInfo);
};

Storage.getFileStream = async function(dataInfo, logger, configs) {
const client = await this.getClientForDataInfo(dataInfo, logger, configs);
return client.getFileStream(dataInfo);
};

Storage.deleteFile = async function(dataInfo, logger, configs) {
const client = await this.getClientForDataInfo(dataInfo, logger, configs);
return client.deleteFile(dataInfo);
Expand Down
76 changes: 62 additions & 14 deletions test/integration/StorageBackends.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@
describe('Storage Features Test', function () {
this.timeout(5000);
const assert = require('assert');
const fs = require('fs');
const testFixture = require('../globals');
const {promisify} = require('util');
let {Writable, pipeline} = require('stream');
pipeline = promisify(pipeline);
const {requirejs} = testFixture;
const TEST_STORAGE = 'storageFeaturesSpec';
const TEST_PATH = `${TEST_STORAGE}/dummyFile`;
const TEST_FILE_NAME = 'TestFile';
const CONTENT = 'A Quick Brown Fox Jumped over a lazy Dog';
const logger = testFixture.logger.fork('StorageTests');
const Storage = requirejs('deepforge/storage/index');
const gmeConfig = testFixture.getGmeConfig();
const server = new testFixture.WebGME.standaloneServer(gmeConfig);
server.start = promisify(server.start);
server.stop = promisify(server.stop);
const { StringDecoder } =require('string_decoder');


const storageBackends = Storage.getAvailableBackends();
let StorageConfigs,
client,
clients = {},
dataInfo;
dataInfoBuffer,
dataInfoStream;

before(async function () {
await server.start();
fs.writeFileSync(TEST_FILE_NAME, CONTENT);
StorageConfigs = await testFixture.getStorageConfigs();
for (const backend of storageBackends) {
client = await Storage.getClient(backend, logger, StorageConfigs[backend]);
Expand All @@ -35,37 +43,52 @@ describe('Storage Features Test', function () {
for (const backend of storageBackends) {
it(`should putFile using ${backend}`, async function() {
this.retries(maxRetries(backend));
dataInfo = await clients[backend].putFile(TEST_PATH,
Buffer.from('A Quick Brown Fox Jumped over a lazy Dog.'));
dataInfoBuffer = await clients[backend].putFile(TEST_PATH,
Buffer.from(CONTENT));
});

it(`should getFile using ${backend}`, async function() {
this.retries(maxRetries(backend));
await clients[backend].getFile(dataInfo);
await clients[backend].getFile(dataInfoBuffer);
});

it(`should getCachePath using ${backend}`, async () => {
await clients[backend].getCachePath(dataInfo);
await clients[backend].getCachePath(dataInfoBuffer);
});

it(`should stat file using ${backend}`, async () => {
if(backend !== 'gme'){
await clients[backend].stat(TEST_PATH);
} else {
assert.rejects(clients[backend].stat(TEST_PATH), {
name: 'Error',
message: 'stat not implemented for WebGME Blob Storage'
});
}
if(backend !== 'gme'){
await clients[backend].stat(TEST_PATH);
} else {
assert.rejects(clients[backend].stat(TEST_PATH), {
name: 'Error',
message: 'stat not implemented for WebGME Blob Storage'
});
}
});

it(`should putFileStream using ${backend}`, async function () {
this.retries(maxRetries([backend]));
const stream = fs.createReadStream(TEST_FILE_NAME);
const pathInStorageBackend = `${TEST_STORAGE}/${TEST_FILE_NAME}`;
dataInfoStream = await clients[backend].putFileStream(pathInStorageBackend, stream);
});

it(`should getFileStream using ${backend}`, async function () {
this.retries(maxRetries([backend]));
const inputStream = await clients[backend].getFileStream(dataInfoStream);
await verifyStreamContent(inputStream);
});

it(`should deleteFile using ${backend}`, async function() {
this.retries(maxRetries(backend));
await clients[backend].deleteFile(dataInfo);
await clients[backend].deleteFile(dataInfoBuffer);
await clients[backend].deleteFile(dataInfoStream);
});
}

after(async function () {
fs.unlinkSync(TEST_FILE_NAME);
await server.stop();
});

Expand All @@ -75,4 +98,29 @@ describe('Storage Features Test', function () {
}
return 1;
}

async function verifyStreamContent(inputStream) {
const outputStream = new StringWritable();
await pipeline(inputStream, outputStream);
assert(outputStream.data === CONTENT);
}

class StringWritable extends Writable {
constructor(options) {
super(options);
this._decoder = new StringDecoder(options && options.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
}
});