Spring Cloud GCP provides Spring Integration adapters that allow your applications to use Enterprise Integration Patterns backed up by Google Cloud Platform services.
The channel adapters for Google Cloud Pub/Sub connect your Spring
MessageChannels
to Google Cloud Pub/Sub topics and subscriptions.
This enables messaging between different processes, applications or micro-services backed up by
Google Cloud Pub/Sub.
The Spring Integration Channel Adapters for Google Cloud Pub/Sub are included in the
spring-cloud-gcp-pubsub
module.
Maven coordinates, using Spring Cloud GCP BOM:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-gcp-pubsub</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> </dependency>
Gradle coordinates:
dependencies { compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-pubsub' compile group: 'org.springframework.integration', name: 'spring-integration-core' }
PubSubInboundChannelAdapter
is the inbound channel adapter for GCP Pub/Sub that listens to a GCP
Pub/Sub subscription for new messages.
It converts new messages to an internal Spring
Message
and then sends it to the bound output channel.
To use the inbound channel adapter, a PubSubInboundChannelAdapter
must be provided and configured
on the user application side.
@Bean public MessageChannel pubsubInputChannel() { return new PublishSubscribeChannel(); } @Bean public PubSubInboundChannelAdapter messageChannelAdapter( @Qualifier("pubsubInputChannel") MessageChannel inputChannel, SubscriberFactory subscriberFactory) { PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(subscriberFactory, "subscriptionName"); adapter.setOutputChannel(inputChannel); adapter.setAckMode(AckMode.MANUAL); return adapter; }
In the example, we first specify the MessageChannel
where the adapter is going to write incoming
messages to.
The MessageChannel
implementation isn’t important here.
Depending on your use case, you might want to use a MessageChannel
other than
PublishSubscribeChannel
.
Then, we declare a PubSubInboundChannelAdapter
bean.
It requires the channel we just created and a SubscriberFactory
, which creates Subscriber
objects from the Google Cloud Java Client for Pub/Sub.
The Spring Boot starter for GCP Pub/Sub provides a configured SubscriberFactory
.
It is also possible to set the message acknowledgement mode on the adapter, which is automatic by
default.
On automatic acking, a message is acked with GCP Pub/Sub if the adapter sent it to the channel and
no exceptions were thrown.
If a RuntimeException
is thrown while the message is processed, then the message is nacked.
On manual acking, the adapter attaches an AckReplyConsumer
object to the Message
headers, which
users can extract using the GcpPubSubHeaders.ACKNOWLEDGEMENT
key and use to (n)ack a message.
@Bean @ServiceActivator(inputChannel = "pubsubInputChannel") public MessageHandler messageReceiver() { return message -> { LOGGER.info("Message arrived! Payload: " + message.getPayload()); AckReplyConsumer consumer = message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT, AckReplyConsumer.class); consumer.ack(); }; }
PubSubMessageHandler
is the outbound channel adapter for GCP Pub/Sub that listens for new messages
on a Spring MessageChannel
.
It uses PubSubTemplate
to convert new Message
instances to the GCP Pub/Sub format and post them
to a GCP Pub/Sub topic.
To use the outbound channel adapter, a PubSubMessageHandler
bean must be provided and configured
on the user application side.
@Bean @ServiceActivator(inputChannel = "pubsubOutputChannel") public MessageHandler messageSender(PubSubTemplate pubsubTemplate) { return new PubSubMessageHandler(pubsubTemplate, "topicName"); }
The provided PubSubTemplate
contains all the necessary configuration to publish messages to a
GCP Pub/Sub topic.
PubSubMessageHandler
publishes messages asynchronously by default.
A publish timeout can be configured for synchronous publishing. If none is provided, the adapter
waits indefinitely for a response.
It is possible to set user-defined callbacks for the publish()
call in PubSubMessageHandler
through the setPublishFutureCallback()
method.
These are useful to process the message ID, in case of success, or the error if any was thrown.
To override the default destination you can use the GcpPubSubHeaders.DESTINATION
header.
@Autowired private MessageChannel pubsubOutputChannel; public void handleMessage(Message<?> msg) throws MessagingException { final Message<?> message = MessageBuilder .withPayload(msg.getPayload()) .setHeader(GcpPubSubHeaders.TOPIC, "customTopic").build(); pubsubOutputChannel.send(message); }
It is also possible to set an SpEL expression for the topic with the setTopicExpression()
or setTopicExpressionString()
methods.
The channel adapters for Google Cloud Storage allow you to read and write files to Google Cloud
Storage through MessageChannels
.
Spring Cloud GCP provides two inbound adapters, GcsInboundFileSynchronizingMessageSource
and
GcsStreamingMessageSource
, and one outbound adapter, GcsMessageHandler
.
The Spring Integration Channel Adapters for Google Cloud Storage are included in the
spring-cloud-gcp-storage
module.
To use the Storage portion of Spring Integration for Spring Cloud GCP, you must also provide the
spring-integration-file
dependency, since they aren’t pulled transitively.
Maven coordinates, using Spring Cloud GCP BOM:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-gcp-pubsub</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> </dependency>
Gradle coordinates:
dependencies { compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-pubsub' compile group: 'org.springframework.integration', name: 'spring-integration-file' }
The Google Cloud Storage inbound channel adapter polls a Google Cloud Storage bucket for new files
and sends each of them in a Message
payload to the MessageChannel
specified in the
@InboundChannelAdapter
annotation.
The files are temporarily stored in a folder in the local file system.
Here is an example of how to configure a Google Cloud Storage inbound channel adapter.
@Bean @InboundChannelAdapter(channel = "new-file-channel", poller = @Poller(fixedDelay = "5000")) public MessageSource<File> synchronizerAdapter(Storage gcs) { GcsInboundFileSynchronizer synchronizer = new GcsInboundFileSynchronizer(gcs); synchronizer.setRemoteDirectory("your-gcs-bucket"); GcsInboundFileSynchronizingMessageSource synchAdapter = new GcsInboundFileSynchronizingMessageSource(synchronizer); synchAdapter.setLocalDirectory(new File("local-directory")); return synchAdapter; }
The inbound streaming channel adapter is similar to the normal inbound channel adapter, except it does not require files to be stored in the file system.
Here is an example of how to configure a Google Cloud Storage inbound streaming channel adapter.
@Bean @InboundChannelAdapter(channel = "streaming-channel", poller = @Poller(fixedDelay = "5000")) public MessageSource<InputStream> streamingAdapter(Storage gcs) { GcsStreamingMessageSource adapter = new GcsStreamingMessageSource(new GcsRemoteFileTemplate(new GcsSessionFactory(gcs))); adapter.setRemoteDirectory("your-gcs-bucket"); return adapter; }
The outbound channel adapter allows files to be written to Google Cloud Storage.
When it receives a Message
containing a payload of type File
, it writes that file to the Google
Cloud Storage bucket specified in the adapter.
Here is an example of how to configure a Google Cloud Storage outbound channel adapter.
@Bean @ServiceActivator(inputChannel = "writeFiles") public MessageHandler outboundChannelAdapter(Storage gcs) { GcsMessageHandler outboundChannelAdapter = new GcsMessageHandler(new GcsSessionFactory(gcs)); outboundChannelAdapter.setRemoteDirectoryExpression(new ValueExpression<>("your-gcs-bucket")); return outboundChannelAdapter; }