'2016/07/02'에 해당되는 글 1건

  1. 2016.07.02 [MapReduce] 미항공기지연 분석
반응형

 미국 규격 협회에서 2009년에 미국 항공편 운항 통계 데이터를 공개하였다. 다음의 사이트에서 관련 데이터를 볼 수 있다. http://stat-computing.org/dataexpo/2009/ 여기에는 1987년도부터 2008년도 까지 미국의 모든 상업 항공편에 대한 도착과 출발에 대한 정보를 다운로드 할 수 있다. 다음의 스크립트로 다운로드->압축해제->hdfs 저장하도록 한다. (hdfs 상에서는 /air_data 폴더가 생성되어 있어야 한다.)

#!/bin/bash

for i in `seq 1987 2008`;

do

        wget 'http://stat-computing.org/dataexpo/2009/'$i'.csv.bz2'

        bunzip2 $i'.csv.bz2'

        hdfs dfs -copyFromLocal $i'.csv' /air_data

done 


 이 csv 의 속성은 다음과 같다.

 항공기의 출발 지연을 분석하기 위해서는 15번 항목에 ArrDelay를 이용하면 된다. Mapper에서는 csv를 라인 하나하나를 읽어서 1번의 운항연도와 2번의 운항일의 합을 Key로, 출발지연 건수를 Value로 출력한다. (즉 ArrDelay가 0보다 클 경우 1 출력) Reducer에서는 운항연도+운항일을 Key, 출발지연 건수를 Value로 하는 값을 받아서 운항연도+운항일에 대한 출발지연 건수를 SUM 한다. 그리고 운항연도+운항일을 Key로 하고 지연 건수를 value로 출력한다.

 Mapper 입력(라인오프셋, 운항데이터) -> 출력(운항연도+운항일, 출발지연건수)

 Reducer 입력(운항연도+운항일,출발지연건수) -> 출력(운항연도+운항일,출발지연건수 합) 이 되는 것이다.


 다음은 이걸 구현한 Mapper이다.

public static class DepartureDelayCntMap extends Mapper<LongWritable,Text,Text,IntWritable>{

private IntWritable outValue=new IntWritable(1);

private Text outKey=new Text();

public void map(LongWritable key,Text value,Context Cxt) throws IOException, InterruptedException{

if(key.get()>0){

String[] col=value.toString().split(",");

if(col!=null && col.length>0){

try{

outKey.set(col[0]+","+col[1]);

if(!col[15].equals("NA")){

int delayTime=Integer.parseInt(col[15]);

if(delayTime>0){

Cxt.write(outKey,outValue);

}

}

} catch (Exception e){

e.printStackTrace();

}

}

}

}

}

 다음은 Reducer이다.

public static class DepartureDelayCntReduce extends Reducer<Text,IntWritable,Text,IntWritable>{

private IntWritable result=new IntWritable();

public void reduce(Text key,Iterable<IntWritable> values,Context Cxt) throws IOException, InterruptedException{

int sum=0;

for(IntWritable value : values) sum+=value.get();

result.set(sum);;

Cxt.write(key,result);

}

}

 다음은 실행하기 위한 드라이버이다.

public static void main(String[] args) {

Configuration conf=new Configuration();

Job job;

try {

job = Job.getInstance();

job.setJarByClass(AirArrDelay.class);

job.setMapperClass(DepartureDelayCntMap.class);

job.setCombinerClass(DepartureDelayCntReduce.class);

job.setReducerClass(DepartureDelayCntReduce.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

} catch (IOException e) {

e.printStackTrace();

} catch (ClassNotFoundException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

} 

 실행은 다음과 같은 명령을 내리면 된다.

hadoop jar airdelay-0.0.1-SNAPSHOT.jar AirArrDelay /air_data /air_out2


 16번 항목인 출발 지연도 동일하게 구현할 수 있다. 하지만 매퍼에서 취해야할 컬럼 index만 다르기 때문에 옵션을 줘서 선택하도록 해보자. 이를 위해서는 하둡 콘솔 명령에서 입력한 옵션을 분석하는 GenericOptionsParser 를 이용한다. 다음은 GenericOptionsParser에 대한 설명이다.


 GenericOptionsParser 를 지원하기 위한 인터페이스로 Tool 인터페이스가 있다. 그리고 Tool 인터페이스의 실행을 도와주는 헬퍼 클래스로 ToolRunner가 존재한다. ToolRunner는 GenericOptionsParser를 사용해 사용자가 콘솔 명령에서 설정한 옵션을 분석하고, Configuration 객체에 설정한다. Configuration 객체를 Tool 인터페이스에 전달한 후, Tool 인터페이스의 run 메서드를 실행한다.

 다음과 같이 Tool을 implements 하고 Configured 를 extend 하는 클래스를 작성한다.

public class AirArrDelay extends Configured implements Tool {

public static class DepartureDelayCntMap extends Mapper<LongWritable,Text,Text,IntWritable>{

.....

}

public static class DepartureDelayCntReduce extends Reducer<Text,IntWritable,Text,IntWritable>{

.....

}

public int run(String[] args) throws Exception {

String[] otherArgs=new GenericOptionsParser(getConf(),args).getRemainingArgs();

if(otherArgs.length!=2){

System.err.println("Usage: AirArrDelay <in> <out>");

System.exit(2);

}

Job job=new Job(getConf(),"AirDelayCount");

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setJarByClass(AirArrDelay.class);

job.setMapperClass(DepartureDelayCntMap.class);

job.setCombinerClass(DepartureDelayCntReduce.class);

job.setReducerClass(DepartureDelayCntReduce.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.waitForCompletion(true);

return 0;

}

} 


 메인 함수는 간단하다.

public static void main(String[] args) {

try {

int res=ToolRunner.run(new Configuration(), new AirArrDelay(),args);

System.out.println("##Result:"+res);

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}


매퍼는 다음과 같이 변경한다.

public static class DepartureDelayCntMap extends Mapper<LongWritable,Text,Text,IntWritable>{

private String workType;

private IntWritable outValue=new IntWritable(1);

private Text outKey=new Text();

@Override

public void setup(Context context) throws IOException, InterruptedException{

workType=context.getConfiguration().get("workType");

}

public void map(LongWritable key,Text value,Context Cxt) throws IOException, InterruptedException{

if(key.get()>0){

String[] col=value.toString().split(",");

if(col!=null && col.length>0){

try{

String parseData="NA";

if(workType.equals("departure")){

parseData=col[15];

} else if(workType.equals("arrival")){

parseData=col[14];

}

outKey.set(col[0]+","+col[1]);

if(!parseData.equals("NA")){

int delayTime=Integer.parseInt(parseData);

if(delayTime>0){

Cxt.write(outKey,outValue);

}

}

} catch (Exception e){

e.printStackTrace();

}

}

}

} 


 다음의 명령으로 실행한다.

hadoop jar airdelay-0.0.1-SNAPSHOT.jar AirArrDelay -D workType=arrival /air_data /air_out_ar



 하둡에서 카운터는 사용자가 정의할 수 있고 MR상에서 증감시킬수 있다. 다음은 카운터를 이용하여 다음의 값들의 통계를 추출하도록 한다.

카운터 이름 

 설명

NA

scheduled

early

데이터가 없는 경우

예정된 도착

일찍 도착

 카운터는 enum 클래스를 이용해서 구현한다. 상기는 다음과 같이 구현한다.

public static enum delayCnts{

NA,

scheduled,

early,

}

 매퍼의 맵메소드를 다음과 같이 수정한다.

public void map(LongWritable key,Text value,Context Cxt) throws IOException, InterruptedException{

if(key.get()>0){

String[] col=value.toString().split(",");

if(col!=null && col.length>0){

try{

String parseData="NA";

if(workType.equals("departure")){

parseData=col[15];

} else if(workType.equals("arrival")){

parseData=col[14];

} else {

parseData=col[15];

}

outKey.set(col[0]+","+col[1]);

if(!parseData.equals("NA")){

int delayTime=Integer.parseInt(parseData);

if(delayTime>0){

Cxt.write(outKey,outValue);

} else if(delayTime==0){

Cxt.getCounter(delayCnts.scheduled).increment(1);

} else if(delayTime<0){

Cxt.getCounter(delayCnts.early).increment(1);

}

} else {

Cxt.getCounter(delayCnts.NA).increment(1);

}

} catch (Exception e){

e.printStackTrace();

}

}

}


 다음은 MultipleOutputs을 이용해서 한번에 Departure와 Arriaval을 계산하는 코드이다.

 매퍼의 맵 함수는  다음과 같이 수정한다.

public void map(LongWritable key,Text value,Context Cxt) throws IOException, InterruptedException{

if(key.get()>0){

String[] col=value.toString().split(",");

if(col!=null && col.length>0){

try{

if(!col[15].equals("NA")){

outKey.set("D,"+col[0]+","+col[1]);

int delayTime=Integer.parseInt(col[15]);

if(delayTime>0){

Cxt.write(outKey,outValue);

} else if(delayTime==0){

Cxt.getCounter("AirDelay","D_Schleduled").increment(1);

} else if(delayTime<0){

Cxt.getCounter("AirDelay","D_Early").increment(1);

}

} else {

Cxt.getCounter("AirDelay","D_NA").increment(1);

}

if(!col[14].equals("NA")){

outKey.set("A,"+col[0]+","+col[1]);

int delayTime=Integer.parseInt(col[14]);

if(delayTime>0){

Cxt.write(outKey,outValue);

} else if(delayTime==0){

Cxt.getCounter("AirDelay","A_Schleduled").increment(1);

} else if(delayTime<0){

Cxt.getCounter("AirDelay","A_Early").increment(1);

}

} else {

Cxt.getCounter("AirDelay","A_NA").increment(1);

}

} catch (Exception e){

e.printStackTrace();

}

}

}


 리듀서는 다음과 같이 수정한다.

public static class DepartureDelayCntReduce extends Reducer<Text,IntWritable,Text,IntWritable>{

private IntWritable result=new IntWritable();

private MultipleOutputs<Text,IntWritable> mos;

private Text outputKey=new Text();

@Override

public void setup(Context context) throws IOException, InterruptedException{

mos=new MultipleOutputs<Text,IntWritable>(context);

}

@Override

public void cleanup(Context context) throws IOException, InterruptedException{

mos.close();

}

public void reduce(Text key,Iterable<IntWritable> values,Context Cxt) throws IOException, InterruptedException{

String[] columns=key.toString().split(",");

outputKey.set(columns[1]+","+columns[1]);

if(columns[0].equals("D")){

int sum=0;

for(IntWritable value : values) sum+=value.get();

result.set(sum);;

mos.write("departure",outputKey,result);

} else {

int sum=0;

for(IntWritable value : values) sum+=value.get();

result.set(sum);;

mos.write("arrival",outputKey,result);

}

}

} 


 드라이버에 다음과 같이 multipleout을 설정해 준다.

public int run(String[] args) throws Exception {

// TODO Auto-generated method stub

String[] otherArgs=new GenericOptionsParser(getConf(),args).getRemainingArgs();

if(otherArgs.length!=2){

System.err.println("Usage: AirArrDelay <in> <out>");

System.exit(2);

}

Job job=new Job(getConf(),"AirDelayCount");

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setJarByClass(AirArrDelay.class);

job.setMapperClass(DepartureDelayCntMap.class);

job.setCombinerClass(DepartureDelayCntReduce.class);

job.setReducerClass(DepartureDelayCntReduce.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class);

MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class);

job.waitForCompletion(true);

return 0;

} 


반응형
Posted by alias
,