Spark WordCount的两种方式。
语言:Java
工具:Idea
项目:Java Maven
pom.xml如下:
- <properties>
- <spark.version>1.2.0</spark.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>${spark.version}</version>
- </dependency>
- </dependencies>
第一种方式,比较常规的按部就班的
- package pairs;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.api.java.function.VoidFunction;
- import scala.Tuple2;
- import java.util.Arrays;
- public class WordCount1 {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf().setMaster("local").setAppName("wordcount1");
- JavaSparkContext sc = new JavaSparkContext(conf);
- String filename = "D:\\tmp\\words.txt";
- JavaRDD<String> input = sc.textFile(filename);
- JavaRDD<String> lines = input.flatMap(new FlatMapFunction<String, String>() {
- public Iterable<String> call(String s) throws Exception {
- return Arrays.asList(s.split(" "));
- }
- });
- //pairs
- JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
- public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<String, Integer>(s,1);
- }
- });
- //reduce
- JavaPairRDD<String,Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
- public Integer call(Integer x, Integer y) throws Exception {
- return x+y;
- }
- });
- //output
- counts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
- public void call(Tuple2<String, Integer> tuple2) throws Exception {
- System.out.println(tuple2);
- }
- });
- sc.stop();
- }
- }
代码输出:
第二种更为简洁
- package pairs;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import java.util.Arrays;
- import java.util.Map;
- public class WordCount2 {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf().setMaster("local").setAppName("wordcount2");
- JavaSparkContext sc = new JavaSparkContext(conf);
- String filename = "D:\\tmp\\words.txt";
- JavaRDD<String> input = sc.textFile(filename);
- JavaRDD<String> lines = input.flatMap(new FlatMapFunction<String, String>() {
- public Iterable<String> call(String s) throws Exception {
- return Arrays.asList(s.split(" "));
- }
- });
- Map<String,Long> result = lines.countByValue();
- System.out.println(result);
- sc.stop();
- }
- }
代码输出:
通过对比可以发现,第一种方式一直都是转化操作,最后打印的是Tuple2;而第二种方式变成了行动操作,直接输出Map<String,Long>。
具体有什么区别,或者效率上有啥不同,待后续深入学习。
参考资料:
《Spark快速大数据分析》