How to use the Tumbling window function for the non keyed streaming data in Flink?

I want to use the tumbling window function for my program (non keyed data) as it is processing streaming data but only 300 messages/sec. I want to take it to at least 5K/sec. For this purpose, I want to use the tumbling window for 2 sec just to see speed up its performance. But I am not sure how to use this in my case.

Note: I am using the Geomesa HBase platform for saving the messages. Also, I did not paste my whole application code here as I only need the window function for which this code is sufficient here for your understanding

Here is my flink code

public class Tranport {

    public static void main(String[] args) throws Exception {

        // fetch runtime arguments
        String bootstrapServers = "xx.xx.xxx.xxx:xxxx";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Set up the Consumer and create a datastream from this source
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", bootstrapServers);
        properties.setProperty("group.id", "group_id");
        final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>("lc", new SimpleStringSchema(), properties);
        flinkConsumer.setStartFromTimestamp(Long.parseLong("0"));

        DataStream<String> readingStream = env.addSource(flinkConsumer);
        readingStream.rebalance().map(new RichMapFunction<String, String>() {

            private static final long serialVersionUID = -2547861355L; // random number

            DataStore lc_live = null;
            
            SimpleFeatureType sft_live;
            SimpleFeatureBuilder SFbuilderLive; // feature builder for live

            List<SimpleFeature> lc_live_features; // 

            @Override
            public void open(Configuration parameters) throws Exception {
                System.out.println("In open method.");

                // --- GEOMESA, GEOTOOLS APPROACH ---//
                // define connection parameters to xxx GeoMesa-HBase DataStore
                Map<String, Serializable> params_live = new HashMap<>();
                params_live.put("xxxx", "xxx"); // HBase table name
                params_live.put("xxxx","xxxx");

                try {
                    lc_live = DataStoreFinder.getDataStore(params_live);
                    if (lc_live == null) {
                        System.out.println("Could not connect to live");
                    } else {
                        System.out.println("Successfully connected to live");
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }

                // create simple feature type for x table in HBASE 
                StringBuilder attributes1 = new StringBuilder();
                attributes1.append("xxx:String,");
                attributes1.append("xxx:Long,");
                attributes1.append("source:String,");
                attributes1.append("xxx:String,");
                attributes1.append("xxx:Double,");
                attributes1.append("status:String,");
                attributes1.append("forecast:Double,");
                attributes1.append("carsCount:Integer,");
                attributes1.append("*xxx:Point:srid=4326");
                sft_history = SimpleFeatureTypes.createType("xxxx", attributes1.toString());

                try {
                    lc_history.createSchema(sft_history);                                               

                } catch (IOException e) {
                    e.printStackTrace();
                }

                // Initialize the variables
                numberOfMessagesProcessed = 0;
                numberOfMessagesFailed = 0;
                numberOfMessagesSkipped = 0;

        // for lc_Live
                lc_live_features = new ArrayList<>();
                SFbuilderLive = new SimpleFeatureBuilder(sft_live);

Here I want to create a Tumbling window function (Window All) which can take all the stream messages with in 2 seconds of window and push them into the array list which i have created below

        
                        // live GeoMesa-HBase DataStore
                        // copy the list into a local variable and empty the list for the next iteration
                        List<SimpleFeature> LocalFeatures = live_features;
                        live_features = new ArrayList<>();
                        LocalFeatures = Collections.unmodifiableList(LocalFeatures);
                        try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer = live.getFeatureWriterAppend(sft_live.getTypeName(), Transaction.AUTO_COMMIT)) {
                            System.out.println("Writing " + LocalFeatures.size() + " features to live");
                            for (SimpleFeature feature : LocalFeatures) {
                                SimpleFeature toWrite = writer.next();
                                toWrite.setAttributes(feature.getAttributes());
                                ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
                                toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
                                toWrite.getUserData().putAll(feature.getUserData());
                                writer.write();
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

It's late but might help someone. In Scala, you can do something like

 env.addSource(consumer).
      windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)))  

But, remember if you are not using KeyBy(), then your data won't be processed in parallel no matter what value you set in env.setParallelism()