diff --git a/.doc_gen/metadata/sqs_metadata.yaml b/.doc_gen/metadata/sqs_metadata.yaml index b52a35a63ff..60d2b83ee2a 100644 --- a/.doc_gen/metadata/sqs_metadata.yaml +++ b/.doc_gen/metadata/sqs_metadata.yaml @@ -1211,3 +1211,21 @@ sqs_Scenario_UseJMS: - javav2/example_code/sqs-jms/src/main/java/com/example/sqs/jms/SqsJmsExampleUtils.java services: sqs: {CreateQueue, DeleteQueue} +sqs_Scenario_SqsExtendedClient: + title: Manage large &SQS; messages using &S3; with an &AWS; SDK + title_abbrev: Manage large messages using S3 + synopsis: use the Amazon SQS Extended Client Library to work with large &SQS; messages. + category: Scenarios + languages: + Java: + versions: + - sdk_version: 2 + github: javav2/example_code/sqs + sdkguide: AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html + excerpts: + - description: + snippet_tags: + - sqs.java2.sqs-extended-client.main + services: + sqs: {SendMessage, ReceiveMessage} + s3: {CreateBucket, PutBucketLifecycleConfiguration} diff --git a/javav2/example_code/sqs/README.md b/javav2/example_code/sqs/README.md index 194236086be..02e10ae1769 100644 --- a/javav2/example_code/sqs/README.md +++ b/javav2/example_code/sqs/README.md @@ -54,6 +54,7 @@ Code examples that show you how to accomplish a specific task by calling multipl functions within the same service. - [Create and publish to a FIFO topic](../sns/src/main/java/com/example/sns/PriceUpdateExample.java) +- [Manage large messages using S3](src/main/java/com/example/sqs/SqsExtendedClientExample.java) - [Process S3 event notifications](../s3/src/main/java/com/example/s3/ProcessS3EventNotification.java) - [Publish messages to queues](../../usecases/topics_and_queues/src/main/java/com/example/sns/SNSWorkflow.java) - [Use the Amazon SQS Java Messaging Library to work with the JMS interface](../sqs-jms/src/main/java/com/example/sqs/jms/stdqueue/TextMessageSender.java) @@ -89,6 +90,18 @@ This example shows you how to create and publish to a FIFO Amazon SNS topic. +#### Manage large messages using S3 + +This example shows you how to use the Amazon SQS Extended Client Library to work with large Amazon SQS messages. + + + + + + + + + #### Process S3 event notifications This example shows you how to work with S3 event notifications in an object-oriented way. diff --git a/javav2/example_code/sqs/pom.xml b/javav2/example_code/sqs/pom.xml index 4f4738153e1..ead18d93914 100644 --- a/javav2/example_code/sqs/pom.xml +++ b/javav2/example_code/sqs/pom.xml @@ -59,6 +59,11 @@ software.amazon.awssdk secretsmanager + + com.amazonaws + amazon-sqs-java-extended-client-lib + 2.1.1 + com.google.code.gson gson @@ -109,6 +114,12 @@ org.apache.logging.log4j log4j-1.2-api + + + joda-time + joda-time + 2.12.6 + org.mockito mockito-core diff --git a/javav2/example_code/sqs/src/main/java/com/example/sqs/SqsExtendedClientExample.java b/javav2/example_code/sqs/src/main/java/com/example/sqs/SqsExtendedClientExample.java new file mode 100644 index 00000000000..228def67606 --- /dev/null +++ b/javav2/example_code/sqs/src/main/java/com/example/sqs/SqsExtendedClientExample.java @@ -0,0 +1,280 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.example.sqs; +// snippet-start:[sqs.java2.sqs-extended-client.main] +import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; +import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.BucketLifecycleConfiguration; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.ExpirationStatus; +import software.amazon.awssdk.services.s3.model.LifecycleExpiration; +import software.amazon.awssdk.services.s3.model.LifecycleRule; +import software.amazon.awssdk.services.s3.model.LifecycleRuleFilter; +import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectVersionsResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.PutBucketLifecycleConfigurationRequest; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; +import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** + * Example of using Amazon SQS Extended Client Library for Java 2.x. + */ +public class SqsExtendedClientExample { + private static final Logger logger = LoggerFactory.getLogger(SqsExtendedClientExample.class); + + private String s3BucketName; + private String queueUrl; + private final String queueName; + private final S3Client s3Client; + private final SqsClient sqsExtendedClient; + private final int messageSize; + + /** + * Constructor with default clients and message size. + */ + public SqsExtendedClientExample() { + this(S3Client.create(), 300000); + } + + /** + * Constructor with custom S3 client and message size. + * + * @param s3Client The S3 client to use + * @param messageSize The size of the test message to create + */ + public SqsExtendedClientExample(S3Client s3Client, int messageSize) { + this.s3Client = s3Client; + this.messageSize = messageSize; + + // Generate a unique bucket name. + this.s3BucketName = UUID.randomUUID() + "-" + + DateTimeFormat.forPattern("yyMMdd-hhmmss").print(new DateTime()); + + // Generate a unique queue name. + this.queueName = "MyQueue-" + UUID.randomUUID(); + + // Configure the SQS extended client. + final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(s3Client, s3BucketName); + + this.sqsExtendedClient = new AmazonSQSExtendedClient(SqsClient.builder().build(), extendedClientConfig); + } + + public static void main(String[] args) { + SqsExtendedClientExample example = new SqsExtendedClientExample(); + try { + example.setup(); + example.sendAndReceiveMessage(); + } finally { + example.cleanup(); + } + } + + /** + * Send a large message and receive it back. + * + * @return The received message + */ + public Message sendAndReceiveMessage() { + try { + // Create a large message. + char[] chars = new char[messageSize]; + Arrays.fill(chars, 'x'); + String largeMessage = new String(chars); + + // Send the message. + final SendMessageRequest sendMessageRequest = SendMessageRequest.builder() + .queueUrl(queueUrl) + .messageBody(largeMessage) + .build(); + + sqsExtendedClient.sendMessage(sendMessageRequest); + logger.info("Sent message of size: {}", largeMessage.length()); + + // Receive and return the message. + final ReceiveMessageResponse receiveMessageResponse = sqsExtendedClient.receiveMessage( + ReceiveMessageRequest.builder().queueUrl(queueUrl).build()); + + List messages = receiveMessageResponse.messages(); + if (messages.isEmpty()) { + throw new RuntimeException("No messages received"); + } + + Message message = messages.getFirst(); + logger.info("\nMessage received."); + logger.info(" ID: {}", message.messageId()); + logger.info(" Receipt handle: {}", message.receiptHandle()); + logger.info(" Message body size: {}", message.body().length()); + logger.info(" Message body (first 5 characters): {}", message.body().substring(0, 5)); + + return message; + } catch (RuntimeException e) { + logger.error("Error during message processing: {}", e.getMessage(), e); + throw e; + } + } +// snippet-end:[sqs.java2.sqs-extended-client.main] + /** + * Set up the S3 bucket and SQS queue. + */ + public void setup() { + try { + // Create and configure the S3 bucket. + createAndConfigureS3Bucket(); + + // Create the SQS queue. + createSqsQueue(); + } catch (RuntimeException e) { + logger.error("Error during setup: {}", e.getMessage(), e); + cleanup(); // Clean up any resources that were created before the error + throw e; + } + } + + /** + * Clean up all AWS resources + */ + public void cleanup() { + try { + // Delete the queue if it was created + if (queueUrl != null) { + sqsExtendedClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build()); + logger.info("Deleted the queue: {}", queueUrl); + queueUrl = null; + } + + // Delete the S3 bucket and its contents if it was created + if (s3BucketName != null) { + deleteBucketAndAllContents(); + logger.info("Deleted the bucket: {}", s3BucketName); + s3BucketName = null; + } + } catch (RuntimeException e) { + logger.error("Error during cleanup: {}", e.getMessage(), e); + } + } + + /** + * Create and configure the S3 bucket with lifecycle rules + */ + private void createAndConfigureS3Bucket() { + final LifecycleRule lifeCycleRule = LifecycleRule.builder() + .expiration(LifecycleExpiration.builder().days(14).build()) + .filter(LifecycleRuleFilter.builder().prefix("").build()) + .status(ExpirationStatus.ENABLED) + .build(); + + final BucketLifecycleConfiguration lifecycleConfig = BucketLifecycleConfiguration.builder() + .rules(lifeCycleRule) + .build(); + + s3Client.createBucket(CreateBucketRequest.builder().bucket(s3BucketName).build()); + s3Client.putBucketLifecycleConfiguration(PutBucketLifecycleConfigurationRequest.builder() + .bucket(s3BucketName) + .lifecycleConfiguration(lifecycleConfig) + .build()); + + logger.info("Bucket created and configured: {}", s3BucketName); + } + + /** + * Create the SQS queue + */ + private void createSqsQueue() { + final CreateQueueResponse createQueueResponse = sqsExtendedClient.createQueue( + CreateQueueRequest.builder().queueName(queueName).build()); + queueUrl = createQueueResponse.queueUrl(); + logger.info("Queue created: {}", queueUrl); + } + + /** + * Delete the message from the SQS queue + * + * @param message The message to delete + */ + public void deleteMessage(Message message) { + sqsExtendedClient.deleteMessage( + DeleteMessageRequest.builder() + .queueUrl(queueUrl) + .receiptHandle(message.receiptHandle()) + .build()); + + logger.info("Deleted the message: {}", message.messageId()); + } + + /** + * Delete the S3 bucket and all its contents + */ + private void deleteBucketAndAllContents() { + ListObjectsV2Response listObjectsResponse = s3Client.listObjectsV2( + ListObjectsV2Request.builder().bucket(s3BucketName).build()); + + listObjectsResponse.contents().forEach(object -> { + s3Client.deleteObject(DeleteObjectRequest.builder() + .bucket(s3BucketName) + .key(object.key()) + .build()); + logger.info("Deleted S3 object: {}", object.key()); + }); + + ListObjectVersionsResponse listVersionsResponse = s3Client.listObjectVersions( + ListObjectVersionsRequest.builder().bucket(s3BucketName).build()); + + listVersionsResponse.versions().forEach(version -> s3Client.deleteObject(DeleteObjectRequest.builder() + .bucket(s3BucketName) + .key(version.key()) + .versionId(version.versionId()) + .build())); + + s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(s3BucketName).build()); + } + + /** + * Get the S3 bucket name + * + * @return The S3 bucket name + */ + public String getS3BucketName() { + return s3BucketName; + } + + /** + * Get the SQS queue URL + * + * @return The SQS queue URL + */ + public String getQueueUrl() { + return queueUrl; + } + + /** + * Get the SQS queue name + * + * @return The SQS queue name + */ + public String getQueueName() { + return queueName; + } + +} diff --git a/javav2/example_code/sqs/src/test/java/com/example/sqs/SqsExtendedClientExampleTest.java b/javav2/example_code/sqs/src/test/java/com/example/sqs/SqsExtendedClientExampleTest.java new file mode 100644 index 00000000000..b3caaa3d56c --- /dev/null +++ b/javav2/example_code/sqs/src/test/java/com/example/sqs/SqsExtendedClientExampleTest.java @@ -0,0 +1,110 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.example.sqs; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test for SqsExtendedClientExample + *

+ * This test verifies that: + * 1. The S3 bucket is created and configured correctly + * 2. The SQS queue is created correctly + * 3. A large message is sent to the queue and stored in S3 + * 4. The message can be received from the queue + * 5. All resources are cleaned up properly + *

+ * Note: This test requires valid AWS credentials to be configured + */ +public class SqsExtendedClientExampleTest { + + private SqsExtendedClientExample example; + private S3Client s3Client; + private final int TEST_MESSAGE_SIZE = 300000; // 300KB, exceeds the SQS limit of 256KB + + @BeforeEach + public void setUp() { + s3Client = S3Client.create(); + example = new SqsExtendedClientExample(s3Client, TEST_MESSAGE_SIZE); + } + + @AfterEach + public void tearDown() { + // Ensure all resources are cleaned up even if tests fail + if (example != null) { + example.cleanup(); + } + } + + @Test + @Tag("IntegrationTest") + public void testFullWorkflow() { + // Set up the resources + example.setup(); + + // Verify the S3 bucket was created + String bucketName = example.getS3BucketName(); + assertNotNull(bucketName, "S3 bucket name should not be null"); + + // Verify the SQS queue was created + String queueUrl = example.getQueueUrl(); + assertNotNull(queueUrl, "Queue URL should not be null"); + assertTrue(queueUrl.contains(example.getQueueName()), "Queue URL should contain the queue name"); + + // Send and receive a message + Message receivedMessage = example.sendAndReceiveMessage(); + + // Verify the message was received correctly + assertNotNull(receivedMessage, "Received message should not be null"); + assertNotNull(receivedMessage.messageId(), "Message ID should not be null"); + assertNotNull(receivedMessage.receiptHandle(), "Receipt handle should not be null"); + assertEquals(TEST_MESSAGE_SIZE, receivedMessage.body().length(), "Message body length should match"); + assertEquals("xxxxx", receivedMessage.body().substring(0, 5), "Message content should match"); + + // Verify the message was stored in S3 + ListObjectsV2Response listObjectsResponse = s3Client.listObjectsV2( + ListObjectsV2Request.builder().bucket(bucketName).build()); + + List s3Objects = listObjectsResponse.contents(); + assertFalse(s3Objects.isEmpty(), "S3 bucket should contain objects"); + + // Verify at least one object in the bucket + S3Object s3Object = s3Objects.getFirst(); + assertNotNull(s3Object.key(), "S3 object key should not be null"); + + // Verify the object size is approximately the same as our message + HeadObjectResponse headObjectResponse = s3Client.headObject( + HeadObjectRequest.builder() + .bucket(bucketName) + .key(s3Object.key()) + .build()); + + // The S3 object size might be slightly larger due to metadata + assertTrue(headObjectResponse.contentLength() >= TEST_MESSAGE_SIZE, + "S3 object size should be at least the message size"); + + // Delete the message + example.deleteMessage(receivedMessage); + + // Clean up resources (this is also done in tearDown as a safety measure) + example.cleanup(); + } +}