@EnableBinding @deprecated as of 3.1 in favor of functional programming model
I see that following annotations are depreciated for Spring Cloud Stream
@Input
@Output
@EnableBinding
@StreamListener
Please provide examples and links to documentation as how to do it in functional way.
Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you. Older version the code with annotation looks like below:
interface InputChannels {
@Input("input")
SubscribableChannel input();
}
@EnableBinding(InputChannels.class)
public class PubSubDemo {
@StreamListener("input")
public void listen() {
if (LOG.isInfoEnabled()) {
LOG.info(context.toString());
}
}
New version code will be like :
public class PubSubDemo {
@Bean
Consumer<String> input() {
return str -> {
if (LOG.isInfoEnabled()) {
LOG.info(context.toString());
}
};
}
}
Check Consumer bean replaced the @StreamListener and the @Input.
Regarding the configuration, if before in order to configure you had an application.yml looking like so:
spring:
cloud:
stream:
bindings:
input:
destination: destination
group: group
consumer:
concurrency: 10
max-attempts: 3
Now new configuration will be like
spring:
cloud:
stream:
bindings:
input-in-0:
destination: destination
group: group
consumer:
concurrency: 10
max-attempts: 3
The in and out corresponds to the type of binding (such as input or output). The index is the index of the input or output binding. It is always 0 for typical single input/output function.
Now let consider Output channels:
public interface OutputChannels {
@Output
MessageChannel output();
}
@Service
@EnableBinding(OutputChannels.class)
class PubSubSendQueue {
OutputChannels outputChannel;
public void publish() {
outputChannel.output().send("Hello");
}
}
Now with the Functions code will be as :
@Service
class PubSubSendQueue {
@Bean
public Supplier<String> output(){
return Supplier { "Adam" }
}
}
This github repository contains a lot of examples..
https://github.com/spring-cloud/stream-applications
The official documentation explains in details how to move from imperative to functional style in spring cloud stream applications with kafka streams but is the same without it.
https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#spring_cloud_function
Please also check this post..
https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified
https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model
There is an example of imperative code (https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_imperative_programming_model) and how it should be developed with functional style.
Here some more helpful information:
Sending a message
Use the org.springframework.cloud.stream.function.StreamBridge for sending messages.
Before
myDataSource.output().send(message);
After
streamBridge.send("myData-out-0", message);
Replacing a ServiceActivator
Before
@ServiceActivator(inputChannel = MyProcessor.INPUT, outputChannel = MyProcessor.OUTPUT)
public Message<MySuperOutputMessage> transform(Message<MySuperInputMessage> message) { ... }
After
@Bean
Function<Message<MySuperInputMessage>, Message<MySuperOutputMessage>> myCoolFunction() {
return message -> {...};
}
Do not forget to register "myCoolFunction" in the properties spring.cloud.function.definition.