Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/common/plugin/GeneratedFiles.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ define([
const objectHashes = {};
for (let i = userAssets.length; i--;) {
const [filepath, dataInfo] = userAssets[i];
const contents = await Storage.getFile(dataInfo);
const contentsStream = await Storage.getFileStream(dataInfo);
const filename = filepath.split('/').pop();
const hash = await this.blobClient.putFile(filename, contents);
const hash = await this.blobClient.putFile(filename, contentsStream);
objectHashes[filepath] = hash;
}
await artifact.addObjectHashes(objectHashes);
Expand Down
6 changes: 3 additions & 3 deletions src/common/plugin/LocalExecutor.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ define([
const srcStorage = this.isPipelineInput(dataNodes[i]) ?
await this.getStorageClientForInputData(originalData)
: dstStorage;
const content = await srcStorage.getFile(originalData);
const userAsset = await dstStorage.putFile(saveDir + name, content);
const contentStream = await srcStorage.getFileStream(originalData);
const userAsset = await dstStorage.putFileStream(saveDir + name, contentStream);

this.core.setAttribute(artifact, 'data', JSON.stringify(userAsset));
this.core.setAttribute(artifact, 'name', name);
Expand Down Expand Up @@ -125,7 +125,7 @@ define([
LocalExecutor.OPERATIONS = Object.keys(LocalExecutor.prototype)
.filter(name => name.indexOf('_') !== 0)
.filter(name => name !== 'isLocalOperation' && name !== 'getLocalOperationType');

class JobLogger{
constructor(core, node) {
this.core = core;
Expand Down
14 changes: 11 additions & 3 deletions src/plugins/GenerateJob/templates/run-debug.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const fs = require('fs');
const path = require('path');
const {promisify} = require('util');
const mkdir = promisify(fs.mkdir);
const writeFile = promisify(fs.writeFile);
const pipeline = promisify(require('stream').pipeline);
const {spawn} = require('child_process');

const Config = require('./config.json');
Expand Down Expand Up @@ -47,9 +47,9 @@ requirejs([
async function fetchInputData(filename, dataInfo, config) {
const {backend} = dataInfo;
const client = await Storage.getClient(backend, null, config);
const buffer = await client.getFile(dataInfo);
const stream = await client.getFileStream(dataInfo);
filename = fromRelative(filename);
await writeFile(filename, buffer);
await writeFile(filename, stream);
}

function fromRelative(filename) {
Expand All @@ -59,4 +59,12 @@ requirejs([
async function tryMkdir(filename) {
await mkdir(filename).catch(nop);
}

async function writeFile(path, readStream) {
const dstStream = fs.createWriteStream(path, {
encoding: readStream.readableEncoding
});
await pipeline(readStream, dstStream);
}

});
17 changes: 12 additions & 5 deletions src/plugins/GenerateJob/templates/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const {spawn} = childProcess;
const fs = require('fs');
const {promisify} = require('util');
const mkdir = promisify(fs.mkdir);
const writeFile = promisify(fs.writeFile);
const pipeline = promisify(require('stream').pipeline);
const lstat = promisify(fs.lstat);
const exec = promisify(childProcess.exec);
const rm_rf = require('rimraf');
Expand Down Expand Up @@ -172,8 +172,8 @@ requirejs([
for (let i = outputNames.length; i--;) {
const filename = outputNames[i];
const storagePath = `${storageDir}/${filename}`;
const contents = fs.readFileSync(`outputs/${filename}`);
const dataInfo = await client.putFile(storagePath, contents);
const contentsStream = fs.createReadStream(`outputs/${filename}`);
const dataInfo = await client.putFileStream(storagePath, contentsStream);
const type = results[filename];
results[filename] = {type, dataInfo};
}
Expand Down Expand Up @@ -278,8 +278,8 @@ requirejs([
if (!exists) {
await createCacheDir(cachePath);
const client = await Storage.getClient(dataInfo.backend, null, config);
const buffer = await client.getFile(dataInfo);
await writeFile(cachePath, buffer);
const stream = await client.getFileStream(dataInfo);
await writeFile(cachePath, stream);
} else {
logger.info(`${inputName} already cached. Skipping retrieval from blob`);
}
Expand Down Expand Up @@ -418,5 +418,12 @@ requirejs([
return path.join(cacheDir, relPath);
}

async function writeFile(path, readStream) {
const dstStream = fs.createWriteStream(path, {
encoding: readStream.readableEncoding
});
await pipeline(readStream, dstStream);
}

function nop() {}
});
6 changes: 4 additions & 2 deletions src/routers/InteractiveCompute/job-files/start.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
const {spawn} = require('child_process');
const WebSocket = require('ws');
const fs = require('fs').promises;
const { promisify } = require('util');
const pipeline = promisify(require('stream').pipeline);
const path = require('path');
const requirejs = require('requirejs');
let Message;
Expand Down Expand Up @@ -43,8 +45,8 @@ class InteractiveClient {
async function saveArtifact() {
const client = await Storage.getClient(dataInfo.backend, null, config);
const dataPath = path.join(...dirs.concat('data'));
const buffer = await client.getFile(dataInfo);
await fs.writeFile(dataPath, buffer);
const stream = await client.getFileStream(dataInfo);
await pipeline(stream, fs.createWriteStream(dataPath));
const filePath = path.join(...dirs.concat('__init__.py'));
await fs.writeFile(filePath, initFile(name, type));
}
Expand Down
14 changes: 13 additions & 1 deletion utils/build-job-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@ const babel = require('@babel/core');
const requirejs = require('requirejs');
const path = require('path');
const fs = require('fs');
const webgmeEngineSrc = path.join(path.dirname(require.resolve('webgme-engine')), 'src');
const webgmeEngineSrc = getWebGMEEngineSrcPath();
const JOB_FILES_DIR = `${__dirname}/../src/plugins/GenerateJob/templates/`;
const os = require('os');
const gmeConfig = require('../config');
const includeFile = path.join(__dirname, 'build-includes.js');
const _ = require('underscore');

function getWebGMEEngineSrcPath() {
let webGMEEngine;

try{
webGMEEngine = require.resolve('webgme/node_modules/webgme-engine');
} catch (e) {
webGMEEngine = require.resolve('webgme-engine');
}

return path.join(path.dirname(webGMEEngine), 'src');
}

function getFiles(dirname) {
return fs.readdirSync(dirname)
.map(name => {
Expand Down