`

InputFormat牛逼(6)org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>

 
阅读更多
@Public
@Evolving

A RecordReader that reads records from a SQL table. Emits LongWritables containing the record number as key and DBWritables as value.


@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DBRecordReader<T extends DBWritable> extends
    RecordReader<LongWritable, T> {

  private static final Log LOG = LogFactory.getLog(DBRecordReader.class);

  private ResultSet results = null;

  private Class<T> inputClass;

  private Configuration conf;

  private DBInputFormat.DBInputSplit split;

  private long pos = 0;
  
  private LongWritable key = null;
  
  private T value = null;

  private Connection connection;

  protected PreparedStatement statement;

  private DBConfiguration dbConf;

  private String conditions;

  private String [] fieldNames;

  private String tableName;

  /**
   * @param split The InputSplit to read data for
   * @throws SQLException 
   */
  public DBRecordReader(DBInputFormat.DBInputSplit split, 
      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
      String cond, String [] fields, String table)
      throws SQLException {
    this.inputClass = inputClass;
    this.split = split;
    this.conf = conf;
    this.connection = conn;
    this.dbConf = dbConfig;
    this.conditions = cond;
    this.fieldNames = fields;
    this.tableName = table;
  }

  protected ResultSet executeQuery(String query) throws SQLException {
    this.statement = connection.prepareStatement(query,
        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    return statement.executeQuery();
  }

  /** Returns the query for selecting the records, 
   * subclasses can override this for custom behaviour.*/
  protected String getSelectQuery() {
    StringBuilder query = new StringBuilder();

    // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.
    if(dbConf.getInputQuery() == null) {
      query.append("SELECT ");
  
      for (int i = 0; i < fieldNames.length; i++) {
        query.append(fieldNames[i]);
        if (i != fieldNames.length -1) {
          query.append(", ");
        }
      }

      query.append(" FROM ").append(tableName);
      query.append(" AS ").append(tableName); //in hsqldb this is necessary
      if (conditions != null && conditions.length() > 0) {
        query.append(" WHERE (").append(conditions).append(")");
      }

      String orderBy = dbConf.getInputOrderBy();
      if (orderBy != null && orderBy.length() > 0) {
        query.append(" ORDER BY ").append(orderBy);
      }
    } else {
      //PREBUILT QUERY
      query.append(dbConf.getInputQuery());
    }
        
    try {
      query.append(" LIMIT ").append(split.getLength());
      query.append(" OFFSET ").append(split.getStart());
    } catch (IOException ex) {
      // Ignore, will not throw.
    }		

    return query.toString();
  }

  /** {@inheritDoc} */
  public void close() throws IOException {
    try {
      if (null != results) {
        results.close();
      }
      if (null != statement) {
        statement.close();
      }
      if (null != connection) {
        connection.commit();
        connection.close();
      }
    } catch (SQLException e) {
      throw new IOException(e.getMessage());
    }
  }

  public void initialize(InputSplit split, TaskAttemptContext context) 
      throws IOException, InterruptedException {
    //do nothing
  }

  /** {@inheritDoc} */
  public LongWritable getCurrentKey() {
    return key;  
  }

  /** {@inheritDoc} */
  public T getCurrentValue() {
    return value;
  }

  /**
   * @deprecated 
   */
  @Deprecated
  public long getPos() throws IOException {
    return pos;
  }

 
  /** {@inheritDoc} */
  public float getProgress() throws IOException {
    return pos / (float)split.getLength();
  }

  /** {@inheritDoc} */
  public boolean nextKeyValue() throws IOException {
    try {
      if (key == null) {
        key = new LongWritable();
      }
      if (value == null) {
        value = createValue();
      }
      if (null == this.results) {
        // First time into this method, run the query.
        this.results = executeQuery(getSelectQuery());
      }
      if (!results.next())
        return false;

      // Set the key field value as the output key value
      key.set(pos + split.getStart());

      value.readFields(results);

      pos ++;
    } catch (SQLException e) {
      throw new IOException("SQLException in nextKeyValue", e);
    }
    return true;
  }

  //... ...

 }

分享到:
评论

相关推荐

    CustomInputFormatCollection:Hadoop Mapreduce InputFormat 集合

    Hadoop 代码使用方式 job.setInputFormatClass(SmallFileCombineTextInputFormat.class);...org.apache.hadoop.mapreduce.sample.SmallFileWordCount -Dmapreduce.input.fileinputformat.split.maxsize=10

    wonderdog:批量加载以进行弹性搜索

    您可以在自己的Hadoop MapReduce作业中使用的 ,可从轻松使用这些InputFormat和OutputFormat类 从 LOAD和STORE到ElasticSearch的 一些用于与ElasticSearch进行交互的 &lt; project&gt; ... &lt; dependencies&gt; &lt; ...

    hadoop-lzo-lib

    编译环境:centos 6.4 64bit、maven 3.3.9、jdk1.7.0_79、lzo-2.09;...解决:hive报错:Cannot create an instance of InputFormat class org.apache.hadoop ....... as specified in mapredwork!

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    MapReduce设计理念与基本架构2.1 Hadoop发展史2.1.1 Hadoop产生背景2.1.2 Apache Hadoop新版本的特性2.1.3 Hadoop版本变迁2.2 Hadoop MapReduce设计目标2.3 MapReduce编程模型概述2.3.1 MapReduce编程模型...

    Hadoop实战中文版.PDF

    413.2.5 Combiner:本地reduce 433.2.6 预定义mapper和Reducer类的单词计数 433.3 读和写 433.3.1 InputFormat 443.3.2 OutputFormat 493.4 小结 50第二部分 实战第4章 编写MapReduce基础程序 524.1...

    自定义MapReduce的InputFormat

    自定义MapReduce的InputFormat,实现提取指定开始与结束限定符的内容。

    Hadoop源码解析---MapReduce之InputFormat

    结合Hadoop源码,详细讲解了MapReduce开发中的InputFormat,很详细。

    Hadoop实战

    1699.3.2 配置集群类型 1699.4 在EC2上运行MapReduce程序 1719.4.1 将代码转移到Hadoop集群上 1719.4.2 访问Hadoop集群上的数据 1729.5 清空和关闭EC2实例 1759.6 Amazon Elastic MapReduce和其他AWS服务 1769.6.1 ...

    Hadoop实战中文版

    书籍目录: 第一部分 Hadoop——一种分布式编程框架 第1章 Hadoop简介 1.1 为什么写《Hadoop 实战》 1.2 什么是Hadoop 1.3 了解分布式系统和Hadoop 1.4 比较SQL 数据库和Hadoop 1.5 理解MapReduce 1.5.1 动手...

    jodconverter-2.2.1.rar

    解决openOffice jodconverter-2.2.1包不能将2007及以上office转为PDF和解决txt乱码问题

    ExcelRecordReaderMapReduce:可以读取Excel文件的MapReduce InputFormat

    ExcelRecordReaderMapReducehadoop mapreduce的MapReduce输入格式以读取Microsoft Excel电子表格执照Apache许可。用法1.下载并运行ant。 2.在您的环境中包括ExcelRecordReaderMapReduce-0.0.1-SNAPSHOT.jar 3.使用...

    Hadoop实战(陆嘉恒)译

    map侧过滤后在reduce侧联结5.3 创建一个Bloom filter5.3.1 Bloom filter做了什么5.3.2 实现一个Bloom filter5.3.3 Hadoop 0.20 以上版本的Bloom filter5.4 温故知新5.5 小结5.6 更多资源第6 章 编程实践6.1 开发...

    mapreduce_training:用于教学目的的MapReduce应用程序集

    MapReduce自定义InputFormat和RecordReader实现 MapReduce自定义OutputFormat和RecordWriter实现 Pig自定义LoadFunc加载和解析Apache HTTP日志事件 Pig的自定义EvalFunc使用MaxMind GEO API将IP地址转换为位置 另外...

    【MapReduce篇03】MapReduce之InputFormat数据输入1

    问题背景:框架默认的TextInputformat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量

    json-mapreduce:可以分割多行JSON的InputFormat

    在您的环境中包含dist/lib/json-mapreduce-1.0.jar 使用MultiLineJsonInputFormat类作为您的 Mapper InputFormat 假设您有一些类似于这样的 JSON: {"menu": { "header": "SVG Viewer", "items": [ {"id": ...

    大数据学习(九):mapreduce编程模型及具体框架实现

    map reduce编程模型把数据运算流程分成2个阶段  阶段1:读取原始数据,形成key-value数据(map方法)  阶段2:将阶段1的key-value数据按照相同key分组聚合(reduce方法) ... 读数据:InputFormat–&gt;TextInputFormat

    wikipedia-hadoop:维基百科 Inputformat 和其他有用的 Hadoop 相关的东西

    维基百科-Hadoop 维基百科输入格式和一些有用的维基百科 Hadoop 工具。用法首先,您必须将WikiInputFormat设置为您的作业 InputFormat: job . setInputFormatClass( WikiInputFormat . class); 您的 Mappers 传入 ...

    Hadoop-CombineFileInputFormat:hadoop CombineFileInputFormat的示例实现

    合并文件InputFormat演示我的...用法hadoop jar CombineFileDemo-0.0.1-SNAPSHOT.jar TestMain &lt;src&gt; &lt;dst&gt;执照版权所有:copyright:2014 Felix Chern 根据Eclipse Public License 1.0版或(可选)任何更高版本分发。

    SequenceFileKeyValueInputFormat:自定义 Hadoop InputFormat

    Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...

Global site tag (gtag.js) - Google Analytics