HDFS부터 DB까지 팁 아닌 팁~

2019. 1. 15. 22:24BigDATA/spark

반응형

애월 - 지금이순간 카페


스칼라 알못 스파크 알못이라..이번에 작업한 내용이 있는데 삽질을 많이 했다...ㅋㅋ 
생각나는 것들을 적어보자.
HDFS를 가져와 RDD에 저장! sc.textFile을 통해서 HDFS를 가져온다.
sc.textFile("hdfs path")
그런데 그냥 가져오면 소용이 없다. 그러므로 map을 통해서 필요한 친구들만 가져온다. 또한 filter를 통해 데이터를 줄여준다.

ex) hdfs의 포맷이 Json이라서 Gson을 사용. filter를 통해 데이터를 줄여준다.

    val rddRaw0 = sc.textFile("hdfs path").map(line => new Gson().fromJson(line, classOf[TestObject])).filter(line=> line.collection.code.contains("ACET") && !line.document.id.isEmpty).cache() 

리고 여러개의(multi) RDD들이 있었고 이 친구들을 한번에 분석을 해야했다. 그래서 union을 사용.
ex) val result  = rddRaw0.union(rddRaw1)

그런뒤 reduceByKey를 하여 word count를 해준다.
rddRaw.map(line => ((line.document.id,device),1)).reduceByKey{ case (x, y) => x + y}

이제 분석되어진 RDD 친구를 List화 하여 JDBC를 통해 DB에 넣는다.
여기에서 중요한 RDD to List는 아래의 내용이다.
val list: List[((String, String), Int)] = result.collect().toList 
그리고 map으로 원하는 형태를 만들어준 뒤,  InsertToDb를 해주면 되는데 여기에서 sirialization 문제가 발생한다.

  def insertDBxx(result: RDD[((String, String), Int)]) {
       val list: List[((String, String), Int)] = result.collect().toList
       val targetList = list.map(line => (line._1._1, line._1._2, line._2))
       new InsertToDb(targetList)
  }  

Serialize 해결은 아래와 같이 하였다.

  @SerialVersionUID(15L)
  class InsertToDb(targetList: List[(String, String, Int)]) extends Serializable{ 

Serializable은 해결방법보다 이녀석이 뭔지 정확히 알필요가 있으니 따로 포스팅을 하겠습니다.

자 이제 InsertToDb에서는 connection을 맺고 db작업을 해주면 된다.
여기에서의 팁은 아래에 보시면 getConnection이 아니라 getPoolConnection()인 것을 알수가 있다.

1) get connection
    val connection = OutputJdbcTask.getPoolConnection()


pool 사용을 위해 build.sbt에 아래를 추가

  "c3p0" "c3p0" % "0.9.0.4" 

소스는 아래와 같다.

val env = MainConfig.env
val driver = ConfigFactory.load().getString(s"$env.driver")
val url = ConfigFactory.load().getString(s"$env.url")
val username = ConfigFactory.load().getString(s"$env.username")
val password = ConfigFactory.load().getString(s"$env.password")

private val cpds: ComboPooledDataSource = new ComboPooledDataSource()
cpds.setDriverClass(driver)
cpds.setJdbcUrl(url)
cpds.setUser(username)
cpds.setPassword(password)
cpds.setMinPoolSize(5)
cpds.setAcquireIncrement(5)
cpds.setMaxPoolSize(35)

def getConnection(){
try {
Class.forName(driver)
DriverManager.getConnection(url, username, password)
} catch {
case e : Exception => e.printStackTrace
}
} def getPoolConnection() = { cpds.getConnection
}


마지막으로 DB Upsert하는 부분 소스는 아래와 같다.

try {
val connection = OutputJdbcTask.getPoolConnection()

val insertSql =
"""
|insert into 테이블명 (dsid, reg_dttm, click_count, device, create_date )
|values (?,?,?,?,?)
|ON DUPLICATE KEY UPDATE dsid = ?, reg_dttm =?, click_count= ?, device =?, create_date =?
""".stripMargin

val insertStmt: PreparedStatement = connection.prepareStatement(insertSql)
val regDttm = MainConfig.currentTimeAsString
val createDate = MainConfig.date
var itemCnt = 1
val chunk = 10000

targetList.foreach {
t =>
val dsid = t._1
val device = t._2
val clickCnt = t._3

insertStmt.setString(1, dsid)
insertStmt.setString(2, regDttm)
insertStmt.setInt(3, clickCnt)
insertStmt.setString(4, device)
insertStmt.setString(5, createDate)
insertStmt.setString(6, dsid)
insertStmt.setString(7, regDttm)
insertStmt.setInt(8, clickCnt)
insertStmt.setString(9, device)
insertStmt.setString(10, createDate)
insertStmt.addBatch()
insertStmt.clearParameters()
if (itemCnt % chunk == 0) {
insertStmt.executeBatch()
}
itemCnt = itemCnt + 1

}
insertStmt.executeBatch()
insertStmt.close()
connection.close()
} catch {
case e: Exception => {
e.printStackTrace()
}
}

참고 사이트)

https://stackoverflow.com/questions/40892800/spark-rdd-to-list

https://alvinalexander.com/scala/how-to-use-serialization-in-scala-serializable-trait

반응형