Hadoop 1-4

4.1 맵리듀스의 개념

하둡은 HDFS와 맵리듀스로 구성됩니다. 맵리듀스는 HDFS에 저장된 파일을 분산 배치 분석을 할 수 있게 도와주는 프레임워크입니다. 맵리듀스 프로그래밍 모델은 맵과 리듀스라는 두 가지 단계로 데이터를 처리합니다. 맵은 입력파일을 한 줄씩 읽어서 데이터를 변형하며, 리듀스는 맵의 결과 데이터를 집계합니다.

이러한 맵리듀스 프로그래밍 모델은 일밙거으로 다음과 같은 함수로 표현됩니다.
맵 : (K1,V1) -> LIST(K2,V2)
리듀스 : (K2,LIST(V2)) -> (K3,LIST(V3))

맵은 키와 값으로 구성된 데이터를 입력받아 이를 가공하고 분류한 후, 새로운 키와 값으로 구성된 목록을 출력합니다.
리듀스는 새로운 키로 그룹핑된 값의 목록을 입력 데이터로 전달받습니다. 그리고 값의 목록에 대한 집계 연산을 실행해 새로운 키로 그룹핑된 새로운 값의 목록을 생성합니다.

하둡의 맵리듀스 프레임워크는 이러한 프로그래밍 모델을 구현할 수 있게 맵과 리듀스 인터페이스를 제공합니다. 맵리듀스 애플리케이션이 동작할 때 맵과 리듀스 사이에는 셔플이라는 작업이 발생합니다. 셔플은 나중에 자세히 설명하겠습니다.

4.2 맵리듀스 아키텍쳐

맵리듀스 프레임워크는 개발자가 분석 로직을 구현하는 데 집중하게 해주고, 데이터에 대한 분산과 병렬처리를 프레임워크가 전담합니다. 하지만 아키텍쳐를 모른다면 성능을 고려하지 않은 채로 개발하게 되고, 서비스를 사용하지 못할 수도 있습니다.

1. 시스템 구성



클라이언트 : 사용자가 진행한 맵리듀스 프로그램과 하둡에서 제공하는 맵리듀스 API를 의미합니다. 

잡트래커 : 클라이언트가 하둡으로 실행을 요청하는 맵리듀스 프로그램은 잡이라는 하나의 작업 단위로 관리됩니다. 잡트래커는 하둡 클러스터에 등록된 전테 잡의 스케줄링을 관리하고 모니터링합니다.

태스크트래커 : 사용자가 설정한 맵리듀스 프로그램을 실행하며, 하둡의 데이터노드에서 실행되는 데몬입니다.

2. 데이터 플로우

맵 단계 : 첫 번째 단계는 입력 파일을 읽어 맵의 출력 데이터를 생성하는 맵 처리 단계입니다. 

셔플 단계 : 셔플은 맵 태스크의 출력 데이터가 리듀스 태스크에게 전달되는 일련의 과정을  의미하며, 아래와 같은 그림과 같은 방식으로 진행됩니다.


리듀스 단계 : 마지막으로 리듀스 단계이며, 사용자에게 전달할 출력 파일을 생성합니다. 

3. 맵리듀스 프로그래밍 요소 

1. 매퍼
데이터 타입과 input format과 output format을 알고 있다는 가정하에 매퍼부터 공부하겠습니다. 매퍼는 맵리듀스 프로그래밍 모델에서 맵 메셔드의 기능을 수행합니다. 매퍼는 키와 값으로 구성된 입력 데이터를 전달받아 이 데이터를 가공하고 분류해 새로운 데이터 목록을 생성합니다. 매퍼 클래스를 그대로 사용할 수도 있지만 대부분 매퍼 클래스를 상속받아 매퍼 클래스를 새롭게 구현하게 됩니다. 

2. 파티셔너 
파티셔너는 맵 태스크의 출력 데이커가 어떤 리듀스 탸스크로 전달될지 결정합니다. 맵리듀스는 기본 파티셔너로 HashPartioner를 제공합니다.

3. 리듀서 
리듀서 클래스에서는 맵 태스크의 출력 데이터를 입력 데이터로 전달받아 집계 연산을 수행합니다.

4. 콤바이너 클래스
맵 태스크의 출력 데이터는 네트워크를 통해 리듀스 태스크로 전달되며, 이러한 맵 태스크와 리듀스 태스크 사이의 데이터 전달 과정을 셔플이라고 표현합니다. 데이터의 크기를 줄일수록 전제 잡의 성능이 좋아질 것입니다.

콤바이너 클래스는 셔플할 데이터의 크기를 줄이는 데 도움을 줍니다. 콤바이너 클래스를 이용한 코드 작성 방법과 성능 개선 사레는 나중에 설명하겠습니다.

3. WordCount 만들기

앞서 배운 맵리듀스 프로그래밍을 토대로 wordcount 예제를 작성해보도록 하겠습니다. 모든 코드는 하둡 1.2.1 버젼을 기준으로 합니다.

1. 매퍼 구현 

'이 클래스는 키가 라인 번호이고, 값이 문장인 입력 파라미터를 받아 키가 글자이고 값이 글자 수인 목록을 출력합니다.
   1. 입력 파라미터의 값인 문장을 공백 단위로 구분해 글자 수를 계산합니다.
   2. 구분된 글자는 즉시 출력 파라미터에 추가합니다. 이때 글자는 하나이므로 글자 수를 1       로 설정합니다.
매퍼 클래스를 구현한 코드는 아래와 같습니다.

'''java
package wikibooks.hadoop.chapter04;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends
  Mapper {

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }
}
'''

매퍼 클래스를 상속받아 구현합니다. StringTokenizer로 공백 단위로 구분된 string 값을 순회합니다. string 값을 순회하면서 출력 파라미터에 키가 글자이고, 값이 글자 수인 레코드를 추가합니다. 

2. 리듀서 구현

이제 글자와 글자 수로 구성된 입력 파라미터를 받아 글자 수를 합산해 출력하는 리듀서 클래스를 구현하겠습니다. 
   1. 입력 파라미터의 값에 담겨 잇는 글자수를 합산합니다.
   2. 합산이 종료되면 출력 파라미터에 레코드를 추가합니다.
아래는 리듀서 클래스를 구현한 코드입니다.

'''java
package wikibooks.hadoop.chapter04;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends
  Reducer {
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable values, Context context)
    throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }
}
'''

Reduce 클래스 또한 리듀서 클래스를 상속받아 구현합니다.

3. 드라이버 클래스 구현

이번에는 매퍼와 리듀서 클래스를 실행하는 드라이버 클래스를 구현하겠습니다. 드라이버 크래스는 맵리듀스 잡에 대한 실행 정보를 설정하고, 맵리듀스 잡을 실행합니다. 
   1. 잡 객체를 생성합니다.
   2. 잡 객체에 맵리듀스 잡의 실행 정보를 설정합니다.
   3. 맵리듀스 잡을 실행합니다.

'''java
package wikibooks.hadoop.chapter04;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    if (args.length != 2) {
      System.err.println("Usage: WordCount ");
      System.exit(2);
    }
    Job job = new Job(conf, "WordCount");

    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
  }
}
'''

지금까지 배운 맵리듀스 개발 과정을 정리하면 다음과 같습니다.
1. 맵리듀스 단계별로 사용할 파라미터를 키와 값의 형태로 설계된다.
2. 매퍼 클래스를 상속받아 매퍼를 구현한다.
3. 리듀서 클래스를 상속받아 리듀서를 구현한다.
4. 맵리듀스 잡을 실행할 잡 객체를 생성하고 실행한다.
5. 완성된 맵리듀스 클래스를 하둡에서 실행한다.
6. 맵리듀스 출력 결과물이 원하는 대로 나왔는지 확인한다.

복잡한 데이터가 나타났을 때는 무작정 코드를 작성하기보단 데이터 흐름을 그려보면서 어떻게 데이터를 주고받을지 설계하기 바랍니다.

Reference : 시작하세요! 하둡 프로그래밍 -위키북스-

댓글

이 블로그의 인기 게시물

윈도우 설치에서 파티션 설정 오류(NTFS)

[exploit writing] 1_스택 기반 오버플로우 (1) First

하둡 설치 오류 정리