MapReduce编程学习(240524)

资料:https://www.123pan.com/s/lfKDVv-XoENH.html提取码:axin

所需资料:

  • lib依赖库
  • hadoop集群已启动
  • 下载提供的data.txt文件(也能自己创建)
  • *.java文件(也可以自己编写)

data.txt

I love Beijing
I love China
Beijing is the capital of China

一、环境配置

1、配置 Artifact(lib依赖)

  1. 打开项目:在 IntelliJ IDEA 中打开你的项目。
  2. 打开 Project Structure

    • 点击 File 菜单,选择 Project Structure 或按 Ctrl+Alt+Shift+S 打开项目结构设置。
  3. 配置Libraries

    • 点击 + 按钮,选择 Java,然后选择复制进来的lib
    • 点击open
    • 点击ok
  4. 应用并保存

    • 点击 Apply,然后点击 OK 保存设置。

2、配置打包

  1. 打开项目:在 IntelliJ IDEA 中打开你的项目。
  2. 打开 Project Structure

    • 点击 File 菜单,选择 Project Structure 或按 Ctrl+Alt+Shift+S 打开项目结构设置。
  3. 配置 Artifact

    • 在 Project Structure 窗口中,选择左侧的 Artifacts 选项。
    • 点击 + 按钮,选择 Jar,然后选择 From modules with dependencies...
    • 在弹出的对话框中,选择包含 main 方法的模块(你的主类是 wc.WordCountMain)。
    • 确认 Extract to the target JAR 设置为 Into JAR directory
    • 点击 OK
  4. 应用并保存

    • 点击 Apply,然后点击 OK 保存设置。

3、hadoop运行MapReduce

前置条件(可不做)

# 访问位置
cd /opt/module/hadoop-3.1.3/share/hadoop/mapreduce

# 创建目录
mkdir myJar

上传Jar包

# 上传文件到该目录
/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/myJar

运行Jar包

cd $HADOOP_HOME/share/hadoop/mapreduce/myJar
hadoop jar <Jar包名字> <HDFS中的输入路径> <HDFS中的输出路径>

查看运行结果:

hadoop fs -cat <HDFS中的输出路径>

# 如
hadoop fs -cat /MR01_output_ahxin/part-r-00000
hadoop fs -cat /MR01_output_ahxin/part-r-00001
hadoop fs -cat /MR01_output_ahxin/*

返回值:

Beijing 2
China   2
I       2
capital 1
is      1
love    2
of      1
the     1

二、任务:MapReduceDemo01

需求:

  • WordCountMapper.java :从输入文本中提取单词,并将每个单词映射到一个计数值1。
  • WordCountReducer.java :接收每个单词及其所有计数值,计算总次数,并输出最终结果。
  • WordCountMain.java :配置和启动 MapReduce 任务,包括设置 Mapper 和 Reducer 类,指定输入输出路径,提交任务并等待

1、WordCountMapper.java

package WordCount01;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * WordCountMapper 类继承了 Hadoop 提供的 Mapper 类。
 * 它接受一行文本作为输入,并输出每个单词以及整数 1。
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * map 方法处理一行输入文本。
     *
     * @param key1     键是文件中该行的字节偏移量(在这里不使用)。
     * @param value1   值是该行的文本。
     * @param context  context 对象允许 Mapper 与 Hadoop 系统的其余部分交互。
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
        // 将文本行转换为字符串。
        String data = value1.toString();

        // 按空格拆分字符串,得到单词数组。
        String[] words = data.split(" ");

        // 遍历每个单词
        for (String word : words) {
            // 将单词和整数 1 写入上下文中。
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

2、WordCountReducer.java

package WordCount01;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * WordCountReducer 类继承了 Hadoop 提供的 Reducer 类。
 * 它接收每个单词及其对应的计数,并计算每个单词出现的总次数。
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * reduce 方法处理每个键(单词)及其对应的值(计数)的集合。
     *
     * @param key3    单词(作为键)。
     * @param values3 单词对应的计数集合。
     * @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key3, Iterable<IntWritable> values3, Context context) throws IOException, InterruptedException {
        int total = 0;

        // 遍历所有的计数,将它们累加起来
        for (IntWritable v : values3) {
            total += v.get();
        }

        // 将单词及其总计数写入上下文中
        context.write(key3, new IntWritable(total));
    }
}

3、WordCountMain.java

package WordCount01;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import WordCount01.WordCountMapper;
import WordCount01.WordCountReducer;

public class WordCountMain {
    public static void main(String[] args) throws Exception {
        // 1. 创建任务Job实例,并指定任务入口类
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(WordCountMain.class); // 设置运行主类

        // 2. 指定任务的Mapper类以及Mapper输出的键值类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class); // Mapper输出的键类型
        job.setMapOutputValueClass(IntWritable.class); // Mapper输出的值类型

        // 3. 指定任务的Reducer类以及Reducer输出的键值类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class); // Reducer输出的键类型
        job.setOutputValueClass(IntWritable.class); // Reducer输出的值类型

        // 4. 指定任务的输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0])); // 输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径

        // 5. 提交任务并等待完成
        // waitForCompletion(true) 方法会打印进度信息到控制台
        job.waitForCompletion(true);
    }
}

运行结果:

[root@hadoop21 myJar]# hadoop fs -cat /MR01_output_ahxin/part-r-000002024-04-09 19:51:56,934 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-04-09 19:51:57,384 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Beijing 2
China   2
I       2
capital 1
is      1
love    2
of      1
the     1

三、任务:MapReduceDemo02

需求:

  1. 大写单词输出到一个文件,小写单词输出到另一个文件(分区)
  2. key不再是单词,是“单词+姓名首字母”的组合

image-20240524020459664

1、WordCountMapper.java

package WordCount02;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * WordCountMapper 类继承了 Hadoop 提供的 Mapper 类。
 * 它接受一行文本作为输入,并输出每个单词以及整数 1。
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * map 方法处理一行输入文本。
     *
     * @param key1     键是文件中该行的字节偏移量(在这里不使用)。
     * @param value1   值是该行的文本。
     * @param context  context 对象允许 Mapper 与 Hadoop 系统的其余部分交互。
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
        // 将文本行转换为字符串。
        String data = value1.toString();

        // 按空格拆分字符串,得到单词数组。
        String[] words = data.split(" ");

        // 遍历每个单词
        for (String word : words) {
            // 将单词和整数 1 写入上下文中。
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

2、WordCountReducer.java

特别注意这里 35行 需要改成你自己的

package WordCount02;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * WordCountReducer 类继承了 Hadoop 提供的 Reducer 类。
 * 它接收每个单词及其对应的计数,并计算每个单词出现的总次数。
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * reduce 方法处理每个键(单词)及其对应的值(计数)的集合。
     *
     * @param key3    单词(作为键)。
     * @param values3 单词对应的计数集合。
     * @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key3, Iterable<IntWritable> values3, Context context) throws IOException, InterruptedException {
        int total = 0;

        // 遍历所有的计数,将它们累加起来
        for (IntWritable v : values3) {
            total += v.get();
        }

        // 将单词及其总计数写入上下文中,格式化输出键
        context.write(
                new Text(key3.toString() + "_ahxin"), // 自定义格式化的输出键
                new IntWritable(total) // 输出值为总计数
        );
    }
}

3、WordPartitioner.java

package WordCount02;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordPartitioner extends Partitioner<Text, IntWritable> {

    /**
     * getPartition 方法根据单词的首字母决定分区。
     *
     * @param key            单词(作为键)。
     * @param value          单词对应的计数(作为值)。
     * @param numPartitioner 分区数(即 Reducer 数量)。
     * @return 分区编号(0 或 1)。
     */
    @Override
    public int getPartition(Text key, IntWritable value, int numPartitioner) {
        // 如果单词的首字母是大写字母,返回分区0
        if (Character.isUpperCase(key.toString().charAt(0))) {
            return 0;
        } else {
            // 否则(小写),返回分区1
            return 1;
        }
    }
}

4、WordCountMain.java

package WordCount02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountMain {
    public static void main(String[] args) throws Exception {
        // 1. 创建任务Job实例,并指定任务入口类
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(WordCountMain.class); // 设置运行主类

        // 2. 指定任务的Mapper类以及Mapper输出的键值类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class); // Mapper输出的键类型
        job.setMapOutputValueClass(IntWritable.class); // Mapper输出的值类型

        // *设置自定义的Partitioner类
        job.setPartitionerClass(WordPartitioner.class);

        // 3. 指定任务的Reducer类以及Reducer输出的键值类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class); // Reducer输出的键类型
        job.setOutputValueClass(IntWritable.class); // Reducer输出的值类型

        // *设置Reducer任务的数量
        job.setNumReduceTasks(2);

        // 4. 指定任务的输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0])); // 输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径

        // 5. 提交任务并等待完成
        // waitForCompletion(true) 方法会打印进度信息到控制台
        job.waitForCompletion(true);
    }
}

计算结果:

[root@hadoop21 myJar]# hadoop fs -cat /RM02_output_ahxin/part-r-00000
2024-04-09 20:15:15,648 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-04-09 20:15:16,123 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Beijing_ahxin   2
China_ahxin     2
I_ahxin 2
[root@hadoop21 myJar]# hadoop fs -cat /RM02_output_ahxin/part-r-00001
2024-04-09 20:15:18,676 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-04-09 20:15:19,132 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
capital_ahxin   1
is_ahxin        1
love_ahxin      2
of_ahxin        1
the_ahxin       1
[root@hadoop21 myJar]# 

四、任务:MapReduceDemo03

需求:

  1. 按照单词长度进行排序,长的排前面,如果长度一样,出现次数多的排前面

image-20240524022714185

1、WordCountMapper.java

package WordCount03;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String data = value.toString();
        String[] words = data.split(" ");
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

2、WordCountReducer.java

package WordCount03;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * WordCountReducer 类继承了 Hadoop 提供的 Reducer 类。
 * 它接收每个单词及其对应的计数,并计算每个单词出现的总次数。
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private List<String> sortedWords = new ArrayList<>();

    /**
     * reduce 方法处理每个键(单词)及其对应的值(计数)的集合。
     *
     * @param key    单词(作为键)。
     * @param values 单词对应的计数集合。
     * @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int total = 0;

        // 遍历所有的计数,将它们累加起来
        for (IntWritable value : values) {
            total += value.get();
        }

        // 将单词及其计数存储到列表中,格式为 "word-count"
        sortedWords.add(key.toString() + "-" + total);
    }

    /**
     * cleanup 方法在所有的 reduce 操作完成后执行,用于进行排序和最终的输出。
     *
     * @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        // 对单词列表按照长度和出现次数进行排序
        Collections.sort(sortedWords, (a, b) -> {
            String[] partsA = a.split("-");
            String[] partsB = b.split("-");

            // 按照单词长度排序
            int compareLength = Integer.compare(partsB[0].length(), partsA[0].length());
            if (compareLength != 0) {
                return compareLength;
            }
            // 长度相同时,按照出现次数排序
            return Integer.compare(Integer.parseInt(partsB[1]), Integer.parseInt(partsA[1]));
        });

        // 输出排序后的结果
//        for (String word : sortedWords) {
//            String[] parts = word.split("-");
//            context.write(new Text(parts[0]), new IntWritable(Integer.parseInt(parts[1])));
//        }
        // 输出排序后的结果,格式化输出
        for (String word : sortedWords) {
            String[] parts = word.split("-");
            String formattedOutput = String.format("%s-%d", parts[0], Integer.parseInt(parts[1]));
            context.write(new Text(formattedOutput), null);
        }
    }
}

3、WordPartitioner.java

package WordCount03;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordPartitioner extends Partitioner<Text, IntWritable> {

    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
        // 不需要按字母分区,直接返回 0,即单一分区
        return 0;
    }
}

4、WordCountMain.java

package WordCount03;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountMain {
    public static void main(String[] args) throws Exception {
        // 1. 创建任务Job实例,并指定任务入口类
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(WordCountMain.class); // 设置运行主类

        // 2. 指定任务的Mapper类以及Mapper输出的键值类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class); // Mapper输出的键类型
        job.setMapOutputValueClass(IntWritable.class); // Mapper输出的值类型

        job.setPartitionerClass(WordPartitioner.class);

        // 3. 指定任务的Reducer类以及Reducer输出的键值类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class); // Reducer输出的键类型
        job.setOutputValueClass(IntWritable.class); // Reducer输出的值类型

        job.setNumReduceTasks(1); // 只需要一个Reduce任务来处理排序

        // 4. 指定任务的输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0])); // 输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径

        // 5. 提交任务并等待完成
        // waitForCompletion(true) 方法会打印进度信息到控制台
        job.waitForCompletion(true);
    }
}

计算结果:

[root@hadoop21 myJar]# hadoop fs -cat /RM03_output_ahxin/part-r-00000
2024-04-09 20:52:58,254 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-04-09 20:52:58,683 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Beijing-2
capital-1
China-2
love-2
the-1
is-1
of-1
I-2
最后修改:2024 年 05 月 24 日
咱们谁跟谁,用不着~