以前紹介した以下のコード
AWS SDK Java で S3 に少しずつ来たデータをバッファリングしつつ一つのファイルとしてアップロードするサンプルコード
©okkez氏
/**
* Multipartアップロード
*
* @param logger
* @param executorService
* @param bucketName
* @param dest
* @param is
* @throws AppException
*/
public static void multipartUpload(LambdaLogger logger, ExecutorService executorService, String bucketName, String dest, InputStream is)
throws IOException, ExecutionException, InterruptedException {
final S3Client s3 = getS3Client();
final CreateMultipartUploadResponse response = s3
.createMultipartUpload(builder -> builder.bucket(bucketName).key(dest));
final String uploadId = response.uploadId();
final ByteBuffer buffer = ByteBuffer.allocate(PART_BUFFER);
try {
final ReadableByteChannel channel = Channels.newChannel(is);
List<Future<CompletedPart>> futures = new ArrayList<>();
int partNumber = 0;
while (channel.read(buffer) != -1) {
if (buffer.position() > PART_LIMIT) {
long contentLength = buffer.position();
buffer.flip();
partNumber++;
Future<CompletedPart> future = uploadMultipartUploadPart(executorService, s3, bucketName, dest,
uploadId, partNumber, contentLength, buffer.array());
futures.add(future);
buffer.clear();
}
}
if (buffer.position() > 0) {
long contentLength = buffer.position();
buffer.flip();
partNumber++;
Future<CompletedPart> future = uploadMultipartUploadPart(executorService, s3, bucketName, dest,
uploadId, partNumber, contentLength, buffer.array());
futures.add(future);
buffer.clear();
}
// 全部のパートをアップロードするまで待つ
while (!futures.stream().allMatch(Future::isDone)) {
try {
TimeUnit.MILLISECONDS.sleep((100));
} catch (InterruptedException ignore) {
// NOP
logger.log("スレッドが途中で終了:MultipartUploadの待機時");
}
}
// CompletedPart を集める
List<CompletedPart> parts = new ArrayList<>();
for (Future<CompletedPart> future : futures) {
try {
parts.add(future.get());
} catch (ExecutionException | InterruptedException e) {
// NOP
logger.log("スレッドが途中で終了:CompletedPart集約時");
}
}
// マルチパートアップロードの完了処理を行う
final var completedMultipartUpload = CompletedMultipartUpload.builder().parts(parts).build();
s3.completeMultipartUpload(
builder -> builder
.bucket(bucketName)
.key(dest)
.uploadId(uploadId)
.multipartUpload(completedMultipartUpload));
} catch (IOException ioe) {
ioe.printStackTrace();
logger.log("大容量ファイルのアップロードに失敗");
throw ioe;
} finally {
executorService.shutdown();
}
}
/**
* MultipartUploadを実行
*
* @param executorService
* @param s3
* @param bucket
* @param key
* @param uploadId
* @param partNumber
* @param contentLength
* @param bytes
* @return
*/
private static Future<CompletedPart> uploadMultipartUploadPart(
ExecutorService executorService,
S3Client s3,
final String bucket,
final String key,
final String uploadId,
final int partNumber,
final long contentLength,
final byte[] bytes) {
return executorService.submit(
() -> {
UploadPartRequest request = UploadPartRequest.builder()
.bucket(bucket)
.key(key)
.uploadId(uploadId)
.contentLength(contentLength)
.partNumber(partNumber)
.build();
UploadPartResponse response = s3.uploadPart(request, RequestBody.fromBytes(bytes));
CompletedPart part = CompletedPart.builder().partNumber(partNumber).eTag(response.eTag()).build();
return part;
});
}
で、ZIPファイルをアップロードすると、一部解凍できない場合が発生した。
スレッドの消失まで待つように変更したりしてみたが、改善はしたものの完全ではなかった。
リクエストを追っかけて、ログを出せば原因解明ができるとは思うが一旦は実現性重視として、同期処理に変更した。
©okkez氏
/**
* 大容量ファイルのアップロード
*
* @param context
* @param bucketName
* @param dest
* @param is
* @throws AppException
*/
public static void multipartUpload(Context context, String bucketName, String dest, InputStream is)
throws IOException {
/** S3のクライアント */
final S3Client s3 = getS3Client();
final CreateMultipartUploadResponse response = s3
.createMultipartUpload(
builder -> builder.bucket(bucketName).key(dest));
final String uploadId = response.uploadId();
final ByteBuffer buffer = ByteBuffer.allocate(PART_BUFFER);
try {
final ReadableByteChannel channel = Channels.newChannel(is);
List<CompletedPart> parts = new ArrayList<>();
int partNumber = 0;
while (channel.read(buffer) != -1) {
if (buffer.position() > PART_LIMIT) {
long contentLength = buffer.position();
buffer.flip();
partNumber++;
CompletedPart part = uploadMultipartUploadPart(s3, bucketName, dest, uploadId, partNumber,
contentLength, buffer.array());
parts.add(part);
buffer.clear();
}
}
if (buffer.position() > 0) {
long contentLength = buffer.position();
buffer.flip();
partNumber++;
CompletedPart part = uploadMultipartUploadPart(s3, bucketName, dest, uploadId, partNumber,
contentLength, buffer.array());
parts.add(part);
buffer.clear();
}
// マルチパートアップロードの完了処理を行う
final var completedMultipartUpload = CompletedMultipartUpload.builder().parts(parts).build();
s3.completeMultipartUpload(
builder -> builder
.bucket(bucketName)
.key(dest)
.uploadId(uploadId)
.multipartUpload(completedMultipartUpload));
} catch (IOException ioe) {
ioe.printStackTrace();
throw ioe;
}
}
/**
* MultipartUploadを実行
*
* @param s3
* @param bucket
* @param key
* @param uploadId
* @param partNumber
* @param contentLength
* @param bytes
* @return
*/
private static CompletedPart uploadMultipartUploadPart(
S3Client s3,
final String bucket,
final String key,
final String uploadId,
final int partNumber,
final long contentLength,
final byte[] bytes) {
UploadPartRequest request = UploadPartRequest.builder()
.bucket(bucket)
.key(key)
.uploadId(uploadId)
.contentLength(contentLength)
.partNumber(partNumber)
.build();
UploadPartResponse response = s3.uploadPart(request, RequestBody.fromBytes(bytes));
CompletedPart part = CompletedPart.builder().partNumber(partNumber).eTag(response.eTag()).build();
return part;
}
ServletのPartから取れるInputStreamからアップロードする、という目的はいったん達成したため、許容しました。
くやしい。