Skip to content

Latest commit

 

History

History
 
 

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
page_type languages products description urlFragment
sample
java
azure-service-bus
Azure Spring Cloud Stream Binder Sample project for Service Bus Topic client library
azure-spring-cloud-sample-service-bus-topic-binder

Spring Cloud Azure Stream Binder for Service Bus topic Sample shared library for Java

Key concepts

This code sample demonstrates how to use the Spring Cloud Stream binder for Azure Service Bus topic. The sample app has two operating modes. One way is to expose a Restful API to receive string message, another way is to automatically provide string messages. These messages are published to a service bus topic. The sample will also consume messages from the same service bus topic.

Getting started

Running this sample will be charged by Azure. You can check the usage and bill at this link.

Create Azure resources

We have several ways to config the Spring Cloud Stream Binder for Azure Service Bus Topic. You can choose anyone of them.

Important

When using the Restful API to send messages, the Active profiles must contain manual.

Method 1: Connection string based usage

  1. Create Azure Service Bus namespace and topic. Please see how to create.

  2. Update application.yaml.

    spring:
      cloud:
        azure:
          servicebus:
            connection-string: [servicebus-namespace-connection-string] 
        stream:
          function:
            definition: consume;supply
          bindings: 
            consume-in-0: 
              destination: [servicebus-queue-name]
            supply-out-0:
              destination: [servicebus-queue-name-same-as-above]
          poller:
            fixed-delay: 1000
            initial-delay: 0

Method 2: Service principal based usage

  1. Create a service principal for use in by your app. Please follow create service principal from Azure CLI.

  2. Create Azure Service Bus namespace and queue. Please see how to create.

  3. Add Role Assignment for Service Bus. See Service principal for Azure resources with Service Bus to add role assignment for Service Bus. Assign Contributor role for managed identity.

  4. Update application-sp.yaml.

    spring:
      cloud:
        azure:
          client-id: [service-principal-id]
          client-secret: [service-principal-secret]
          tenant-id: [tenant-id]
          resource-group: [resource-group]
          servicebus:
            namespace: [servicebus-namespace]
        stream:
          function:
            definition: consume;supply
          bindings:
            consume-in-0:
              destination: [servicebus-queue-name]
            supply-out-0:
              destination: [servicebus-queue-name-same-as-above]
          poller:
            fixed-delay: 1000
            initial-delay: 0

    We should specify spring.profiles.active=sp to run the Spring Boot application.

Method 3: MSI credential based usage

Set up managed identity

Please follow create managed identity to set up managed identity.

Create other Azure resources
  1. Create Azure Service Bus namespace and queue. Please see how to create.

  2. Add Role Assignment for Service Bus. See Managed identities for Azure resources with Service Bus to add role assignment for Service Bus. Assign Contributor role for managed identity.

Update MSI related properties
  1. Update application-mi.yaml
    spring:
      cloud:
        azure:
          msi-enabled: true
          client-id: [the-id-of-managed-identity]
          resource-group: [resource-group]
          subscription-id: [subscription-id]
          servicebus:
            namespace: [servicebus-namespace]
        stream:
          function:
            definition: consume;supply
          bindings:
            consume-in-0:
              destination: [servicebus-queue-name]
            supply-out-0:
              destination: [servicebus-queue-name-same-as-above]
          poller:
            fixed-delay: 1000
            initial-delay: 0

    We should specify spring.profiles.active=mi to run the Spring Boot application. For App Service, please add a configuration entry for this.

Redeploy Application

If you update the spring.cloud.azure.managed-identity.client-id property after deploying the app, or update the role assignment for services, please try to redeploy the app again.

You can follow Deploy a Spring Boot JAR file to Azure App Service to deploy this application to App Service

Enable auto create

If you want to auto create the Azure Service Bus instances, make sure you add such properties (only support the service principal and managed identity cases):

spring:
  cloud:
    azure:
      subscription-id: [subscription-id]
      auto-create-resources: true
      environment: Azure
      region: [region]

Examples

  1. Run the mvn spring-boot:run in the root of the code sample to get the app running.

  2. Send a POST request

    $ curl -X POST http://localhost:8080/messages?message=hello
    

    or when the app runs on App Service or VM

    $ curl -d -X POST https://[your-app-URL]/messages?message=hello
    
  3. Verify in your app’s logs that a similar message was posted:

    New message received: 'hello'
    Message 'hello' successfully checkpointed
    
  4. Delete the resources on Azure Portal to avoid unexpected charges.

Enhancement

Configuration Options

The binder provides the following configuration options:

Spring Cloud Azure Properties
Name Description Required Default
spring.cloud.azure.credential-file-path Location of azure credential file Yes
spring.cloud.azure.resource-group Name of Azure resource group Yes
spring.cloud.azure.region Region name of the Azure resource group, e.g. westus Yes
spring.cloud.azure.servicebus.namespace Service Bus Namespace. Auto creating if missing Yes
spring.cloud.azure.servicebus.transportType Service Bus transportType, supported value of AMQP and AMQP_WEB_SOCKETS No AMQP
spring.cloud.azure.servicebus.retry-Options Service Bus retry options No Default value of AmqpRetryOptions
Serivce Bus Queue Producer Properties

It supports the following configurations with the format of spring.cloud.stream.servicebus.queue.bindings.<channelName>.producer.

sync

Whether the producer should act in a synchronous manner with respect to writing messages into a stream. If true, the producer will wait for a response after a send operation.

Default: false

send-timeout

Effective only if sync is set to true. The amount of time to wait for a response after a send operation, in milliseconds.

Default: 10000

Service Bus Queue Consumer Properties

It supports the following configurations with the format of spring.cloud.stream.servicebus.queue.bindings.<channelName>.consumer.

checkpoint-mode

The mode in which checkpoints are updated.

RECORD, checkpoints occur after each record successfully processed by user-defined message handler without any exception.

MANUAL, checkpoints occur on demand by the user via the Checkpointer. You can get Checkpointer by Message.getHeaders.get(AzureHeaders.CHECKPOINTER)callback.

Default: RECORD

prefetch-count

Prefetch count of underlying service bus client.

Default: 1

maxConcurrentCalls

Controls the max concurrent calls of service bus message handler and the size of fixed thread pool that handles user's business logic

Default: 1

maxConcurrentSessions

Controls the maximum number of concurrent sessions to process at any given time.

Default: 1

concurrency

When sessionsEnabled is true, controls the maximum number of concurrent sessions to process at any given time. When sessionsEnabled is false, controls the max concurrent calls of service bus message handler and the size of fixed thread pool that handles user's business logic.

Deprecated, replaced with maxConcurrentSessions when sessionsEnabled is true and maxConcurrentCalls when sessionsEnabled is false

Default: 1

sessionsEnabled

Controls if is a session aware consumer. Set it to true if is a queue with sessions enabled.

Default: false

requeueRejected

Controls if is a message that trigger any exception in consumer will be force to DLQ. Set it to true if a message that trigger any exception in consumer will be force to DLQ. Set it to false if a message that trigger any exception in consumer will be re-queued.

Default: false

receiveMode

The modes for receiving messages.

PEEK_LOCK, received message is not deleted from the queue or subscription, instead it is temporarily locked to the receiver, making it invisible to other receivers.

RECEIVE_AND_DELETE, received message is removed from the queue or subscription and immediately deleted.

Default: PEEK_LOCK

enableAutoComplete

Enable auto-complete and auto-abandon of received messages. 'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.

Default: false

Set Service Bus message headers

The following table illustrates how Spring message headers are mapped to Service Bus message headers and properties. When creat a message, developers can specify the header or property of a Service Bus message by below constants.

    @Autowired
private Sinks.Many<Message<String>> many;

@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
    many.emitNext(MessageBuilder.withPayload(message)
    .setHeader(SESSION_ID, "group1")
    .build(),
    Sinks.EmitFailureHandler.FAIL_FAST);
    return ResponseEntity.ok("Sent!");
    }

For some Service Bus headers that can be mapped to multiple Spring header constants, the priority of different Spring headers is listed.

Service Bus Message Headers and Properties Spring Message Header Constants Type Priority Number (Descending priority)
ContentType org.springframework.messaging.MessageHeaders.CONTENT_TYPE String N/A
CorrelationId com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.CORRELATION_ID String N/A
MessageId com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.MESSAGE_ID String 1
MessageId com.azure.spring.integration.core.AzureHeaders.RAW_ID String 2
MessageId org.springframework.messaging.MessageHeaders.ID UUID 3
PartitionKey com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.PARTITION_KEY String N/A
ReplyTo org.springframework.messaging.MessageHeaders.REPLY_CHANNEL String N/A
ReplyToSessionId com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.REPLY_TO_SESSION_ID String N/A
ScheduledEnqueueTimeUtc com.azure.spring.integration.core.AzureHeaders.SCHEDULED_ENQUEUE_MESSAGE Integer 1
ScheduledEnqueueTimeUtc com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.SCHEDULED_ENQUEUE_TIME Instant 2
SessionID com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.SESSION_ID String N/A
TimeToLive com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.TIME_TO_LIVE Duration N/A
To com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders.TO String N/A

Troubleshooting

Next steps

Contributing