Ace-T's Blog 내 검색 [네이버 커넥트 이웃 합니다~^-^/ 요청 大 환영~~]

HDFS부터 DB까지 팁 아닌 팁~

BigDATA/spark 2019.01.15 22:24
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

애월 - 지금이순간 카페


스칼라 알못 스파크 알못이라..이번에 작업한 내용이 있는데 삽질을 많이 했다...ㅋㅋ 
생각나는 것들을 적어보자.
HDFS를 가져와 RDD에 저장! sc.textFile을 통해서 HDFS를 가져온다.
sc.textFile("hdfs path")
그런데 그냥 가져오면 소용이 없다. 그러므로 map을 통해서 필요한 친구들만 가져온다. 또한 filter를 통해 데이터를 줄여준다.

ex) hdfs의 포맷이 Json이라서 Gson을 사용. filter를 통해 데이터를 줄여준다.

    val rddRaw0 = sc.textFile("hdfs path").map(line => new Gson().fromJson(line, classOf[TestObject])).filter(line=> line.collection.code.contains("ACET") && !line.document.id.isEmpty).cache() 

리고 여러개의(multi) RDD들이 있었고 이 친구들을 한번에 분석을 해야했다. 그래서 union을 사용.
ex) val result  = rddRaw0.union(rddRaw1)

그런뒤 reduceByKey를 하여 word count를 해준다.
rddRaw.map(line => ((line.document.id,device),1)).reduceByKey{ case (x, y) => x + y}

이제 분석되어진 RDD 친구를 List화 하여 JDBC를 통해 DB에 넣는다.
여기에서 중요한 RDD to List는 아래의 내용이다.
val list: List[((String, String), Int)] = result.collect().toList 
그리고 map으로 원하는 형태를 만들어준 뒤,  InsertToDb를 해주면 되는데 여기에서 sirialization 문제가 발생한다.

  def insertDBxx(result: RDD[((String, String), Int)]) {
       val list: List[((String, String), Int)] = result.collect().toList
       val targetList = list.map(line => (line._1._1, line._1._2, line._2))
       new InsertToDb(targetList)
  }  

Serialize 해결은 아래와 같이 하였다.

  @SerialVersionUID(15L)
  class InsertToDb(targetList: List[(String, String, Int)]) extends Serializable{ 

Serializable은 해결방법보다 이녀석이 뭔지 정확히 알필요가 있으니 따로 포스팅을 하겠습니다.

자 이제 InsertToDb에서는 connection을 맺고 db작업을 해주면 된다.
여기에서의 팁은 아래에 보시면 getConnection이 아니라 getPoolConnection()인 것을 알수가 있다.

1) get connection
    val connection = OutputJdbcTask.getPoolConnection()


pool 사용을 위해 build.sbt에 아래를 추가

  "c3p0" "c3p0" % "0.9.0.4" 

소스는 아래와 같다.

val env = MainConfig.env
val driver = ConfigFactory.load().getString(s"$env.driver")
val url = ConfigFactory.load().getString(s"$env.url")
val username = ConfigFactory.load().getString(s"$env.username")
val password = ConfigFactory.load().getString(s"$env.password")

private val cpds: ComboPooledDataSource = new ComboPooledDataSource()
cpds.setDriverClass(driver)
cpds.setJdbcUrl(url)
cpds.setUser(username)
cpds.setPassword(password)
cpds.setMinPoolSize(5)
cpds.setAcquireIncrement(5)
cpds.setMaxPoolSize(35)

def getConnection(){
try {
Class.forName(driver)
DriverManager.getConnection(url, username, password)
} catch {
case e : Exception => e.printStackTrace
}
} def getPoolConnection() = { cpds.getConnection
}


마지막으로 DB Upsert하는 부분 소스는 아래와 같다.

try {
val connection = OutputJdbcTask.getPoolConnection()

val insertSql =
"""
|insert into 테이블명 (dsid, reg_dttm, click_count, device, create_date )
|values (?,?,?,?,?)
|ON DUPLICATE KEY UPDATE dsid = ?, reg_dttm =?, click_count= ?, device =?, create_date =?
""".stripMargin

val insertStmt: PreparedStatement = connection.prepareStatement(insertSql)
val regDttm = MainConfig.currentTimeAsString
val createDate = MainConfig.date
var itemCnt = 1
val chunk = 10000

targetList.foreach {
t =>
val dsid = t._1
val device = t._2
val clickCnt = t._3

insertStmt.setString(1, dsid)
insertStmt.setString(2, regDttm)
insertStmt.setInt(3, clickCnt)
insertStmt.setString(4, device)
insertStmt.setString(5, createDate)
insertStmt.setString(6, dsid)
insertStmt.setString(7, regDttm)
insertStmt.setInt(8, clickCnt)
insertStmt.setString(9, device)
insertStmt.setString(10, createDate)
insertStmt.addBatch()
insertStmt.clearParameters()
if (itemCnt % chunk == 0) {
insertStmt.executeBatch()
}
itemCnt = itemCnt + 1

}
insertStmt.executeBatch()
insertStmt.close()
connection.close()
} catch {
case e: Exception => {
e.printStackTrace()
}
}

참고 사이트)

https://stackoverflow.com/questions/40892800/spark-rdd-to-list

https://alvinalexander.com/scala/how-to-use-serialization-in-scala-serializable-trait

'BigDATA > spark' 카테고리의 다른 글

HDFS부터 DB까지 팁 아닌 팁~  (0) 2019.01.15
sbt lib 연동 안되는 현상  (0) 2019.01.04
spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

java.sql.SQLException: No value specified for parameter 3

DataBase/MySql 2019.01.09 14:13
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


해당 에러는 scala + mysql에서 insert + update를 하고자할 때 났다.

쿼리 스트링은 아래와 같다.

val insertSql =
"""
|insert into 테이블 (name, age )
|values (?,?)
|ON DUPLICATE KEY UPDATE name = ?, age =?
""".stripMargin

java.sql.SQLException: No value specified for parameter 3

에러는 아래에서 코드가 추가되지 않아서였다.

targetList.foreach {
t =>
val name = t._1
val age = t._2

if (age > 20){
insertStmt.setString(1, name)
insertStmt.setInt(2, age)
insertStmt.addBatch()
insertStmt.clearParameters()
}
}
insertStmt.executeBatch()
insertStmt.close()
connection.close()

즉, 2번째 ? 까지는 채워졌는데 3,4 번째 ?는 채워지지 않아서 이다.

그러므로 insertStmt.setString(3, name)와 insertStmt.setInt(4, age)가 필요하다.


 - END

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

sbt lib 연동 안되는 현상

BigDATA/spark 2019.01.04 14:47
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T
not found!! dependency에는 있는데 not found....sbt가 꼬인듯 하다..ㅠㅠ 이럴땐? 다시 셋팅 하자..



SBT버전이 중요! 너무 낮거나 높으면 인텔리J에서 제대로 못가져옴.-_-;;
Scala/Spark 버전은 사내 분산 클러스터에 맞게 적용.



build.sbt 내용
name := "neosite-data"

version := "0.1"

scalaVersion := "2.11.11"

val sparkVersion = "1.6.2"

libraryDependencies ++= Seq("org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.0",
  "org.scalatest" %% "scalatest" % "3.0.5" % "test",
  "org.apache.spark"      %%  "spark-core"        % sparkVersion % "provided",
  //   "org.apache.spark" %% "spark-sql" % sparkVersion,
  "mysql"           "mysql-connector-java" % "5.1.+",
  "com.typesafe.play" %% "play-json" % "2.6.10",
  "com.google.code.gson" % "gson" % "1.7.1",
  "com.typesafe"    "config"               % "1.3.0",
  "org.scalikejdbc" %% "scalikejdbc"          % "2.3.5",
  "org.scalikejdbc" %% "scalikejdbc-config"   % "2.3.5",
  "org.scalikejdbc" %% "scalikejdbc-test"     % "2.3.5"     % "test",
  "ch.qos.logback"  "logback-classic"      % "1.1.6"
)

결과


테스트

warn등을 발견할 수 있다.
하라는데로 1.0.4로 수정 후 다시 sbt를 돌려보자.(scala-parser-combinators 1.0.4로 수정.)

수정 후 결과

이제 환경이 구축 되었다.
자기 입맛에 맞게 개발하면 된다.

내가 해야 할 것
  1. hdfs 연동. 즉, 다른쪽에 저장되어있는 친구를 불러온다(raw data)
  2. Spark 분석(map-reduce)
  3. Mysql에 저장.
    1. scala application.conf의 정보 읽어와서 JDBC 접속 후 INSERT 등 수행.



OH MY GOD~~~

sbt lib 연동이 안되어지는 현상.. 원인은 바로...
jar 파일들이 포함이 되이 않아서이다..

그러므로 plugins.sbt 에 아래와 같이 addSbtPlugin을 추가 후 sbt clean compile assembly 를 해주어야 한다.
버전은 spark에 맞게 해줘야 합니다.


  logLevel := Level.Warn
  addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5") 


 








'BigDATA > spark' 카테고리의 다른 글

HDFS부터 DB까지 팁 아닌 팁~  (0) 2019.01.15
sbt lib 연동 안되는 현상  (0) 2019.01.04
spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

couchbase와 같은 키/벨류의 key design??

Language/Scala 2016.04.15 10:25
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T




오픈소스로 키/벨류의 형태의 스토리지들이 많이 있다. Redis나 couchbase등이 그 예이다. 

이러한 스토리지들에 저장을할 때 고려사항으로 key를 어떤식으로 만들어야할 지 고민이 되어진다.

왜냐하면 이러한 key들은 한정되어진 범위를 가지고 있다. couchbase의 경우는 250byte로 제한을 둔다. 

또한 이러한 key들의 무결성을 보장되어야 한다. 그렇지 않으면 데이터의 유실이 발생할 수가 있다.

그래서 생각했던 것이 Hash였다. 그러나 자바진형의 String에서 제공되어지는 Hash는 중복의 염려가 있다고 한다.

그래서 SHA-1 + Base64 Encoding(urlSafe)를 선택 하였다.

아래는 스칼라 코드이며 해당 키는 아래의 소스를 통해 "IbDNShu-PGxJtnsUVuDJLv-aJoU=" 이러한 형태로 키가 만들어진다. goood~


build.sbt에는 아래와 같이 dependencies를 추가!

libraryDependencies ++= Seq(

   "me.lessis" %% "base64" % "0.2.0"

) 


import~~

import base64.Encode 


SHA-1 + Base64 Encoding(urlSafe)

val md = java.security.MessageDigest.getInstance("SHA-1")

val keyword = {

  new String(Encode.urlSafe(md.digest("스트링~~".getBytes)))

} 


- END -


'Language > Scala' 카테고리의 다른 글

couchbase와 같은 키/벨류의 key design??  (0) 2016.04.15
(기초) 스칼라 데이터 구조(컬렉션)  (0) 2016.03.25
(기초) 스칼라 문법  (0) 2016.03.25

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

(기초) 스칼라 데이터 구조(컬렉션)

Language/Scala 2016.03.25 16:19
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T




2016/03/25 - [Language/Scala] - (기초) 스칼라 문법

참고 사이트 : https://twitter.github.io/scala_school/ko/collections.html


scala> val numbers = List(1,2,3,4,5)

numbers: List[Int] = List(1, 2, 3, 4, 5)


scala> Set(1,2,2)

res20: scala.collection.immutable.Set[Int] = Set(1, 2)


scala> val hostPort = ("localhsot", 8080)

hostPort: (String, Int) = (localhsot,8080)


scala> hostPort._1

res21: String = localhsot


scala> hostPort._2

res22: Int = 8080


scala> 1->2

res23: (Int, Int) = (1,2)


scala> ("D", (1,2,3))

res25: (String, (Int, Int, Int)) = (D,(1,2,3))


scala> ("H", (4,5,6))

res26: (String, (Int, Int, Int)) = (H,(4,5,6))


scala> ("D", (1,2,3)) -> ("H", (4,5,6))

res27: ((String, (Int, Int, Int)), (String, (Int, Int, Int))) = ((D,(1,2,3)),(H,(4,5,6)))


scala> Map(1 -> 2)

res28: scala.collection.immutable.Map[Int,Int] = Map(1 -> 2)


scala> Map("foo" -> "bar")

res29: scala.collection.immutable.Map[String,String] = Map(foo -> bar)


scala> Map(1-> Map("foo" -> "bar"))

res31: scala.collection.immutable.Map[Int,scala.collection.immutable.Map[String,String]] = Map(1 -> Map(foo -> bar))


scala> numbers.map((i:Int) => i *2)

res33: List[Int] = List(2, 4, 6, 8, 10)


scala> numbers.foreach((i: Int) => i * 3)


scala> val value = numbers.foreach((i:Int) => i * 7)

value: Unit = ()


scala> def isEven(i:Int): Boolean = i % 2 == 0

isEven: (i: Int)Boolean


scala> numbers.filter(isEven _)

res37: List[Int] = List(2, 4)


scala> val numbers = List(1,2,3,4,5,6,7,8)

numbers: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8)


scala> numbers.partition(_ % 2 == 0)

res39: (List[Int], List[Int]) = (List(2, 4, 6, 8),List(1, 3, 5, 7))


scala> numbers.find((i: Int) => i > 5)

res40: Option[Int] = Some(6)


scala> numbers.drop(5)

res41: List[Int] = List(6, 7, 8)


scala> numbers.dropWhile(_ %2 != 0)

res42: List[Int] = List(2, 3, 4, 5, 6, 7, 8)






'Language > Scala' 카테고리의 다른 글

couchbase와 같은 키/벨류의 key design??  (0) 2016.04.15
(기초) 스칼라 데이터 구조(컬렉션)  (0) 2016.03.25
(기초) 스칼라 문법  (0) 2016.03.25

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

(기초) 스칼라 문법

Language/Scala 2016.03.25 13:45
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T



참고 사이트 : https://twitter.github.io/scala_school/ko/basics.html


terrypark@localhost:~/program/sbt/bin$ sbt console

[info] Set current project to bin (in build file:/Users/terrypark/program/sbt/bin/)

[info] Starting scala interpreter...

[info]

Welcome to Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_31).

Type in expressions to have them evaluated.

Type :help for more information.


scala>



예제 따라잡기!

scala> 1+1

res0: Int = 2


scala> val result = 1+1

result: Int = 2


scala> var name = "terry.park"

name: String = terry.park


scala> def addOne(m: Int): Int = m +1

addOne: (m: Int)Int


scala> val test = addOne(2)

test: Int = 3


scala> def threeReturn() = 1 + 2

threeReturn: ()Int


scala> threeReturn()

res1: Int = 3


scala> threeReturn

res2: Int = 3


이름없는 함수?!

scala> (x: Int) => x + 1

res3: Int => Int = <function1>


scala> (2)        <- 이렇게 하면 호출되어지나? 했지만 역시나 안됌!ㅋ 

res4: Int = 2


scala> res3(2)

res5: Int = 3


// 이름없는 함수를 다른 함수나 식에 넘기거나 val에 지정할 수 있음.

scala> val addOne = (x: Int) => x+4

addOne: Int => Int = <function1>


scala> addOne(1)

res8: Int = 5


// function timesTow(int i){ .... }  이런 내용과 같음!

scala> def timesTow(i: Int): Int = {

     | println("hello world")

     | i*2

     | }

timesTow: (i: Int)Int


scala> timesTow(2)

hello world

res9: Int = 4


// 이름없는 함수도 동일!

scala> { i:Int => println("hello~")

     | i * 2

     | }

res10: Int => Int = <function1>


scala> res10(3)

hello~

res11: Int = 6


scala> def addr(m: Int, n: Int)= m + n

addr: (m: Int, n: Int)Int


scala> val add = addr(_:Int, 4)

add: Int => Int = <function1>


scala> add(2)

res1: Int = 6


_..밑줄?!! 인자의 일부만을 사용하는 것? 

add(2)라고 한다면 인자의 값으로 2가 들어가고..add(2)는 addr(2, 4)가 되어지는것!?

그래서 addr이 m+n이니..6이라는 결과가 똭!


커리 함수(Curried functions)

scala> def multiply(m: Int)(n:Int): Int = m * n

multiply: (m: Int)(n: Int)Int


scala> multiply(2)(4)

res3: Int = 8


scala> val timesTwo = multiply(2) _

timesTwo: Int => Int = <function1>


scala> timesTwo(3)

res4: Int = 6


scala> (addr _).curried

res8: Int => (Int => Int) = <function1>


scala> def capitalizeAll(args: String*) = {

     | args.map { arg => arg.capitalize

     | }

     | }

capitalizeAll: (args: String*)Seq[String]


scala> capitalizeAll("rarity", "applejack")

res12: Seq[String] = ArrayBuffer(Rarity, Applejack)


클래스

scala> class Calculator {

     |    val brand : String = "HP"

     |    def add(m: Int, n: Int): Int = m + n

     | }

defined class Calculator


scala> val calc = new Calculator

calc: Calculator = Calculator@e7a7ed9


scala> calc.add(1,2)

res13: Int = 3


scala> calc.brand

res14: String = HP


scala> class Calculaotr(brand: String) {

     |    /**

     |     * 생성자

     |    */

     |    val color: String = if (brand == "TI") {

     |       "blue"

     |    } else if (brand == "HP") {

     |       "black"

     |    } else {

     |       "white"

     |    }

     |

     |    // 인스턴스 메소드

     |    def add(m: Int, n: Int) : Int = m + n

     | }

defined class Calculaotr


scala> val calc = new Calculaotr("HP")

calc: Calculaotr = Calculaotr@6b448973


scala> calc.color

res15: String = black


scala> class Test {

     |    val finc = { () => acc += 1}

     |    def mic = { acc += 1 }

     |    var acc = 0

     | }

defined class Test


scala> val c = new Test

c: Test = Test@6c10fc3


scala> c.mic                                // c.mic()를 호출 함.


scala> c.finc

res18: () => Unit = <function0>  // 함수 자체를 반환


메소드와 함수의 차이가 뭐지? 

val과 var의 차이는 var은 변수, val은 함수나 객체를 받아주는 것인가?

=>, _ 의 용도 정확히 모르겠다!?

aList.map(a => a.{연산}) == aList.map(_.(연산))

람다로 들어오는 파라미터 자체를 함축 사용!


일단은 그냥 GO!  문법은 여기까쥐~


  - 끝 -

'Language > Scala' 카테고리의 다른 글

couchbase와 같은 키/벨류의 key design??  (0) 2016.04.15
(기초) 스칼라 데이터 구조(컬렉션)  (0) 2016.03.25
(기초) 스칼라 문법  (0) 2016.03.25

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark + scala + sbt 프로젝트!!

BigDATA/spark 2016.03.22 16:58
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

환경 : 

sbt : 0.13.11    - 참고 :  https://twitter.github.io/scala_school/ko/sbt.html

scala : 2.10.6

spark : 1.5.2




음..환경설정이 조금 짜증이 났지만..아래와 같은 프로젝트 구조가 생겼다.



이제 한번 scala의 문법을 공부해보자. 그런 뒤 spark를 사용하여 지지고 볶고 해보자!

일단 여기까쥐~

  - 끝 -



'BigDATA > spark' 카테고리의 다른 글

spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16
ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread  (0) 2016.05.11
spark와 친해지기!  (0) 2016.03.22
spark + scala + sbt 프로젝트!!  (0) 2016.03.22

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

티스토리 툴바