반응형
AggregateByKey
https://www.projectpro.io/recipes/explain-aggregatebykey-spark-scala
위의 링크를 통해 개념을 살펴보고 아래처럼 작성해보았다.
spark-shell을 사용해서 작성
/usr/local/Cellar/apache-spark/3.1.2/bin/spark-shell
// Bazic aggregateByKey example in scala
// Creating PairRDD studentRDD with key value pairs, Number partitions is 3 defined in parallelize method.
val studentRDD = sc.parallelize(Array(
("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82),
("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80),
("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74),
("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68),
("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)), 3)
studentRDD.collect()
위에처럼 하려다가 IDE를 사용하는것이 더 편할것 같아서 IDE에서 코딩!
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object AggregateByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AggregateByKey")
conf.setIfMissing("spark.master", "local[*]")
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
// Bazic aggregateByKey example in scala
// Creating PairRDD studentRDD with key value pairs, Number partitions is 3 defined in parallelize method.
val studentRDD = spark.sparkContext.parallelize(Array(
("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82),
("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80),
("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74),
("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68),
("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)), 3)
println(studentRDD.count())
def seqOp = (accumulator: Int, element: (String, Int)) =>
if(accumulator > element._2) accumulator else element._2
def combOp = (accumulator1: Int, accumulator2: Int) =>
if(accumulator1 > accumulator2) accumulator1 else accumulator2
val zeroVal = 0
val aggrRDD = studentRDD.map(t => (t._1, (t._2, t._3))).aggregateByKey(zeroVal)(seqOp, combOp)
aggrRDD.collect foreach println
}
}
너무 간만에 스파크 스터디를 해서...다시 초기화 ㅋㅋㅋ
뭐하는 녀석인지 다시 살펴봐야겠다.
반응형
'BigDATA > spark' 카테고리의 다른 글
Spark - 파티셔너(Partitioner)&셔플링(shuffling) (0) | 2022.03.10 |
---|---|
Spark - RDD? (0) | 2022.03.03 |
[Spark-Study] Day-5 인텔리제이에서 실습 (0) | 2021.08.12 |
[Spark-Study] Day-4 스파크 로컬 디버깅 (2) | 2021.08.05 |
Upgrade IntelliJ IDEA for Big Data Tool Plug-In & running spark! (0) | 2021.07.29 |