Skip to content

Join 动态表

Jark Wu edited this page Nov 30, 2019 · 9 revisions

Flink SQL 支持三种常用的方法来 JOIN 流数据:

  • 普通Join;
  • 时间窗口的JOIN;
  • Temporal 表JOIN;

本课程将向您介绍如何使用这三种不同类型的Join。

Slides

上机练习

时间窗口JOIN

时间窗口JOIN是一个带临时窗口谓词的JOIN操作,这个窗口谓词要求两个输入表的两个记录的时间戳小于给定时间间隔。

因为多个输入表之间的临时关联属性,这个JOIN操作不需要完整的维护任何一个表的状态信息,实际上,只需要维护数据流尾部的一段状态。

计算一天中平均每小时收到的小费金额

计算一天内平均每小时收到的小费金额

点击查看提示
  • 一次行程收到的小费金额存放在 Fares 表中。
  • 支付行为期望在行程结束前5每分钟内完成。因此,Fares 表中一条记录的支付时间戳 payTime 肯定不小于行程结束时间戳 rideTime,同时最多比行程结束时间戳 rideTime 早5分钟。
  • 请使用内置的 HOUR 函数来提取时间戳的整点数值。


输出数据类似如下:
hourOfDay                    avgTip
01:00:00.0                0.93595284
02:00:00.0                 1.1023914
03:00:00.0                 1.1757631
04:00:00.0                 1.2181113
05:00:00.0                 1.1652884
06:00:00.0                 1.1640768
点击查看答案
SELECT
  HOUR(r.rideTime) AS hourOfDay,
  AVG(f.tip) AS avgTip
FROM
  Rides r,
  Fares f
WHERE 
  r.rideId = f.rideId AND
  NOT r.isStart AND
  f.payTime BETWEEN r.rideTime - INTERVAL '5' MINUTE AND r.rideTime
GROUP BY
  HOUR(r.rideTime);

这个查询通过rideId字段和时间戳两个字段连接了 Rides表的行程结束事件集和 Fares表的支付事件。窗口连接条件JOIN一次支付操作和一次行程事件,前提是 payTime 属性最多比 rideTime 属性早5分钟。在两个表JOIN之后,这个查询按照一天中的整点做分组(通过 HOUR 函数来计算),并计算每个组内的平均小费金额。


计算行程时长

在这个练习中,我们想计算每一次出租车行程的耗时,既行程开始事件和结束事件的时间差,单位为分钟。这意味着我们需要基于行程的 rideId 来连接开始事件和结束事件。

这里,我们只须关注发生在纽约市,且行程开始和结束事件发生在最近2小时纽约市的行程。

点击查看提示
  • 过滤 Rides 表来分开行程开始事件和行程结束事件。
  • 使用时间窗口连接来关联行程的开始和结束时间。在JOIN时,只关联那些结束事件(当然它的开始事件已经到达)在2小时内到达的事件。
  • 请使用提供的 timeDiff 函数来返回两个时间戳的时间间隔(单位为毫秒)。


输出数据类似如下:
rideId        durationMin
 52693                 13
 46868                 24
 53226                 12
 53629                 11
 55651                  7
 43220                 31
 53082                 12
 54716                  9
 55125                  9
 57211                  4
 44795                 28
 53563                 12
点击查看答案
SELECT
  s.rideId,
  timeDiff(s.rideTime, e.rideTime) / 60000 AS durationMin
FROM
  (SELECT * FROM Rides WHERE isStart AND isInNYC(lon, lat)) s,
  (SELECT * FROM Rides WHERE NOT isStart AND isInNYC(lon, lat)) e
WHERE
  s.rideId = e.rideId AND
  e.rideTime BETWEEN s.rideTime AND s.rideTime + INTERVAL '2' HOUR;

这个查询在两个子查询之间做了一个时间窗口JOIN。第一个子查询 s 返回纽约市已经开始的行程记录,第二个子查询 e 返回纽约市的行程结束事件。

这两个表通过 rideId 字段来做JOIN操作。另外,WHERE 条件指定了开始事件和结束事件的时间限制,这确保了只会JOIN那些结束事件发生在开始事件2小时内的两次出租车行程记录。

结果中的 durationMin 列指的是行程时长,单位为分钟。


Temporal JOIN

一个 Temporal Table JOIN 会连接一个流表和Temporal表. 一个Temporal表维护了表的历史记录,并且能返回指定时间戳的单行数据。对流表中的每一行记录,这个JOIN操作会在满足时间戳要求和JOIN条件的情况下,查询Temporal表的行数据。

因为条件的Temporal属性,这个JOIN操作只需要保存Temporal表的相关历史状态,同时不会存流表的任何状态。

找出载客较多的司机

找出在15分钟内载客数超过10人的所有司机列表。

点击查看提示
  • Temporal表的函数 Drivers 会列出那些在指定时间点载过客的司机。


输出数据类似如下:
  driverId                srvdPsgCnt                         t
2013000155                        12     2013-01-01 00:00:00.0
2013000233                        12     2013-01-01 00:00:00.0
2013000230                        31     2013-01-01 00:00:00.0
2013001174                        12     2013-01-01 00:00:00.0
2013000014                        12     2013-01-01 00:00:00.0
2013000595                        12     2013-01-01 00:00:00.0
2013002453                        12     2013-01-01 00:00:00.0
2013000124                        12     2013-01-01 00:00:00.0
2013000117                        18     2013-01-01 00:00:00.0
点击查看答案
SELECT 
  d.driverId, 
  SUM(r.psgCnt) AS srvdPsgCnt,
  TUMBLE_START(r.rideTime, INTERVAL '15' MINUTE) AS t
FROM
  Rides r, 
  LATERAL TABLE(Drivers(r.rideTime)) d
WHERE
  r.taxiId = d.taxiId AND
  r.isStart
GROUP BY
  d.driverId,
  TUMBLE(r.rideTime, INTERVAL '15' MINUTE)
HAVING SUM(r.psgCnt) >= 10;

这个查询把 Rides表的开始事件和Temporal 表 Drivers 做关联,找出那些行程开始后载过客的司机。然后,这个查询会通过 driverId 和15分钟窗口来分组数据,计算出每组的载客数,最后 HAVING 语句会过滤掉所有载客数小于10的司机。


维表JOIN

查找负责每个行程的具体出租车司机。这里提供了一张叫做LatestDrivers的维表,具体的字段如下,主要存放了当前时间下的每一个出租车的司机ID。

Flink SQL> describe LatestDrivers;
root
 |-- taxiId: BIGINT
 |-- driverId: BIGINT
点击查看答案
-- user blink planner.
set execution.planner=blink;

SELECT 
  r.rideId,
  r.taxiId,
  CAST(r.rideTime AS VARCHAR), 
  d.driverId
FROM
  (SELECT *, PROCTIME() as proc FROM Rides) r JOIN LatestDrivers FOR SYSTEM_TIME AS OF r.proc d
ON r.taxiId = d.taxiId; 

普通JOIN

普通JOIN 就是那些不带时间属性表的JOIN操作。

由于不带时间属性条件了,JOIN操作的多个输入表都需要提前把数据组织好,以备查询。如果其中某个表的数据无限增长的话,这可能会是个很大问题。当然,你也可以将一个查询设置为自动清理过期行数据,其实就是说过了某些事件点之前的数据不会被JOIN或者更新了。

在多个输入表数据都是连续更新,且限制行数的情况下,普通JOIN是可以正常工作的。

计算每个出租车的统计信息

计算每一辆出租车的司机替换数和服务乘客数。

点击查看提示
  • 用两个子查询分别来司机的替换数和服务乘客数,最红通过 taxId 属性来关联获得结果。


输出数据类似如下信息:
   taxiId                  shiftCnt                    psgCnt
2013009497                         1                        26
2013009261                         1                         8
2013003937                         2                        27
2013008238                         1                        30
2013006197                         1                        12
2013001680                         1                        29
2013005414                         2                        10
2013004937                         1                        16
点击查看答案
SELECT s.taxiId, s.shiftCnt, p.psgCnt
FROM 
    (SELECT taxiId, COUNT(*) AS shiftCnt
     FROM DriverChanges
     GROUP BY taxiId) s 
  JOIN
    (SELECT taxiId, SUM(psgCnt) AS psgCnt
     FROM Rides
     WHERE isStart
     GROUP BY taxiId) p
  ON s.taxiId = p.taxiId;

第一个子查询计算每个出租车的司机替换数;第二个子查询统计每一辆出租车的乘客数。最后,外部的关联查询连接两个子查询的结果。

由于两个子查询都不断产生结果,但是会受限于 taxId 值的总数。因此,我们可以JOIN两个子查询的结果,而不会导致state占用太大空间的问题。