Ace-T's Blog 내 검색 [네이버 커넥트 이웃 합니다~^-^/ 요청 大 환영~~]

HDFS부터 DB까지 팁 아닌 팁~

BigDATA/spark 2019.01.15 22:24
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

애월 - 지금이순간 카페


스칼라 알못 스파크 알못이라..이번에 작업한 내용이 있는데 삽질을 많이 했다...ㅋㅋ 
생각나는 것들을 적어보자.
HDFS를 가져와 RDD에 저장! sc.textFile을 통해서 HDFS를 가져온다.
sc.textFile("hdfs path")
그런데 그냥 가져오면 소용이 없다. 그러므로 map을 통해서 필요한 친구들만 가져온다. 또한 filter를 통해 데이터를 줄여준다.

ex) hdfs의 포맷이 Json이라서 Gson을 사용. filter를 통해 데이터를 줄여준다.

    val rddRaw0 = sc.textFile("hdfs path").map(line => new Gson().fromJson(line, classOf[TestObject])).filter(line=> line.collection.code.contains("ACET") && !line.document.id.isEmpty).cache() 

리고 여러개의(multi) RDD들이 있었고 이 친구들을 한번에 분석을 해야했다. 그래서 union을 사용.
ex) val result  = rddRaw0.union(rddRaw1)

그런뒤 reduceByKey를 하여 word count를 해준다.
rddRaw.map(line => ((line.document.id,device),1)).reduceByKey{ case (x, y) => x + y}

이제 분석되어진 RDD 친구를 List화 하여 JDBC를 통해 DB에 넣는다.
여기에서 중요한 RDD to List는 아래의 내용이다.
val list: List[((String, String), Int)] = result.collect().toList 
그리고 map으로 원하는 형태를 만들어준 뒤,  InsertToDb를 해주면 되는데 여기에서 sirialization 문제가 발생한다.

  def insertDBxx(result: RDD[((String, String), Int)]) {
       val list: List[((String, String), Int)] = result.collect().toList
       val targetList = list.map(line => (line._1._1, line._1._2, line._2))
       new InsertToDb(targetList)
  }  

Serialize 해결은 아래와 같이 하였다.

  @SerialVersionUID(15L)
  class InsertToDb(targetList: List[(String, String, Int)]) extends Serializable{ 

Serializable은 해결방법보다 이녀석이 뭔지 정확히 알필요가 있으니 따로 포스팅을 하겠습니다.

자 이제 InsertToDb에서는 connection을 맺고 db작업을 해주면 된다.
여기에서의 팁은 아래에 보시면 getConnection이 아니라 getPoolConnection()인 것을 알수가 있다.

1) get connection
    val connection = OutputJdbcTask.getPoolConnection()


pool 사용을 위해 build.sbt에 아래를 추가

  "c3p0" "c3p0" % "0.9.0.4" 

소스는 아래와 같다.

val env = MainConfig.env
val driver = ConfigFactory.load().getString(s"$env.driver")
val url = ConfigFactory.load().getString(s"$env.url")
val username = ConfigFactory.load().getString(s"$env.username")
val password = ConfigFactory.load().getString(s"$env.password")

private val cpds: ComboPooledDataSource = new ComboPooledDataSource()
cpds.setDriverClass(driver)
cpds.setJdbcUrl(url)
cpds.setUser(username)
cpds.setPassword(password)
cpds.setMinPoolSize(5)
cpds.setAcquireIncrement(5)
cpds.setMaxPoolSize(35)

def getConnection(){
try {
Class.forName(driver)
DriverManager.getConnection(url, username, password)
} catch {
case e : Exception => e.printStackTrace
}
} def getPoolConnection() = { cpds.getConnection
}


마지막으로 DB Upsert하는 부분 소스는 아래와 같다.

try {
val connection = OutputJdbcTask.getPoolConnection()

val insertSql =
"""
|insert into 테이블명 (dsid, reg_dttm, click_count, device, create_date )
|values (?,?,?,?,?)
|ON DUPLICATE KEY UPDATE dsid = ?, reg_dttm =?, click_count= ?, device =?, create_date =?
""".stripMargin

val insertStmt: PreparedStatement = connection.prepareStatement(insertSql)
val regDttm = MainConfig.currentTimeAsString
val createDate = MainConfig.date
var itemCnt = 1
val chunk = 10000

targetList.foreach {
t =>
val dsid = t._1
val device = t._2
val clickCnt = t._3

insertStmt.setString(1, dsid)
insertStmt.setString(2, regDttm)
insertStmt.setInt(3, clickCnt)
insertStmt.setString(4, device)
insertStmt.setString(5, createDate)
insertStmt.setString(6, dsid)
insertStmt.setString(7, regDttm)
insertStmt.setInt(8, clickCnt)
insertStmt.setString(9, device)
insertStmt.setString(10, createDate)
insertStmt.addBatch()
insertStmt.clearParameters()
if (itemCnt % chunk == 0) {
insertStmt.executeBatch()
}
itemCnt = itemCnt + 1

}
insertStmt.executeBatch()
insertStmt.close()
connection.close()
} catch {
case e: Exception => {
e.printStackTrace()
}
}

참고 사이트)

https://stackoverflow.com/questions/40892800/spark-rdd-to-list

https://alvinalexander.com/scala/how-to-use-serialization-in-scala-serializable-trait

'BigDATA > spark' 카테고리의 다른 글

HDFS부터 DB까지 팁 아닌 팁~  (0) 2019.01.15
sbt lib 연동 안되는 현상  (0) 2019.01.04
spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

java.sql.SQLException: No value specified for parameter 3

DataBase/MySql 2019.01.09 14:13
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


해당 에러는 scala + mysql에서 insert + update를 하고자할 때 났다.

쿼리 스트링은 아래와 같다.

val insertSql =
"""
|insert into 테이블 (name, age )
|values (?,?)
|ON DUPLICATE KEY UPDATE name = ?, age =?
""".stripMargin

java.sql.SQLException: No value specified for parameter 3

에러는 아래에서 코드가 추가되지 않아서였다.

targetList.foreach {
t =>
val name = t._1
val age = t._2

if (age > 20){
insertStmt.setString(1, name)
insertStmt.setInt(2, age)
insertStmt.addBatch()
insertStmt.clearParameters()
}
}
insertStmt.executeBatch()
insertStmt.close()
connection.close()

즉, 2번째 ? 까지는 채워졌는데 3,4 번째 ?는 채워지지 않아서 이다.

그러므로 insertStmt.setString(3, name)와 insertStmt.setInt(4, age)가 필요하다.


 - END

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

[mysql] select 한것 update하기

DataBase/MySql 2018.11.23 13:56
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

Mysql에서~

SELECT 를 하여 원하는 테이블의 필드에 데이터를 채우고자 했다.

같은 테이블일 경우

UPDATE 테이블명

SET 데이터 들어갈 필드 = 데이터있는필드

WHERE 조건문 블라블라


다른 테이블일 경우

UDATE  테이블1, 테이블2

SET 테이블1.필드 = 테이블2.필드

WHERE 조건문 블라블라 


작업전엔~SELECT로 확인 후 작업이 센스~


acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

2. mysql process 상태 리스트 보기

DataBase/MySql 2015.05.26 10:41
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


리스트 보기 

SHOW PROCESSLIST;




해당 프로세스 죽이기

KILL 프로세스 번호(ID)



acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

MySQL 기본 문법(데이터베이스 보기, 사용하기)

DataBase 2012.10.28 16:00
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T
1) 데이터베이스 보기
show databases;


2) 데이터베이스 사용
    use database이름;



3) 암호 설정 하기
처음설치 시에는 root의 암호가 설정이 되지 않은 상태이다.
bin으로 이동 : d:\Spring\mysql-5.5.28-win32\mysql-5.5.28-win32\bin 로 이동하여
mysqladmin -u root -p password new-password 명령어를 쳐준다.
처음에는 암호가 없기 때문에 그냥 아무것도 없이 엔터를 쳐준다.


그런 뒤 접속을 해보자!
mysql -uroot
그러나...오류가 났다!!!
ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: NO)

해결방법은...Windows방법은..아래와 같이 매개변수를 넣고 시작을 해준다.

 그리고 나서 mysql -uroot로 접속을 하면 위의 에러없이 접속이 가능하다!^-^good~~~

접속 후에 DB 선택, update구문을 사용하여 root에 비밀번호를 설정하여주면 된다.
1) use mysql;
2) update user set password=password('1234') where user='root';
3) flush privileges;

비밀번호를 설정 한 뒤에 매개변수 없이 다시 재시작을 한 뒤!!
1) mysql -uroot -p
2) 비밀번호 입력
접속 되는지 테스트 해보면 된다^-^

참조 사이트 :
1) http://www.viper.pe.kr/docs/mysql_prog/mysql_syntax.html
2) http://www.gpgstudy.com/gpgiki/MySQL%EC%97%90%EC%84%9C%20%EC%82%AC%EC%9A%A9%EC%9E%90%EC%99%80%20%EB%8D%B0%EC%9D%B4%ED%84%B0%EB%B2%A0%EC%9D%B4%EC%8A%A4%20%EB%A7%8C%EB%93%A4%EA%B8%B0

오류가 나서 당황했다..ㅋㅋㅋ 찾아봐도 windows쪽은 없었다..ㅠㅜ..도움이 되었으면 좋겠네요!^-^good~

'DataBase' 카테고리의 다른 글

MySQL 기본 문법(데이터베이스 보기, 사용하기)  (0) 2012.10.28

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

티스토리 툴바