【Hadoop—14】MapReduce:OutputFormat『 TextOutputFormat | 自定义OutputFormat』

导读:本篇文章讲解 【Hadoop—14】MapReduce:OutputFormat『 TextOutputFormat | 自定义OutputFormat』,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1. OutputFormat阶段流程

OutputFormat阶段是MapReduce的一个阶段。

其详细流程见:MapReduce详细流程

2. OutputFormat与其子类关系图

在这里插入图片描述
接下来主要介绍TextOutputFormat类的读取机制 和 如何自定义类去自定义读取机制。

3. TextOutputFormat(默认)

一个ReduceTask配置一个OutputFormat。而默认OutputFormat使用TextOutputFormat的写入机制。

TextOutputFormat的写入机制:将从ReduceTask收到的所有键值对追加的写入到一个文件中。

故使用TextOutputFormat,一个ReduceTask最终只会产生一个文件。

4. 自定义OutputFormat类

4.1 什么时候需要自定义OutputFormat类

如果需要自定义写入机制,就必须自定义类去继承OutputFormat / FileOutputFormat。例如:

  1. 输出数据到Mysql、HBase、Elasticsearch等存储框架中。
  2. 将一个ReduceTask的输出数据分成多个文件存储
    等等

4.2 自定义OutputFormat步骤

  1. 自定义OutputFormat 类,该类需要继承 OutputFormat / FileOutputFormat 抽象类,并重写getRecordWriter方法
  2. 自定义RecordWriter类,需要继承RecordWriter抽象类,并重写write()方法和close()方法。
  3. 在Driver类中将自定义OutputFormat类与job绑定起来。

4.3 自定义OutputFormat示例

package com.study.mapreduce.output;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.UUID;

/**
 * 自定义输出:
 * 在job中设置输出文件夹路径,如果结果的key带有spring就输出到指定文件夹下的test01.txt,否则输出到指定文件夹下的test02.txt
 */
public class MyOutputFormat extends FileOutputFormat<Text, IntWritable> {
    @Override
    public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        // 返回一个我们自定义的输出Writer
        MyRecordWriter writer = new MyRecordWriter(job);
        return writer;
    }
}

4.4 自定义RecordWriter示例

/**
 * 自定义RecordWriter类,需要继承RecordWriter
 */
class MyRecordWriter extends RecordWriter<Text, IntWritable> {

    private FSDataOutputStream fsOut1;
    private FSDataOutputStream fsOut2;

    public MyRecordWriter(TaskAttemptContext job) {

        try {
            // 创建输出流
            FileSystem fileSystem = FileSystem.get(job.getConfiguration());
            String defaultPath = "/app/WordCount/myoutput/" + UUID.randomUUID().toString();
            String pathParent = job.getConfiguration().get(FileOutputFormat.OUTDIR, defaultPath);  // 读取job设置的输出路径

            // 创建两个输出的文件
            // 获取job中配置的SpringFileName、OtherFileName
            String subPath1 = pathParent + "/" + job.getConfiguration().get("SpringFileName"); 
            String subPath2 = pathParent + "/" + job.getConfiguration().get("OtherFileName");

            fsOut1 = fileSystem.create(new Path(subPath1));
            fsOut2 = fileSystem.create(new Path(subPath2));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 写操作
     */
    @Override
    public void write(Text key, IntWritable value) throws IOException, InterruptedException {
        // 根据key是否为spring,输出到指定文件
        if("spring".equalsIgnoreCase(key.toString())) {
            fsOut1.writeBytes(key + "@" + value + "\n");
        } else {
            fsOut2.writeBytes(key + "@" + value + "\n");
        }
    }

    /**
     * 关闭IO流
     */
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(fsOut1);
        IOUtils.closeStream(fsOut2);
    }
}

4.5 自定义OutputFormat类与job绑定示例

// 设置自定义OutputFormat
job.setOutputFormatClass(MyOutputFormat.class);

// 给job添加一个属性,用作配置输出文件路径。属性名为FileOutputFormat.OUTDIR常量
// 程序中可以通过 job.getConfiguration().get(FileOutputFormat.OUTDIR) 获取
// FileOutputFormat的 _SUCCESS 文件默认也会生成到该路径下
FileOutputFormat.setOutputPath(job, new Path("/app/WordCount/myoutput/output1"));  

// 给job再添加两个属性SpringFileName、OtherFileName
job.getConfiguration().set("SpringFileName", "test01.txt");
job.getConfiguration().set("OtherFileName", "test02.txt");

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/84541.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!