执行之后当前表信息会更新并在历史表里面产生一条历史信息,如下:

注意当前表的SysStartTime意见发生了变化,历史表产生了一条记录,SyStartTIme是原当前表记录的SysStartTime,SysEndTime是当前表记录的SystemStartTime。我们再更新一次:
- UPDATE [dbo].[Department] SET [ManagerID] = 201 WHERE [DeptID] = 10

到这里我们了解到SQLServer里面关于Temporal Table的逻辑是有当前表和历史表来存储数据,并且数据库内部以StartTime和EndTime的方式管理数据的版本。
(3) SELECT
- SELECT [DeptID], [DeptName], [SysStartTime],[SysEndTime]
- FROM [dbo].[Department]
- FOR SYSTEM_TIME AS OF '2018-06-06 05:50:21.0000000' ;

SELECT语句查询的是Department的表,实际返回的数据是从历史表里面查询出来的,查询的底层逻辑就是 SysStartTime <= '2018-06-06 05:50:21.0000000' and SysEndTime > '2018-06-06 05:50:21.0000000' 。
四、Apache Flink Temporal Table
我们不止一次的提到Apache Flink遵循ANSI-SQL标准,Apache Flink中Temporal Table的概念也源于ANSI-2011的标准语义,但目前的实现在语法层面和ANSI-SQL略有差别,上面看到ANSI-2011中使用FOR SYSTEM_TIME AS OF的语法,目前Apache Flink中使用 LATERAL TABLE(TemporalTableFunction)的语法。这一点后续需要推动社区进行改进。
1. 为啥需要 Temporal Table
我们以具体的查询示例来说明为啥需要Temporal Table,假设我们有一张实时变化的汇率表(RatesHistory),如下:

RatesHistory代表了Yen汇率(Yen汇率为1),是不断变化的Append only的汇率表。例如,Euro兑Yen汇率从09:00至10:45的汇率为114。从10点45分到11点15分是116。
假设我们想在10:58输出所有当前汇率,我们需要以下SQL查询来计算结果表:
- SELECT *
- FROM RatesHistory AS r
- WHERE r.rowtime = (
- SELECT MAX(rowtime)
- FROM RatesHistory AS r2
- WHERE rr2.currency = r.currency
- AND r2.rowtime <= '10:58');
相应Flink代码如下:
- 定义数据源-genRatesHistorySource
- def genRatesHistorySource: CsvTableSource = {
-
- val csvRecords = Seq(
- "rowtime ,currency ,rate",
- "09:00:00 ,US Dollar , 102",
- "09:00:00 ,Euro , 114",
- "09:00:00 ,Yen , 1",
- "10:45:00 ,Euro , 116",
- "11:15:00 ,Euro , 119",
- "11:49:00 ,Pounds , 108"
- )
- // 测试数据写入临时文件
- val tempFilePath =
- writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")
-
- // 创建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("rowtime","currency","rate"),
- Array(
- Types.STRING,Types.STRING,Types.STRING
- ),
- fieldDelim = ",",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- def writeToTempFile(
- contents: String,
- filePrefix: String,
- fileSuffix: String,
- charset: String = "UTF-8"): String = {
- val tempFile = File.createTempFile(filePrefix, fileSuffix)
- val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset)
- tmpWriter.write(contents)
- tmpWriter.close()
- tempFile.getAbsolutePath}
主程序代码
- def main(args: Array[String]): Unit = {
- // Streaming 环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- //方便我们查出输出数据
- env.setParallelism(1)
-
- val sourceTableName = "RatesHistory"
- // 创建CSV source数据结构
- val tableSource = CsvTableSourceUtils.genRatesHistorySource
- // 注册source
- tEnv.registerTableSource(sourceTableName, tableSource)
-
- // 注册retract sink
- val sinkTableName = "retractSink"
- val fieldNames = Array("rowtime", "currency", "rate")
- val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.STRING, Types.STRING)
-
- tEnv.registerTableSink(
- sinkTableName,
- fieldNames,
- fieldTypes,
- new MemoryRetractSink)
-
- val SQL =
- """
- |SELECT *
- |FROM RatesHistory AS r
- |WHERE r.rowtime = (
- | SELECT MAX(rowtime)
- | FROM RatesHistory AS r2
- | WHERE rr2.currency = r.currency
- | AND r2.rowtime <= '10:58:00' )
- """.stripMargin
-
- // 执行查询
- val result = tEnv.SQLQuery(SQL)
-
- // 将结果插入sink
- result.insertInto(sinkTableName)
- env.execute()
- }
执行结果如下图:

(编辑:青岛站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|