Loading... # MapReduce编程学习(240524) 资料:https://www.123pan.com/s/lfKDVv-XoENH.html提取码:axin 所需资料: - lib依赖库 - hadoop集群已启动 - 下载提供的data.txt文件(也能自己创建) - *.java文件(也可以自己编写) data.txt ```css I love Beijing I love China Beijing is the capital of China ``` ## 一、环境配置 ### 1、配置 Artifact(lib依赖) 1. **打开项目**:在 IntelliJ IDEA 中打开你的项目。 2. **打开 Project Structure**: - 点击 `File` 菜单,选择 `Project Structure` 或按 `Ctrl+Alt+Shift+S` 打开项目结构设置。 3. **配置Libraries** - 点击 `+` 按钮,选择 `Java`,然后选择复制进来的`lib` - 点击`open` - 点击`ok` 4. **应用并保存**: - 点击 `Apply`,然后点击 `OK` 保存设置。 ### 2、配置打包 1. **打开项目**:在 IntelliJ IDEA 中打开你的项目。 2. **打开 Project Structure**: - 点击 `File` 菜单,选择 `Project Structure` 或按 `Ctrl+Alt+Shift+S` 打开项目结构设置。 3. **配置 Artifact**: - 在 Project Structure 窗口中,选择左侧的 `Artifacts` 选项。 - 点击 `+` 按钮,选择 `Jar`,然后选择 `From modules with dependencies...`。 - 在弹出的对话框中,选择包含 `main` 方法的模块(你的主类是 `wc.WordCountMain`)。 - 确认 `Extract to the target JAR` 设置为 `Into JAR directory`。 - 点击 `OK`。 4. **应用并保存**: - 点击 `Apply`,然后点击 `OK` 保存设置。 ### 3、hadoop运行MapReduce 前置条件(可不做) ```css # 访问位置 cd /opt/module/hadoop-3.1.3/share/hadoop/mapreduce # 创建目录 mkdir myJar ``` 上传Jar包 ```css # 上传文件到该目录 /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/myJar ``` 运行Jar包 ```css cd $HADOOP_HOME/share/hadoop/mapreduce/myJar hadoop jar <Jar包名字> <HDFS中的输入路径> <HDFS中的输出路径> ``` 查看运行结果: ```css hadoop fs -cat <HDFS中的输出路径> # 如 hadoop fs -cat /MR01_output_ahxin/part-r-00000 hadoop fs -cat /MR01_output_ahxin/part-r-00001 hadoop fs -cat /MR01_output_ahxin/* ``` 返回值: ```css Beijing 2 China 2 I 2 capital 1 is 1 love 2 of 1 the 1 ``` ## 二、任务:MapReduceDemo01 需求: - **WordCountMapper.java** :从输入文本中提取单词,并将每个单词映射到一个计数值1。 - **WordCountReducer.java** :接收每个单词及其所有计数值,计算总次数,并输出最终结果。 - **WordCountMain.java** :配置和启动 MapReduce 任务,包括设置 Mapper 和 Reducer 类,指定输入输出路径,提交任务并等待 --- ### 1、WordCountMapper.java ```css package WordCount01; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * WordCountMapper 类继承了 Hadoop 提供的 Mapper 类。 * 它接受一行文本作为输入,并输出每个单词以及整数 1。 */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * map 方法处理一行输入文本。 * * @param key1 键是文件中该行的字节偏移量(在这里不使用)。 * @param value1 值是该行的文本。 * @param context context 对象允许 Mapper 与 Hadoop 系统的其余部分交互。 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { // 将文本行转换为字符串。 String data = value1.toString(); // 按空格拆分字符串,得到单词数组。 String[] words = data.split(" "); // 遍历每个单词 for (String word : words) { // 将单词和整数 1 写入上下文中。 context.write(new Text(word), new IntWritable(1)); } } } ``` ### 2、WordCountReducer.java ```css package WordCount01; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * WordCountReducer 类继承了 Hadoop 提供的 Reducer 类。 * 它接收每个单词及其对应的计数,并计算每个单词出现的总次数。 */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * reduce 方法处理每个键(单词)及其对应的值(计数)的集合。 * * @param key3 单词(作为键)。 * @param values3 单词对应的计数集合。 * @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。 * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key3, Iterable<IntWritable> values3, Context context) throws IOException, InterruptedException { int total = 0; // 遍历所有的计数,将它们累加起来 for (IntWritable v : values3) { total += v.get(); } // 将单词及其总计数写入上下文中 context.write(key3, new IntWritable(total)); } } ``` ### 3、WordCountMain.java ```css package WordCount01; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import WordCount01.WordCountMapper; import WordCount01.WordCountReducer; public class WordCountMain { public static void main(String[] args) throws Exception { // 1. 创建任务Job实例,并指定任务入口类 Job job = Job.getInstance(new Configuration()); job.setJarByClass(WordCountMain.class); // 设置运行主类 // 2. 指定任务的Mapper类以及Mapper输出的键值类型 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); // Mapper输出的键类型 job.setMapOutputValueClass(IntWritable.class); // Mapper输出的值类型 // 3. 指定任务的Reducer类以及Reducer输出的键值类型 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); // Reducer输出的键类型 job.setOutputValueClass(IntWritable.class); // Reducer输出的值类型 // 4. 指定任务的输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); // 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径 // 5. 提交任务并等待完成 // waitForCompletion(true) 方法会打印进度信息到控制台 job.waitForCompletion(true); } } ``` ### 运行结果: ```css [root@hadoop21 myJar]# hadoop fs -cat /MR01_output_ahxin/part-r-000002024-04-09 19:51:56,934 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2024-04-09 19:51:57,384 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false Beijing 2 China 2 I 2 capital 1 is 1 love 2 of 1 the 1 ``` ## 三、任务:MapReduceDemo02 需求: 1. 大写单词输出到一个文件,小写单词输出到另一个文件(分区) 2. key不再是单词,是“单词+姓名首字母”的组合  ### 1、WordCountMapper.java ```css package WordCount02; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * WordCountMapper 类继承了 Hadoop 提供的 Mapper 类。 * 它接受一行文本作为输入,并输出每个单词以及整数 1。 */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * map 方法处理一行输入文本。 * * @param key1 键是文件中该行的字节偏移量(在这里不使用)。 * @param value1 值是该行的文本。 * @param context context 对象允许 Mapper 与 Hadoop 系统的其余部分交互。 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { // 将文本行转换为字符串。 String data = value1.toString(); // 按空格拆分字符串,得到单词数组。 String[] words = data.split(" "); // 遍历每个单词 for (String word : words) { // 将单词和整数 1 写入上下文中。 context.write(new Text(word), new IntWritable(1)); } } } ``` ### 2、WordCountReducer.java `特别注意这里 35行 需要改成你自己的` ```css package WordCount02; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * WordCountReducer 类继承了 Hadoop 提供的 Reducer 类。 * 它接收每个单词及其对应的计数,并计算每个单词出现的总次数。 */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * reduce 方法处理每个键(单词)及其对应的值(计数)的集合。 * * @param key3 单词(作为键)。 * @param values3 单词对应的计数集合。 * @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。 * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key3, Iterable<IntWritable> values3, Context context) throws IOException, InterruptedException { int total = 0; // 遍历所有的计数,将它们累加起来 for (IntWritable v : values3) { total += v.get(); } // 将单词及其总计数写入上下文中,格式化输出键 context.write( new Text(key3.toString() + "_ahxin"), // 自定义格式化的输出键 new IntWritable(total) // 输出值为总计数 ); } } ``` ### 3、WordPartitioner.java ```css package WordCount02; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class WordPartitioner extends Partitioner<Text, IntWritable> { /** * getPartition 方法根据单词的首字母决定分区。 * * @param key 单词(作为键)。 * @param value 单词对应的计数(作为值)。 * @param numPartitioner 分区数(即 Reducer 数量)。 * @return 分区编号(0 或 1)。 */ @Override public int getPartition(Text key, IntWritable value, int numPartitioner) { // 如果单词的首字母是大写字母,返回分区0 if (Character.isUpperCase(key.toString().charAt(0))) { return 0; } else { // 否则(小写),返回分区1 return 1; } } } ``` ### 4、WordCountMain.java ```css package WordCount02; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountMain { public static void main(String[] args) throws Exception { // 1. 创建任务Job实例,并指定任务入口类 Job job = Job.getInstance(new Configuration()); job.setJarByClass(WordCountMain.class); // 设置运行主类 // 2. 指定任务的Mapper类以及Mapper输出的键值类型 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); // Mapper输出的键类型 job.setMapOutputValueClass(IntWritable.class); // Mapper输出的值类型 // *设置自定义的Partitioner类 job.setPartitionerClass(WordPartitioner.class); // 3. 指定任务的Reducer类以及Reducer输出的键值类型 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); // Reducer输出的键类型 job.setOutputValueClass(IntWritable.class); // Reducer输出的值类型 // *设置Reducer任务的数量 job.setNumReduceTasks(2); // 4. 指定任务的输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); // 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径 // 5. 提交任务并等待完成 // waitForCompletion(true) 方法会打印进度信息到控制台 job.waitForCompletion(true); } } ``` ### 计算结果: ```css [root@hadoop21 myJar]# hadoop fs -cat /RM02_output_ahxin/part-r-00000 2024-04-09 20:15:15,648 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2024-04-09 20:15:16,123 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false Beijing_ahxin 2 China_ahxin 2 I_ahxin 2 [root@hadoop21 myJar]# hadoop fs -cat /RM02_output_ahxin/part-r-00001 2024-04-09 20:15:18,676 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2024-04-09 20:15:19,132 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false capital_ahxin 1 is_ahxin 1 love_ahxin 2 of_ahxin 1 the_ahxin 1 [root@hadoop21 myJar]# ``` ## 四、任务:MapReduceDemo03 需求: 1. 按照单词长度进行排序,长的排前面,如果长度一样,出现次数多的排前面 ---  ### 1、WordCountMapper.java ```css package WordCount03; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String data = value.toString(); String[] words = data.split(" "); for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } } ``` ### 2、WordCountReducer.java ```css package WordCount03; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * WordCountReducer 类继承了 Hadoop 提供的 Reducer 类。 * 它接收每个单词及其对应的计数,并计算每个单词出现的总次数。 */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private List<String> sortedWords = new ArrayList<>(); /** * reduce 方法处理每个键(单词)及其对应的值(计数)的集合。 * * @param key 单词(作为键)。 * @param values 单词对应的计数集合。 * @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。 * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int total = 0; // 遍历所有的计数,将它们累加起来 for (IntWritable value : values) { total += value.get(); } // 将单词及其计数存储到列表中,格式为 "word-count" sortedWords.add(key.toString() + "-" + total); } /** * cleanup 方法在所有的 reduce 操作完成后执行,用于进行排序和最终的输出。 * * @param context context 对象允许 Reducer 与 Hadoop 系统的其余部分交互。 * @throws IOException * @throws InterruptedException */ @Override protected void cleanup(Context context) throws IOException, InterruptedException { // 对单词列表按照长度和出现次数进行排序 Collections.sort(sortedWords, (a, b) -> { String[] partsA = a.split("-"); String[] partsB = b.split("-"); // 按照单词长度排序 int compareLength = Integer.compare(partsB[0].length(), partsA[0].length()); if (compareLength != 0) { return compareLength; } // 长度相同时,按照出现次数排序 return Integer.compare(Integer.parseInt(partsB[1]), Integer.parseInt(partsA[1])); }); // 输出排序后的结果 // for (String word : sortedWords) { // String[] parts = word.split("-"); // context.write(new Text(parts[0]), new IntWritable(Integer.parseInt(parts[1]))); // } // 输出排序后的结果,格式化输出 for (String word : sortedWords) { String[] parts = word.split("-"); String formattedOutput = String.format("%s-%d", parts[0], Integer.parseInt(parts[1])); context.write(new Text(formattedOutput), null); } } } ``` ### 3、WordPartitioner.java ```css package WordCount03; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class WordPartitioner extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text key, IntWritable value, int numPartitions) { // 不需要按字母分区,直接返回 0,即单一分区 return 0; } } ``` ### 4、WordCountMain.java ```css package WordCount03; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountMain { public static void main(String[] args) throws Exception { // 1. 创建任务Job实例,并指定任务入口类 Job job = Job.getInstance(new Configuration()); job.setJarByClass(WordCountMain.class); // 设置运行主类 // 2. 指定任务的Mapper类以及Mapper输出的键值类型 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); // Mapper输出的键类型 job.setMapOutputValueClass(IntWritable.class); // Mapper输出的值类型 job.setPartitionerClass(WordPartitioner.class); // 3. 指定任务的Reducer类以及Reducer输出的键值类型 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); // Reducer输出的键类型 job.setOutputValueClass(IntWritable.class); // Reducer输出的值类型 job.setNumReduceTasks(1); // 只需要一个Reduce任务来处理排序 // 4. 指定任务的输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); // 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径 // 5. 提交任务并等待完成 // waitForCompletion(true) 方法会打印进度信息到控制台 job.waitForCompletion(true); } } ``` ### 计算结果: ```css [root@hadoop21 myJar]# hadoop fs -cat /RM03_output_ahxin/part-r-00000 2024-04-09 20:52:58,254 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2024-04-09 20:52:58,683 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false Beijing-2 capital-1 China-2 love-2 the-1 is-1 of-1 I-2 ``` 最后修改:2024 年 05 月 24 日 © 允许规范转载 打赏 赞赏作者 赞 3 咱们谁跟谁,用不着~