体会一下:mapToPair()、countByKey()、reduceByKey()。
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class ReduceByKeyDemo {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local");
// 这一行非常耗时,目测耗时约10秒
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> data = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 5, 6, 7);
JavaRDD<Integer> javaRDD = sc.parallelize(data);
System.out.println("### List<Integer>转JavaRDD=" + javaRDD.collect());
//转化为K,V格式
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) {
return new Tuple2<Integer, Integer>(integer, 1);
}
});
System.out.println("### javaRDD.mapToPair(...)=" + javaPairRDD.collect());
Map<Integer, Long> countByKeyResultMap = javaPairRDD.countByKey();
System.out.println("### javaRDD.countByKey()=" + countByKeyResultMap);
JavaPairRDD<Integer,Integer> reduceByKeyRDD1 = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}
});
System.out.println("### javaPairRDD.reduceByKey,方式1=" + reduceByKeyRDD1.collect());
//指定numPartitions
JavaPairRDD<Integer,Integer> reduceByKeyRDD2 = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}
},2);
System.out.println("### javaPairRDD.reduceByKey,方式2=" + reduceByKeyRDD2.collect());
//自定义partition
JavaPairRDD<Integer,Integer> reduceByKeyRDD3 = javaPairRDD.reduceByKey(new Partitioner() {
@Override
public int numPartitions() {
return 2;
}
@Override
public int getPartition(Object o) {
return (o.toString()).hashCode() % numPartitions();
}
}, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}
});
System.out.println("### javaPairRDD.reduceByKey,方式3=" + reduceByKeyRDD3.collect());
}
}
体会一下:groupByKey
test.txt内容:
2010-05-04 11:50,10,10,10
2010-05-04 12:50,10,10,10
2010-05-04 12:50,10,10,10
2010-05-05 13:50,20,20,20
2010-05-05 13:50,20,20,20
2010-05-06 14:50,30,30,30
2010-05-06 14:50,30,30,30
在Spark-shell里执行:
import scala.io.Source
val source = Source.fromFile("D:/test.txt").getLines.toArray
val sourceRDD = sc.parallelize(source)
sourceRDD.map {
line =>
val lines = line.split(",")
(s"${lines(0)}", s"${lines(1)},${lines(2)},${lines(3)}")
}.groupByKey.map {
case (k, v) =>
var a, b, c = 0
v.foreach {
x =>
val r = x.split(",")
a += r(0).toInt
b += r(1).toInt
c += r(2).toInt
}
s"$k,$a,$b,$c"
}.foreach(println)
输出:
2010-05-04 11:50,10,10,10
2010-05-04 12:50,20,20,20
2010-05-05 13:50,40,40,40
2010-05-06 14:50,60,60,60
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/17258.html