R2 - not compatible with aws api

Hi,

airbyte s3 destination uses this github link + /alexmojaki/s3-stream-upload/blob/master/src/main/java/alex/mojaki/s3upload/StreamTransferManager.java#L544)

airbyte s3 destination does check with testMultipartUpload case and catch exception below:

Could not connect to the S3 bucket with the provided configuration. Invalid base 16 character: 'T'

Below provided detailed logs:

29: "2022-08-08 10:31:50 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-08 10:31:50 \u001b[32mINFO\u001b[m i.a.i.d.s.S3Destination(testMultipartUpload):95 - Started testing if all required credentials assigned to user for multipart upload"
30: "2022-08-08 10:31:50 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-08 10:31:50 \u001b[32mINFO\u001b[m a.m.s.StreamTransferManager(getMultiPartOutputStreams):329 - Initiated multipart upload to xxx/test/example10/test_1659954710376 with full ID AIT9jzFZ5QLRhguHVZmsBaQFN77bJBM/uf6AYYBVlgYA7wok3hDbutZp/OxsG/IB+/HB5vdVWEfY+w9LXHky8COjhg/SiwPLuRAdtKYUv0zirR6OHwQI912nFUV3y+SOL+fQwzEv0B7Hp6Zjowgm4Kg5IDuYw8EQQq+4NtJ1mQDJpUb81UavOOLTlAn+JH2X6b+vRPNFwk5QzJNktY8yLsXblPckGHdu3RskcY6XGmrDR6lZxgYhfyrC7+w8NZXvb903ZdtvkZiOU7prxd7NifLf2tq2UHoOawL4n+vit0grRRHIGxA1lNv7ZZ0aekHyAA=="
31: "2022-08-08 10:31:50 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-08 10:31:50 \u001b[32mINFO\u001b[m a.m.s.MultiPartOutputStream(close):158 - Called close() on [MultipartOutputStream for parts 1 - 10000]"
32: "2022-08-08 10:31:50 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-08 10:31:50 \u001b[32mINFO\u001b[m a.m.s.MultiPartOutputStream(close):158 - Called close() on [MultipartOutputStream for parts 1 - 10000]"
33: "2022-08-08 10:31:50 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-08 10:31:50 \u001b[33mWARN\u001b[m a.m.s.MultiPartOutputStream(close):160 - [MultipartOutputStream for parts 1 - 10000] is already closed"
34: "2022-08-08 10:31:50 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-08 10:31:50 \u001b[32mINFO\u001b[m a.m.s.StreamTransferManager(complete):367 - [Manager uploading to xxx/test/example10/test_1659954710376 with id AIT9jzFZ5...aekHyAA==]: Uploading leftover stream [Part number 1 containing 3.34 MB]"
35: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-08 10:31:53 \u001b[1;31mERROR\u001b[m a.m.s.StreamTransferManager(abort):432 - Aborting [Manager uploading to xxx/test/example10/test_1659954710376 with id AIT9jzFZ5...aekHyAA==] due to error: java.lang.IllegalArgumentException: Invalid base 16 character: 'M'"
36: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-08 10:31:53 \u001b[32mINFO\u001b[m a.m.s.StreamTransferManager(abort):470 - [Manager uploading to xxx/test/example10/test_1659954710376 with id AIT9jzFZ5...aekHyAA==]: Aborted"
37: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-08 10:31:53 \u001b[1;31mERROR\u001b[m i.a.i.d.s.S3Destination(check):72 - Exception attempting to access the S3 bucket: "
38: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - java.lang.IllegalArgumentException: Invalid base 16 character: 'M'"
39: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat com.amazonaws.util.Base16Codec.pos(Base16Codec.java:100) ~[aws-java-sdk-core-1.12.6.jar:?]"
40: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat com.amazonaws.util.Base16Codec.decode(Base16Codec.java:87) ~[aws-java-sdk-core-1.12.6.jar:?]"
41: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat com.amazonaws.util.Base16Lower.decode(Base16Lower.java:53) ~[aws-java-sdk-core-1.12.6.jar:?]"
42: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat com.amazonaws.util.BinaryUtils.fromHex(BinaryUtils.java:48) ~[aws-java-sdk-core-1.12.6.jar:?]"
43: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3881) ~[aws-java-sdk-s3-1.12.6.jar:?]"
44: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3860) ~[aws-java-sdk-s3-1.12.6.jar:?]"
45: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat alex.mojaki.s3upload.StreamTransferManager.uploadStreamPart(StreamTransferManager.java:555) ~[s3-stream-upload-2.2.2.jar:?]"
46: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat alex.mojaki.s3upload.StreamTransferManager.complete(StreamTransferManager.java:368) ~[s3-stream-upload-2.2.2.jar:?]"
47: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat io.airbyte.integrations.destination.s3.S3Destination.testMultipartUpload(S3Destination.java:113) ~[io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.37-alpha.jar:?]"
48: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat io.airbyte.integrations.destination.s3.S3Destination.check(S3Destination.java:68) [io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.37-alpha.jar:?]"
49: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:140) [io.airbyte.airbyte-integrations.bases-base-java-0.39.37-alpha.jar:?]"
50: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:107) [io.airbyte.airbyte-integrations.bases-base-java-0.39.37-alpha.jar:?]"
51: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - \tat io.airbyte.integrations.destination.s3.S3Destination.main(S3Destination.java:51) [io.airbyte.airbyte-integrations.connectors-destination-s3-0.39.37-alpha.jar:?]"
52: "2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.w.i.DefaultAirbyteStreamFactory(lambda$create$0):61 - 2022-08-08 10:31:53 \u001b[32mINFO\u001b[m i.a.i.b.IntegrationRunner(runInternal):171 - Completed integration: io.airbyte.integrations.destination.s3.S3Destination"
53: "2022-08-08 10:31:54 \u001b[32mINFO\u001b[m i.a.w.t.TemporalAttemptExecution(get):131 - Stopping cancellation check scheduling..."
succeeded: true

Below copy of code that executes this part upload.

Please check this moment on r2 server side.

    private void uploadStreamPart(StreamPart part) {
        log.debug("{}: Uploading {}", this, part);

        UploadPartRequest uploadRequest = new UploadPartRequest()
                .withBucketName(bucketName).withKey(putKey)
                .withUploadId(uploadId).withPartNumber(part.getPartNumber())
                .withInputStream(part.getInputStream())
                .withPartSize(part.size());
        if (checkIntegrity) {
            uploadRequest.setMd5Digest(part.getMD5Digest());
        }
        customiseUploadPartRequest(uploadRequest);

        UploadPartResult uploadPartResult = s3Client.uploadPart(uploadRequest);
        PartETag partETag = uploadPartResult.getPartETag();
        partETags.add(partETag);
        log.info("{}: Finished uploading {}", this, part);
    }

I noticed that Cloudflare returns in etag header - very long value comparing with s3:

AMClbuttcYqB7BF87QMg3AoksQpTl1irPgMPSw1tRzTVHdOG0Lg5IsXKRfsyI5XIf7BpGvF7qu5JcpeytpWjNsOLT8l8LNhf2r86HO06zTouao5TXV0NqZ2EBsCP13WiC+g1B2xIJy4I3NhcPEw5COEreGlzt9pHTg7NK1+H4rpzcS54uyZ3CmRZI5V3ksG6t+6maCEbVAROkPIn6/61xOqQvYPWy9pF9jOwREBsvsEtwzqKKEaGIz9n8V2ckJTX2w==

aws returns value like below

7e1699ce7c4480424036fefdfa9dd378

someone from Cloudflare can check this?

AmazonS3Client does internal check in AmazonS3Client#doUploadPart

  private UploadPartResult doUploadPart(final String bucketName,
            final String key, final String uploadId, final int partNumber,
            final long partSize, Request<UploadPartRequest> request,
            InputStream inputStream,
            MD5DigestCalculatingInputStream md5DigestStream,
            final ProgressListener listener) {
        try {
            request.setContent(inputStream);
            ObjectMetadata metadata = invoke(request, new S3MetadataResponseHandler(), bucketName, key);
            final String etag = metadata.getETag();

            if (md5DigestStream != null
                    && !skipMd5CheckStrategy.skipClientSideValidationPerUploadPartResponse(metadata)) {
                byte[] clientSideHash = md5DigestStream.getMd5Digest();
                byte[] serverSideHash = BinaryUtils.fromHex(etag);

Cloudflare R2 returns non compatible with s3 api etag value.

Correct. The ETag returned from UploadPart does not represent the md5 of the part. We might improve that comparability in the future but this is literally the first report of this (official AWS SDKs work fine as do most 3p clients people have tried). I’ll add it to the backlog but this involves deeper surgery in R2 to walk back so I wouldn’t expect it to be solved quickly. You may have better luck getting AirByte to adjust their S3 implementation to handle this more gracefully.

1 Like

This topic was automatically closed 15 days after the last reply. New replies are no longer allowed.