加入收藏 | 设为首页 | 会员中心 | 我要投稿 青岛站长网 (https://www.0532zz.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Flink 漫谈系列(11) - Temporal Table JOIN

发布时间:2018-12-17 01:01:33 所属栏目:教程 来源:孙金城
导读:一、什么是Temporal Table 在《Apache Flink 漫谈系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这

在运行上面代码之前需要注意上面代码中对EventTime时间提取的过程,也就是说Apache Flink的TimeCharacteristic.EventTime 模式,需要调用assignTimestampsAndWatermarks方法设置EventTime的生成方式,这种方式也非常灵活,用户可以控制业务数据的EventTime的值和WaterMark的产生,WaterMark相关内容可以查阅《Apache Flink 漫谈系列(03) - Watermark》。 在本示例中提取EventTime的完整代码如下:

  1. import java.SQL.Timestamp 
  2.  
  3. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  4. import org.apache.flink.streaming.api.windowing.time.Time 
  5.  
  6. class OrderTimestampExtractor[T1, T2] 
  7. extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { 
  8. override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { 
  9. element._3.getTime 

查看运行结果:

Apache Flink 漫谈系列(11) - Temporal Table JOIN

5. With CSVConnector 实现代码

在实际的生产开发中,都需要实际的Connector的定义,下面我们以CSV格式的Connector定义来开发Temporal Table JOIN Demo。

(1) genEventRatesHistorySource

  1. def genEventRatesHistorySource: CsvTableSource = { 
  2.  
  3. val csvRecords = Seq( 
  4. "ts#currency#rate", 
  5. "1#US Dollar#102", 
  6. "1#Euro#114", 
  7. "1#Yen#1", 
  8. "3#Euro#116", 
  9. "5#Euro#119", 
  10. "7#Pounds#108" 
  11. // 测试数据写入临时文件 
  12. val tempFilePath = 
  13. FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp") 
  14.  
  15. // 创建Source connector 
  16. new CsvTableSource( 
  17. tempFilePath, 
  18. Array("ts","currency","rate"), 
  19. Array( 
  20. Types.LONG,Types.STRING,Types.LONG 
  21. ), 
  22. fieldDelim = "#", 
  23. rowDelim = CommonUtils.line, 
  24. ignoreFirstLine = true, 
  25. ignoreComments = "%" 
  26. )} 

(2) genRatesOrderSource

  1. def genRatesOrderSource: CsvTableSource = { 
  2.  
  3. val csvRecords = Seq( 
  4. "ts#currency#amount", 
  5. "2#Euro#10", 
  6. "4#Euro#10" 
  7. // 测试数据写入临时文件 
  8. val tempFilePath = 
  9. FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp") 
  10.  
  11. // 创建Source connector 
  12. new CsvTableSource( 
  13. tempFilePath, 
  14. Array("ts","currency", "amount"), 
  15. Array( 
  16. Types.LONG,Types.STRING,Types.LONG 
  17. ), 
  18. fieldDelim = "#", 
  19. rowDelim = CommonUtils.line, 
  20. ignoreFirstLine = true, 
  21. ignoreComments = "%" 

(编辑:青岛站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读