TextFile DataFrame
- import.org.apache.spark.sql._
- //定义数据的列名称和类型
- valdt=StructType(List(id:String,name:String,gender:String,age:Int))
-
- //导入user_info.csv文件并指定分隔符
- vallines = sc.textFile("/path/user_info.csv").map(_.split(","))
-
- //将表结构和数据关联起来,把读入的数据user.csv映射成行,构成数据集
- valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))
-
- //通过SparkSession.createDataFrame()创建表,并且数据表表头
- val df= spark.createDataFrame(rowRDD, dt)
读取规则数据文件作为DataFrame
- SparkSession.Builder builder = SparkSession.builder()
- Builder.setMaster("local").setAppName("TestSparkSQLApp")
- SparkSession spark = builder.getOrCreate();
- SQLContext sqlContext = spark.sqlContext();
-
- # 读取 JSON 数据,path 可为文件或者目录
- valdf=sqlContext.read().json(path);
-
- # 读取 HadoopParquet 文件
- vardf=sqlContext.read().parquet(path);
-
- # 读取 HadoopORC 文件
- vardf=sqlContext.read().orc(path);
JSON 文件为每行一个 JSON 对象的文件类型,行尾无须逗号。文件头也无须[]指定为数组;SparkSQL 读取是只是按照每行一条 JSON Record序列化;
Parquet文件
- Configurationconfig = new Configuration();
- ParquetFileReaderreader = ParquetFileReader.open(
- HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf));
- Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData();
- String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata");
allFiedls 的值就是各字段的名称和具体的类型,整体是一个json格式进行展示。
读取 Hive 表作为 DataFrame
Spark2 API 推荐通过 SparkSession.Builder 的 Builder 模式创建 SparkContext。 Builder.getOrCreate() 用于创建 SparkSession,SparkSession 是 SparkContext 的封装。
在Spark1.6中有两个核心组件SQLcontext和HiveContext。SQLContext 用于处理在 SparkSQL 中动态注册的表,HiveContext 用于处理 Hive 中的表。
从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可执行 Hive 中的表,也可执行内部注册的表;
在需要执行 Hive 表时,只需要在 SparkSession.Builder 中开启 Hive 支持即可(enableHiveSupport())。
- SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();
- SparkSession spark = builder.getOrCreate();
- SQLContext sqlContext = spark.sqlContext();
// db 指 Hive 库中的数据库名,如果不写默认为 default
// tableName 指 hive 库的数据表名
- sqlContext.sql(“select * from db.tableName”)
SparkSQL ThriftServer
//首先打开 Hive 的 Metastore服务
- hive$bin/hive –-service metastore –p 8093
//把 Spark 的相关 jar 上传到hadoophdfs指定目录,用于指定sparkonyarn的依赖 jar
- spark$hadoop fs –put jars/*.jar /lib/spark2
// 启动 spark thriftserver 服务
- spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf
- spark.yarn.jars=hdfs:///lib/spark2/*.jar
当hdfs 上传了spark 依赖 jar 时,通过spark.yarn.jars 可看到日志 spark 无须每个job 都上传jar,可节省启动时间
- 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar
- 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar
(编辑:青岛站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|