How to use the Tumbling window function for the non keyed streaming data in Flink?
I want to use the tumbling window function for my program (non keyed data) as it is processing streaming data but only 300 messages/sec. I want to take it to at least 5K/sec. For this purpose, I want to use the tumbling window for 2 sec just to see speed up its performance. But I am not sure how to use this in my case.
Note: I am using the Geomesa HBase platform for saving the messages. Also, I did not paste my whole application code here as I only need the window function for which this code is sufficient here for your understanding
Here is my flink code
public class Tranport {
public static void main(String[] args) throws Exception {
// fetch runtime arguments
String bootstrapServers = "xx.xx.xxx.xxx:xxxx";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set up the Consumer and create a datastream from this source
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);
properties.setProperty("group.id", "group_id");
final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>("lc", new SimpleStringSchema(), properties);
flinkConsumer.setStartFromTimestamp(Long.parseLong("0"));
DataStream<String> readingStream = env.addSource(flinkConsumer);
readingStream.rebalance().map(new RichMapFunction<String, String>() {
private static final long serialVersionUID = -2547861355L; // random number
DataStore lc_live = null;
SimpleFeatureType sft_live;
SimpleFeatureBuilder SFbuilderLive; // feature builder for live
List<SimpleFeature> lc_live_features; //
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("In open method.");
// --- GEOMESA, GEOTOOLS APPROACH ---//
// define connection parameters to xxx GeoMesa-HBase DataStore
Map<String, Serializable> params_live = new HashMap<>();
params_live.put("xxxx", "xxx"); // HBase table name
params_live.put("xxxx","xxxx");
try {
lc_live = DataStoreFinder.getDataStore(params_live);
if (lc_live == null) {
System.out.println("Could not connect to live");
} else {
System.out.println("Successfully connected to live");
}
} catch (IOException e) {
e.printStackTrace();
}
// create simple feature type for x table in HBASE
StringBuilder attributes1 = new StringBuilder();
attributes1.append("xxx:String,");
attributes1.append("xxx:Long,");
attributes1.append("source:String,");
attributes1.append("xxx:String,");
attributes1.append("xxx:Double,");
attributes1.append("status:String,");
attributes1.append("forecast:Double,");
attributes1.append("carsCount:Integer,");
attributes1.append("*xxx:Point:srid=4326");
sft_history = SimpleFeatureTypes.createType("xxxx", attributes1.toString());
try {
lc_history.createSchema(sft_history);
} catch (IOException e) {
e.printStackTrace();
}
// Initialize the variables
numberOfMessagesProcessed = 0;
numberOfMessagesFailed = 0;
numberOfMessagesSkipped = 0;
// for lc_Live
lc_live_features = new ArrayList<>();
SFbuilderLive = new SimpleFeatureBuilder(sft_live);
Here I want to create a Tumbling window function (Window All) which can take all the stream messages with in 2 seconds of window and push them into the array list which i have created below
// live GeoMesa-HBase DataStore
// copy the list into a local variable and empty the list for the next iteration
List<SimpleFeature> LocalFeatures = live_features;
live_features = new ArrayList<>();
LocalFeatures = Collections.unmodifiableList(LocalFeatures);
try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer = live.getFeatureWriterAppend(sft_live.getTypeName(), Transaction.AUTO_COMMIT)) {
System.out.println("Writing " + LocalFeatures.size() + " features to live");
for (SimpleFeature feature : LocalFeatures) {
SimpleFeature toWrite = writer.next();
toWrite.setAttributes(feature.getAttributes());
((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
toWrite.getUserData().putAll(feature.getUserData());
writer.write();
}
} catch (IOException e) {
e.printStackTrace();
}
It's late but might help someone. In Scala, you can do something like
env.addSource(consumer).
windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)))
But, remember if you are not using KeyBy()
, then your data won't be processed in parallel no matter what value you set in env.setParallelism()