经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
Spark学习笔记
来源:cnblogs  作者:killianxu  时间:2019/6/17 8:43:51  对本文有异议

Spark Core

1.1 RDD

概念:The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.

RDD创建:

  1. parallelizing an existing collection in your driver program
  2. referencing a dataset in an external storage system, such as a shared filesystem

  

  1. //第一种创建方法
  2. List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
  3. JavaRDD<Integer> distData = sc.parallelize(data);
  4.  
  5. //第二种创建方法
  6. JavaRDD<String> distFile = sc.textFile("data.txt");

RDD操作:

  1. transformations, which create a new dataset from an existing one
  2. actions, which return a value to the driver program after running a computation on the dataset
  1. JavaRDD<String> lines = sc.textFile("data.txt");
  2. JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
  3. int totalLength = lineLengths.reduce((a, b) -> a + b);

 map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program.

RDD操作性能:

  All transformations in Spark are lazy.

  By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.

  Shuffle operations. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. (reduceByKey).

1.2 Shared Variables

Broadcast Variables:Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

  1. Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
  2.  
  3. broadcastVar.value();
  4. // returns [1, 2, 3]

Accumulators:Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel.A numeric accumulator can be created by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using the add method. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.sed to implement counters (as in MapReduce) or sums.

  1. LongAccumulator accum = jsc.sc().longAccumulator();
  2.  
  3. sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
  4.  
  5. accum.value();
  6. // returns 10

Spark Sql

  Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API.

  A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.

  A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. While, in JAVA API, users need to use Dataset<Row> to represent a DataFrame.

  1. //DataFrames实例
  2. //people.json
  3. //{"name":"Michael"}
  4. //{"name":"Andy", "age":30}
  5. //{"name":"Justin", "age":19}
  6. import org.apache.spark.sql.SparkSession;
  7. import org.apache.spark.sql.Dataset;
  8. import org.apache.spark.sql.Row;
  9. Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
  10.  
  11. SparkSession spark = SparkSession
  12. .builder()
  13. .appName("Java Spark SQL basic example")
  14. .config("spark.some.config.option", "some-value")
  15. .getOrCreate();
  16. // Displays the content of the DataFrame to stdout
  17. df.show();
  18. // +----+-------+
  19. // | age| name|
  20. // +----+-------+
  21. // |null|Michael|
  22. // | 30| Andy|
  23. // | 19| Justin|
  24. // +----+-------+
  25. // Select everybody, but increment the age by 1
  26. df.select(col("name"), col("age").plus(1)).show();
  27. // +-------+---------+
  28. // | name|(age + 1)|
  29. // +-------+---------+
  30. // |Michael| null|
  31. // | Andy| 31|
  32. // | Justin| 20|
  33. // +-------+---------+
  34.  
  35. // Select people older than 21
  36. df.filter(col("age").gt(21)).show();
  37. // +---+----+
  38. // |age|name|
  39. // +---+----+
  40. // | 30|Andy|
  41. // +---+----+
  42.  
  43. // Count people by age
  44. df.groupBy("age").count().show();
  45. // +----+-----+
  46. // | age|count|
  47. // +----+-----+
  48. // | 19| 1|
  49. // |null| 1|
  50. // | 30| 1|
  51. // +----+-----+

  

 

  1. //SQL实例
  2. import org.apache.spark.sql.Dataset;
  3. import org.apache.spark.sql.Row;
  4.  
  5. // Register the DataFrame as a SQL temporary view
  6. df.createOrReplaceTempView("people");
  7.  
  8. Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
  9. sqlDF.show();
  10. // +----+-------+
  11. // | age| name|
  12. // +----+-------+
  13. // |null|Michael|
  14. // | 30| Andy|
  15. // | 19| Justin|
  16. // +----+-------+
  1. //Dataset实例
  2. import java.util.Arrays;
  3. import java.util.Collections;
  4. import java.io.Serializable;
  5.  
  6. import org.apache.spark.api.java.function.MapFunction;
  7. import org.apache.spark.sql.Dataset;
  8. import org.apache.spark.sql.Row;
  9. import org.apache.spark.sql.Encoder;
  10. import org.apache.spark.sql.Encoders;
  11.  
  12. public static class Person implements Serializable {
  13. private String name;
  14. private int age;
  15.  
  16. public String getName() {
  17. return name;
  18. }
  19.  
  20. public void setName(String name) {
  21. this.name = name;
  22. }
  23.  
  24. public int getAge() {
  25. return age;
  26. }
  27.  
  28. public void setAge(int age) {
  29. this.age = age;
  30. }
  31. }
  32. // Create an instance of a Bean class
  33. Person person = new Person();
  34. person.setName("Andy");
  35. person.setAge(32);
  36.  
  37. // Encoders are created for Java beans
  38. Encoder<Person> personEncoder = Encoders.bean(Person.class);
  39. Dataset<Person> javaBeanDS = spark.createDataset(
  40. Collections.singletonList(person),
  41. personEncoder
  42. );
  43. javaBeanDS.show();
  44. // +---+----+
  45. // |age|name|
  46. // +---+----+
  47. // | 32|Andy|
  48. // +---+----+
  49.  
  50.  
  51. // DataFrames can be converted to a Dataset by providing a class. Mapping based on name
  52. String path = "examples/src/main/resources/people.json";
  53. Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
  54. peopleDS.show();
  55. // +----+-------+
  56. // | age| name|
  57. // +----+-------+
  58. // |null|Michael|
  59. // | 30| Andy|
  60. // | 19| Justin|
  61. // +----+-------+

  Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.

 

原文链接:http://www.cnblogs.com/killianxu/p/11026447.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号