在运行上面代码之前需要注意上面代码中对EventTime时间提取的过程,也就是说Apache Flink的TimeCharacteristic.EventTime 模式,需要调用assignTimestampsAndWatermarks方法设置EventTime的生成方式,这种方式也非常灵活,用户可以控制业务数据的EventTime的值和WaterMark的产生,WaterMark相关内容可以查阅《Apache Flink 漫谈系列(03) - Watermark》。 在本示例中提取EventTime的完整代码如下:
- import java.SQL.Timestamp
-
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.windowing.time.Time
-
- class OrderTimestampExtractor[T1, T2]
- extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
- override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
- element._3.getTime
- }
- }
查看运行结果:

5. With CSVConnector 实现代码
在实际的生产开发中,都需要实际的Connector的定义,下面我们以CSV格式的Connector定义来开发Temporal Table JOIN Demo。
(1) genEventRatesHistorySource
- def genEventRatesHistorySource: CsvTableSource = {
-
- val csvRecords = Seq(
- "ts#currency#rate",
- "1#US Dollar#102",
- "1#Euro#114",
- "1#Yen#1",
- "3#Euro#116",
- "5#Euro#119",
- "7#Pounds#108"
- )
- // 测试数据写入临时文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")
-
- // 创建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency","rate"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )}
(2) genRatesOrderSource
- def genRatesOrderSource: CsvTableSource = {
-
- val csvRecords = Seq(
- "ts#currency#amount",
- "2#Euro#10",
- "4#Euro#10"
- )
- // 测试数据写入临时文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")
-
- // 创建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency", "amount"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
(编辑:青岛站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|