-
Notifications
You must be signed in to change notification settings - Fork 216
Add extstore s3 driver #2907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: extstore-foundation
Are you sure you want to change the base?
Add extstore s3 driver #2907
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| description = '''Temporal Java SDK External Storage Driver for AWS S3''' | ||
|
|
||
| ext { | ||
| awsSdkVersion = '2.31.0' | ||
| } | ||
|
|
||
| dependencies { | ||
| compileOnly project(':temporal-serviceclient') | ||
| compileOnly project(':temporal-sdk') | ||
|
|
||
| api platform("software.amazon.awssdk:bom:$awsSdkVersion") | ||
| api "software.amazon.awssdk:s3" | ||
|
|
||
| testImplementation project(':temporal-serviceclient') | ||
| testImplementation project(':temporal-sdk') | ||
| testImplementation "junit:junit:${junitVersion}" | ||
| testRuntimeOnly group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}" | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package io.temporal.payload.storage.s3; | ||
|
|
||
| import io.temporal.api.common.v1.Payload; | ||
| import io.temporal.common.Experimental; | ||
| import io.temporal.payload.storage.StorageDriverStoreContext; | ||
| import javax.annotation.Nonnull; | ||
|
|
||
| /** | ||
| * Resolves the target S3 bucket for a payload. Use {@link | ||
| * S3StorageDriver.Builder#setBucket(String)} for a fixed bucket, or supply a resolver via {@link | ||
| * S3StorageDriver.Builder#setBucketResolver(BucketResolver)} to choose a bucket per payload. | ||
| */ | ||
| @Experimental | ||
| @FunctionalInterface | ||
| public interface BucketResolver { | ||
| @Nonnull | ||
| String resolveBucket(@Nonnull StorageDriverStoreContext context, @Nonnull Payload payload); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| package io.temporal.payload.storage.s3; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| final class CompletableFutures { | ||
| private CompletableFutures() {} | ||
|
|
||
| /** | ||
| * Returns a future that completes when all of the given futures complete, yielding a list of | ||
| * their results. If any future completes exceptionally, the returned future also completes | ||
| * exceptionally with the same exception. If the input list is empty, the returned future | ||
| * completes immediately with an empty list. | ||
| * | ||
| * @param <T> | ||
| * @param futures | ||
| * @return | ||
| */ | ||
| static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) { | ||
| return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])) | ||
| .thenApply( | ||
| ignored -> { | ||
| List<T> results = new ArrayList<>(futures.size()); | ||
| for (CompletableFuture<T> future : futures) { | ||
| results.add(future.join()); | ||
| } | ||
| return results; | ||
| }); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| # AWS S3 Driver | ||
|
|
||
| Temporal's S3 Driver for External Storage. Uses the official [AWS S3 Java SDK](https://github.com/aws/aws-sdk-java-v2). | ||
|
|
||
| ## Usage | ||
|
|
||
| Construct the S3 storage driver: | ||
|
|
||
| ```java | ||
| import io.temporal.payload.storage.s3.S3AsyncClientAdapter; | ||
| import io.temporal.payload.storage.s3.S3StorageDriver; | ||
| import software.amazon.awssdk.regions.Region; | ||
| import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
|
|
||
| S3AsyncClient s3Client = | ||
| S3AsyncClient.builder().region(Region.US_EAST_1).build(); | ||
|
|
||
| S3StorageDriver driver = | ||
| S3StorageDriver.newBuilder() | ||
| .setClient(new S3AsyncClientAdapter(s3Client)) | ||
| .setBucket("temporal-payloads") | ||
| .build(); | ||
| ``` | ||
|
|
||
| Register the driver in external storage config: | ||
|
|
||
| ```java | ||
| import io.temporal.payload.storage.ExternalStorage; | ||
|
|
||
| ExternalStorage externalStorage = | ||
| ExternalStorage.newBuilder() | ||
| .setDriver(driver) | ||
| .build(); | ||
| ``` | ||
|
|
||
| Use `setBucketResolver(...)` instead of `setBucket(...)` when bucket selection must vary per | ||
| payload. | ||
|
|
||
| ## S3 Storage Key Specification | ||
|
|
||
| All Temporal S3 drivers generate S3 keys in a consistent manner. | ||
|
|
||
| ### Key format | ||
|
|
||
| Workflow key: | ||
| ```text | ||
| v0/ns/{namespace}/wt/{workflow-type}/wi/{workflow-id}/ri/{run-id}/d/{hash-algorithm}/{hex-digest} | ||
| ``` | ||
|
|
||
| Activity key: | ||
| ```text | ||
| v0/ns/{namespace}/at/{activity-type}/ai/{activity-id}/ri/{run-id}/d/{hash-algorithm}/{hex-digest} | ||
| ``` | ||
|
|
||
| Fallback key (unknown target): | ||
| ```text | ||
| v0/d/{hash-algorithm}/{hex-digest} | ||
| ``` | ||
|
|
||
| - If no namespace, workflow, or activity information is available, the fallback is used. | ||
| - Dynamic path segments are percent-encoded (rules below). | ||
| - Missing values (including a missing `run-id`) are encoded as `null`. | ||
| - `hex-digest` is lower-case SHA-256 hex (64 characters). | ||
|
|
||
| ### Percent-encoding rules | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are encoding rules that all temporal s3 drivers should conform to. If we have a better place to put information like this I will move it, otherwise I'll put something similar in the READMEs of the other SDKs. |
||
|
|
||
| 1. Treat each key path component as UTF-8 bytes. | ||
| 2. Leave ASCII letters and digits unescaped. | ||
| 3. Leave the following ASCII characters unescaped: `- _ . ~ $ & + : = @` | ||
| 4. Encode all other bytes as % followed by two uppercase hexadecimal digits. | ||
| 5. Empty or null values are encoded as the literal string `null`. | ||
| 6. This is path-segment escaping, not form encoding (`+` stays `+`). | ||
|
|
||
| ### Examples | ||
|
|
||
| Workflow key example: | ||
|
|
||
| ```text | ||
| input: | ||
| namespace=payments prod | ||
| workflow-type=ChargeWorkflow | ||
| workflow-id=order+123=abc | ||
| run-id=3f1d6c7a-8b2e-4f7a-9d0a-87a6f95e4d31 | ||
| hash-algorithm=sha256 | ||
| hex-digest=9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08 | ||
|
|
||
| output: | ||
| v0/ns/payments%20prod/wt/ChargeWorkflow/wi/order+123=abc/ri/3f1d6c7a-8b2e-4f7a-9d0a-87a6f95e4d31/d/sha256/9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08 | ||
| ``` | ||
|
|
||
| Activity key example: | ||
|
|
||
| ```text | ||
| input: | ||
| namespace=payments prod | ||
| activity-type=Capture/Charge | ||
| activity-id=activity id+42 | ||
| run-id=9e1d1fd9-2f8a-4c40-93e2-731f31b9268b | ||
| hash-algorithm=sha256 | ||
| hex-digest=2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824 | ||
|
|
||
| output: | ||
| v0/ns/payments%20prod/at/Capture%2FCharge/ai/activity%20id+42/ri/9e1d1fd9-2f8a-4c40-93e2-731f31b9268b/d/sha256/2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824 | ||
| ``` | ||
|
|
||
| Fallback key example: | ||
|
|
||
| ```text | ||
| input: | ||
| hash-algorithm=sha256 | ||
| hex-digest=486ea46224d1bb4fb680f34f7c9ad96a8f24ec88be73ea8e5a6c65260e9cb8a7 | ||
|
|
||
| output: | ||
| v0/d/sha256/486ea46224d1bb4fb680f34f7c9ad96a8f24ec88be73ea8e5a6c65260e9cb8a7 | ||
| ``` | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| package io.temporal.payload.storage.s3; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
| import javax.annotation.Nonnull; | ||
| import software.amazon.awssdk.core.async.AsyncRequestBody; | ||
| import software.amazon.awssdk.core.async.AsyncResponseTransformer; | ||
| import software.amazon.awssdk.regions.Region; | ||
| import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
| import software.amazon.awssdk.services.s3.model.GetObjectRequest; | ||
| import software.amazon.awssdk.services.s3.model.HeadObjectRequest; | ||
| import software.amazon.awssdk.services.s3.model.NoSuchKeyException; | ||
| import software.amazon.awssdk.services.s3.model.PutObjectRequest; | ||
| import software.amazon.awssdk.services.s3.model.S3Exception; | ||
|
|
||
| /** | ||
| * {@link S3Client} backed by the AWS SDK for Java v2 {@link S3AsyncClient}. The wrapped client must | ||
| * be configured with credentials and a region by the caller. | ||
| */ | ||
| @Experimental | ||
| public final class S3AsyncClientAdapter implements S3Client { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes the |
||
| private final S3AsyncClient client; | ||
|
|
||
| public S3AsyncClientAdapter(@Nonnull S3AsyncClient client) { | ||
| this.client = Objects.requireNonNull(client, "client"); | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public CompletableFuture<Void> putObject( | ||
| @Nonnull String bucket, @Nonnull String key, @Nonnull byte[] data) { | ||
| // fromBytesUnsafe avoids a defensive copy of data; the driver never mutates it after this call. | ||
| return client | ||
| .putObject( | ||
| PutObjectRequest.builder().bucket(bucket).key(key).build(), | ||
| AsyncRequestBody.fromBytesUnsafe(data)) | ||
| .thenApply(response -> (Void) null); | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public CompletableFuture<Boolean> objectExists(@Nonnull String bucket, @Nonnull String key) { | ||
| return client | ||
| .headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build()) | ||
| .handle( | ||
| (response, ex) -> { | ||
| if (ex == null) { | ||
| return true; | ||
| } | ||
| Throwable cause = | ||
| (ex instanceof CompletionException && ex.getCause() != null) ? ex.getCause() : ex; | ||
| if (cause instanceof NoSuchKeyException) { | ||
| return false; | ||
| } | ||
| if (cause instanceof S3Exception && ((S3Exception) cause).statusCode() == 404) { | ||
| return false; | ||
| } | ||
| if (cause instanceof RuntimeException) { | ||
| throw (RuntimeException) cause; | ||
| } | ||
| throw new RuntimeException(cause); | ||
| }); | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public CompletableFuture<byte[]> getObject(@Nonnull String bucket, @Nonnull String key) { | ||
| return client | ||
| .getObject( | ||
| GetObjectRequest.builder().bucket(bucket).key(key).build(), | ||
| AsyncResponseTransformer.toBytes()) | ||
| // asByteArrayUnsafe avoids a copy; the driver only reads the bytes (hash + parse). | ||
| .thenApply(response -> response.asByteArrayUnsafe()); | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public Map<String, String> describe() { | ||
| Region region = client.serviceClientConfiguration().region(); | ||
| if (region == null) { | ||
| return Collections.emptyMap(); | ||
| } | ||
| return Collections.singletonMap("client_region", region.id()); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| package io.temporal.payload.storage.s3; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import javax.annotation.Nonnull; | ||
|
|
||
| /** Interface for S3 {@link S3StorageDriver} operations: upload, existence check, and download. */ | ||
| @Experimental | ||
| public interface S3Client { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. S3StorageDriverClient |
||
| /** | ||
| * Uploads {@code data} to the given {@code bucket} and {@code key}, overwriting any existing | ||
| * object at that key. Implementations must be safe to call concurrently for different keys. | ||
| */ | ||
| @Nonnull | ||
| CompletableFuture<Void> putObject( | ||
| @Nonnull String bucket, @Nonnull String key, @Nonnull byte[] data); | ||
|
|
||
| /** | ||
| * Reports whether an object exists at the given {@code bucket} and {@code key}. The future | ||
| * completes with {@code false} when the object is absent, and completes exceptionally when | ||
| * existence cannot be determined (e.g. a network or permission failure). | ||
| */ | ||
| @Nonnull | ||
| CompletableFuture<Boolean> objectExists(@Nonnull String bucket, @Nonnull String key); | ||
|
|
||
| /** | ||
| * Downloads the bytes stored at the given {@code bucket} and {@code key}. The future completes | ||
| * exceptionally if the object does not exist. | ||
| */ | ||
| @Nonnull | ||
| CompletableFuture<byte[]> getObject(@Nonnull String bucket, @Nonnull String key); | ||
|
|
||
| /** | ||
| * Diagnostic metadata about the client configuration, such as {@code {"client_region": | ||
| * "us-west-2"}}, that the driver appends to error messages. Returns an empty map by default. | ||
| */ | ||
| @Nonnull | ||
| default Map<String, String> describe() { | ||
| return Collections.emptyMap(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package io.temporal.payload.storage.s3driverto avoid collisions with the AWS S3 SDK package naming?