Ace-T's Blog 내 검색 [네이버 커넥트 이웃 합니다~^-^/ 요청 大 환영~~]

SBT + ANSIBLE

CM/ansible 2019.01.23 09:56
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


빌드서버에서 빌드 후 배포(배치 서버로)

build는 shell을 통해 처리 하였다. (음..뭔가 ansible style은 아니다..ㅠㅠ 책을 사서 좀 더 파봐야겠다.)

- name: Execute sbt build
shell: |
cd {{ build_home }}/{{ build_id }}
pwd     ./sbt.sh   

sbt.sh에서는 sbt를 통해 명령어로 처리! sbt가 export가 잘안되어서 그냥 절대경로로 처리!

/daum/program/sbt/bin/sbt clean assembly 

deploy는 간단히 copy해주는 형식으로 처리 하였다. 즉, local jar를 deploy할 서버로 카피!

---
- hosts : spark
serial : 1
tasks :
- name : Make data directory
file : dest="{{ base_data }}" owner={{ user_name }} group={{ group_name }} state=directory

- name : Remove old file
file : path="{{ base_service }}/{{ service_name }}/bin/{{ file_name }}.{{ file_type }}" state=absent

- name : Deploy file
copy : src="{{ data_build_home }}/{{ build_id }}/target/scala-2.10/{{ file_name }}.{{ file_type }}"            dest="{{ base_service }}/{{ service_name }}/bin/{{ file_name }}.{{ file_type }}" 



배치 서버에서 스파크 수행

Spark Submit

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

HDFS부터 DB까지 팁 아닌 팁~

BigDATA/spark 2019.01.15 22:24
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

애월 - 지금이순간 카페


스칼라 알못 스파크 알못이라..이번에 작업한 내용이 있는데 삽질을 많이 했다...ㅋㅋ 
생각나는 것들을 적어보자.
HDFS를 가져와 RDD에 저장! sc.textFile을 통해서 HDFS를 가져온다.
sc.textFile("hdfs path")
그런데 그냥 가져오면 소용이 없다. 그러므로 map을 통해서 필요한 친구들만 가져온다. 또한 filter를 통해 데이터를 줄여준다.

ex) hdfs의 포맷이 Json이라서 Gson을 사용. filter를 통해 데이터를 줄여준다.

    val rddRaw0 = sc.textFile("hdfs path").map(line => new Gson().fromJson(line, classOf[TestObject])).filter(line=> line.collection.code.contains("ACET") && !line.document.id.isEmpty).cache() 

리고 여러개의(multi) RDD들이 있었고 이 친구들을 한번에 분석을 해야했다. 그래서 union을 사용.
ex) val result  = rddRaw0.union(rddRaw1)

그런뒤 reduceByKey를 하여 word count를 해준다.
rddRaw.map(line => ((line.document.id,device),1)).reduceByKey{ case (x, y) => x + y}

이제 분석되어진 RDD 친구를 List화 하여 JDBC를 통해 DB에 넣는다.
여기에서 중요한 RDD to List는 아래의 내용이다.
val list: List[((String, String), Int)] = result.collect().toList 
그리고 map으로 원하는 형태를 만들어준 뒤,  InsertToDb를 해주면 되는데 여기에서 sirialization 문제가 발생한다.

  def insertDBxx(result: RDD[((String, String), Int)]) {
       val list: List[((String, String), Int)] = result.collect().toList
       val targetList = list.map(line => (line._1._1, line._1._2, line._2))
       new InsertToDb(targetList)
  }  

Serialize 해결은 아래와 같이 하였다.

  @SerialVersionUID(15L)
  class InsertToDb(targetList: List[(String, String, Int)]) extends Serializable{ 

Serializable은 해결방법보다 이녀석이 뭔지 정확히 알필요가 있으니 따로 포스팅을 하겠습니다.

자 이제 InsertToDb에서는 connection을 맺고 db작업을 해주면 된다.
여기에서의 팁은 아래에 보시면 getConnection이 아니라 getPoolConnection()인 것을 알수가 있다.

1) get connection
    val connection = OutputJdbcTask.getPoolConnection()


pool 사용을 위해 build.sbt에 아래를 추가

  "c3p0" "c3p0" % "0.9.0.4" 

소스는 아래와 같다.

val env = MainConfig.env
val driver = ConfigFactory.load().getString(s"$env.driver")
val url = ConfigFactory.load().getString(s"$env.url")
val username = ConfigFactory.load().getString(s"$env.username")
val password = ConfigFactory.load().getString(s"$env.password")

private val cpds: ComboPooledDataSource = new ComboPooledDataSource()
cpds.setDriverClass(driver)
cpds.setJdbcUrl(url)
cpds.setUser(username)
cpds.setPassword(password)
cpds.setMinPoolSize(5)
cpds.setAcquireIncrement(5)
cpds.setMaxPoolSize(35)

def getConnection(){
try {
Class.forName(driver)
DriverManager.getConnection(url, username, password)
} catch {
case e : Exception => e.printStackTrace
}
} def getPoolConnection() = { cpds.getConnection
}


마지막으로 DB Upsert하는 부분 소스는 아래와 같다.

try {
val connection = OutputJdbcTask.getPoolConnection()

val insertSql =
"""
|insert into 테이블명 (dsid, reg_dttm, click_count, device, create_date )
|values (?,?,?,?,?)
|ON DUPLICATE KEY UPDATE dsid = ?, reg_dttm =?, click_count= ?, device =?, create_date =?
""".stripMargin

val insertStmt: PreparedStatement = connection.prepareStatement(insertSql)
val regDttm = MainConfig.currentTimeAsString
val createDate = MainConfig.date
var itemCnt = 1
val chunk = 10000

targetList.foreach {
t =>
val dsid = t._1
val device = t._2
val clickCnt = t._3

insertStmt.setString(1, dsid)
insertStmt.setString(2, regDttm)
insertStmt.setInt(3, clickCnt)
insertStmt.setString(4, device)
insertStmt.setString(5, createDate)
insertStmt.setString(6, dsid)
insertStmt.setString(7, regDttm)
insertStmt.setInt(8, clickCnt)
insertStmt.setString(9, device)
insertStmt.setString(10, createDate)
insertStmt.addBatch()
insertStmt.clearParameters()
if (itemCnt % chunk == 0) {
insertStmt.executeBatch()
}
itemCnt = itemCnt + 1

}
insertStmt.executeBatch()
insertStmt.close()
connection.close()
} catch {
case e: Exception => {
e.printStackTrace()
}
}

참고 사이트)

https://stackoverflow.com/questions/40892800/spark-rdd-to-list

https://alvinalexander.com/scala/how-to-use-serialization-in-scala-serializable-trait

'BigDATA > spark' 카테고리의 다른 글

HDFS부터 DB까지 팁 아닌 팁~  (0) 2019.01.15
sbt lib 연동 안되는 현상  (0) 2019.01.04
spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

sbt lib 연동 안되는 현상

BigDATA/spark 2019.01.04 14:47
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T
not found!! dependency에는 있는데 not found....sbt가 꼬인듯 하다..ㅠㅠ 이럴땐? 다시 셋팅 하자..



SBT버전이 중요! 너무 낮거나 높으면 인텔리J에서 제대로 못가져옴.-_-;;
Scala/Spark 버전은 사내 분산 클러스터에 맞게 적용.



build.sbt 내용
name := "neosite-data"

version := "0.1"

scalaVersion := "2.11.11"

val sparkVersion = "1.6.2"

libraryDependencies ++= Seq("org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.0",
  "org.scalatest" %% "scalatest" % "3.0.5" % "test",
  "org.apache.spark"      %%  "spark-core"        % sparkVersion % "provided",
  //   "org.apache.spark" %% "spark-sql" % sparkVersion,
  "mysql"           "mysql-connector-java" % "5.1.+",
  "com.typesafe.play" %% "play-json" % "2.6.10",
  "com.google.code.gson" % "gson" % "1.7.1",
  "com.typesafe"    "config"               % "1.3.0",
  "org.scalikejdbc" %% "scalikejdbc"          % "2.3.5",
  "org.scalikejdbc" %% "scalikejdbc-config"   % "2.3.5",
  "org.scalikejdbc" %% "scalikejdbc-test"     % "2.3.5"     % "test",
  "ch.qos.logback"  "logback-classic"      % "1.1.6"
)

결과


테스트

warn등을 발견할 수 있다.
하라는데로 1.0.4로 수정 후 다시 sbt를 돌려보자.(scala-parser-combinators 1.0.4로 수정.)

수정 후 결과

이제 환경이 구축 되었다.
자기 입맛에 맞게 개발하면 된다.

내가 해야 할 것
  1. hdfs 연동. 즉, 다른쪽에 저장되어있는 친구를 불러온다(raw data)
  2. Spark 분석(map-reduce)
  3. Mysql에 저장.
    1. scala application.conf의 정보 읽어와서 JDBC 접속 후 INSERT 등 수행.



OH MY GOD~~~

sbt lib 연동이 안되어지는 현상.. 원인은 바로...
jar 파일들이 포함이 되이 않아서이다..

그러므로 plugins.sbt 에 아래와 같이 addSbtPlugin을 추가 후 sbt clean compile assembly 를 해주어야 한다.
버전은 spark에 맞게 해줘야 합니다.


  logLevel := Level.Warn
  addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5") 


 








'BigDATA > spark' 카테고리의 다른 글

HDFS부터 DB까지 팁 아닌 팁~  (0) 2019.01.15
sbt lib 연동 안되는 현상  (0) 2019.01.04
spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark rdd programining

BigDATA/spark 2018.12.30 20:08
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


https://spark.apache.org/docs/latest/rdd-programming-guide.html

spark rdd

Overview

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. 

The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

A second abstraction in Spark is shared variables that can be used in parallel operations. 
By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.


  1. flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U])RDD[U]

    Permalink

    Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U])RDD[U]

Permalink

Return a new RDD by applying a function to all elements of this RDD.







'BigDATA > spark' 카테고리의 다른 글

HDFS부터 DB까지 팁 아닌 팁~  (0) 2019.01.15
sbt lib 연동 안되는 현상  (0) 2019.01.04
spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

2탄. SPARK를 설치해보자~(클러스터)

BigDATA/spark 2016.10.19 19:24
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

2016/10/18 - [BigDATA/spark] - 1탄. SPARK를 설치해보자~


1탄. 단일모드 

2탄. 클러스터 모드 

3탄. 기타 유용 셋팅(스파크 관련)


우선 분산 클러스터로 셋팅하기전에! 요것만큼은 알고가자는 의미에서 살짝 정리해본다.

1. Spark 구조

(펌: https://www.google.co.kr/search?q=spark+%EA%B5%AC%EC%A1%B0&newwindow=1&biw=1598&bih=976&source=lnms&tbm=isch&sa=X&sqi=2&ved=0ahUKEwi1y4ut0ObPAhWqiVQKHWFaDgcQ_AUIBigB&dpr=1#imgrc=EdvQ87Vu0XWkMM%3A)


여기에서 Stand alone Scheduler , YARN, Apache Mesos는 클러스터 매니저의 종류이다. 총 3가지~여기에서 스파크가 돌아간다! 
밑에서 더 이야기 하겠음!


2. Spark RDD..꼭 알아야하나?

알아야한다~코딩하려면 뭔지는 알고 코딩하자!

또한 스파크의 핵심기능으로써 분산 되어있는 변경불가능한 객체의 모음이라고 생각하자.

2가지 타입으로 구분

   1) transformation 2) action 으로 구분되어진다.

간단히 말하면 1)은 기존 RDD를 new RDD로 리턴 하는 것(ex. filter(), map() 등)

2)는 기존 RDD를 계산하여 저장하거나 다른 타입으로 리턴하는 것.

Spark는 RDD의 내용을 메모리에 클러스터의 머신들에 나뉘어서 저장 -> action에서 재사용!


또한 실행구조는 분산모드일 경우!

마스터 / 슬레이브의 구조!

여기에서 마스터는 드라이버(driver)라고 불림. 슬레이브는 익스큐터(executor) => 이것을 Spark Application이라고 함!

이녀석들은 서로 독립된 java process로 돌아감.

용어중에 Task는 스파크 작업 계층에서 최소 개체라고 보면 된다.


3. 실행 흐름

  1.  사용자 프로그램을 스파크에 제출! -> spark-submit.sh 
  2.  드라이버 실행 : 익스큐터 실행을 위한 리소스를 클러스터 매니저에 요청.
  3. 클러스터 매니저는 익스큐터를 실행.
  4. 드라이버는 main() 메소드를 호출! 이때 작업 내역을 단위 작업 형태(task)로 나눠 익스큐터에게 보낸다.
  5. 익스큐터는 task를 실행.
  6. 드라이버 : main()이 끝이거나 SparkContext.stop() 호출 시 익스큐터들을 중지! 
    클러스터 매니저에 사용했던 자원을 반환!

작성 중..~





'BigDATA > spark' 카테고리의 다른 글

spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18
spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

1탄. SPARK를 설치해보자~

BigDATA/spark 2016.10.18 17:48
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

해당 포스팅은 총 3부로 구성될 예정 입니다.

1탄. 단일모드 

2탄. 클러스터 모드

3탄. 기타 유용 셋팅(스파크 관련)

 

1탄. SPARK를 설치해보자~


Apache Spark 설치!

버전은 1.6.1 / hadoop 2.6으로 해서 다운받아보겠습니다. (현업에서 사용중인게 요거라서 요걸로!)

Step 1. 아래의 링크를 통해 스파크를 다운 받아보자!

http://spark.apache.org/downloads.html



Step 2. down을 받아서 원하는 서버에서 압축을 풀어줍니다.

압축을 해제하고 내용을 보면 아래와 같습니다.


Step 3. Spark는 대화형 쉘들을 제공 합니다.

파이썬과 스칼라가 있는데요 

즉석 데이터를 분석하기에 좋습니다.

실행은?

1) 파이썬 쉘  

ㄴ bin directory에 가서 ./pyspark 를 실행시키면 아래와 같이 수행되어집니다.

2) 스칼라 쉘

ㄴ bin/spark-shell 를 수행!


간단히 테스트를 해보자.

내용은 스파크 퀵 스타트를 통해 해보자!

http://spark.apache.org/docs/latest/quick-start.html



Step 4. Log 설정을 해보자.

Spark에서의 로그는 아래와 같이 설정할 수가 있다.

아래의 템플릿 중에 log4j.properties.template를 복사해서 log4j.properties를 만들면 된다.

많이 사용되어지는 친구이기 때문에 잘 아실거라 믿는다.

내용은!? 아래처럼 INFO로 설정이 되어있어서 spark-shell를 수행시키면 많은 정보들이 나오게 된다. 
INFO->WARN으로 변경을 한다면 적은 내용의 정보가 보인다.


- 1탄 끝~ -




'BigDATA > spark' 카테고리의 다른 글

spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18
spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark logback 설정?

BigDATA/spark 2016.06.29 11:45
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


spark에서 돌아가는 app에서 logback.xml을 설정하여 사용하고 싶었습니다.


환경은!


spark 1.5.2

scala 2.10.6


그리고 아래와 같이 build.sbt에 설정! 

"ch.qos.logback"  %  "logback-classic"      % "1.1.6" 


그러나 spark conf쪽의 log4j.properties를 조정하면 영향을 받음. 배제 시켜야할 듯 어디에서? 스파크에서!

그러므로 그냥 logback 말고 log4j를 사용하는게 좋을듯!




결론 : spark 할때는 logback 바인딩이 어려우니 그냥 속편하게 log4j 사용하자! 끝~



참고 : http://stackoverflow.com/questions/31790944/best-way-to-send-apache-spark-loggin-to-redis-logstash-on-an-amazon-emr-cluster


spark + log4j 설정 방법

2016/07/04 - [BigDATA/spark] - spark log4j 사용해보기!


'BigDATA > spark' 카테고리의 다른 글

1탄. SPARK를 설치해보자~  (0) 2016.10.18
spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16
ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread  (0) 2016.05.11
spark와 친해지기!  (0) 2016.03.22

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark와 친해지기!

BigDATA/spark 2016.03.22 18:35
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


아마 아래와 같은 형태가 될것 같다.

  • sparkContext 클래스는 스파크클러스터의 연결과 스파크와 연동할 수 있는 엔트리 포인트를 제공.
    인스턴스를 생성하여 다양한 일을할 수 있다.

  • spark RDD : RDD(resilient distributed dataset)를 활용하면 데이터의 병렬처리를 쉽게할 수 있다.

spark 참고 사이트!!







'BigDATA > spark' 카테고리의 다른 글

spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16
ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread  (0) 2016.05.11
spark와 친해지기!  (0) 2016.03.22
spark + scala + sbt 프로젝트!!  (0) 2016.03.22

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark + scala + sbt 프로젝트!!

BigDATA/spark 2016.03.22 16:58
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

환경 : 

sbt : 0.13.11    - 참고 :  https://twitter.github.io/scala_school/ko/sbt.html

scala : 2.10.6

spark : 1.5.2




음..환경설정이 조금 짜증이 났지만..아래와 같은 프로젝트 구조가 생겼다.



이제 한번 scala의 문법을 공부해보자. 그런 뒤 spark를 사용하여 지지고 볶고 해보자!

일단 여기까쥐~

  - 끝 -



'BigDATA > spark' 카테고리의 다른 글

spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16
ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread  (0) 2016.05.11
spark와 친해지기!  (0) 2016.03.22
spark + scala + sbt 프로젝트!!  (0) 2016.03.22

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::