Ace-T's Blog 내 검색 [네이버 커넥트 이웃 합니다~^-^/ 요청 大 환영~~]

HDFS부터 DB까지 팁 아닌 팁~

BigDATA/spark 2019.01.15 22:24
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

애월 - 지금이순간 카페


스칼라 알못 스파크 알못이라..이번에 작업한 내용이 있는데 삽질을 많이 했다...ㅋㅋ 
생각나는 것들을 적어보자.
HDFS를 가져와 RDD에 저장! sc.textFile을 통해서 HDFS를 가져온다.
sc.textFile("hdfs path")
그런데 그냥 가져오면 소용이 없다. 그러므로 map을 통해서 필요한 친구들만 가져온다. 또한 filter를 통해 데이터를 줄여준다.

ex) hdfs의 포맷이 Json이라서 Gson을 사용. filter를 통해 데이터를 줄여준다.

    val rddRaw0 = sc.textFile("hdfs path").map(line => new Gson().fromJson(line, classOf[TestObject])).filter(line=> line.collection.code.contains("ACET") && !line.document.id.isEmpty).cache() 

리고 여러개의(multi) RDD들이 있었고 이 친구들을 한번에 분석을 해야했다. 그래서 union을 사용.
ex) val result  = rddRaw0.union(rddRaw1)

그런뒤 reduceByKey를 하여 word count를 해준다.
rddRaw.map(line => ((line.document.id,device),1)).reduceByKey{ case (x, y) => x + y}

이제 분석되어진 RDD 친구를 List화 하여 JDBC를 통해 DB에 넣는다.
여기에서 중요한 RDD to List는 아래의 내용이다.
val list: List[((String, String), Int)] = result.collect().toList 
그리고 map으로 원하는 형태를 만들어준 뒤,  InsertToDb를 해주면 되는데 여기에서 sirialization 문제가 발생한다.

  def insertDBxx(result: RDD[((String, String), Int)]) {
       val list: List[((String, String), Int)] = result.collect().toList
       val targetList = list.map(line => (line._1._1, line._1._2, line._2))
       new InsertToDb(targetList)
  }  

Serialize 해결은 아래와 같이 하였다.

  @SerialVersionUID(15L)
  class InsertToDb(targetList: List[(String, String, Int)]) extends Serializable{ 

Serializable은 해결방법보다 이녀석이 뭔지 정확히 알필요가 있으니 따로 포스팅을 하겠습니다.

자 이제 InsertToDb에서는 connection을 맺고 db작업을 해주면 된다.
여기에서의 팁은 아래에 보시면 getConnection이 아니라 getPoolConnection()인 것을 알수가 있다.

1) get connection
    val connection = OutputJdbcTask.getPoolConnection()


pool 사용을 위해 build.sbt에 아래를 추가

  "c3p0" "c3p0" % "0.9.0.4" 

소스는 아래와 같다.

val env = MainConfig.env
val driver = ConfigFactory.load().getString(s"$env.driver")
val url = ConfigFactory.load().getString(s"$env.url")
val username = ConfigFactory.load().getString(s"$env.username")
val password = ConfigFactory.load().getString(s"$env.password")

private val cpds: ComboPooledDataSource = new ComboPooledDataSource()
cpds.setDriverClass(driver)
cpds.setJdbcUrl(url)
cpds.setUser(username)
cpds.setPassword(password)
cpds.setMinPoolSize(5)
cpds.setAcquireIncrement(5)
cpds.setMaxPoolSize(35)

def getConnection(){
try {
Class.forName(driver)
DriverManager.getConnection(url, username, password)
} catch {
case e : Exception => e.printStackTrace
}
} def getPoolConnection() = { cpds.getConnection
}


마지막으로 DB Upsert하는 부분 소스는 아래와 같다.

try {
val connection = OutputJdbcTask.getPoolConnection()

val insertSql =
"""
|insert into 테이블명 (dsid, reg_dttm, click_count, device, create_date )
|values (?,?,?,?,?)
|ON DUPLICATE KEY UPDATE dsid = ?, reg_dttm =?, click_count= ?, device =?, create_date =?
""".stripMargin

val insertStmt: PreparedStatement = connection.prepareStatement(insertSql)
val regDttm = MainConfig.currentTimeAsString
val createDate = MainConfig.date
var itemCnt = 1
val chunk = 10000

targetList.foreach {
t =>
val dsid = t._1
val device = t._2
val clickCnt = t._3

insertStmt.setString(1, dsid)
insertStmt.setString(2, regDttm)
insertStmt.setInt(3, clickCnt)
insertStmt.setString(4, device)
insertStmt.setString(5, createDate)
insertStmt.setString(6, dsid)
insertStmt.setString(7, regDttm)
insertStmt.setInt(8, clickCnt)
insertStmt.setString(9, device)
insertStmt.setString(10, createDate)
insertStmt.addBatch()
insertStmt.clearParameters()
if (itemCnt % chunk == 0) {
insertStmt.executeBatch()
}
itemCnt = itemCnt + 1

}
insertStmt.executeBatch()
insertStmt.close()
connection.close()
} catch {
case e: Exception => {
e.printStackTrace()
}
}

참고 사이트)

https://stackoverflow.com/questions/40892800/spark-rdd-to-list

https://alvinalexander.com/scala/how-to-use-serialization-in-scala-serializable-trait

'BigDATA > spark' 카테고리의 다른 글

HDFS부터 DB까지 팁 아닌 팁~  (0) 2019.01.15
sbt lib 연동 안되는 현상  (0) 2019.01.04
spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

sbt lib 연동 안되는 현상

BigDATA/spark 2019.01.04 14:47
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T
not found!! dependency에는 있는데 not found....sbt가 꼬인듯 하다..ㅠㅠ 이럴땐? 다시 셋팅 하자..



SBT버전이 중요! 너무 낮거나 높으면 인텔리J에서 제대로 못가져옴.-_-;;
Scala/Spark 버전은 사내 분산 클러스터에 맞게 적용.



build.sbt 내용
name := "neosite-data"

version := "0.1"

scalaVersion := "2.11.11"

val sparkVersion = "1.6.2"

libraryDependencies ++= Seq("org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.0",
  "org.scalatest" %% "scalatest" % "3.0.5" % "test",
  "org.apache.spark"      %%  "spark-core"        % sparkVersion % "provided",
  //   "org.apache.spark" %% "spark-sql" % sparkVersion,
  "mysql"           "mysql-connector-java" % "5.1.+",
  "com.typesafe.play" %% "play-json" % "2.6.10",
  "com.google.code.gson" % "gson" % "1.7.1",
  "com.typesafe"    "config"               % "1.3.0",
  "org.scalikejdbc" %% "scalikejdbc"          % "2.3.5",
  "org.scalikejdbc" %% "scalikejdbc-config"   % "2.3.5",
  "org.scalikejdbc" %% "scalikejdbc-test"     % "2.3.5"     % "test",
  "ch.qos.logback"  "logback-classic"      % "1.1.6"
)

결과


테스트

warn등을 발견할 수 있다.
하라는데로 1.0.4로 수정 후 다시 sbt를 돌려보자.(scala-parser-combinators 1.0.4로 수정.)

수정 후 결과

이제 환경이 구축 되었다.
자기 입맛에 맞게 개발하면 된다.

내가 해야 할 것
  1. hdfs 연동. 즉, 다른쪽에 저장되어있는 친구를 불러온다(raw data)
  2. Spark 분석(map-reduce)
  3. Mysql에 저장.
    1. scala application.conf의 정보 읽어와서 JDBC 접속 후 INSERT 등 수행.



OH MY GOD~~~

sbt lib 연동이 안되어지는 현상.. 원인은 바로...
jar 파일들이 포함이 되이 않아서이다..

그러므로 plugins.sbt 에 아래와 같이 addSbtPlugin을 추가 후 sbt clean compile assembly 를 해주어야 한다.
버전은 spark에 맞게 해줘야 합니다.


  logLevel := Level.Warn
  addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5") 


 








'BigDATA > spark' 카테고리의 다른 글

HDFS부터 DB까지 팁 아닌 팁~  (0) 2019.01.15
sbt lib 연동 안되는 현상  (0) 2019.01.04
spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark rdd programining

BigDATA/spark 2018.12.30 20:08
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


https://spark.apache.org/docs/latest/rdd-programming-guide.html

spark rdd

Overview

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. 

The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

A second abstraction in Spark is shared variables that can be used in parallel operations. 
By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.


  1. flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U])RDD[U]

    Permalink

    Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U])RDD[U]

Permalink

Return a new RDD by applying a function to all elements of this RDD.







'BigDATA > spark' 카테고리의 다른 글

HDFS부터 DB까지 팁 아닌 팁~  (0) 2019.01.15
sbt lib 연동 안되는 현상  (0) 2019.01.04
spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

하둡명령어로 삭제하기(fs -rm)

BigDATA/Hadoop 2016.12.06 17:12
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T



하둡 hdfs 삭제 하기! 
#!/bin/bash 
# 주기적으로 hdfs 데이터를 삭제한다. 
# crontab 에 등록하여 실행시킨다. 
# 2일 전 데이터 삭제 
date=`date -d "2 days ago" "+%Y%m%d"` 
/home/acet/program/hadoop/bin/hadoop fs -rm -r -skipTrash "/HADOOP경로/하둡path/*.txt.$date*" 

# 하둡 temp 데이터 삭제 
find /home/acet/data/ -ctime +2 -type f -exec rm -f {} \;



acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark-submit deploy-mode option

BigDATA/spark 2016.11.02 17:45
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


  • Client mode
    • Want to get a job result (dynamic analysis)
    • Easier for developping/debugging
    • Control where your Driver Program is running
    • Always up application: expose your Spark job launcher as REST service or a Web UI
  • Cluster mode
    • Easier for resource allocation (let the master decide): Fire and forget
    • Monitor your Driver Program from Master Web UI like other workers
    • Stop at the end: one job is finished, allocated resources a freed


spark-submit 관련 


'BigDATA > spark' 카테고리의 다른 글

sbt lib 연동 안되는 현상  (0) 2019.01.04
spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18
spark log4j 사용해보기!  (0) 2016.07.04

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

hadoop distcp

BigDATA/Hadoop 2016.10.28 11:17
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


$ ./hadoop distcp

usage: distcp OPTIONS [source_path...] <target_path>

              OPTIONS

 -append                       Reuse existing data in target files and

                               append new data to them if possible

 -async                        Should distcp execution be blocking

 -atomic                       Commit all changes or none

 -bandwidth <arg>              Specify bandwidth per map in MB

 -delete                       Delete from target, files missing in source

 -diff <arg>                   Use snapshot diff report to identify the

                               difference between source and target

 -f <arg>                      List of files that need to be copied

 -filelimit <arg>              (Deprecated!) Limit number of files copied

                               to <= n

 -filters <arg>                The path to a file containing a list of

                               strings for paths to be excluded from the

                               copy.

 -i                            Ignore failures during copy

 -log <arg>                    Folder on DFS where distcp execution logs

                               are saved

 -m <arg>                      Max number of concurrent maps to use for

                               copy

 -mapredSslConf <arg>          Configuration for ssl config file, to use

                               with hftps://

 -numListstatusThreads <arg>   Number of threads to use for building file

                               listing (max 40).

 -overwrite                    Choose to overwrite target files

                               unconditionally, even if they exist.

 -p <arg>                      preserve status (rbugpcaxt)(replication,

                               block-size, user, group, permission,

                               checksum-type, ACL, XATTR, timestamps). If

                               -p is specified with no <arg>, then

                               preserves replication, block size, user,

                               group, permission, checksum type and

                               timestamps. raw.* xattrs are preserved when

                               both the source and destination paths are

                               in the /.reserved/raw hierarchy (HDFS

                               only). raw.* xattrpreservation is

                               independent of the -p flag. Refer to the

                               DistCp documentation for more details.

 -sizelimit <arg>              (Deprecated!) Limit number of files copied

                               to <= n bytes

 -skipcrccheck                 Whether to skip CRC checks between source

                               and target paths.

 -strategy <arg>               Copy strategy to use. Default is dividing

                               work based on file sizes

 -tmp <arg>                    Intermediate work path to be used for

                               atomic commit

 -update                       Update target, copying only missingfiles or

                               directories

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

2탄. SPARK를 설치해보자~(클러스터)

BigDATA/spark 2016.10.19 19:24
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

2016/10/18 - [BigDATA/spark] - 1탄. SPARK를 설치해보자~


1탄. 단일모드 

2탄. 클러스터 모드 

3탄. 기타 유용 셋팅(스파크 관련)


우선 분산 클러스터로 셋팅하기전에! 요것만큼은 알고가자는 의미에서 살짝 정리해본다.

1. Spark 구조

(펌: https://www.google.co.kr/search?q=spark+%EA%B5%AC%EC%A1%B0&newwindow=1&biw=1598&bih=976&source=lnms&tbm=isch&sa=X&sqi=2&ved=0ahUKEwi1y4ut0ObPAhWqiVQKHWFaDgcQ_AUIBigB&dpr=1#imgrc=EdvQ87Vu0XWkMM%3A)


여기에서 Stand alone Scheduler , YARN, Apache Mesos는 클러스터 매니저의 종류이다. 총 3가지~여기에서 스파크가 돌아간다! 
밑에서 더 이야기 하겠음!


2. Spark RDD..꼭 알아야하나?

알아야한다~코딩하려면 뭔지는 알고 코딩하자!

또한 스파크의 핵심기능으로써 분산 되어있는 변경불가능한 객체의 모음이라고 생각하자.

2가지 타입으로 구분

   1) transformation 2) action 으로 구분되어진다.

간단히 말하면 1)은 기존 RDD를 new RDD로 리턴 하는 것(ex. filter(), map() 등)

2)는 기존 RDD를 계산하여 저장하거나 다른 타입으로 리턴하는 것.

Spark는 RDD의 내용을 메모리에 클러스터의 머신들에 나뉘어서 저장 -> action에서 재사용!


또한 실행구조는 분산모드일 경우!

마스터 / 슬레이브의 구조!

여기에서 마스터는 드라이버(driver)라고 불림. 슬레이브는 익스큐터(executor) => 이것을 Spark Application이라고 함!

이녀석들은 서로 독립된 java process로 돌아감.

용어중에 Task는 스파크 작업 계층에서 최소 개체라고 보면 된다.


3. 실행 흐름

  1.  사용자 프로그램을 스파크에 제출! -> spark-submit.sh 
  2.  드라이버 실행 : 익스큐터 실행을 위한 리소스를 클러스터 매니저에 요청.
  3. 클러스터 매니저는 익스큐터를 실행.
  4. 드라이버는 main() 메소드를 호출! 이때 작업 내역을 단위 작업 형태(task)로 나눠 익스큐터에게 보낸다.
  5. 익스큐터는 task를 실행.
  6. 드라이버 : main()이 끝이거나 SparkContext.stop() 호출 시 익스큐터들을 중지! 
    클러스터 매니저에 사용했던 자원을 반환!

작성 중..~





'BigDATA > spark' 카테고리의 다른 글

spark rdd programining  (0) 2018.12.30
spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18
spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

1탄. SPARK를 설치해보자~

BigDATA/spark 2016.10.18 17:48
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

해당 포스팅은 총 3부로 구성될 예정 입니다.

1탄. 단일모드 

2탄. 클러스터 모드

3탄. 기타 유용 셋팅(스파크 관련)

 

1탄. SPARK를 설치해보자~


Apache Spark 설치!

버전은 1.6.1 / hadoop 2.6으로 해서 다운받아보겠습니다. (현업에서 사용중인게 요거라서 요걸로!)

Step 1. 아래의 링크를 통해 스파크를 다운 받아보자!

http://spark.apache.org/downloads.html



Step 2. down을 받아서 원하는 서버에서 압축을 풀어줍니다.

압축을 해제하고 내용을 보면 아래와 같습니다.


Step 3. Spark는 대화형 쉘들을 제공 합니다.

파이썬과 스칼라가 있는데요 

즉석 데이터를 분석하기에 좋습니다.

실행은?

1) 파이썬 쉘  

ㄴ bin directory에 가서 ./pyspark 를 실행시키면 아래와 같이 수행되어집니다.

2) 스칼라 쉘

ㄴ bin/spark-shell 를 수행!


간단히 테스트를 해보자.

내용은 스파크 퀵 스타트를 통해 해보자!

http://spark.apache.org/docs/latest/quick-start.html


따라해보기!


Step 4. Log 설정을 해보자.

Spark에서의 로그는 아래와 같이 설정할 수가 있다.

아래의 템플릿 중에 log4j.properties.template를 복사해서 log4j.properties를 만들면 된다.

많이 사용되어지는 친구이기 때문에 잘 아실거라 믿는다.

내용은!? 아래처럼 INFO로 설정이 되어있어서 spark-shell를 수행시키면 많은 정보들이 나오게 된다. 
INFO->WARN으로 변경을 한다면 적은 내용의 정보가 보인다.


- 1탄 끝~ -




'BigDATA > spark' 카테고리의 다른 글

spark-submit deploy-mode option  (0) 2016.11.02
2탄. SPARK를 설치해보자~(클러스터)  (0) 2016.10.19
1탄. SPARK를 설치해보자~  (0) 2016.10.18
spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark log4j 사용해보기!

BigDATA/spark 2016.07.04 15:35
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


구조는 아래와 같습니다.

  • assembly.jar              // assembly한 소스! jar
  • spark-submit.sh         // spark-submit의 내용이 있는 shellscript
  • log4j-acet.properties  // 우리가 사용하는 log4j.properties


아래의 spark-submit 에서 아래의 옵션을 2가지 추가 해줍니다.(즉, spark-submit.sh안에서!)

예시)

LOG_PATH=file:/full path를 적어줍니다./log4j-acet.properties

$SPARK_HOME/bin/spark-submit --class "Main" \

    --conf spark.executor.extraJavaOptions="-Dlog4j.configuration=$LOG_PATH" \

    --driver-java-options "-Dlog4j.configuration=$LOG_PATH" \



필요없는 설정들

1) main> resources 에 설정 필요 X

2) sbt에 아래의 설정 필요 X

//  "org.slf4j" % "slf4j-api" % "1.7.21",
// "org.slf4j" % "log4j-over-slf4j" % "1.7.12",


 - 끝 -

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark logback 설정?

BigDATA/spark 2016.06.29 11:45
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


spark에서 돌아가는 app에서 logback.xml을 설정하여 사용하고 싶었습니다.


환경은!


spark 1.5.2

scala 2.10.6


그리고 아래와 같이 build.sbt에 설정! 

"ch.qos.logback"  %  "logback-classic"      % "1.1.6" 


그러나 spark conf쪽의 log4j.properties를 조정하면 영향을 받음. 배제 시켜야할 듯 어디에서? 스파크에서!

그러므로 그냥 logback 말고 log4j를 사용하는게 좋을듯!


더보기



결론 : spark 할때는 logback 바인딩이 어려우니 그냥 속편하게 log4j 사용하자! 끝~



참고 : http://stackoverflow.com/questions/31790944/best-way-to-send-apache-spark-loggin-to-redis-logstash-on-an-amazon-emr-cluster


spark + log4j 설정 방법

2016/07/04 - [BigDATA/spark] - spark log4j 사용해보기!


'BigDATA > spark' 카테고리의 다른 글

1탄. SPARK를 설치해보자~  (0) 2016.10.18
spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16
ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread  (0) 2016.05.11
spark와 친해지기!  (0) 2016.03.22

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark-submit 옵션 관련

BigDATA/spark 2016.05.16 17:31
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


Spark Submit의 옵션들에 대해서 알아보자.

$SPARK_HOME/bin/spark-submit --class "Main" \

    --master spark://acet.pe.kr:7077 \

    --executor-memory 4G \

    --total-executor-cores 25 \

    --conf spark.driver.memory=2G \

    --properties-file $CONF_NAME \

    --conf spark.driver.extraJavaOptions='-Xms1024m -Xmx2048m' \

    /home/acet/service/hahaha/good-dev-assembly-1.0.jar




--master  // 스파크 마스터의 URL을 적어준다.

The cluster manager to connect to. See the list of allowed master URL's.


--executor-memory  // executor process 마다 사용되어지는 메모리의 양이다.

Amount of memory to use per executor process (e.g. 2g, 8g).


--total-executor-cores // 전체 executor process 코어 수.


--conf spark.driver.memory // driver process가 사용되어지는 메모리 양.

Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (e.g. 1g, 2g). 
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.


--properties-file  내가 원하는 파일의 properties를 사용할 수 있음.


--conf spark.driver.extraJavaOptions // 

A string of extra JVM options to pass to the driver. For instance, GC settings or other logging. 
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-java-options command line option or in your default properties file.



참고 URL

http://spark.apache.org/docs/latest/configuration.html#application-properties


http://spark.apache.org/docs/latest/submitting-applications.html

'BigDATA > spark' 카테고리의 다른 글

spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16
ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread  (0) 2016.05.11
spark와 친해지기!  (0) 2016.03.22
spark + scala + sbt 프로젝트!!  (0) 2016.03.22

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread

BigDATA/spark 2016.05.11 12:55
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T



아래와 같은 오류가 발생하였다.

원인은 스파크 버전이 달라서였다!


ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[appclient-registration-retry-thread,5,main]

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@4ce379d4 rejected from java.util.concurrent.ThreadPoolExecutor@60e14377[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)

at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)

at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)

at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1.apply(AppClient.scala:96)

at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1.apply(AppClient.scala:95)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at org.apache.spark.deploy.client.AppClient$ClientEndpoint.tryRegisterAllMasters(AppClient.scala:95)

at org.apache.spark.deploy.client.AppClient$ClientEndpoint.org$apache$spark$deploy$client$AppClient$ClientEndpoint$$registerWithMaster(AppClient.scala:121)

at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2$$anonfun$run$1.apply$mcV$sp(AppClient.scala:132)

at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1119)

at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2.run(AppClient.scala:124)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


   - 끝 -

'BigDATA > spark' 카테고리의 다른 글

spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16
ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread  (0) 2016.05.11
spark와 친해지기!  (0) 2016.03.22
spark + scala + sbt 프로젝트!!  (0) 2016.03.22

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

문제해결) 로딩만 하다가 타임아웃 나는 현상..

BigDATA/couchbase 2016.04.19 16:41
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

couchbase + spring boot api 를 사용하였다.

그런데..200만건 정도가 들어있는 상태에서 api로 조회를 하니..loading..만 하다가 timeout이 되면서 죽는 현상이 발생!!


1. spark(scala)로 데이터를 적재하는데 있어서 warning이 발생. 혹..이것 때문에? 라는 생각이 들었다.

    왜냐하면 spark로 대량의 데이터를 적재한 뒤에 발생을 하였고 Flush를 한 뒤 1건을 넣고 조회를 하려고 해도 

    같은 현상이 발생 하였기 때문이다. 하지만 warning 때문은 아니고 추측하기에 Flush를 하여도 완전한 delete를 

    하는 것은 아니고 marking했다가 이녀석은 del 상태군..pass~! 하면서 전체를 풀 스캔을 하는것 같다. 

    couchbase를 재부팅하고 다시 1건을 조회를 하면 잘 조회가 되었다.


2. 튜닝을 살펴보기로 했다.


튜닝이 필요! 아래를 참조!

http://www.couchbase.com/nosql-resources/presentations/tuning-query-performance-with-n1ql-in-couchbase-server-4.0.html



                                                                                                    [ 그림 01]


위의 그림01 처럼 primary index는 bucket에서 Full-scan을 타게 된다.

그래서 Index를 추가해보기로 하였다.

cd /opt/couchbase/bin/cbq

cbq> CREATE INDEX i_query on `lineup-test` (query) USING GSI;

{

    "requestID": "f604af9f-f4d5-4920-bbff-3bd4b2104a54",

    "signature": null,

    "results": [

    ],

    "status": "success",

    "metrics": {

        "elapsedTime": "2.779732692s",

        "executionTime": "2.77969173s",

        "resultCount": 0,

        "resultSize": 0

    }

}


결과

INDEX..가 걸려있지 않아서 풀스캔을 탔기 때문에 로딩만하다가 타임아웃이 난 것이였다.


다시 데이터를 넣고 테스트를 해보자! ㄱㄱ~

   - 끝 -


acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark와 친해지기!

BigDATA/spark 2016.03.22 18:35
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


아마 아래와 같은 형태가 될것 같다.

  • sparkContext 클래스는 스파크클러스터의 연결과 스파크와 연동할 수 있는 엔트리 포인트를 제공.
    인스턴스를 생성하여 다양한 일을할 수 있다.

  • spark RDD : RDD(resilient distributed dataset)를 활용하면 데이터의 병렬처리를 쉽게할 수 있다.

spark 참고 사이트!!







'BigDATA > spark' 카테고리의 다른 글

spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16
ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread  (0) 2016.05.11
spark와 친해지기!  (0) 2016.03.22
spark + scala + sbt 프로젝트!!  (0) 2016.03.22

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

spark + scala + sbt 프로젝트!!

BigDATA/spark 2016.03.22 16:58
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

환경 : 

sbt : 0.13.11    - 참고 :  https://twitter.github.io/scala_school/ko/sbt.html

scala : 2.10.6

spark : 1.5.2




음..환경설정이 조금 짜증이 났지만..아래와 같은 프로젝트 구조가 생겼다.



이제 한번 scala의 문법을 공부해보자. 그런 뒤 spark를 사용하여 지지고 볶고 해보자!

일단 여기까쥐~

  - 끝 -



'BigDATA > spark' 카테고리의 다른 글

spark log4j 사용해보기!  (0) 2016.07.04
spark logback 설정?  (0) 2016.06.29
spark-submit 옵션 관련  (0) 2016.05.16
ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread  (0) 2016.05.11
spark와 친해지기!  (0) 2016.03.22
spark + scala + sbt 프로젝트!!  (0) 2016.03.22

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

About MongoDB

BigDATA/mongoDB 2016.03.15 18:45
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

이런..회사 github에서 적은 내용이 블로그로 포스팅하려는데..힘들군요 ㅠㅠ

마크다운도 안먹히는군요 ㄲㄲ


## Mongo DB site 

https://www.mongodb.com/


## Mongo DB란?

MongoDB stores data는 JSON과 같은 동적스키마형태의 문서들인데 다양한 구조를 가질수 있다.

MongoDB에서는 이러한 구조를 BSON이라고 부른다.

이해를 더 돕기 위해 아래의 MySql과 Mongo DB의 비교한 그림을 보자.





또한 각각의 특징을 살펴보자. 몽고DB site에 있는 내용이라서 그런지..어마무시하다 ㅋㅋ




음..MongoDB를 살짝 맛보자!




위의 내용으로 보면 api는 select를 logstash에서는 insert와 update가 일어나야하는데 가능여부는 찾아보아야 한다.

(현재 logstash에서 insert 즉, append되어지는 것을 확인, 또한 file change 이벤트를 받은 후에 동작 여부는 체크 해봐야한다.)

이러한 것을 해결하기 위해서 BSON형태의 구조와 이중화 정책등을 고려해야한다. #19 


## Mongo DB 설치(local)

더욱 더 이해도를 높이기 위해서 mongo db를 로컬에 설치를 해보겠습니다.

우선 아래의 그림을 통해 플랫폼과의 호환을 체킹한 뒤에 다운로드 받습니다.




``version : mongodb-osx-x86_64-enterprise-3.2.3.tgz``

``os : mac os 64bit``





`1) 원하는 곳에 압축을 해제 합니다.`

terrypark@localhost:~/kakao/program$ tar -xvf mongodb-osx-x86_64-enterprise-3.2.3.tgz


`2) 편의를 위해 심볼릭링크를 걸어줍니다.`

terrypark@localhost:~/kakao/program$ ln -s /Users/terrypark/kakao/program/mongodb-osx-x86_64-enterprise-3.2.3 /Users/terrypark/kakao/program/mongodb-3.2.3


`3) mongoDB에 필요한 디렉토리를 생성하여 줍니다.`

/Users/terrypark/kakao/program/mongodb-3.2.3

drwxr-xr-x   2 terrypark  staff     68  2 21 16:15 conf

drwxr-xr-x   2 terrypark  staff     68  2 21 16:15 data

drwxr-xr-x   2 terrypark  staff     68  2 21 16:15 log


`4) /conf 디렉토리 밑에 mongodb.conf 파일을 만들고 아래와 같이 설정 합니다.`

dbpath=/Users/terrypark/kakao/program/mongodb-3.2.3/data

logpath=/Users/terrypark/kakao/program/mongodb-3.2.3/logs/mongodb.log

logappend=true

verbose=true


#bind_ip=127.0.0.1

port=27017

fork=true


rest=true

#auth=true

#noauth=true


`5) mongoDB 실행`

terrypark@localhost:~/kakao/program/mongodb-3.2.3/bin$ ./mongod --config ../conf/mongodb.conf

2016-02-21T16:18:26.501+0900 I CONTROL  [main] ** WARNING: --rest is specified without --httpinterface,

2016-02-21T16:18:26.502+0900 I CONTROL  [main] **          enabling http interface

about to fork child process, waiting until server is ready for connections.

forked process: 83090

child process started successfully, parent exiting


`6) mongoDB Client 접속`

terrypark@localhost:~/kakao/program/mongodb-3.2.3/bin$ ./mongo localhost:27017

MongoDB shell version: 3.2.3

connecting to: localhost:27017/test

Welcome to the MongoDB shell.

For interactive help, type "help".

For more comprehensive documentation, see

http://docs.mongodb.org/

Questions? Try the support group

http://groups.google.com/group/mongodb-user

Server has startup warnings:

2016-02-21T16:18:26.501+0900 I CONTROL  [main] ** WARNING: --rest is specified without --httpinterface,

2016-02-21T16:18:26.502+0900 I CONTROL  [main] **          enabling http interface

MongoDB Enterprise >


`MongDB Shutdown 시키기.`

MongoDB Enterprise > use admin

switched to db admin


MongoDB Enterprise > db.shutdownServer();

server should be down...

2016-02-21T16:22:26.487+0900 I NETWORK  [thread1] trying reconnect to localhost:27017 (127.0.0.1) failed

2016-02-21T16:22:26.487+0900 W NETWORK  [thread1] Failed to connect to 127.0.0.1:27017, reason: errno:61 Connection refused

2016-02-21T16:22:26.488+0900 I NETWORK  [thread1] reconnect localhost:27017 (127.0.0.1) failed failed


MongoDB Enterprise > exit

bye


`7) 브라우저로 콘솔 접속`

실행 옵션에 rest=true 로 준 경우에는 브라우저로 접속 가능.

http://localhost:27017/ 로 접속하게 되면 아래와 같은 메시지가 나옵니다.

It looks like you are trying to access MongoDB over HTTP on the native driver port.

1000을 더한 http://localhost:28017/로 접속을 하게 도면 아래와 같이 브라우저에 잘 접속하게 됩니다.





`8) MongDB Client Shell로 간단한 테스트를 해보자.`

MongDB의 느낌은 아래의 그림을 참고하도록 하자.





MongoDB Enterprise > use terry

switched to db terry


`insert'

MongoDB Enterprise > db.users.insert({name:"김세정", age:"21", company:"젤리피쉬", groups: ["프로듀스101", "가수"]})

WriteResult({ "nInserted" : 1 })


`select'

MongoDB Enterprise > db.users.find({name:"김세정"})

{ "_id" : ObjectId("56c96d9cfa1322ff062eaadb"), "name" : "김세정", "age" : "21", "company" : "젤리피쉬", "groups" : [ "프로듀스101", "가수" ] }


`update'

김세정이면

MongoDB Enterprise > db.users.update({name:"김세정"},{$set:{age:22}})

WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })


김세정이면서 젤리피쉬 소속사인 경우

db.users.update({name:"김세정",company:"젤리피쉬"},{$set:{age:21}})

WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })


`delete'

MongoDB Enterprise > db.users.remove({name:"김세정"})

WriteResult({ "nRemoved" : 1 })


## 참고 사이트

https://namu.wiki/w/MongoDB

https://www.mongodb.com/compare/mongodb-mysql

https://docs.mongodb.org/manual/reference/configuration-options/

https://docs.mongodb.org/manual/core/crud-introduction/

'BigDATA > mongoDB' 카테고리의 다른 글

About MongoDB  (0) 2016.03.15

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

org.springframework.dao.DataRetrievalFailureException: Query error: [{"msg":"No primary index on keyspace lineup-test. Use CREATE PRIMARY INDEX to create one.","code":4000}]

BigDATA/couchbase 2016.03.15 16:15
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T

Error Message

org.springframework.dao.DataRetrievalFailureException: Query error: [{"msg":"No primary index on keyspace lineup-test. Use CREATE PRIMARY INDEX to create one.","code":4000}]


Query

Executing Query: SELECT v1 FROM `lineup-test` WHERE v2 = "12"


Solution

/opt/couchbase/bin/cbq

cbq>  CREATE PRIMARY INDEX ON `lineup-test` USING GSI;

{

    "requestID": "f75f9210-cf8d-4312-b469-d7dd28e866a1",

    "signature": null,

    "results": [

    ],

    "status": "success",

    "metrics": {

        "elapsedTime": "2.387898008s",

        "executionTime": "2.387748206s",

        "resultCount": 0,

        "resultSize": 0

    }

}


Data

{

  "airportname": "Seattle Tacoma Intl",

  "city": "Seattle",

  "country": "United States",

  "faa": "SEA",

  "geo": {

    "alt": 433,

    "lat": 47.449,

    "lon": -122.309306

  },

  "icao": "KSEA",

  "id": 3577,

  "type": "airport",

  "tz": "America/Los_Angeles"

}



Result

http://localhost:8080/lineup/v1/findAll?query=SEA

query : SELECT airportname FROM `lineup-test` WHERE faa = "SEA"



 


acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

(info) vagrant commands

BigDATA/Hadoop 2015.02.04 10:58
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T



내역

1. 로컬에서 하둡을 간단히 돌려서 프로그래밍을할 수 있는 환경을 만들고 싶음.

2. vagrant로 단일 하둡 셋팅 함.

3. 우분투 관련 오류 발생 함.


오류

recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'. 

http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/



vagrant 명령어

Usage: vagrant [options] <command> [<args>]


    -v, --version                    Print the version and exit.

    -h, --help                       Print this help.


Common commands:

     box             manages boxes: installation, removal, etc.

     connect         connect to a remotely shared Vagrant environment

     destroy         stops and deletes all traces of the vagrant machine

     global-status   outputs status Vagrant environments for this user

     halt            stops the vagrant machine

     help            shows the help for a subcommand

     init            initializes a new Vagrant environment by creating a Vagrantfile

     login           log in to HashiCorp's Atlas

     package         packages a running vagrant environment into a box

     plugin          manages plugins: install, uninstall, update, etc.

     provision       provisions the vagrant machine

     push            deploys code in this environment to a configured destination

     rdp             connects to machine via RDP

     reload          restarts vagrant machine, loads new Vagrantfile configuration

     resume          resume a suspended vagrant machine

     share           share your Vagrant environment with anyone in the world

     ssh             connects to machine via SSH

     ssh-config      outputs OpenSSH valid configuration to connect to the machine

     status          outputs status of the vagrant machine

     suspend         suspends the machine

     up              starts and provisions the vagrant environment

     version         prints current and latest Vagrant version

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

[꿀팁] 하둡 inputPath로 다중 File 작업하기

BigDATA/Hadoop 2015.02.04 10:50
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T


[그림 - 1 : 안구정화용~~]


// 참고 소스~~


// 아래처럼 path 들을 list에 넣어준다.

List<String> inputPath = new ArrayList<String>();
inputPath.add(otherArgs[0]+"/01_acet.clicklog_mo");
inputPath.add(otherArgs[0]+"/02_acet.clicklog_mo");


// 입출력 데이터 경로 설정
//FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

for(String input : inputPath){ // list에 담겨져있는 path들을 하나씩 넣어준다.
FileInputFormat.addInputPath(job, new Path(input));
}


나의 의문은 현재 하나의 파일을 input하여 output을 하는 단순한 맵-리듀스 프로그래밍을 했다.

그런데 input되는 파일이 하나가 아니라면? 경로를 어떻게 해줘야 많은 파일들을 처리할 수 있을까?

답은 위처럼 for을 돌려서 Path에 경로들을 넣어주면 돌아간다~


     - END -

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

[Hadoop] 하둡 맵-리듀스 따라잡기

BigDATA/Hadoop 2015.02.02 13:32
[Good Comment!!, Good Discussion!!, Good Contens!!]
[ If you think that is useful, please click the finger on the bottom~^-^good~ ]
by ace-T



맵-리듀스! 이녀석을 알아가기 위해 정리를 하나씩 해보려고 한다.

가장 쉬우면서도 어려운 맵-리듀스 소스 짜보기!


1. Maven 설정

 

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>


2. runner : runner는 우선 map, reduce, data format등 그리고 run을 통해 시작 시킬수 있는 친구라고 생각하고 넘어가자.
아래의 소스는 가장 단순한 형태 이다. 보통 책에 나오거나 튜토리얼에 나오는 소스 형태! 중요한 것은 runner보다 map과reduce!

public class MoClickDistributionChartRunner extends Configured implements Tool {

public MoClickDistributionChartRunner() {
}

@Override
public int run(String[] args) throws Exception {

String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

// 입출력 데이터 경로 확인
if (otherArgs.length != 2) {
System.err.println("Usage: MoClickDistributionChartRunner <in> <out>");
System.exit(2);
}

// Job 이름 설정
Job job = new Job(getConf(), "MoClickDistributionChartJob");

// 입출력 데이터 경로 설정
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

// Job 클래스 설정
job.setJarByClass(MoClickDistributionChartJob.class);

// Mapper 클래스 설정
job.setMapperClass(MoClickDistributionChartMapper.class);
// Reducer 클래스 설정
job.setReducerClass(MoClickDistributionChartReducer.class);


// 입출력 데이터 포맷 설정
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

// 출력키 및 출력값 유형 설정
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.waitForCompletion(true);

return 0;
}

public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
// Tool 인터페이스 실행
int res = ToolRunner.run(new Configuration(), new MoClickDistributionChartRunner(), args);
System.out.println("## RESULT:" + res);
}
}

3. Mapper

public class MoClickDistributionChartMapper extends
Mapper<LongWritable, Text, Text, Text> {
// 입출력의 데이터포맷을 알수가 있다.

// map 메소드를 통해 수행 되어지며 key, value의 입력값들이 들어오게 된다.
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException { ..블라블라~.. }}

- key는 라인을 나타내는 숫자형태이며, value는 Text형태의 값들이 들어온다.

- value값을 가지고 파싱하여 vo에 넣을수도 있으며 원하는 값을 가지고 오기 위해 parsing작업을 한다.

// 원하는 값들을 구하여 아래에 context.write를 해주면 output value로 들어가게 된다. Text, Text형태가 될 것이다. context.write(new Text(moSize), new Text(xyNum + "\t1"));


즉, 코딩에 들어가기전에 map과 reduce의 입력과 출력형태의 key, value를 고려하여야 한다.

4. Reducer : 

public class MoClickDistributionChartReducer extends
Reducer<Text, Text, Text, Text> {

protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {


작성 중~~

acet 박태하가 추천하는 readtrend 추천글!

설정

트랙백

댓글

:::: facebook을 이용하시는 분들은 로그인 후 아래에 코멘트를 남겨주세요 ::::

티스토리 툴바