Apache Beam Streaming Lag Operator
I'm currently thinking to build pipeline that have LAG
operator like in SQL. But i'm not sure if it's possible.
To be clearer, let's say I have stream of data like this:
# sensor_name, temperature
("station 1", 30.0)
("station 1", 31.0)
("station 1", 32.0)
("station 1", 33.0)
("station 2", 30.0)
("station 2", 31.0)
("station 2", 32.0)
and do PTransform and the output become
("station 1", {"now":30.0, "before":None})
("station 1", {"now":31.0, "before":30.0})
("station 1", {"now":32.0, "before":31.0})
("station 1", {"now":33.0, "before":32.0})
("station 2", {"now":30.0, "before":None})
("station 2", {"now":31.0, "before":30.0})
("station 2", {"now":32.0, "before":31.0})
Is it possible to do so? thanks!
Solution 1:
Here you have a working sample using the public topic for taxis
This is the StatefulDoFn
class UpdateLast(beam.DoFn):
RIDE_TRACK = BagStateSpec('rides', TupleCoder((FloatCoder(), FloatCoder())))
def process(self,
element,
timestamp_param=beam.DoFn.TimestampParam,
ride_state=beam.DoFn.StateParam(RIDE_TRACK)):
key = element[0]
meter_reading = element[1]
timestamp = float(timestamp_param)
bag_content = [x for x in ride_state.read()]
if not bag_content:
logging.info("Generating entry %s for key %s", (meter_reading, timestamp), key)
ride_state.add((meter_reading, timestamp))
output = {"now": meter_reading, "before": None}
yield (key, output)
else:
# There should only be one element in the bag
bag_ride = bag_content[0]
old_meter = bag_ride[0]
old_timestamp = bag_ride[1]
# We only need to check if the element is more recent
if timestamp > old_timestamp:
# Update bag
ride_state.clear()
ride_state.add((meter_reading, timestamp))
output = {"now": meter_reading, "before": old_meter}
logging.info("KEY %s: updating from %s to %s", key, old_meter, meter_reading)
yield (key, output)
else:
# Invert old and new if element is old
output = {"now": old_meter, "before": meter_reading}
yield (key, output)
And a pipeline for you to test it"
options = PipelineOptions(
temp_location=f"{bucket}/tmp/",
project=project,
region=region,
streaming=True,
job_name="statedofn",
num_workers=4,
max_num_workers=20,
)
p = beam.Pipeline(DataflowRunner(), options=options)
topic = "projects/pubsub-public-data/topics/taxirides-realtime"
pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
| "Json Loads" >> Map(json.loads)
| beam.Filter(lambda x: x["ride_status"] == "enroute")
| "KV" >> Map(lambda x: (x["ride_id"], x["meter_reading"]))
)
state_df = (pubsub | "Stateful Do Fn" >> ParDo(UpdateLast())
| Map(logging.info)
)
p.run()
output:
('052b8a40-1c57-4a3c-a012-73ffeddb1f02', {'now': 9.875244, 'before': 9.857124})
('835a9a99-c2fc-4f3d-9284-59098827fe05', {'now': 26.973698, 'before': 26.940273})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.828278, 'before': 17.808857})
('952c0fa5-2bb8-4c9a-b38c-72d66dedfddc', {'now': 17.847698, 'before': 17.828278})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3575556, 'before': 2.3346667})
('d5641df2-2fd8-4416-bde7-4def6d477a29', {'now': 2.3804445, 'before': 2.3575556})