- 大容量のInputStreamやファイルをパート分けしてアップロードする。
- バッファの容量(PART_BUFFER)や、パート単位の容量(PART_LIMIT)は同時使用ユーザ数等と考慮して設定しよう。
- 参考サイト(ほぼまんま)
AWS SDK Java で S3 に少しずつ来たデータをバッファリングしつつ一つのファイルとしてアップロードするサンプルコード
- 例外は各システムで適宜ハンドリングしましょう。
©okkez氏
/** * Multipartアップロード * * @param logger * @param executorService * @param bucketName * @param dest * @param is * @throws AppException */ public static void multipartUpload(LambdaLogger logger, ExecutorService executorService, String bucketName, String dest, InputStream is) throws IOException, ExecutionException, InterruptedException { final S3Client s3 = getS3Client(); final CreateMultipartUploadResponse response = s3 .createMultipartUpload(builder -> builder.bucket(bucketName).key(dest)); final String uploadId = response.uploadId(); final ByteBuffer buffer = ByteBuffer.allocate(PART_BUFFER); try { final ReadableByteChannel channel = Channels.newChannel(is); List<Future<CompletedPart>> futures = new ArrayList<>(); int partNumber = 0; while (channel.read(buffer) != -1) { if (buffer.position() > PART_LIMIT) { logger.log("パートアップロード...パート番号:" + partNumber); long contentLength = buffer.position(); buffer.flip(); partNumber++; Future<CompletedPart> future = uploadMultipartUploadPart(executorService, s3, bucketName, dest, uploadId, partNumber, contentLength, buffer.array()); futures.add(future); buffer.clear(); } } if (buffer.position() > 0) { logger.log("最終パートアップロード...パート番号:" + partNumber); long contentLength = buffer.position(); buffer.flip(); partNumber++; Future<CompletedPart> future = uploadMultipartUploadPart(executorService, s3, bucketName, dest, uploadId, partNumber, contentLength, buffer.array()); futures.add(future); buffer.clear(); } // 全部のパートをアップロードするまで待つ while (!futures.stream().allMatch(Future::isDone)) { try { TimeUnit.MILLISECONDS.sleep((100)); } catch (InterruptedException ignore) { // NOP logger.log("スレッドが途中で終了:MultipartUploadの待機時"); } } // CompletedPart を集める List<CompletedPart> parts = new ArrayList<>(); for (Future<CompletedPart> future : futures) { try { parts.add(future.get()); } catch (ExecutionException | InterruptedException e) { // NOP logger.log("スレッドが途中で終了:CompletedPart集約時"); } } // マルチパートアップロードの完了処理を行う final var completedMultipartUpload = CompletedMultipartUpload.builder().parts(parts).build(); s3.completeMultipartUpload( builder -> builder .bucket(bucketName) .key(dest) .uploadId(uploadId) .multipartUpload(completedMultipartUpload)); } catch (IOException ioe) { ioe.printStackTrace(); logger.log("大容量ファイルのアップロードに失敗"); throw ioe; } finally { executorService.shutdown(); } } /** * MultipartUploadを実行 * * @param executorService * @param s3 * @param bucket * @param key * @param uploadId * @param partNumber * @param contentLength * @param bytes * @return */ private static Future<CompletedPart> uploadMultipartUploadPart( ExecutorService executorService, S3Client s3, final String bucket, final String key, final String uploadId, final int partNumber, final long contentLength, final byte[] bytes) { return executorService.submit( () -> { UploadPartRequest request = UploadPartRequest.builder() .bucket(bucket) .key(key) .uploadId(uploadId) .contentLength(contentLength) .partNumber(partNumber) .build(); UploadPartResponse response = s3.uploadPart(request, RequestBody.fromBytes(bytes)); CompletedPart part = CompletedPart.builder().partNumber(partNumber).eTag(response.eTag()).build(); return part; }); }