I can confirm that I’ve done exactly this flow with Storj using JS + Elixir, so it does work.
import * as React from "react";
import * as Sentry from "@sentry/browser";
import axios from "axios";
import { filesize } from "filesize";
import axiosRetry from "axios-retry";
import log from "loglevel";
import BusyIndicator from "./BusyIndicator";
import PQueue from "p-queue";
import {
stubTrue,
constant,
cond,
isEmpty,
sumBy,
reduce,
map,
values,
orderBy,
flow,
keyBy,
each,
reject,
} from "lodash/fp";
import produce from "immer";
const UPLOAD_CONCURRENCY = 3;
const uploadAxios = axios.create();
axiosRetry(uploadAxios, { retries: 3, retryDelay: axiosRetry.exponentialDelay });
const uploadProgressAxios = axios.create();
axiosRetry(uploadProgressAxios, { retries: 10, retryDelay: axiosRetry.exponentialDelay });
/**
* The queue ensures we limit the number of uploads to Storj that are
* occuring at the same time. Too many and we simply overload the users
* connection.
*
* We reject any parts that are marked as completed_at, no need to upload
* them again. They will however still be marked as completed in the UI.
*/
const queue = new PQueue({ concurrency: UPLOAD_CONCURRENCY, autoStart: true });
const logUploadPart = (uploadPart, level, message) => {
log[level](`Part number [${uploadPart.partNumber}]: ${message}`);
};
export default function UploadPerform({ userDetails, uploadDetails, onError, onComplete }) {
const partProgress = React.useRef(null);
const [lastProgressAt, setLastProgressAt] = React.useState(null);
/**
* Called to create the upload job on the part before placing it in the
* queue.
*/
const createUploadJob = React.useCallback((uploadPart, data) => {
return async () => {
/**
* Perform the upload to Storj. We embed this code in an anonymous
* function so it can be executed by the queue system.
*/
// record the start time of the upload
const nextPartProgress = produce(partProgress.current, (draftPartProgress) => {
draftPartProgress[uploadPart.id] = {
...draftPartProgress[uploadPart.id],
startedAt: Date.now(),
};
});
partProgress.current = nextPartProgress;
try {
logUploadPart(uploadPart, "info", "Starting upload");
const uploadResponse = await uploadAxios({
method: "put",
url: uploadPart.uploadUrl,
data,
headers: {
"Content-Type": uploadDetails.mimeType,
},
onUploadProgress: async (progressEvent) => {
/**
* Calculate the progress of the upload to Storj and store it in
* the partProgress ref. We also store the last progress event as
* setting this will trigger a rerender of the React component.
*
* Finally, we report the progress of this piece to the server so
* that the dashboard can display the upload progress. If this
* reporting fails, it is not fatal to the app.
*/
// Guard on not being able to calculate progress yet
if (!progressEvent.loaded || !progressEvent.total) {
return;
}
const progress = progressEvent.loaded / progressEvent.total;
// If we cannot calculate progress as part of this upload, bail out
if (!progress) {
return;
}
const elapsedSeconds = (Date.now() - partProgress.current[uploadPart.id].startedAt) / 1000;
const bytesCompleted = data.size * progress;
const bytesPerSecond = bytesCompleted / elapsedSeconds;
const nextPartProgress = produce(partProgress.current, (draftPartProgress) => {
draftPartProgress[uploadPart.id] = {
...draftPartProgress[uploadPart.id],
progress,
bytesPerSecond,
};
});
partProgress.current = nextPartProgress;
setLastProgressAt(Date.now());
try {
logUploadPart(uploadPart, "info", `Reporting progress to the server: ${progress}`);
await uploadProgressAxios({
data: {
progress,
},
method: "put",
url: `/api/upload_parts/${uploadPart.id}/progress`,
});
} catch (e) {
logUploadPart(uploadPart, "error", `Failed to report progress to server, this is not fatal`);
Sentry.captureException(e);
}
},
});
/**
* Once the upload to Storj has completed, we then inform the FileYeet
* backend that this piece has been completed. We pass the etag that was
* given to us by Storj as this is needed to join the multipart upload
* later.
*/
const etag = JSON.parse(uploadResponse.headers.etag);
try {
logUploadPart(uploadPart, "info", "Notifying server that part upload was completed");
const { data } = await uploadProgressAxios({
data: {
etag,
},
method: "put",
url: `/api/upload_parts/${uploadPart.id}`,
});
const nextPartProgress = produce(partProgress.current, (draftPartProgress) => {
draftPartProgress[uploadPart.id] = {
...draftPartProgress[uploadPart.id],
completedAt: data.completedAt,
progress: data.progress,
};
});
partProgress.current = nextPartProgress;
setLastProgressAt(Date.now());
} catch (e) {
logUploadPart(uploadPart, "error", "Failed to report to server that upload part completed");
Sentry.captureException(e);
onError(e, "We had a problem performing the upload, please try again");
}
} catch (e) {
logUploadPart(uploadPart, "error", "Failed to upload part to S3");
Sentry.captureException(e);
onError(e, "We had a problem performing the upload, please try again");
}
};
});
/**
* Start jobs for each part to upload to the backend
*/
React.useEffect(() => {
if (userDetails === null || uploadDetails === null) {
return;
}
(async () => {
/**
* Initialize a map of progress based on id, this is updated when we
* start uploading to Storj. We use a ref here because of stale scope
* when running the jobs in the queue.
*/
const initProgress = flow(
orderBy(["partNumber"], ["asc"]),
map((uploadPart) => {
return {
...uploadPart,
progress: 0.0,
startedAt: null,
bytesPerSecond: null,
};
}),
keyBy("id")
)(uploadDetails.uploadParts);
partProgress.current = initProgress;
flow(
reject("completedAt"),
each((uploadPart) => {
const file = userDetails.file.slice(uploadPart.partStart, uploadPart.partStart + uploadPart.partSize);
const job = createUploadJob(uploadPart, file);
queue.add(job);
})
)(uploadDetails.uploadParts);
setLastProgressAt(Date.now());
await queue.onIdle();
onComplete(null);
})();
}, [userDetails, uploadDetails]);
if (lastProgressAt === null) {
return <BusyIndicator message="Getting ready to upload..." />;
}
const uploadParts = values(partProgress.current);
const renderedProgress = flow(
orderBy(["partNumber"], ["asc"]),
map((uploadPart) => {
const partProgressPercentage = uploadPart.progress * 100;
return (
<div key={uploadPart.id}>
<div className="w-full flex items-center">
<div className="w-full h-6 bg-slate-900 relative border-slate-700 border">
<div className="h-6 bg-slate-600" style={{ width: `${partProgressPercentage}%` }} />
</div>
</div>
</div>
);
})
)(uploadParts);
const amountUploaded = reduce((uploaded, uploadPart) => {
return uploaded + uploadPart.partSize * uploadPart.progress;
}, 0)(uploadParts);
const totalProgress = amountUploaded / userDetails.file.size;
// Sum the speed of all non-complete parts (i.e. those that are being uploaded currently)
const sumBytesPerSecond = flow(
reject(({ bytesPerSecond, completedAt }) => {
return bytesPerSecond === null || completedAt !== null;
}),
cond([
[isEmpty, constant(null)],
[stubTrue, sumBy("bytesPerSecond")],
])
)(uploadParts);
const gridCols = (() => {
switch (uploadParts.length) {
case 1:
return "grid-cols-1";
case 2:
return "grid-cols-2";
case 3:
return "grid-cols-3";
case 4:
return "grid-cols-4";
default:
return "grid-cols-5";
}
})();
return (
<div className="text-center border border-white/20 p-8">
<div className="text-center mb-4">
<span className="text-white/80 block mb-2 text-3xl">{userDetails.uploaderFilename}</span>
</div>
<span className="text-white/90 text-2xl block mb-4">
<span className="block mb-2 text-2xl">{Math.floor(totalProgress * 100)}% done</span>
<span className="block mb-2 text-base">
{sumBytesPerSecond === null ? "Calculating speed..." : `${filesize(sumBytesPerSecond)}/s`}
</span>
</span>
<div className="mb-4">
<div className={`grid border border-white/20 ${gridCols}`}>{renderedProgress}</div>
</div>
</div>
);
}
It’s a big chunk of code, but you can see I’m being given an etag
back in the header.