MapReduce编程学习(240524)
资料:https://www.123pan.com/s/lfKDVv-XoENH.html提取码:axin
所需资料:
- lib依赖库
- hadoop集群已启动
- 下载提供的data.txt文件(也能自己创建)
- *.java文件(也可以自己编写)
data.txt
I love Beijing
I love China
Beijing is the capital of China
一、环境配置
1、配置 Artifact(lib依赖)
- 打开项目:在 IntelliJ IDEA 中打开你的项目。
打开 Project Structure:
- 点击
File
菜单,选择Project Structure
或按Ctrl+Alt+Shift+S
打开项目结构设置。
- 点击
配置Libraries
- 点击
+
按钮,选择Java
,然后选择复制进来的lib
- 点击
open
- 点击
ok
- 点击
应用并保存:
- 点击
Apply
,然后点击OK
保存设置。
- 点击
2、配置打包
- 打开项目:在 IntelliJ IDEA 中打开你的项目。
打开 Project Structure:
- 点击
File
菜单,选择Project Structure
或按Ctrl+Alt+Shift+S
打开项目结构设置。
- 点击
配置 Artifact:
- 在 Project Structure 窗口中,选择左侧的
Artifacts
选项。 - 点击
+
按钮,选择Jar
,然后选择From modules with dependencies...
。 - 在弹出的对话框中,选择包含
main
方法的模块(你的主类是wc.WordCountMain
)。 - 确认
Extract to the target JAR
设置为Into JAR directory
。 - 点击
OK
。
- 在 Project Structure 窗口中,选择左侧的
应用并保存:
- 点击
Apply
,然后点击OK
保存设置。
- 点击
3、hadoop运行MapReduce
前置条件(可不做)
# 访问位置
cd /opt/module/hadoop-3.1.3/share/hadoop/mapreduce
# 创建目录
mkdir myJar
上传Jar包
# 上传文件到该目录
/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/myJar
运行Jar包
cd $HADOOP_HOME/share/hadoop/mapreduce/myJar
hadoop jar <Jar包名字> <HDFS中的输入路径> <HDFS中的输出路径>
查看运行结果:
hadoop fs -cat <HDFS中的输出路径>
# 如
hadoop fs -cat /MR01_output_ahxin/part-r-00000
hadoop fs -cat /MR01_output_ahxin/part-r-00001
hadoop fs -cat /MR01_output_ahxin/*
返回值:
Beijing 2
China 2
I 2
capital 1
is 1
love 2
of 1
the 1
二、任务:MapReduceDemo01
需求:
- WordCountMapper.java :从输入文本中提取单词,并将每个单词映射到一个计数值1。
- WordCountReducer.java :接收每个单词及其所有计数值,计算总次数,并输出最终结果。
- WordCountMain.java :配置和启动 MapReduce 任务,包括设置 Mapper 和 Reducer 类,指定输入输出路径,提交任务并等待
1、WordCountMapper.java
package WordCount01;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* WordCountMapper 类继承了 Hadoop 提供的 Mapper 类。
* 它接受一行文本作为输入,并输出每个单词以及整数 1。
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map 方法处理一行输入文本。
*
* @param key1 键是文件中该行的字节偏移量(在这里不使用)。
* @param value1 值是该行的文本。
* @param context context 对象允许 Mapper 与 Hadoop 系统的其余部分交互。
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
// 将文本行转换为字符串。
String data = value1.toString();
// 按空格拆分字符串,得到单词数组。
String[] words = data.split(" ");
// 遍历每个单词
for (String word : words) {
// 将单词和整数 1 写入上下文中。
context.write(new Text(word), new IntWritable(1));
}
}
}
2、WordCountReducer.java
package WordCount01;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* WordCountReducer 类继承了 Hadoop 提供的 Reducer 类。
* 它接收每个单词及其对应的计数,并计算每个单词出现的总次数。
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* reduce 方法处理每个键(单词)及其对应的值(计数)的集合。
*
* @param key3 单词(作为键)。
* @param values3 单词对应的计数集合。
* @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key3, Iterable<IntWritable> values3, Context context) throws IOException, InterruptedException {
int total = 0;
// 遍历所有的计数,将它们累加起来
for (IntWritable v : values3) {
total += v.get();
}
// 将单词及其总计数写入上下文中
context.write(key3, new IntWritable(total));
}
}
3、WordCountMain.java
package WordCount01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import WordCount01.WordCountMapper;
import WordCount01.WordCountReducer;
public class WordCountMain {
public static void main(String[] args) throws Exception {
// 1. 创建任务Job实例,并指定任务入口类
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class); // 设置运行主类
// 2. 指定任务的Mapper类以及Mapper输出的键值类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class); // Mapper输出的键类型
job.setMapOutputValueClass(IntWritable.class); // Mapper输出的值类型
// 3. 指定任务的Reducer类以及Reducer输出的键值类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class); // Reducer输出的键类型
job.setOutputValueClass(IntWritable.class); // Reducer输出的值类型
// 4. 指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0])); // 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径
// 5. 提交任务并等待完成
// waitForCompletion(true) 方法会打印进度信息到控制台
job.waitForCompletion(true);
}
}
运行结果:
[root@hadoop21 myJar]# hadoop fs -cat /MR01_output_ahxin/part-r-000002024-04-09 19:51:56,934 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-04-09 19:51:57,384 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Beijing 2
China 2
I 2
capital 1
is 1
love 2
of 1
the 1
三、任务:MapReduceDemo02
需求:
- 大写单词输出到一个文件,小写单词输出到另一个文件(分区)
- key不再是单词,是“单词+姓名首字母”的组合
1、WordCountMapper.java
package WordCount02;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* WordCountMapper 类继承了 Hadoop 提供的 Mapper 类。
* 它接受一行文本作为输入,并输出每个单词以及整数 1。
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map 方法处理一行输入文本。
*
* @param key1 键是文件中该行的字节偏移量(在这里不使用)。
* @param value1 值是该行的文本。
* @param context context 对象允许 Mapper 与 Hadoop 系统的其余部分交互。
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
// 将文本行转换为字符串。
String data = value1.toString();
// 按空格拆分字符串,得到单词数组。
String[] words = data.split(" ");
// 遍历每个单词
for (String word : words) {
// 将单词和整数 1 写入上下文中。
context.write(new Text(word), new IntWritable(1));
}
}
}
2、WordCountReducer.java
特别注意这里 35行 需要改成你自己的
package WordCount02;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* WordCountReducer 类继承了 Hadoop 提供的 Reducer 类。
* 它接收每个单词及其对应的计数,并计算每个单词出现的总次数。
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* reduce 方法处理每个键(单词)及其对应的值(计数)的集合。
*
* @param key3 单词(作为键)。
* @param values3 单词对应的计数集合。
* @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key3, Iterable<IntWritable> values3, Context context) throws IOException, InterruptedException {
int total = 0;
// 遍历所有的计数,将它们累加起来
for (IntWritable v : values3) {
total += v.get();
}
// 将单词及其总计数写入上下文中,格式化输出键
context.write(
new Text(key3.toString() + "_ahxin"), // 自定义格式化的输出键
new IntWritable(total) // 输出值为总计数
);
}
}
3、WordPartitioner.java
package WordCount02;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordPartitioner extends Partitioner<Text, IntWritable> {
/**
* getPartition 方法根据单词的首字母决定分区。
*
* @param key 单词(作为键)。
* @param value 单词对应的计数(作为值)。
* @param numPartitioner 分区数(即 Reducer 数量)。
* @return 分区编号(0 或 1)。
*/
@Override
public int getPartition(Text key, IntWritable value, int numPartitioner) {
// 如果单词的首字母是大写字母,返回分区0
if (Character.isUpperCase(key.toString().charAt(0))) {
return 0;
} else {
// 否则(小写),返回分区1
return 1;
}
}
}
4、WordCountMain.java
package WordCount02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountMain {
public static void main(String[] args) throws Exception {
// 1. 创建任务Job实例,并指定任务入口类
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class); // 设置运行主类
// 2. 指定任务的Mapper类以及Mapper输出的键值类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class); // Mapper输出的键类型
job.setMapOutputValueClass(IntWritable.class); // Mapper输出的值类型
// *设置自定义的Partitioner类
job.setPartitionerClass(WordPartitioner.class);
// 3. 指定任务的Reducer类以及Reducer输出的键值类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class); // Reducer输出的键类型
job.setOutputValueClass(IntWritable.class); // Reducer输出的值类型
// *设置Reducer任务的数量
job.setNumReduceTasks(2);
// 4. 指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0])); // 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径
// 5. 提交任务并等待完成
// waitForCompletion(true) 方法会打印进度信息到控制台
job.waitForCompletion(true);
}
}
计算结果:
[root@hadoop21 myJar]# hadoop fs -cat /RM02_output_ahxin/part-r-00000
2024-04-09 20:15:15,648 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-04-09 20:15:16,123 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Beijing_ahxin 2
China_ahxin 2
I_ahxin 2
[root@hadoop21 myJar]# hadoop fs -cat /RM02_output_ahxin/part-r-00001
2024-04-09 20:15:18,676 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-04-09 20:15:19,132 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
capital_ahxin 1
is_ahxin 1
love_ahxin 2
of_ahxin 1
the_ahxin 1
[root@hadoop21 myJar]#
四、任务:MapReduceDemo03
需求:
- 按照单词长度进行排序,长的排前面,如果长度一样,出现次数多的排前面
1、WordCountMapper.java
package WordCount03;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String data = value.toString();
String[] words = data.split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
2、WordCountReducer.java
package WordCount03;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* WordCountReducer 类继承了 Hadoop 提供的 Reducer 类。
* 它接收每个单词及其对应的计数,并计算每个单词出现的总次数。
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private List<String> sortedWords = new ArrayList<>();
/**
* reduce 方法处理每个键(单词)及其对应的值(计数)的集合。
*
* @param key 单词(作为键)。
* @param values 单词对应的计数集合。
* @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int total = 0;
// 遍历所有的计数,将它们累加起来
for (IntWritable value : values) {
total += value.get();
}
// 将单词及其计数存储到列表中,格式为 "word-count"
sortedWords.add(key.toString() + "-" + total);
}
/**
* cleanup 方法在所有的 reduce 操作完成后执行,用于进行排序和最终的输出。
*
* @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 对单词列表按照长度和出现次数进行排序
Collections.sort(sortedWords, (a, b) -> {
String[] partsA = a.split("-");
String[] partsB = b.split("-");
// 按照单词长度排序
int compareLength = Integer.compare(partsB[0].length(), partsA[0].length());
if (compareLength != 0) {
return compareLength;
}
// 长度相同时,按照出现次数排序
return Integer.compare(Integer.parseInt(partsB[1]), Integer.parseInt(partsA[1]));
});
// 输出排序后的结果
// for (String word : sortedWords) {
// String[] parts = word.split("-");
// context.write(new Text(parts[0]), new IntWritable(Integer.parseInt(parts[1])));
// }
// 输出排序后的结果,格式化输出
for (String word : sortedWords) {
String[] parts = word.split("-");
String formattedOutput = String.format("%s-%d", parts[0], Integer.parseInt(parts[1]));
context.write(new Text(formattedOutput), null);
}
}
}
3、WordPartitioner.java
package WordCount03;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
// 不需要按字母分区,直接返回 0,即单一分区
return 0;
}
}
4、WordCountMain.java
package WordCount03;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountMain {
public static void main(String[] args) throws Exception {
// 1. 创建任务Job实例,并指定任务入口类
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class); // 设置运行主类
// 2. 指定任务的Mapper类以及Mapper输出的键值类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class); // Mapper输出的键类型
job.setMapOutputValueClass(IntWritable.class); // Mapper输出的值类型
job.setPartitionerClass(WordPartitioner.class);
// 3. 指定任务的Reducer类以及Reducer输出的键值类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class); // Reducer输出的键类型
job.setOutputValueClass(IntWritable.class); // Reducer输出的值类型
job.setNumReduceTasks(1); // 只需要一个Reduce任务来处理排序
// 4. 指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0])); // 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径
// 5. 提交任务并等待完成
// waitForCompletion(true) 方法会打印进度信息到控制台
job.waitForCompletion(true);
}
}
计算结果:
[root@hadoop21 myJar]# hadoop fs -cat /RM03_output_ahxin/part-r-00000
2024-04-09 20:52:58,254 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-04-09 20:52:58,683 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Beijing-2
capital-1
China-2
love-2
the-1
is-1
of-1
I-2