Summary of 20 operators commonly used in spark
Spark operators can be roughly divided into the following three categories:

1. Transformation data operator of the Value data type, this transformation does not trigger the submission of the job, the data item for the processing is Value type data.
2. The Transfromation operator of the Key-Value data type. This transformation does not trigger the submit job. The data item processed is a Key-Value type data pair.
3.Action operator, this type of operator will trigger SparkContext to submit Job job
example:
1. Map is a function that performs a specified function on each element in the RDD to generate a new RDD. Any element in the original RDD has one and only one element corresponds to it in the new RDD
val a = sc.parallelize(1 to 9, 3)
# x => *2 is a function, x is the incoming parameter, ie each element of the RDD, x*2 is the return value
val b = a.map(x => x*2)
a.collect
# Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
b.collect
# Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)list/key--->key-valueval a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x => (x, 1))
b.collect.foreach(println(_))# /*
# (dog,1)
# (tiger,1)
# (lion,1)
# (cat,1)
# (panther,1)
# ( eagle,1)
# */val l=sc.parallelize(List((1,'a'),(2,'b')))
var ll=l.map(x=>(x._1,"PV:"+x._2)).collect()
ll.foreach(println)
# (1,PVa)
# (2,PVb)```
2, mapPartitions (function), map () input function is applied to each element in the RDD, and mapPartitions () input function is applied to each partition
package test
import scala.Iterator
import org.apache.spark.SparkConf
import org.apache.spark.SparkContextobject TestRdd {
def sumOfEveryPartition(input: Iterator[Int]): Int = {
var total = 0
input.foreach { elem =>
total += elem
}
total
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Rdd Test")
val spark = new SparkContext(conf)
Val input = spark.parallelize(List(1, 2, 3, 4, 5, 6), 2)//RDD has 6 elements, divided into 2 partitions
val result = input.mapPartitions(
Partition => Iterator(sumOfEveryPartition(partition))) //partition is the passed argument, is a list, and the request is also a list, which is Iterator(sumOfEveryPartition(partition))
result.collect().foreach {
println(_)
# 6 15, partition calculation and
}
spark.stop()
}
}
3、mapValues(function)
The Key in the original RDD remains unchanged, together with the new Value, forms the elements in the new RDD. Therefore, this function only works for RDDs whose elements are KV pairs.
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect# //Results
# Array(
# (3,xdogx),
# (5,xtigerx),
# (4,xlionx),
# (3,xcatx),
# (7,xpantherx),
# (5,xeaglex)
# )# val grouped = mds.groupBy(md => md.matched)
# grouped.mapValues(x => x.size).foreach(println)
4、flatMap(function)
is similar to map. The difference is that the elements in the original RDD can only generate one element after being processed by the map, and the elements in the original RDD can generate multiple elements after being processed by flatmap.
val a = sc.parallelize(1 to 4, 2)
Val b = a.flatMap(x => 1 to x)//Each element extension
b.collect
/*
Result Array[Int] = Array( 1,
1, 2,
1, 2, 3,
1, 2, 3, 4)
*/
5、flatMapValues(function)
val a = sc.parallelize(List((1,2),(3,4),(5,6)))
val b = a.flatMapValues(x=>1 to x)
b.collect.foreach(println(_))
/* result
(1,1)
(1,2)
(3,1)
(3,2)
(3,3)
(3,4)
(5,1)
(5,2)
(5,3)
(5,4)
(5,5)
(5,6)
*/val list = List(("mobin",22),("kpop",20),("lufei",23))
val rdd = sc.parallelize(list)
val mapValuesRDD = rdd.flatMapValues(x => Seq(x,"male"))
mapValuesRDD.foreach(println)Output:(mobin,22)
(mobin,male)
(kpop,20)
(kpop,male)
(lufei,23)
(lufei,male)If it is mapValues will output: [comparison difference](mobin,List(22, male))
(kpop,List(20, male))
(lufei,List(23, male))
6, reduceByKey (func, numPartitions): group by Key, use the given func function to aggregate the value, numPartitions set the number of partitions, improve the degree of parallelism
val arr = List(("A",3),("A",2),("B",1),("B",3))
val rdd = sc.parallelize(arr)
val reduceByKeyRDD = rdd.reduceByKey(_ +_)
reduceByKeyRDD.foreach(println)
sc.stop# (A,5)
# (A,4)
7, groupByKey (numPartitions): group by Key, return [K, Iterable[V]], numPartitions set the number of partitions, improve the degree of parallelism [value is not cumulative, but become an array]
//omit
val arr = List(("A",1),("B",2),("A",2),("B",3))
val rdd = sc.parallelize(arr)
val groupByKeyRDD = rdd.groupByKey()
groupByKeyRDD.foreach(println)
sc.stop# (B,CompactBuffer(2, 3))
# (A,CompactBuffer(1, 2))# Count the number of array summary elements after the key
scala> groupByKeyRDD.mapValues(x => x.size).foreach(println)
# (A,2)
# (B,2)
8. sortByKey(accending, numPartitions): Returns the RDD composed of (K, V) key-value pairs sorted by Key. When accending is true, it indicates ascending order. When false, it indicates descending order. numPartitions sets the number of partitions to improve the parallelism of the job.
/ / Omit sc
val arr = List(("A",1),("B",2),("A",2),("B",3))
val rdd = sc.parallelize(arr)
val sortByKeyRDD = rdd.sortByKey()
sortByKeyRDD.foreach(println)
sc.stop# (A,1)
# (A,2)
# (B,2)
# (B,3)# word frequency
val rdd = sc.textFile("/home/scipio/README.md")
val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)
val wcsort = wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1))
wcsort.saveAsTextFile("/home/scipio/sort.txt")# Ascending order, sortByKey(true)
9, cogroup (otherDataSet, numPartitions): two RDD (such as: (K, V) and (K, W)) the same Key elements are first aggregated, and finally return (K, Iterator, Iterator) form of RDD, numPartitions sets the number of partitions to improve job parallelism
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd1 = sc.parallelize(arr, 3)
val rdd2 = sc.parallelize(arr1, 3)
val groupByKeyRDD = rdd1.cogroup(rdd2)
groupByKeyRDD.foreach(println)
sc.stop# (B,(CompactBuffer(2, 3),CompactBuffer(B1, B2)))
# (A,(CompactBuffer(1, 2),CompactBuffer(A1, A2)))
10, join (otherDataSet, numPartitions): For the two RDDs, first perform a cogroup operation to form a new RDD, and then perform a Cartesian product for each element under the Key, and numPartitions sets the number of partitions to improve the parallelism of the job.
//omit
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd = sc.parallelize(arr, 3)
val rdd1 = sc.parallelize(arr1, 3)
val groupByKeyRDD = rdd.join(rdd1)
groupByKeyRDD.foreach(println)# (B,(2,B1))
# (B,(2,B2))
# (B,(3,B1))
# (B,(3,B2))
# (A,(1,A1))
# (A,(1,A2))
# (A,(2,A1))
# (A,(2,A2
11, LeftOutJoin (otherDataSet, numPartitions): left outer join, including all data of the left RDD, if there is no match on the right side with None, numPartitions set the number of partitions, improve the degree of parallelism
//omit
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3),("C",1))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd = sc.parallelize(arr, 3)
val rdd1 = sc.parallelize(arr1, 3)
val leftOutJoinRDD = rdd.leftOuterJoin(rdd1)
leftOutJoinRDD .foreach(println)
sc.stop# (B,(2,Some(B1)))
# (B,(2,Some(B2)))
# (B,(3,Some(B1)))
# (B,(3,Some(B2)))
# (C,(1,None))
# (A,(1,Some(A1)))
# (A,(1,Some(A2)))
# (A,(2,Some(A1)))
# (A,(2,Some(A2)))
12, RightOutJoin (otherDataSet, numPartitions): right outer join, including all data of the right RDD, if there is no match on the left side with None, numPartitions set the number of partitions, improve the degree of parallelism
//omit
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"),("C","C1"))
val rdd = sc.parallelize(arr, 3)
val rdd1 = sc.parallelize(arr1, 3)
val rightOutJoinRDD = rdd.rightOuterJoin(rdd1)
rightOutJoinRDD.foreach(println)
sc.stop# (B,(Some(2),B1))
# (B,(Some(2),B2))
# (B,(Some(3),B1))
# (B,(Some(3),B2))
# (C,(None,C1))
# (A,(Some(1),A1))
# (A,(Some(1),A2))
# (A,(Some(2),A1))
# (A,(Some(2),A2))
13、lookup()
var rdd1=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
# rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[81] at parallelize at
rdd1.lookup(1)
# res34: Seq[String] = WrappedArray(a)
14、filter()
val filterRdd = sc.parallelize(List(1,2,3,4,5)).map(_*2).filter(_>5)
filterRdd.collect
# res5: Array[Int] = Array(6, 8, 10)
15、collect()
scala> var rdd1 = sc.makeRDD(1 to 10,2)
# rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
scala> rdd1.collect
# res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
16、reduce()
scala> var rdd1 = sc.makeRDD(1 to 10,2)
# rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
#
scala> rdd1.reduce(_ + _)
# res18: Int = 55
scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
# rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21
#
scala> rdd2.reduce((x,y) => {
| (x._1 + y._1,x._2 + y._2)
| })
res21: (String, Int) = (CBBAA,6)=================================================================
18、count()
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
# rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21
scala> rdd1.count
# res15: Long = 3
17、first()
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
# rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
scala> rdd1.first
# res14: (String, String) = (A,1)
18、 case
scala> val aa=List(1,2,3,"asa")
# aa: List[Any] = List(1, 2, 3, asa)
scala> aa. map {
| case i: Int => i + 1
| case s: String => s.length
| }
# res16: List[Int] = List(2, 3, 4, 3)
19、mapPartitionsWithIndex[U: ClassTag] (f: (Int, Iterator [T]) => Iterator [U], preservesPartitioning: Boolean = false): RDD [U] f function parameters and the partition comprises a partition number corresponding to data set two parameters , you can put a partition index data plus the conversion time; preservesPartitioning parameters indicate whether to keep the father of RDD partitioner partition information.
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 3)
// mapPartitions operator
rdd.mapPartitions {
val value = {// outer connection resource map}
iterator => iterator.map(_ * value)
}
// mapPartitionsWithIndex operator
val partitionIndex = (index: Int, iter: Iterator[Int]) => {
iter.toList.map(item => "index:" + index + ": value: " + item).iterator
}
rdd.mapPartitionsWithIndex(partitionIndex, true).foreach(println(_))
/**
index:0: value: 1
index:0: value: 2
index:1: value: 3
index:1: value: 4
index:2: value: 5
index:2: value: 6
*/
20、flatMapValues[U] (f: V => TraversableOnce [U]): RDD [(K, U)]: value-tuple of service logic operations further set back, and were combined with the key
val rdd = sc.parallelize(List((2, "a b c"), (5, "q w e"), (2, "x y z"), (6, "t y")), 2)
rdd.flatMapValues(_.split(" ")).collect()
/**
Array((2,a), (2,b), (2,c), (5,q), (5,w), (5,e), (2,x), (2,y), (2,z), (6,t), (6,y))
*/