composite conditions in apache flink DataStream.coGroup.where.equalTo clauses

I'm trying to coGroup 2 datastreams using flink's datastream API.

stream1.coGroup(stream2)
    .where(stream1Item -> streamItem.field1)
    .equalTo(stream2Item -> stream2Item.field1)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
    .apply(someCoGroupFunction)
    .sinkTo(someSink)

The above code successfully works while coGrouping on stream1.field1 = stream2.field1. But how the following kind of composite field equality be built for coGoruping?

stream1.field1 = stream2.field1 AND stream1.field2 = stream2.field2 AND (stream1.field3 = stream2.fieldN OR stream1.field4 = stream2.fieldN)

With Flink's SQL API such composite join conditions can be written flawlessly like

LEFT JOIN ON
Table1.field1 = Table2.field1 AND
Table1.field2 = Table2.field2 AND
(Table1.field3 = Table2.fieldN OR Table1.field4 = Table2.fieldN)

But how the same can be implemented in Datastream API with composite complex conditions for join/cogroup?


DataStream joins only support equality predicates, but I think you could implement the additional constraint inside the coGroupFunction. Or perhaps you could take the union of two coGroupings, one for each side of the OR.

But given that the DataStream and Table APIs are interoperable, why not just use the Table/SQL API for that part of the pipeline?