Spark SQL,DataFrames and Datasets Guide

2023-05-16

概览
Spark SQL是Spark的一个结构化数据处理模块。不像基本的Spark RDD API,Spark SQL提供的接口提供更多关于数据和执行的操作的结构信息。从内部看,Spark SQL使用额外的信息来执行额外的优化。有很多种方法来和Spark SQL交互,包括SQL和数据集API。当计算结果时使用了相同的执行引擎,独立于你使用的用来表达计算的API或语言。一种统一意味着开发者可以轻易地在不同的API(基于此提供的最自然的方式来表达一个给定的转换)间切换。
本页面所有的示例使用相同的数据,包含在Spark分发包中,可以在spark-shell,pyspark shell或sparkR shell中运行。
SQL
Spark SQL 的一种用途是执行SQL查询。Spark SQL也能用来读取来自现成的Hive安装的数据。更多关于配置此特色的信息,请参见Hive Table章节。当在其他编程语言程序中运行SQL时,结果会以数据集或数据框架的方式返回。你可以使用命令行或者JDBC/ODBC的SQL接口进行交互。
数据集和数据框架
数据集时分布式的数据集合。数据集时Spark从1.6开始增加的新接口,提供RDDs(强类型,使用强大的lambda函数的能力)和Spark SQL优化执行引擎的益处。数据集能够通过JVM对象构造,然后使用函数转化(map,flatMap,filter等等)来修改。数据集API在Scala和Java中是可用的。Python不支持数据集API。但是由于Python的动态特性,数据集API的很多益处是具备的(比如,你可以很自然地通过名字即row.columnName来访问一组数据的字段)。对于R,这种例子是相似的。
数据框架是组织成命名列的数据集。它从概念上等同于在关系型数据库中的一张表,或R/Python中的数据框架,但是有在hood下的大量优化。数据框架可以通过大量的源来构造,比如结构化数据文件、Hive中的表、额外的数据库、现存的RDDs。DataFrame API在Scala、Java、Python和R中是可用的。在Scala和Java中,数据框架由Rows数据集代表。在Scala API中,DataFrame简单来说就是DataSet[Row]的别称。然而,在Java API中,用户需要使用DataSet来代表DataFrame。
本文中,我们会经常将Scala/Java DataSets of Rows作为DataFrames来提及。
开始
起点:SparkSession –Java
Spark中所有功能的起点是SparkSession类。要创建基本的SparkSession,使用SparkSession.builder():

import org.apache.spark.sql.SparkSession;
SparkSession spark=SparkSession.builder().appName("Java Spark SQL basic example").config("spark.some.config.option","some-value").getOrCreate();    
全部的样例代码见“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”。
Spark 2.0中SparkSession对Hive特色提供内置支持,包括使用HiveQL写查询、访问Hive UDFs以及从Hive tables中读取数据的能力。为了使用这些特色,你不需要设置Hive。
**创建DataFrame**
使用SparkSession,应用可以从现有的RDD、从Hive表或Spark数据源创建一个DataFrames。
作为一个例子,以下在一个JSON文件内容基础上创建一个DataFrames。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df=spark.read().json("examples/src/main/resources/people.json");
//展示DataFrame的内容到标准输出
df.show();
// +------+------+
// |  age | name |
// +------+------+
// | null | Jack |
// |  30  | Andy |
// +------+------+

全部样例代码见“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”。
无类型的数据集操作(aka数据框架操作)
数据框架提供了在Scala/Java/Python/R中用于结构化数据更改的一个指定领域的语言。
如上述,在Spark2.0中,数据框架在Scala和Java API中是Rows的数据集。这些操作被称作“无类型转化”,相较于强类型的Scala/Java数据集的“类型转换”。
这里有一些基本的使用数据集的结构化数据处理的例子。

//col("...")is preferable to df.col("...")
import static org.apache.spark.sql.function.col;
//print the schema in a tree format
df.printSchema();
// root
// |--age:long(nullable=true)
// |--name:string(nullable=true)

//select Only the "name" column
df.select("name").show();
// +----------+
// |  name    |
// +----------+
// | Michael  |
// |   Andy   |
// +----------+

//select everybody ,but increment the age by 1
df.select(col("name"),col("age").plus(1)).show();
// +--------+-------+
// |   age  |(age+1)|
// +--------+-------+
// |Micheal |  null |
// |  Andy  |   31  |
// +--------+-------+

//select people older than 21
df.filter(col("age").gt(21)).show();
// +-----+-----+
// | age |name |
// +-----+-----+
// | 30  |Andy |
// +-----+-----+

//count people by age
df.groupBy("age").count().show();
// +-----+-----+
// |age  |count|
// +-----+-----+
// |  19 |  1  |
// |null |  1  |
// |  30 |  1  |
// +-----+-----+

全部代码见“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”。
Dataset能够执行的操作类型的全部清单参见API文档。
除了简单的列引用和表达式,Datasets还有丰富的函数库,包括字符串修改、日期算术、常用数学操作等等。完整的清单见DataFrame函数引用。
程序运行SQL查询
SparkSession的sql功能使得应用能够运行SQL查询,并把结果作为DataFrame返回。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
//register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF=spark.sql("SELECT * FROM people");
sqlDF.show();
// +-----+-------+
// | age |  name |
// +-----+-------+
// | null|Michael|
// |  30 |  Andy |
// +-----+-------+

全部代码见“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”。
全局临时视图
在Spark SQL中临时视图是session范围的,如果创建临时视图的session终止了,临时视图会消失。如果你想有一个临时视图可以在所有session中共享,并保持活着直到Spark应用终止,你可以创建一个全局临时视图。全局临时视图绑定在一个系统保存的数据库global_temp,我们必须使用合格的名称来引用它,比如 SELECT * FROM global_temp.view1。

//register the DataFrame as a global temporary view
df.createGlobalTempView("people");
//global temporary view is tied to a system preserved database 'global_temp'
spark.sql("SELECT * FROM global_temp.people").show();
// +-----+-------+
// | age | name  |
// +-----+-------+
// |null |Michael|
// | 30  | Andy  |
// +-----+-------+
//global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +-----+-------+
// | age | name  |
// +-----+-------+
// |null |Michael|
// | 30  | Andy  |
// +-----+-------+

全部代码见“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”。
创建DataSets
Datasets类似于RDDs,然而,Datasets使用特殊的编码器来实现对象处理和在网络上传输,而不是使用Java序列化或Kryo。当编码器和标准序列化把对象转换成字节时,编码器是自动生成的代码,使用一种格式允许Spark在不将字节反序列化成对象的情况下执行很多操作比如filter、sort和hash。

import java.util.Arrays;
import java.util.Collections;
import java.io.serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
public static class Person implements Serializable{
    private String name;
    private int age;
    public String getName(){
    return name;
    }
    public void setName(String name){
    this.name=name;
    }
    public int getAge(){
    return age;
    }
    public void setAge(int age){
    this.age=age;
    }
}
//create an instance of a Bean class
Person person =new Person();
person.setName("Andy");
person.steAge(32);

//encoders are created for java beans
Encoder<Persoin> personEncoder=Encoder.bean(Person.class);
Dataset<Person> javaBeanDS=spark.createDataset(Collections.singletonList(person),personEncoder);
javaBeanDS.show();
// +-----+-------+
// | age | name  |
// +-----+-------+
// | 32  | Andy  |
// +-----+-------+

//encoders for most common types are provided in class encoders
Encodeer<Integer> integerEncoder =Encoders.INT();
Dataset<Integer> primitivesDS=spark.createDataset(Arrays.asList(1,2,3),integerEncoder);
Dataset<Integer> transformedDS=primitivesDS.map((MapFunction<Integer,Integer>)value -> value+1,integerEncoder);
transformedDS.collect();//returns [2,3,4]
//dataframes can be converted to a dataset by providing a class.mapping based on name
String path="examples/src/main/resources/people.json";
Dataset<Person> peopleDS=spark.read().json(path).as(personEncoder);
peopleDS.show();
// +-----+-------+
// | age |  name |
// +-----+-------+
// | null|Miahael|
// | 30  | Andy  |
// +-----+-------+

全部代码见“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”。
与RDDs互操作
Spark SQL支持两种不同的方法来将已有RDDs转换成Datasets。第一个方法使用反射来推断包含特定类型对象的RDD的schema。这种基于反射的方式导致更多简洁的代码,当你写Spark程序已经知道schema时非常高效。
创建Datasets的第二中方法是通过一个程序接口,允许你构造一个schema,然后把它应用到已有的RDD。因为这个方法非常详细,它允许你在runtime之前不知道列和列的类型时构造Datasets。
使用反射推断schema
Spark SQL支持自动将JavaBean RDD转换成DataFrame。使用反射得到BeanInfo,定义了表的schema。当前Spark SQL不支持包含Map字段的JavaBeans,支持嵌套的JavaBeans和List、Array字段。你可以通过创建一个实现serializable并包含所有字段getter和setter方法的类来创建一个JavaBean。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

//create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD=spark.read().textFile("examples/src/main/resources/people.txt").javaRDD().map(line ->{
    String[] parts=line.split(",");
    Person person=new Person();
    person.setName(parts[0]);
    person.setAge(Integer.parseInt(parts[1].trim());
    return person;
});
//apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF=spark.createDataFrame(peopleRDD,Person.class);
//register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
//SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF=spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
//the column of a row in the result can be acccessed by field index
Encoder<String> stringEncoder=Encoders.STRING();
Dataset<String> teenageNameByIndexDF=teenagersDF.map((MapFunction<Row,String>) row -> "name:" + rown.getString(0),stringEncoder);
teenagerNamesByIndexDF.show();
// +--------------+
// |     value    |
// +--------------+
// | Nanme:Justin |
// +--------------+

//or by field name
Dataset<String> teenagerNamesByFieldDF=teenagersDF.map((MapFunction<Row,String>)row -> "Name:" +row.<String>getAs("name"),stringEncoder);
teenagerNameByFieldDF.show();
// +--------------+
// |     value    |
// +--------------+
// | Nanme:Justin |
// +--------------+

全部代码见”examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”
程序化指定schema
当JavaBean类不能提前定义时(比如,记录的结构用字符串的方式编码,或者文档类dataset将会被解析,字段会被投射到不同的用户),Dataset可以通过三个步骤程序化地建立。

  1. 从原始RDD创建一个Rows RDD。
  2. 创建与步骤1创建的RDD中Row结构向匹配的StructType代表的schema。
  3. 通过SparkSession提供的createDataFrame方法将schema应用到RDD Row。
    比如:
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.javaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.structType;
//create an RDD
JavaRDD<String> peopleRDD=spark.sparkContext().textFile("examples/src/main/resources/people.txt",1).toJavaRDD();
//the schema is encoded in a string
String schemaString="name age";
//generate the schema based on the string schema
List<StructFiled> fields =new ArrayList<>();
for (String fieldName : schemaString.split(" ")){
    StructField field =DataTypes.createStructField(fieldName,DataTypes.StringTypes,true);
    fields.add(field);
}
StructType schema=DataTypes.createStrucType(field);
//convert records of the RDD(people) to Rows
JavaRDD<Row> rowRDD=peopleRDD.map((Function<String,Row>) record -> {
    String[] attributes=records.split(",");
    return RowFactory.create(attribute[0],attributes[1].trim());
});
//apply the schema to the RDD
Dataset<Row> peopleDataFrame=spark.createDataFrame(rowRDD,schema);
//create a temporary view using the dataFrame
peopleDataFrame.createOrReplaceTempView("people");
//SQL can be run over a temporary view created using dataFrames
Dataset<Row> results=spark.sql("SELECT name from people");
// the results of SQL queries are DataFrames and support all the normal RDD operation
//the column of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS=results.map((MapFunction<Row,String>) row -> "Name:" + row.getString(0),Encoders.String());
namesDS.show();
// +-------------+
// |     value   |
// +-------------+
// |   Name:Andy |
// | Name:Justin |
// +-------------+

聚集aggregation
内置的DataFrame函数提供常见的聚集函数比如count()、countDistinct()、avg()、max()、min()、etc()等等。尽管那些函数为DataFrames涉及,Spark SQL在Scala和Java中也有一些他们中的类型安全的版本来支持强类型的Datasets。另外,用户也不限于预先定义好的聚集函数,可以建立自己的。
非类型化的用户定义的聚集函数
用户必须继承UserDefinedAggregateFunction抽象类来实现一个自定义非类型化的聚集函数。比如:

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public static class MyAverage extends UserDefinedAggregateFunction {

  private StructType inputSchema;
  private StructType bufferSchema;

  public MyAverage() {
    List<StructField> inputFields = new ArrayList<>();
    inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
    inputSchema = DataTypes.createStructType(inputFields);

    List<StructField> bufferFields = new ArrayList<>();
    bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
    bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
    bufferSchema = DataTypes.createStructType(bufferFields);
  }
  // Data types of input arguments of this aggregate function
  public StructType inputSchema() {
    return inputSchema;
  }
  // Data types of values in the aggregation buffer
  public StructType bufferSchema() {
    return bufferSchema;
  }
  // The data type of the returned value
  public DataType dataType() {
    return DataTypes.DoubleType;
  }
  // Whether this function always returns the same output on the identical input
  public boolean deterministic() {
    return true;
  }
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  public void initialize(MutableAggregationBuffer buffer) {
    buffer.update(0, 0L);
    buffer.update(1, 0L);
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
      long updatedSum = buffer.getLong(0) + input.getLong(0);
      long updatedCount = buffer.getLong(1) + 1;
      buffer.update(0, updatedSum);
      buffer.update(1, updatedCount);
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
    long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
    buffer1.update(0, mergedSum);
    buffer1.update(1, mergedCount);
  }
  // Calculates the final result
  public Double evaluate(Row buffer) {
    return ((double) buffer.getLong(0)) / buffer.getLong(1);
  }
}

// Register the function to access it
spark.udf().register("myAverage", new MyAverage());

Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

类型安全的用户自定义的聚集函数
强类型Datasets的用户自定义聚集围绕Aggregator抽象类。比如:

import java.io.Serializable;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;

public static class Employee implements Serializable {
  private String name;
  private long salary;

  // Constructors, getters, setters...

}

public static class Average implements Serializable  {
  private long sum;
  private long count;

  // Constructors, getters, setters...

}

public static class MyAverage extends Aggregator<Employee, Average, Double> {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  public Average zero() {
    return new Average(0L, 0L);
  }
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  public Average reduce(Average buffer, Employee employee) {
    long newSum = buffer.getSum() + employee.getSalary();
    long newCount = buffer.getCount() + 1;
    buffer.setSum(newSum);
    buffer.setCount(newCount);
    return buffer;
  }
  // Merge two intermediate values
  public Average merge(Average b1, Average b2) {
    long mergedSum = b1.getSum() + b2.getSum();
    long mergedCount = b1.getCount() + b2.getCount();
    b1.setSum(mergedSum);
    b1.setCount(mergedCount);
    return b1;
  }
  // Transform the output of the reduction
  public Double finish(Average reduction) {
    return ((double) reduction.getSum()) / reduction.getCount();
  }
  // Specifies the Encoder for the intermediate value type
  public Encoder<Average> bufferEncoder() {
    return Encoders.bean(Average.class);
  }
  // Specifies the Encoder for the final output value type
  public Encoder<Double> outputEncoder() {
    return Encoders.DOUBLE();
  }
}

Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

全部代码见”examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala” 。
数据源
Spark SQL通过DataFrame接口支持操作很多数据来源。使用关系转换可以操作DataFrame,DataFrame也可以用来创建临时视图。注册DataFrame为临时视图可以允许对其数据运行SQL查询。这部分描述使用Spark数据源来加载和保存数据的一般方法,然后讨论内置数据源可用的特定的选项。
一般的加载、保存函数
用最简单的方式,默认的数据源(parquet除非由spark.sql.sources.default配置)会被用于所有的操作。

Dataset<Row> usersDf=spark.read().load("examples/src/main/resouces/users.parquet");
usersDF.select("name","favorite_color").write().save("namesAndColors.parquet");

全部代码见”examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”。
手工指定选项
你也可以手工指定数据源和额外选项。数据源通过合格的全名称(比如org.apache.spark.sql.parquet)来指定,但是对于内置的数据源你也可以使用它们的短名称(json,parquet,jdbc,orc,libsvm,csv,text)。从任何数据源类型加载的DataFrame都可以使用这种语法被转换成其他类型。

Dataset<Row> peopleDF=spark.read().format("json").load("examples/src/main/resouces/people.json");
peopleDF.select("name","age").write().format("parquet").save("namesAndAges.parquet");

全部代码见”examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” 。
直接在文件上运行SQL
除了使用读API来加载文件到DataFrame然后查询,你也可以直接使用SQL查询文件。

Dataset<Row> sqlDF=spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

全部代码见”examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”
保存模式
保存操作可选择保存模式,指定如何处理已存在的数据如果存在的话。意识到这些保存模式不使用任何锁和不是原子性的是非常重要的。另外,当执行Overwrite模式时,在写入新数据前会删除之前的数据。

Scala/Java任何语言意义
SaveMode.ErrorIfExists(default)“error”(default)当保存一个DataFrame到数据源时,如果数据已经存在,会抛出一个错误。
SaveMode.Append“append”如果数据、表已经存在,DataFrame的内容会追加到已存在的数据
SaveMode.Overwrite“overwrite”overwrite模式意味着,当数据/表已经存在时,存在的数据会被DataFrame的内容覆盖。
SaveMode.Ignore“ignore”当数据已经存在,不会保存DataFrame的内容,不会改变已经存在的数据。这类似于SQL中的“CREATE TABLE IF NOT EXISTS”

保存到持久化表
也可以使用saveAsTable命令将DataFrame作为持久化表保存到Hive metastore。注意已存在的Hive 部署不需要使用这个特色。Spark会(使用Derby)创建一个默认的本地Hive metastore。不像createOrReplaceTempView命令,saveAsTable会materialize DataFrame的内容,并在Hive metastore中建立指向数据的指针。持久化表甚至在你的Spark程序重启后仍然存在,只要你维护连接相同的metastore。持久化表的DataFrame通过对有表名的SparkSession调用table方法建立。
对于以文件为基础的数据源,比如text、parquet、json等等,你可以通过path选项指定一个自定义表路径,比如df.write.option(“path”,”/some/path”).saveAsTable(“t”)。当表被丢弃时,自定义表路径会被移除,表数据依然在那儿。如果没有自定义表路径被指定,Spark会把数据写到仓库目录下的默认表路径。当表被丢弃后,默认表路径也会被移除。
从Spark 2.1开始,持久化数据源表有每个分区的metadata存储在Hive metastore中。这带来几个好处:

  • 由于metastore为只返回查询必要的分区,在第一次查询时就发现所有的分区就不再必要。
  • Hive DDLs比如ALTER TABLE PARTITION 、SET LOCATION,现在对于数据源API创建的表是可用的。
    注意当创建外部数据源表(那些有path选项)时,分区信息不是默认收集的。为了同步metastore中的分区信息,你可以触发MSCK REPAIR TABLE。
    Bucketing,排序和分区
    对于以文件为基础的数据源,可能对输出进行bucket,排序和分区。bucket和排序只对持续化表有效:
peopleDF.write().bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed");

使用Dataset APIs时,partitioning可以与save、saveAsTABLE一起使用。

usersDF.write().partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet");

对于一个单独的表使用partition和bucket是可能的。

peopleDF.write().partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed");

partitionBy创建了一个目录结构,正如在Partition Discovery章节中描述的。这样,它限制了具有高基数列的适用性。相反,bucketBy跨大量的bucket分发数据,当大量独特的值无限时可以使用。
Parquet文件
parquet是一个柱状格式,得到了很多其他数据处理系统的支持。Spark SQL提供读取和写入Parquet文件的支持,自动保护原来数据的schema。当写入Parquet文件时,所有列基于兼容原因自动转换成可为空。
以编程方式加载数据
使用上面例子中的数据:

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> peopleDF=spark.read().json("examples/src/main/resources/people.json");
//DataFrame can be saved as Parquet files,maintaining the schema information
peopleDF.write().parquet(people.parquet");
//read in the parquet file created above
//parquet files are self-describing so the schema is preserved
//the result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF=spark.read().parquet("people.parquet");
//parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF=spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS=namesDF.map((MapFunction<Row,String> row -> "Name:" + row.getString(0), Encoders.STRING());
namesDS.show();
// +-------------+
// |     value   |
// +-------------+
// |Name:Justin  |
// +-------------+

Partion Discovery
表分区是一种常见的优化方法,在像Hive这样的系统中使用。在一个分区的表中,数据经常存储在不同的目录中,分区的列值被编码在每个分区目录路径中。所有内置的文件源(包括Text/CSV/JSON/ORC/Parquet)能够自动发现和推测分区信息。比如,我们可以使用下面的目录结构将我们之前使用过的人口数据存储进一个分区的表,有两个额外的列gender和country作为分区列。

path
|---to
    |--table
         |----gender=male
         |      |--....
         |      |--country=US
         |      |     |----data.parquet
         |      |--country=CN
         |      |     |----data.parquet
         |      |--....
         |----gender=female
                |--...
                |--country=US
                |     |----data.parquet
                |--country=CN
                |     |----data.parquet
                |--...

通过将path/to/table传递给SparkSession.read.parquet或SparkSession.read.load,Spark SQL会自动从路径中提取出分区信息。现在返回的DataFrame的schema变成了:

root
|--- name:string(nullable=true)
|--- age:long(nullable=true)
|--- gender:string(nullable=true)
|---country:string(nullable=true)

注意分区列的数据类型是自动推断的。现在支持数字数据类型、日期、时间戳和字符串类型。有时用户不想自动推断出分区列的数据类型。对于这些用例,自动类型推断可以通过spark.sql.sources.partitionColumnTypeInference.enabled来配置,默认是true。当类型推断失效后,字符串类型会被用于分区列。
从Spark1.6开始,分区发现默认只会找到给定路径下的分区。拿上面的例子来说,如果用户传递path/to/table/gender=male传递给SparkSession.read.parquet或SparkSession.read.load,gender不会给被认为是一个分区列。如果用户需要指定分区发现应该开始的基础路径,他们应该在数据源选项中设置bashPath。比如,当path/to/table/gender=male时数据的路径,用户设置basePath给path/to/table,gender会是一个分区列。
schema merge
像ProtocolBuffer、Avro和Thrift,Parquet也支持schema演化。用户从一个简单的schema开始,然后根据需要逐渐增加更多的列。这样,用户最后形成多个Parquet文件,这些文件有不同但是相互兼容的schema。Parquet数据源现在可以自动检测到这种情况,然后把schema融合进这些文件中。
由于schema融合是相对高成本的操作,在大多数情况下不是必要的。我们从1.5.0开始默认关闭了。你这样可以启用:

  • 当读取Parquet文件(如下面的例子)时,设置数据源选项mergeSchema为true
  • 设置全局SQL选项spark.sql.parquet.mergeSchema为true。
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public static class Square implements Serializable{
    private int value;
    private int square;
    //Getter and setter...
}
public static class Cube implements Serializable{
    private int value;
    private int cube;
}
List<Square> squares=new ArrayList<>();
for(int value=1;value<=5;value++){
    Square square=new Square();
    square.setValue(value);
    square.setSquare(value*value);
    square.add(square);
}
//create a simple DataFrame, store into a partition directory
Dataset<Row> squareDF=spark.createDataFrame(square,Square.class);
squareDF.write().parquet("data/test_table/key=1");
List<Cube> cubes=new ArrayList<>();
for(int value=6;value<=10;value++){
    Cube cube=new Cube();
    cube.setValue(value);
    cube.setCube(value*value*value);
    cubes.add(cube);
}
//create another DataFrame in a new partition directory
//adding a new column and dropping an existing column
Dataset<Row> cubeDF=spark.createDataFrame(cubes,Cube.class);
cubesDF.write().parquet("data/test_table/key=2");
//read the partitioned table
Dataset<Row> mergedDF=spark.read().option("mergeSchema",true).parquet("data/test_table");
mergedDF.printSchema();
//the final schema consists of all 3 columns in the parquet files together
//with the partitioning column appeared in the partition directory paths
//root
//  |--value:int(nullable=true)
//  |--squre:int(nullable=true)
//  |--cube:int(nullable=true)
//  |--key:int(nullable=true)

Hive metastore Parquet table conversion
当读取或写入Hive metastore Parquet tables时,Spark SQL会尝试使用自己的Parquet支持,而不是Hive SerDe,以获取较好的性能。这个行为由spark.sql.hive.convertMetastoreParquet配置来控制,默认是打开的。
Hive/Parquet schema Reconciliation
从表schema处理的角度看,Hive和Parquet有两个关键的不同:
1.Hive是大小写不敏感的,而Parquet是敏感的。
2.Hive考虑所有列是可为空的,而为空性在Parquet中是非常重要的。
由于这个原因,当把Hive metastore Parquet表转换成Spark SQL Parquet表时,我们必须调节Hive meta schema和Parquet schema。调节规则时:
1.两个schema中有相同名称的字段不考虑为空性必须有相同的数据类型。调节字段应该有Parquet side的数据类型,这样就考虑到了为空性。
2.reconciled schema包含那些在Hive metastore schema中定义的字段。
- 任何只在Parquet schema中出现的字段在reconciled schema中丢弃
- 任何只在Hive metastore中出现的字段作为可为空字段增加到reconciled schema中。
metastore refreshing
Spark SQL缓存Parquet metadata以获得更好性能。当Hive metastore Parquet table转换生效后,那些转换后的表的metadata也被缓存了。如果这些表被Hive或其他外部工具更新了,你需要手动更新它们已保证一致的metadata。

//spark is an existing SparkSession
spark.catalog().refreshTable("my_table");

配置
Parquet的配置可以通过SparkSession的setConf方法完成,或者使用SQL的set key=value 命令来设置。

属性名称默认值意义
spark.sql.parquet.binaryAsStringfalse生成Parquet的一些其他系统,在Impala、Hive、旧版本的Spark SQL中,在输出Parquet schema时不区分二进制数据和字符串。这个标志告诉Spark SQL把二进制数据作为字符串来解释,以兼容这些系统。
spark.sql.parquet.int96AsTimestamptrue
spark.sql.parquet.compression.codecsnappy
spark.sql.parquet.filterPushdowntrue
spark.sql.hive.convertMetastoreParquettrue
spark.sql.parquet.mergeSchemafalse
spark.sql.optimizer.metadataOnlytrue

ORC文件
从Spark2.3开始,Spark支持矢量化的ORC阅读器和新的ORC文件格式。为了这样,下面的配置是新增加的。矢量化的阅读器用于本地ORC表(比如)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark SQL,DataFrames and Datasets Guide 的相关文章

  • 如何根据条件删除结果以计算平均值

    我有下面的架构 对其的快速解释是 鲍勃评分为 5 5 詹姆斯评分 1 5 梅西百货评分高达 5 5 逻辑 如果我是 A 请查找我屏蔽的所有人 查阅所有电影评论 任何留下电影评论且 personA 已屏蔽的人 请将其从计算中删除 计算电影的平
  • 如何在 SQL Server 中保持数据行内

    我正在尝试找出如何检测数据是否在VARCHAR n SQL Server 2008 中的列存储在行内或行外 有谁知道如何做到这一点 另外 如果我们需要数据 有没有办法将数据保持在行中 要查看某个值是行内还是行外 您可以使用DBCC PAGE
  • SQL Server中的列级与表级约束?

    A 列级 GO CREATE TABLE Products ProductID INT CONSTRAINT pk products pid PRIMARY KEY ProductName VARCHAR 25 GO b 表层 CREATE
  • 验证 sql/oracle 中的电子邮件/邮政编码字段

    对于以下方面的一些建议将不胜感激 是否可以通过 oracle 中的 sql 中的某种检查约束来验证电子邮件和邮政编码字段 或者我怀疑 pl sql 带有正则表达式的这种事情 Thanks 这是电子邮件地址的正则表达式语法 包括引号 a zA
  • 如何从 MySQL 中的布尔类型返回不同的字符串?

    如果我在 MySql 中将一列设置为布尔值 则查询将返回以下值 0 or 1 是否可以做这样的事情 SELECT bool value AS yes OR no 我的意思是 根据真假返回两个不同的字符串 SELECT CASE WHEN b
  • 手动更改postgresql中查询的执行计划?

    是否可以在postgresql中手动更改执行计划的操作顺序 例如 如果我总是想在过滤之前进行排序操作 尽管这在 postgresql 的正常使用中没有意义 是否可以通过例如手动强制执行该操作改变运营的内部成本 如果我实现自己的功能呢 是否可
  • 如何搜索表中的所有列?

    如何在 SQL Server 中搜索表的所有列 SELECT FROM yourtable WHERE val IN field1 field2 field3 field4 如果您正在寻找精确的全场比赛 如果你正在寻找子字符串匹配 你将不得
  • sql join 告诉我 ID 是否存在于其他表中

    我有 2 张桌子 A B ID FKID 1 3 2 3 3 4 4 4 我需要一个 select 语句 它显示 A 的所有内容 其中一个字段告诉我表 B 是否有任何与该 ID 匹配的 id Desired Result ID hasB 1
  • 与 SQL 中的 IN 运算符相反

    我怎么能做相反的事情 换句话说 选择所有姓氏不是 Hansen 或 Pettersen 的人 WHERE lastname NOT IN Hansen Pettersen 请参阅 IN 和 NOT IN 运算符 部分SQLite 所理解的
  • 在 SQL 中按键组对行进行顺序编号?

    SQL中有没有办法按顺序添加行号按关键组 假设一个表包含任意 CODE NAME 元组 示例表 CODE NAME A Apple A Angel A Arizona B Bravo C Charlie C Cat D Dog D Dopp
  • ORA-00933 与内部联接和“as”混淆

    我有一个使用以下命令从两个表中获取数据的查询inner join 但我收到错误SQL command not properly ended as 下面有一个星号 select P carrier id O order id O aircra
  • 通过 SQLAlchemy 获取随机行

    如何使用 SQLAlchemy 从表中选择一个或多个随机行 这在很大程度上是一个特定于数据库的问题 我知道 PostgreSQL SQLite MySQL 和 Oracle 具有通过随机函数排序的能力 因此您可以在 SQLAlchemy 中
  • 如何计算 Postgres 上图表中所有连接的节点(行)?

    我的桌子有account id and device id One account id可以有多个device ids 反之亦然 我正在尝试计算每个连接的多对多关系的深度 Ex account id device id 1 10 1 11
  • 删除表的重复项

    In my activity logs 它包含列 material name user id mod result 这标志着测试是否通过 失败 cert links 不知何故 用户生成了两倍的条目material name与cert lin
  • SQLite (Android):使用 ORDER BY 更新查询

    Android SQLite 我想要在 myTable 中的其他行之间插入行在android中使用SQLite 为此 我尝试增加从第 3 行开始的所有行的 id 这样 我就可以在位置 3 处插入新行 myTable 的主键是列 id 表中没
  • TSQL - 生成文字浮点值

    我理解比较浮点数时遇到的许多问题 并对它们在这种情况下的使用感到遗憾 但我不是表格作者 只有一个小障碍需要克服 有人决定使用浮点数 就像您期望使用 GUID 一样 我需要检索具有特定浮点值的所有记录 sp help MyTable Colu
  • 在 PostgreSql 中计算百分比

    例如我有一个这样的表 string adm A 2 A 1 B 2 A 1 C 1 A 2 通过 SQL 查询 我想要这样的结果 string perc adm A 50 B 100 C 0 我想要每个字符串中数字 2 出现的百分比 我可以
  • 如何连续添加起始行和下一行的值

    我只想创建一个 sql 查询 结果就像图片上的那样 类似于 SQL 中的斐波那契数列 Ex Column 1 10 则 Result 列的值为 Result 10 因为这是第一行 然后假设column1第二行的值为50 那么Result第二
  • 在 MS Access SQL 查询中从正常日期转换为 unix 纪元日期

    我正在尝试编写一个通过 ODBC 连接到 MySQL 数据库的 MS Access 2007 连接的查询 一切工作正常 查询执行我想要的操作 我挂断的部分是我一直在询问用户 unix 纪元时间 而不是常规日期 我查找了 MS Access
  • 没有为 1 个或多个必需参数给出值。更新SQL

    我正在编写一个程序 当用户在列表视图上选择记录时 该程序会更新密码或积分 我收到错误 没有为 1 个或多个必需参数给出值 我不知道如何纠正 我是否遗漏了一些明显的东西 Dim sql As String UPDATE Users SET P

随机推荐

  • Lamp环境搭建和ucenter/ucenterhome

    环境 xff1a Centos 7 3 1611 步骤 xff1a 安装apache php软件 xff1a yum install httpd php php mysql安装mysql mariadb xff0c 以Centos系统为例
  • win10安装系统自带应用

    以管理员身份启动系统自带的Windows Powershell组件 xff0c 接着输入Get AppxPackage allusers Select Name PackageFullName xff0c 通过该命令获取当前系统安装的所有应
  • SQL DDL从MySQL到Oracle

    最新一个项目的sql ddl为MySQL准备的 xff0c 我想在Oracle中使用 之前不太了解两者的区别 xff0c 结果报错一坨 于是顶着头皮开始看什么问题 xff0c 以下是我陷过的坑 xff0c 让大家看看 废话少说 xff0c
  • 7 MySQL安全概述

    1 常见因素 密码 常见的密码要求 xff1a 包含大小写 数字 特殊字符限制 长度 不要保存密码明文 为防止彩虹表 xff0c 也不要简单的使用hash方法 xff0c 可以采用hash hash password 43 salt 的方式
  • 关于SIFT和SURF介绍

    SIFT xff08 尺度不变特征变换 xff09 关于一些角点检测技术 xff0c 比如 Harris 等 它们具有旋转不变特性 xff0c 即使图片发生了旋转 xff0c 我们也能找到同样的角点 xff0c 但如果进行图像缩放 xff0
  • 7.2 MySQL权限系统原理

    MySQL权限系统的用户接口由SQL语句组成 xff0c 比如create user xff0c grant xff0c revoke 在数据库内部 xff0c MySQL把权限信息保存在MySQL database的赋权表中 MySQL服
  • 7.2.1 MySQL提供的权限

    MySQL提供的权限应用于不同的上下文和不同的操作级别 xff1a 管理权限使用户可以管理MySQL服务器的操作 这些权限是全局性的 xff0c 因为它们不是局限于某个特定的数据库 数据库权限应用于数据库和数据库的组成对象 这些权限可以被赋
  • 7.3 MySQL用户账号管理

    7 3 1用户名称和密码 MySQL把账号存储在mysql系统数据库的user表中 一个账号被定义成一个用户名称和能够连接到服务器的客户端主机 xff08 群 xff09 账号都有一个密码 MySQL支持授权插件 xff0c 也就是说一个账
  • 7 Oracle 管理用户和安全

    用户和安全概览 用户账号由一个用户名确认 xff0c 定义了用户的属性包括 xff1a 鉴权方式 数据库鉴权密码 永久存储和临时存储的默认表空间 表空间配额 账号状态 xff08 是否锁定 xff09 密码状态 xff08 是否过期 xff
  • linux-bash-find

    FIND 1 General Commands Manual FIND 1 1 NAME find search for files in a directory hierarchy 2 SYNOPSIS find H L P D debu
  • awk、任务管理

    awk awk F 39 39 39 span class hljs operator span class hljs keyword BEGIN span l 61 span class hljs number 0 span span c
  • java数据结构和算法

    常见的数据结构 数组 gt 方便通过下标随机访问数据 有序数组无序数组数组大小一旦确定无法变更栈 先进后出只能压入 xff08 push xff09 查看 xff08 peek xff09 删除 xff08 pop xff09 栈顶无法查找
  • spring概述

    spring框架主要包括以上几个方面
  • 查看进程_端口的命令

    1 Windows平台 在windows控制台窗口下执行 xff1a netstat nao findstr 9010 TCP 127 0 0 1 9010 0 0 0 0 0 LISTENING 3017 你看到是PID为3017的进程占
  • hadoop学习记录—2.8.2documentation—mapreduce Tutorial

    1 概况 hadoop MapReduce是一个软件框架 xff0c 在这个框架上可以很容易编写以可靠 容错地运行在大量廉价硬件组成的集群 xff08 上千节点 xff09 上 并行地处理大量数据 xff08 数TB数据集 xff09 的程
  • 服务器使用windows server 2008修改密码步骤教程

    1 进入服务器后右击计算机 xff0c 点击管理 xff0c 进入服务器管理器 2 在服务器管理器里面双击配置 xff08 打开下一列 xff09 双击本地用户和组 xff08 打开下一列 xff09 点击用户 3 右击Administra
  • yarn结构-2.9.0

    YARN最基本的想法就是把资源管理和任务调度 监听功能分成独立的守护进程 这个想法就是有一个全局的ResourceManager xff08 RM xff09 和每个应用独自的ApplicationMaster xff08 AM xff09
  • spark集群模式概览

    本文简短概述下spark如何在集群上运行 xff0c 使得更简单地理解涉及到的组件 可以通过读 应用提交指南 来学习在一个集群上加载应用 组件 spark应用作为独立的进程集运行在集群上 xff0c 在主应用 xff08 称为驱动程序 xf
  • SPARK RDD编程指南

    在高层次面上 xff0c 每个spark应用有一个驱动程序组成 xff0c 驱动程序运行用户的主函数 xff0c 在集群上执行很多并行操作 Spark提供的主要抽象是RDD xff0c 可以进行并行操作的跨节点分散的元素集 RDDs可以由H
  • Spark SQL,DataFrames and Datasets Guide

    概览 Spark SQL是Spark的一个结构化数据处理模块 不像基本的Spark RDD API xff0c Spark SQL提供的接口提供更多关于数据和执行的操作的结构信息 从内部看 xff0c Spark SQL使用额外的信息来执行