课程表

Spark 基础

Spark RDDs

Spark Streaming

Spark SQL

GraphX编程指南

工具箱
速查手册

Spark SQL数据源

当前位置:免费教程 » 数据库/运维 » Spark

Spark SQL支持通过SchemaRDD接口操作各种数据源。一个SchemaRDD能够作为一个一般的RDD被操作,也可以被注册为一个临时的表。注册一个SchemaRDD为一个表就可以允许你在其数据上运行SQL查询。这节描述了加载数据为SchemaRDD的多种方法。

RDDs

Spark支持两种方法将存在的RDDs转换为SchemaRDDs。第一种方法使用反射来推断包含特定对象类型的RDD的模式(schema)。在你写spark程序的同时,当你已经知道了模式,这种基于反射的方法可以使代码更简洁并且程序工作得更好。

创建SchemaRDDs的第二种方法是通过一个编程接口来实现,这个接口允许你构造一个模式,然后在存在的RDDs上使用它。虽然这种方法更冗长,但是它允许你在运行期之前不知道列以及列的类型的情况下构造SchemaRDDs。

利用反射推断模式

Spark SQL的Scala接口支持将包含样本类的RDDs自动转换为SchemaRDD。这个样本类定义了表的模式。

给样本类的参数名字通过反射来读取,然后作为列的名字。样本类可以嵌套或者包含复杂的类型如序列或者数组。这个RDD可以隐式转化为一个SchemaRDD,然后注册为一个表。表可以在后续的sql语句中使用。

  1. // sc is an existing SparkContext.
  2. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  3. // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
  4. import sqlContext.createSchemaRDD
  5. // Define the schema using a case class.
  6. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
  7. // you can use custom classes that implement the Product interface.
  8. case class Person(name: String, age: Int)
  9. // Create an RDD of Person objects and register it as a table.
  10. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
  11. people.registerTempTable("people")
  12. // SQL statements can be run by using the sql methods provided by sqlContext.
  13. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
  14. // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
  15. // The columns of a row in the result can be accessed by ordinal.
  16. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

编程指定模式

当样本类不能提前确定(例如,记录的结构是经过编码的字符串,或者一个文本集合将会被解析,不同的字段投影给不同的用户),一个SchemaRDD可以通过三步来创建。

  • 从原来的RDD创建一个行的RDD
  • 创建由一个StructType表示的模式与第一步创建的RDD的行结构相匹配
  • 在行RDD上通过applySchema方法应用模式
  1. // sc is an existing SparkContext.
  2. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  3. // Create an RDD
  4. val people = sc.textFile("examples/src/main/resources/people.txt")
  5. // The schema is encoded in a string
  6. val schemaString = "name age"
  7. // Import Spark SQL data types and Row.
  8. import org.apache.spark.sql._
  9. // Generate the schema based on the string of schema
  10. val schema =
  11. StructType(
  12. schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
  13. // Convert records of the RDD (people) to Rows.
  14. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
  15. // Apply the schema to the RDD.
  16. val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
  17. // Register the SchemaRDD as a table.
  18. peopleSchemaRDD.registerTempTable("people")
  19. // SQL statements can be run by using the sql methods provided by sqlContext.
  20. val results = sqlContext.sql("SELECT name FROM people")
  21. // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
  22. // The columns of a row in the result can be accessed by ordinal.
  23. results.map(t => "Name: " + t(0)).collect().foreach(println)

Parquet文件

Parquet是一种柱状(columnar)格式,可以被许多其它的数据处理系统支持。Spark SQL提供支持读和写Parquet文件的功能,这些文件可以自动地保留原始数据的模式。

加载数据

  1. // sqlContext from the previous example is used in this example.
  2. // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
  3. import sqlContext.createSchemaRDD
  4. val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
  5. // The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
  6. people.saveAsParquetFile("people.parquet")
  7. // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
  8. // The result of loading a Parquet file is also a SchemaRDD.
  9. val parquetFile = sqlContext.parquetFile("people.parquet")
  10. //Parquet files can also be registered as tables and then used in SQL statements.
  11. parquetFile.registerTempTable("parquetFile")
  12. val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
  13. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

配置

可以在SQLContext上使用setConf方法配置Parquet或者在用SQL时运行SET key=value命令来配置Parquet。

Property NameDefaultMeaning
spark.sql.parquet.binaryAsStringfalse一些其它的Parquet-producing系统,特别是Impala和其它版本的Spark SQL,当写出Parquet模式的时候,二进制数据和字符串之间无法区分。这个标记告诉Spark SQL将二进制数据解释为字符串来提供这些系统的兼容性。
spark.sql.parquet.cacheMetadatatrue打开parquet元数据的缓存,可以提高静态数据的查询速度
spark.sql.parquet.compression.codecgzip设置写parquet文件时的压缩算法,可以接受的值包括:uncompressed, snappy, gzip, lzo
spark.sql.parquet.filterPushdownfalse打开Parquet过滤器的pushdown优化。因为已知的Paruet错误,这个特征默认是关闭的。如果你的表不包含任何空的字符串或者二进制列,打开这个特征仍是安全的
spark.sql.hive.convertMetastoreParquettrue当设置为false时,Spark SQL将使用Hive SerDe代替内置的支持

Spark SQL JSON数据集

Spark SQL能够自动推断JSON数据集的模式,加载它为一个SchemaRDD。这种转换可以通过下面两种方法来实现

  • jsonFile :从一个包含JSON文件的目录中加载。文件中的每一行是一个JSON对象
  • jsonRDD :从存在的RDD加载数据,这些RDD的每个元素是一个包含JSON对象的字符串

注意,作为jsonFile的文件不是一个典型的JSON文件,每行必须是独立的并且包含一个有效的JSON对象。结果是,一个多行的JSON文件经常会失败

  1. // sc is an existing SparkContext.
  2. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  3. // A JSON dataset is pointed to by path.
  4. // The path can be either a single text file or a directory storing text files.
  5. val path = "examples/src/main/resources/people.json"
  6. // Create a SchemaRDD from the file(s) pointed to by path
  7. val people = sqlContext.jsonFile(path)
  8. // The inferred schema can be visualized using the printSchema() method.
  9. people.printSchema()
  10. // root
  11. // |-- age: integer (nullable = true)
  12. // |-- name: string (nullable = true)
  13. // Register this SchemaRDD as a table.
  14. people.registerTempTable("people")
  15. // SQL statements can be run by using the sql methods provided by sqlContext.
  16. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
  17. // Alternatively, a SchemaRDD can be created for a JSON dataset represented by
  18. // an RDD[String] storing one JSON object per string.
  19. val anotherPeopleRDD = sc.parallelize(
  20. """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
  21. val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

Hive表

Spark SQL也支持从Apache Hive中读出和写入数据。然而,Hive有大量的依赖,所以它不包含在Spark集合中。可以通过-Phive-Phive-thriftserver参数构建Spark,使其支持Hive。注意这个重新构建的jar包必须存在于所有的worker节点中,因为它们需要通过Hive的序列化和反序列化库访问存储在Hive中的数据。

当和Hive一起工作是,开发者需要提供HiveContext。HiveContext从SQLContext继承而来,它增加了在MetaStore中发现表以及利用HiveSql写查询的功能。没有Hive部署的用户也可以创建HiveContext。当没有通过hive-site.xml配置,上下文将会在当前目录自动地创建metastore_dbwarehouse

  1. // sc is an existing SparkContext.
  2. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
  3. sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
  4. sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
  5. // Queries are expressed in HiveQL
  6. sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
转载本站内容时,请务必注明来自W3xue,违者必究。
 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号