Azure Stream ANalytics - Find Most Recent `n` Events Within Time Interval
I am working with Azure Stream Analytics and, to illustrate my situation, I have streaming events corresponding to buy(+)/sell(-) orders from users of a certain amount. So, key fields in an individual event look like: {UserId: 'u12345', Type: 'Buy', Amt: 14.0}
.
I want to write a query which outputs UserId
's and the sum of Amt
for the most recent (up to) 5 events within a sliding 24 hr period partitioned by UserId
.
To clarify:
- If there are more than 5 events for a given
UserId
in the last 24 hours, I only want the sum ofAmt
for the most recent 5. - If there are fewer than 5 events, I either want the
UserId
to be omitted or the sum of theAmt
of the events that do exist.
I've tried looking at LIMIT DURATION
predicates, but there doesn't seem to be a way to limit the number of events as well as filter on time while PARTITION
'ing by UserId
. Has anyone done something like this?
Considering the comments, I think this should work:
WITH Last5 AS (
SELECT
UserId,
System.Timestamp() AS windowEnd,
COLLECTTOP(5) OVER (ORDER BY CAST(EventEnqueuedUtcTime AS DATETIME) DESC) AS Top5
FROM input1
TIMESTAMP BY EventEnqueuedUtcTime
GROUP BY
SlidingWindow(hour,24),
UserId
HAVING COUNT(*) >= 5 --We want at least 5
)
SELECT
L.UserId,
System.Timestamp() AS ts,
SUM(C.ArrayValue.value.Amt) AS sumAmt
INTO myOutput
FROM Last5 AS L
CROSS APPLY GetArrayElements(L.Top5) AS C
GROUP BY
System.Timestamp(), --Snapshot window
L.UserId
We use a CTE to first get the sliding window of 24h. In there we both filter to only retain windows of more than 5 records (HAVING COUNT(*) > 5
), and collect only the last 5 of them (COLLECTOP(5) OVER...
). Note that I had to TIMESTAMP BY
and CAST
on my own timestamp when testing the query, you may not need that in your case.
Next we need to unpack the collected records, that's done via CROSS APPLY GetArrayElements
, and sum them. I use a snapshot window for that, as I don't need time grouping on that one.
Please let me know if you need more details.