Apache Beam Streaming Lag Operator

I'm currently thinking to build pipeline that have LAG operator like in SQL. But i'm not sure if it's possible.

To be clearer, let's say I have stream of data like this:

# sensor_name, temperature
("station 1", 30.0)
("station 1", 31.0)
("station 1", 32.0)
("station 1", 33.0)
("station 2", 30.0)
("station 2", 31.0)
("station 2", 32.0)

and do PTransform and the output become

("station 1", {"now":30.0, "before":None})
("station 1", {"now":31.0, "before":30.0})
("station 1", {"now":32.0, "before":31.0})
("station 1", {"now":33.0, "before":32.0})
("station 2", {"now":30.0, "before":None})
("station 2", {"now":31.0, "before":30.0})
("station 2", {"now":32.0, "before":31.0})

Is it possible to do so? thanks!


Solution 1:

Here you have a working sample using the public topic for taxis

This is the StatefulDoFn

class UpdateLast(beam.DoFn):
    RIDE_TRACK = BagStateSpec('rides', TupleCoder((FloatCoder(), FloatCoder())))
    
    def process(self,
                element,
                timestamp_param=beam.DoFn.TimestampParam,
                ride_state=beam.DoFn.StateParam(RIDE_TRACK)):
        
        key = element[0]
        meter_reading = element[1]
        timestamp = float(timestamp_param)
        
        bag_content = [x for x in ride_state.read()]
        if not bag_content:
            logging.info("Generating entry %s for key %s", (meter_reading, timestamp), key)
            ride_state.add((meter_reading, timestamp))
            output = {"now": meter_reading, "before": None}
            yield (key, output)
        else:
            # There should only be one element in the bag
            bag_ride = bag_content[0]
            
            old_meter = bag_ride[0]
            old_timestamp = bag_ride[1]
            # We only need to check if the element is more recent
            if timestamp > old_timestamp:
                # Update bag
                ride_state.clear()
                ride_state.add((meter_reading, timestamp))
                output = {"now": meter_reading, "before": old_meter}
                logging.info("KEY %s: updating from %s to %s", key, old_meter, meter_reading)
                yield (key, output)
            else: 
                # Invert old and new if element is old
                output = {"now": old_meter, "before": meter_reading}
                yield (key, output) 

And a pipeline for you to test it"

options = PipelineOptions(
    temp_location=f"{bucket}/tmp/",
    project=project,
    region=region,
    streaming=True,
    job_name="statedofn",
    num_workers=4,
    max_num_workers=20,
)

p = beam.Pipeline(DataflowRunner(), options=options)

topic = "projects/pubsub-public-data/topics/taxirides-realtime"

pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
            | "Json Loads" >> Map(json.loads)
            | beam.Filter(lambda x: x["ride_status"] == "enroute")
            | "KV" >> Map(lambda x: (x["ride_id"], x["meter_reading"]))
         )

state_df = (pubsub | "Stateful Do Fn" >> ParDo(UpdateLast())
                   | Map(logging.info)
           )

p.run()

output:

('052b8a40-1c57-4a3c-a012-73ffeddb1f02', {'now': 9.875244, 'before': 9.857124})
('835a9a99-c2fc-4f3d-9284-59098827fe05', {'now': 26.973698, 'before': 26.940273})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.828278, 'before': 17.808857})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.847698, 'before': 17.828278})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3575556, 'before': 2.3346667})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3804445, 'before': 2.3575556})