加入收藏 | 设为首页 | 会员中心 | 我要投稿 青岛站长网 (https://www.0532zz.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Flink 漫谈系列(11) - Temporal Table JOIN

发布时间:2018-12-17 01:01:33 所属栏目:教程 来源:孙金城
导读:一、什么是Temporal Table 在《Apache Flink 漫谈系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这

(3) 主程序代码

  1. /* 
  2.  * Licensed to the Apache Software Foundation (ASF) under one 
  3.  * or more contributor license agreements.  See the NOTICE file 
  4.  * distributed with this work for additional information 
  5.  * regarding copyright ownership.  The ASF licenses this file 
  6.  * to you under the Apache License, Version 2.0 (the 
  7.  * "License"); you may not use this file except in compliance 
  8.  * with the License.  You may obtain a copy of the License at 
  9.  * 
  10.  *     http://www.apache.org/licenses/LICENSE-2.0 
  11.  * 
  12.  * Unless required by applicable law or agreed to in writing, software 
  13.  * distributed under the License is distributed on an "AS IS" BASIS, 
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  15.  * See the License for the specific language governing permissions and 
  16.  * limitations under the License. 
  17.  */ 
  18.  
  19. package org.apache.flink.book.connectors 
  20.  
  21. import java.io.File 
  22.  
  23. import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} 
  24. import org.apache.flink.book.utils.{CommonUtils, FileUtils} 
  25. import org.apache.flink.table.sinks.{CsvTableSink, TableSink} 
  26. import org.apache.flink.table.sources.CsvTableSource 
  27. import org.apache.flink.types.Row 
  28.  
  29. object CsvTableSourceUtils { 
  30.  
  31.   def genWordCountSource: CsvTableSource = { 
  32.     val csvRecords = Seq( 
  33.       "words", 
  34.       "Hello Flink", 
  35.       "Hi, Apache Flink", 
  36.       "Apache FlinkBook" 
  37.     ) 
  38.     // 测试数据写入临时文件 
  39.     val tempFilePath = 
  40.       FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp") 
  41.  
  42.     // 创建Source connector 
  43.     new CsvTableSource( 
  44.       tempFilePath, 
  45.       Array("words"), 
  46.       Array( 
  47.         Types.STRING 
  48.       ), 
  49.       fieldDelim = "#", 
  50.       rowDelim = "$", 
  51.       ignoreFirstLine = true, 
  52.       ignoreComments = "%" 
  53.     ) 
  54.   } 
  55.  
  56.  
  57.   def genRatesHistorySource: CsvTableSource = { 
  58.  
  59.     val csvRecords = Seq( 
  60.       "rowtime ,currency   ,rate", 
  61.     "09:00:00   ,US Dollar  , 102", 
  62.     "09:00:00   ,Euro       , 114", 
  63.     "09:00:00  ,Yen        ,   1", 
  64.     "10:45:00   ,Euro       , 116", 
  65.     "11:15:00   ,Euro       , 119", 
  66.     "11:49:00   ,Pounds     , 108" 
  67.     ) 
  68.     // 测试数据写入临时文件 
  69.     val tempFilePath = 
  70.       FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp") 
  71.  
  72.     // 创建Source connector 
  73.     new CsvTableSource( 
  74.       tempFilePath, 
  75.       Array("rowtime","currency","rate"), 
  76.       Array( 
  77.         Types.STRING,Types.STRING,Types.STRING 
  78.       ), 
  79.       fieldDelim = ",", 
  80.       rowDelim = "$", 
  81.       ignoreFirstLine = true, 
  82.       ignoreComments = "%" 
  83.     ) 
  84.   } 
  85.  
  86.   def genEventRatesHistorySource: CsvTableSource = { 
  87.  
  88.     val csvRecords = Seq( 
  89.       "ts#currency#rate", 
  90.       "1#US Dollar#102", 
  91.       "1#Euro#114", 
  92.       "1#Yen#1", 
  93.       "3#Euro#116", 
  94.       "5#Euro#119", 
  95.       "7#Pounds#108" 
  96.     ) 
  97.     // 测试数据写入临时文件 
  98.     val tempFilePath = 
  99.       FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp") 
  100.  
  101.     // 创建Source connector 
  102.     new CsvTableSource( 
  103.       tempFilePath, 
  104.       Array("ts","currency","rate"), 
  105.       Array( 
  106.         Types.LONG,Types.STRING,Types.LONG 
  107.       ), 
  108.       fieldDelim = "#", 
  109.       rowDelim = CommonUtils.line, 
  110.       ignoreFirstLine = true, 
  111.       ignoreComments = "%" 
  112.     ) 
  113.   } 
  114.  
  115.   def genRatesOrderSource: CsvTableSource = { 
  116.  
  117.     val csvRecords = Seq( 
  118.       "ts#currency#amount", 
  119.       "2#Euro#10", 
  120.       "4#Euro#10" 
  121.     ) 
  122.     // 测试数据写入临时文件 
  123.     val tempFilePath = 
  124.       FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp") 
  125.  
  126.     // 创建Source connector 
  127.     new CsvTableSource( 
  128.       tempFilePath, 
  129.       Array("ts","currency", "amount"), 
  130.       Array( 
  131.         Types.LONG,Types.STRING,Types.LONG 
  132.       ), 
  133.       fieldDelim = "#", 
  134.       rowDelim = CommonUtils.line, 
  135.       ignoreFirstLine = true, 
  136.       ignoreComments = "%" 
  137.     ) 
  138.   } 
  139.  
  140.  
  141.   /** 
  142.     * Example: 
  143.     * genCsvSink( 
  144.     *   Array[String]("word", "count"), 
  145.     *   Array[TypeInformation[_] ](Types.STRING, Types.LONG)) 
  146.     */ 
  147.   def genCsvSink(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = { 
  148.     val tempFile = File.createTempFile("csv_sink_", "tem") 
  149.     if (tempFile.exists()) { 
  150.       tempFile.delete() 
  151.     } 
  152.     new CsvTableSink(tempFile.getAbsolutePath).configure(fieldNames, fieldTypes) 
  153.   } 
  154.  

(编辑:青岛站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读