Spring batch Job read from multiple sources
There isn't a ready-to-use component that perform what you ask; the only solution is to write a custom ItemReader<>
that delegates to JdbcCursorItemReader
(or to HibernateCursorItemReader
or to any generic ItemReader
implementation).
You need to prepare all necessary stuff (datasource, session, real database readers) and bind all delegated readers to your custom reader.
EDIT:
You need to simulate a loop using recusion of ItemReader.read()
and mantain reader and delegates state across job restarts.
class MyItemReader<T> implements ItemReader<T>, ItemStream {
private ItemReader[] delegates;
private int delegateIndex;
private ItemReader<T> currentDelegate;
private ExecutionContext stepExecutionContext;
public void setDelegates(ItemReader[] delegates) {
this.delegates = delegates;
}
@BeforeStep
private void beforeStep(StepExecution stepExecution) {
this.stepExecutionContext = stepExecution.getExecutionContext();
}
public T read() {
T item = null;
if(null != currentDelegate) {
item = currentDelegate.read();
if(null == item) {
((ItemStream)this.currentDelegate).close();
this.currentDelegate = null;
}
}
// Move to next delegate if previous was exhausted!
if(null == item && this.delegateIndex< this.delegates.length) {
this.currentDelegate = this.delegates[this.currentIndex++];
((ItemStream)this.currentDelegate).open(this.stepExecutionContext);
update(this.stepExecutionContext);
// Recurse to read() to simulate loop through delegates
item = read();
}
return item;
}
public void open(ExecutionContext ctx) {
// During open restore last active reader and restore its state
if(ctx.containsKey("index")) {
this.delegateIndex = ctx.getInt("index");
this.currentDelegate = this.delegates[this.delegateIndex];
((ItemStream)this.currentDelegate ).open(ctx);
}
}
public void update(ExecutionContext ctx) {
// Update current delegate index and state
ctx.putInt("index", this.delegateIndex);
if(null != this.currentDelegate) {
((ItemStream)this.currentDelegate).update(ctx);
}
}
public void close(ExecutionContext ctx) {
if(null != this.currentDelegate) {
((ItemStream)this.currentDelegate).close();
}
}
<bean id="myItemReader" class=path.to.MyItemReader>
<property name="delegates">
<array>
<ref bean="itemReader1"/>
<ref bean="itemReader2"/>
<ref bean="itemReader3"/>
</array>
</property>
</bean>
EDIT2: Remember to set property name; this is NECESSARY to let MyItemReader.read() works correctly
<bean id="itemReader1" class="JdbcCursorItemReader">
<property name="name" value="itemReader1" />
<!-- Set other properties -->
</bean>
I suggest a simple workaround that may not be suitable to all cases, but will be useful in many:
Simply define:
- 2 readers, one for each database
- 2 steps
- one job that contains both 2 steps
The 2 steps are nearly identical, they reference the same processor and writer, but they have different readers. They will be called consecutively.
Whether this setup works will depend on the processor and writer (whether they still work correctly when called in different steps). In my case, it was sufficient to set appendAllowed=true
to the writer, such that both steps can write to the same file.