MapReduce案例详解

MapReduce案例实操

1、MapReduce核心编程思想

  1)分布式的运算程序往往需要分成至少2个阶段
  2)第一个阶段的maptask并发实例,完全并行运行,互不相干
  3)第二个阶段的reduce task并发实例互不相干,但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出
  4)MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个mapreduce程序,串行运行

2、MapReduce程序运行流程详解

MR程序具体运行步骤如下:
  1)在MapReduce程序读取文件的输入目录上存放相应的文件。
  2)客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。
  3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。
  4)MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。
  5)maptask利用客户指定的inputformat来读取数据,形成输入KV对。
  6)maptask将输入KV对传递给客户定义的map()方法,做逻辑运算。
  7)map()运算完毕后将KV对收集到maptask缓存。
  8)maptask缓存中的KV对按照K分区排序后不断写到磁盘文件。
  9)MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。
  10)Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。
  11)Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。

3、案例实践

新建Maven工程,在pom.xml文件中添加依赖

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
    </dependencies>

在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender 
    log4j.appender.logfile.File=target/spring.log 
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3.1、WordCount案例

  需求:在给定的文本文件中统计输出每一个单词出现的总次数
输入数据:

hello world
hadoop
spark
hello world
hadoop
spark
hello world
hadoop
spark

代码
1)定义一个mapper类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * (1)用户自定义的Mapper要继承自己的父类
 * (2)Mapper的输入数据是K-V对的形式(K-V的类型可自定义)
 * (3)Mapper中的业务逻辑写在map()方法中
 * (4)Mapper的输出数据是K-V对的形式(K-V的类型可自定义)
 * (5)map()方法(maptask进程)对每一个<K,V>调用一次
 */
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    /**
     * map()方法(maptask进程)对每一个<K,V>调用一次
     *
     * @param key     : 数据的offset
     * @param value   : 要处理的一行数据
     * @param context : 上下文
     * @throws IOException
     * @throws InterruptedException
     */
    IntWritable v = new IntWritable(1);
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1、获取一行数据,将一行数据转化为String类型
        String line = value.toString();
        //2、切割
        String[] words = line.split(" ");
        //3、循环写出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

2)定义一个reducer类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 *  (1)用户自定义的Reducer要继承自己的父类
 *  (2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
 *  (3)Reducer的业务逻辑写在reduce()方法中
 *  (4)Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
 */
public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    /**
     * Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
     *
     * @param key : 单词
     * @param values : 单词个数(1)的集合
     * @param context : 上下文
     * @throws IOException
     * @throws InterruptedException
     */
    IntWritable v = new IntWritable();
    int sum;
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //1、累加
        sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        //2、写出
        v.set(sum);
        context.write(key, v);
    }
}

3)定义一个主类,用来描述job并提交job

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 相当于yarn客户端,负责提交MapReduce程序
 */
public class WordcountDriver {
    public static void main(String[] args) throws Exception {
        // 1 获取配置信息以及封装任务
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 2 设置jar加载路径
        job.setJarByClass(WordcountDriver.class);
        // 3 设置map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReduce.class);
        // 4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5 设置Reduce输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6 设置job数据输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 7 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

第一种情况:
  设置好job数据输入和输出路径,直接在IDEA上运行代码,得到输出结果即可。

第二种情况:
  在IDEA上打包成jar,上传到集群上运行
  运行命令:

hadoop jar mapreduce-1.0-SNAPSHOT.jar com.atguigu.wordcount.WordcountDriver /user/atguigu/wc/input/hello.txt /user/atguigu/wc/output
hadoop jar jar包名称 Driver类 数据输入路径 数据输出路径

  得到输出结果即可。

3.2、数据清洗案例

  需求:去除日志中字段长度小于等于11的日志。
输入数据:
数据源
期望输出数据:
  每行字段长度都大于11。(实际通过计数器计数:符合要求的为true,不符合的为false)
需求分析:
  需要在Map阶段对输入的数据根据规则进行过滤清洗。

代码
1)编写LogParseMapper类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogParseMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        boolean result = LogParse(line, context);
        if (!result) {
            return;
        }
        k.set(line);
        context.write(k, NullWritable.get());
    }
    private boolean LogParse(String line, Context context) {
        String[] fields = line.split(" ");
        if (fields.length > 11) {
            context.getCounter("map", "true").increment(1);
            return true;
        }
        context.getCounter("map", "false").increment(1);
        return false;
    }
}

2)编写LogParseDriver类


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogParseDriver {
    public static void main(String[] args) throws Exception {
        //输入输出路径需要根据自己

教程来源于Github,感谢旧时光大数据大佬的无私奉献,致敬!

技术教程推荐

AI技术内参 -〔洪亮劼〕

朱赟的技术管理课 -〔朱赟〕

零基础学Python -〔尹会生〕

Go语言核心36讲 -〔郝林〕

SQL必知必会 -〔陈旸〕

Spring Cloud 微服务项目实战 -〔姚秋辰(姚半仙)〕

深入浅出分布式技术原理 -〔陈现麟〕

人人都用得上的数字化思维课 -〔付晓岩〕

结构沟通力 -〔李忠秋〕