I want to define a flow that writes to MongoDB, and only on success writes the IDs to Kafka. I'm using the JavaDSL, and I'd wish to have a FlowBuilder class that defines my pipeline at a high level. I'm searching for the features that will enable me to write a flow such as:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .process(writeToMongo) // <-- Searching for this kind of function
      .handle(writeToKafka)
      .get();
}

I've seen that Apache Camel works exactly like this and I wonder if Spring Integration has a simple and good solution to this basic problem either.


What you are looking for is a publishSubscribeChannel() with its capabilities to have several subscribers. By default, without an executor configured on the channel, the next subscriber is going to be called only after the previous and only if this one has succeeded.

It may look similar to what you express with that process():

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .publishSubscribeChannel(c -> c
                        .subscribe(sf -> sf
                                .handle(MongoDb.reactiveOutboundChannelAdapter())) 
      .handle(writeToKafka)
      .get();
}

Another option would be a gateway(), but then you need to to return something from there to continue. In Spring Integration if no reply, the flow is just stopped. It doesn't have a concept of reusing in for out if no out.