加入收藏 | 设为首页 | 会员中心 | 我要投稿 青岛站长网 (https://www.0532zz.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Flink 漫谈系列(11) - Temporal Table JOIN

发布时间:2018-12-17 01:01:33 所属栏目:教程 来源:孙金城
导读:一、什么是Temporal Table 在《Apache Flink 漫谈系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这

(4) 统计需求对应的SQL

  1. SELECT o.currency, o.amount, r.rate 
  2. o.amount * r.rate AS yen_amount 
  3. FROM 
  4. Orders AS o, 
  5. LATERAL TABLE (Rates(o.rowtime)) AS r 
  6. WHERE r.currency = o.currency 

(5) 预期结果

Apache Flink 漫谈系列(11) - Temporal Table JOIN

4. Without connnector 实现代码

  1. object TemporalTableJoinTest { 
  2. def main(args: Array[String]): Unit = { 
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  4. val tEnv = TableEnvironment.getTableEnvironment(env) 
  5. env.setParallelism(1) 
  6. // 设置时间类型是 event-time env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  7. // 构造订单数据 
  8. val ordersData = new mutable.MutableList[(Long, String, Timestamp)] 
  9. ordersData.+=((2L, "Euro", new Timestamp(2L))) 
  10. ordersData.+=((1L, "US Dollar", new Timestamp(3L))) 
  11. ordersData.+=((50L, "Yen", new Timestamp(4L))) 
  12. ordersData.+=((3L, "Euro", new Timestamp(5L))) 
  13.  
  14. //构造汇率数据 
  15. val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)] 
  16. ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L))) 
  17. ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L))) 
  18. ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L))) 
  19. ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L))) 
  20. ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L))) 
  21.  
  22. // 进行订单表 event-time 的提取 
  23. val orders = env 
  24. .fromCollection(ordersData) 
  25. .assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]()) 
  26. .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime) 
  27.  
  28. // 进行汇率表 event-time 的提取 
  29. val ratesHistory = env 
  30. .fromCollection(ratesHistoryData) 
  31. .assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]()) 
  32. .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime) 
  33.  
  34. // 注册订单表和汇率表 
  35. tEnv.registerTable("Orders", orders) 
  36. tEnv.registerTable("RatesHistory", ratesHistory) 
  37. val tab = tEnv.scan("RatesHistory"); 
  38. // 创建TemporalTableFunction 
  39. val temporalTableFunction = tab.createTemporalTableFunction('rowtime, 'currency) 
  40. //注册TemporalTableFunction 
  41. tEnv.registerFunction("Rates",temporalTableFunction) 
  42.  
  43. val SQLQuery = 
  44. """ 
  45. |SELECT o.currency, o.amount, r.rate, 
  46. | o.amount * r.rate AS yen_amount 
  47. |FROM 
  48. | Orders AS o, 
  49. | LATERAL TABLE (Rates(o.rowtime)) AS r 
  50. |WHERE r.currency = o.currency 
  51. |""".stripMargin 
  52.  
  53. tEnv.registerTable("TemporalJoinResult", tEnv.SQLQuery(SQLQuery)) 
  54.  
  55. val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] 
  56. // 打印查询结果 
  57. result.print() 
  58. env.execute() 

(编辑:青岛站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读