Skip to content

使用 MATCH_RECOGNIZE 模式匹配

openinx edited this page Nov 21, 2019 · 3 revisions

本节讨论在流数据中如何写SQL查询来做模式匹配。

你将会掌握如下知识点:

  • 在SQL:2016中定义的 MATCH_RECOGNIZE 语法;
  • Flink中支持的语义及限制;
  • 如何在Append表中实现模式匹配。

Slides

上机练习

这个练习将教您如何在数据流中使用模式匹配,以及如何在匹配的行集合中做相关计算。

计算行程的耗时

在这个练习中,我们想计算每一次行程的耗时时长,即行程结束时间减去行程开始时间,单位为分钟。这意味着,我们需要基于rideId来做一个开始事件和结束事件的模式查询。

注意: 这个练习跟join练习一摸一样,但是我们想通过使用 MATCH_RECOGNIZE 来解决这个问题。

点击查看提示
  • 找到每个 rideId 固定模式
  • 这个模式由两类事件组成:一个开始事件和一个结束事件;
  • 使用提供的 timeDiff 函数来返回两个时间戳的间隔,单位毫秒。


输出结果如下:
rideId        durationMin
 52693                 13
 46868                 24
 53226                 12
 53629                 11
 55651                  7
 43220                 31
 53082                 12
 54716                  9
 55125                  9
 57211                  4
 44795                 28
 53563                 12
点击查看答案
SELECT rideId, timeDiff(startT, endT) / 60000 AS durationMin
FROM Rides
MATCH_RECOGNIZE (
  PARTITION BY rideId
  ORDER BY rideTime
  MEASURES 
    S.rideTime AS startT, 
    E.rideTime AS endT
  AFTER MATCH SKIP PAST LAST ROW
  PATTERN (S E)
  DEFINE
    S AS S.isStart,
    E AS NOT E.isStart
);

This query matches start and end events of the same ride, i.e., that have the same ride id. By partitioning the table on rideId, only rides with the same id are processed together. Events are then distinguished into a start event S and an end event E and the pattern to match is defined as (S E), i.e., exactly one S followed by exactly one E. Finally, we emit for each match, the rideId and the timestamps of the start and end events, and compute the ride duration in the SELECT clause using the timeDiff function.


行程中的休息时间

在这个练习中,我们想找出出租车在开始一个新的行程之前闲置了多长时间。

使用 MATCH_RECOGNIZE 语法来检测这个模式:

  • 一个开始事件;
  • 对同一个出租车不同行程的潜在中间事件 (任意数量);
  • 一个结束事件;
  • 下一个开始事件;

计算休息时长,单位为分钟。

点击查看提示
  • 使用一个 AFTER MATCH SKIP TO LAST variable 策略来包含下次匹配前的最近一次开始事件。
  • 使用内置的 TIMESTAMPDIFF 函数来计算时间差,单位分钟。


输出参考如下:
                   taxiId                ride_start                  ride_end           next_ride_start           minutes_of_rest
                2013000002     2013-01-01 00:00:00.0     2013-01-01 00:06:00.0     2013-01-01 00:16:00.0                        10
                2013000004     2013-01-01 00:00:00.0     2013-01-01 00:08:00.0     2013-01-01 00:13:00.0                         5
                2013000032     2013-01-01 00:00:00.0     2013-01-01 00:05:00.0     2013-01-01 00:06:00.0                         1
                2013000128     2013-01-01 00:01:00.0     2013-01-01 00:04:00.0     2013-01-01 00:12:00.0                         8
                2013000256     2013-01-01 00:02:00.0     2013-01-01 00:10:00.0     2013-01-01 00:10:00.0                         0
                2013000512     2013-01-01 00:03:25.0     2013-01-01 00:04:51.0     2013-01-01 00:10:00.0                         5
                2013000512     2013-01-01 00:10:00.0     2013-01-01 00:13:31.0     2013-01-01 00:14:19.0                         0
                2013001028     2013-01-01 00:05:52.0     2013-01-01 00:13:20.0     2013-01-01 00:14:12.0                         0
                2013000258     2013-01-01 00:02:00.0     2013-01-01 00:08:00.0     2013-01-01 00:11:00.0                         3
                2013002070     2013-01-01 00:08:57.0     2013-01-01 00:12:26.0     2013-01-01 00:13:46.0                         1
点击查看答案
SELECT * FROM Rides
MATCH_RECOGNIZE(
  PARTITION BY taxiId
  ORDER BY rideTime
  MEASURES
    START_RIDE.rideTime AS ride_start,
    END_RIDE.rideTime AS ride_end,
    NEXT_RIDE.rideTime AS next_ride_start,
    TIMESTAMPDIFF(MINUTE, END_RIDE.rideTime, NEXT_RIDE.rideTime) AS minutes_of_rest
  AFTER MATCH SKIP TO LAST NEXT_RIDE
  PATTERN (START_RIDE M* END_RIDE NEXT_RIDE)
  DEFINE
    START_RIDE AS START_RIDE.isStart = true,
    M AS M.rideId <> START_RIDE.rideId,
    END_RIDE AS END_RIDE.isStart = false,
    NEXT_RIDE AS NEXT_RIDE.isStart = true
);

The query matches the pattern mentioned above. The variable START_RIDE detects the start event. M defines a greedy set of ride events that don't belong to the same ride of the start event. The END_RIDE detects the end event. The NEXT_RIDE variable defines the following start event for computing the resting time. Once a match has been detected, we measure the difference in minutes and return it.