@EnableBinding @deprecated as of 3.1 in favor of functional programming model

I see that following annotations are depreciated for Spring Cloud Stream

@Input @Output @EnableBinding @StreamListener

Please provide examples and links to documentation as how to do it in functional way.


Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you. Older version the code with annotation looks like below:

interface InputChannels {
    
        @Input("input")
        SubscribableChannel input();
    }

    @EnableBinding(InputChannels.class)
    public class PubSubDemo {
    @StreamListener("input")
    public void listen() {
            
        if (LOG.isInfoEnabled()) {
            LOG.info(context.toString());
        }
    }

New version code will be like :

public class PubSubDemo {
   @Bean
   Consumer<String> input() {
    return str -> {
        
        if (LOG.isInfoEnabled()) {
            LOG.info(context.toString());
             }
        };
   }
}

Check Consumer bean replaced the @StreamListener and the @Input.

Regarding the configuration, if before in order to configure you had an application.yml looking like so:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: destination
          group: group
          consumer:
            concurrency: 10
            max-attempts: 3

Now new configuration will be like

spring:
  cloud:
    stream:
      bindings:
        input-in-0:
          destination: destination
          group: group
          consumer:
            concurrency: 10
            max-attempts: 3

The in and out corresponds to the type of binding (such as input or output). The index is the index of the input or output binding. It is always 0 for typical single input/output function.

Now let consider Output channels:

    public interface OutputChannels {
        @Output
        MessageChannel output();
    }

    @Service
    @EnableBinding(OutputChannels.class)
    class PubSubSendQueue {
    
    OutputChannels  outputChannel;

    public void publish() {
        outputChannel.output().send("Hello");
    }
}

Now with the Functions code will be as :

@Service
class PubSubSendQueue {
    @Bean
    public Supplier<String> output(){
        return Supplier { "Adam" }
    }
}

This github repository contains a lot of examples..
https://github.com/spring-cloud/stream-applications

The official documentation explains in details how to move from imperative to functional style in spring cloud stream applications with kafka streams but is the same without it.

https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#spring_cloud_function

Please also check this post..
https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model

There is an example of imperative code (https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_imperative_programming_model) and how it should be developed with functional style.


Here some more helpful information:

Sending a message

Use the org.springframework.cloud.stream.function.StreamBridge for sending messages.

Before

myDataSource.output().send(message);

After

streamBridge.send("myData-out-0", message);

Replacing a ServiceActivator

Before

@ServiceActivator(inputChannel = MyProcessor.INPUT, outputChannel = MyProcessor.OUTPUT)
public Message<MySuperOutputMessage> transform(Message<MySuperInputMessage> message) { ... }

After

@Bean
Function<Message<MySuperInputMessage>, Message<MySuperOutputMessage>> myCoolFunction() {
    return message -> {...}; 
}

Do not forget to register "myCoolFunction" in the properties spring.cloud.function.definition.