- 浏览: 144201 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
EclipseEye:
fair_jm 写道不错 蛮详细的 谢谢分享
SWT/JFace专题 --- SWT中Display和多线程 -
fair_jm:
不错 蛮详细的 谢谢分享
SWT/JFace专题 --- SWT中Display和多线程
一、SequenceFileInputFormat及SequenceFileRecordReader
二、SequenceFileAsBinaryInputFormat及SequenceFileAsBinaryRecordReader
三、SequenceFileAsBinaryRecordReader
及SequenceFileAsTextRecordReader
/** An {@link InputFormat} for {@link SequenceFile}s. */ @InterfaceAudience.Public @InterfaceStability.Stable public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> { @Override public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException { return new SequenceFileRecordReader<K,V>(); } @Override protected long getFormatMinSplitSize() { return SequenceFile.SYNC_INTERVAL; } @Override protected List<FileStatus> listStatus(JobContext job )throws IOException { List<FileStatus> files = super.listStatus(job); int len = files.size(); for(int i=0; i < len; ++i) { FileStatus file = files.get(i); if (file.isDirectory()) { // it's a MapFile Path p = file.getPath(); FileSystem fs = p.getFileSystem(job.getConfiguration()); // use the data file files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME))); } } return files; } }
/** An {@link RecordReader} for {@link SequenceFile}s. */ @InterfaceAudience.Public @InterfaceStability.Stable public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> { private SequenceFile.Reader in; private long start; private long end; private boolean more = true; private K key = null; private V value = null; protected Configuration conf; @Override public void initialize(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) split; conf = context.getConfiguration(); Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); this.in = new SequenceFile.Reader(fs, path, conf); this.end = fileSplit.getStart() + fileSplit.getLength(); if (fileSplit.getStart() > in.getPosition()) { in.sync(fileSplit.getStart()); // sync to start } this.start = in.getPosition(); more = start < end; } @Override @SuppressWarnings("unchecked") public boolean nextKeyValue() throws IOException, InterruptedException { if (!more) { return false; } long pos = in.getPosition(); key = (K) in.next(key); if (key == null || (pos >= end && in.syncSeen())) { more = false; key = null; value = null; } else { value = (V) in.getCurrentValue(value); } return more; } @Override public K getCurrentKey() { return key; } @Override public V getCurrentValue() { return value; } /** * Return the progress within the input split * @return 0.0 to 1.0 of the input byte range */ public float getProgress() throws IOException { if (end == start) { return 0.0f; } else { return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start)); } } public synchronized void close() throws IOException { in.close(); } }
二、SequenceFileAsBinaryInputFormat及SequenceFileAsBinaryRecordReader
/** * InputFormat reading keys, values from SequenceFiles in binary (raw) * format. */ @InterfaceAudience.Public @InterfaceStability.Stable public class SequenceFileAsBinaryInputFormat extends SequenceFileInputFormat<BytesWritable,BytesWritable> { public SequenceFileAsBinaryInputFormat() { super(); } public RecordReader<BytesWritable,BytesWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { return new SequenceFileAsBinaryRecordReader(); } /** * Read records from a SequenceFile as binary (raw) bytes. */ public static class SequenceFileAsBinaryRecordReader extends RecordReader<BytesWritable,BytesWritable> { private SequenceFile.Reader in; private long start; private long end; private boolean done = false; private DataOutputBuffer buffer = new DataOutputBuffer(); private SequenceFile.ValueBytes vbytes; private BytesWritable key = null; private BytesWritable value = null; public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { Path path = ((FileSplit)split).getPath(); Configuration conf = context.getConfiguration(); FileSystem fs = path.getFileSystem(conf); this.in = new SequenceFile.Reader(fs, path, conf); this.end = ((FileSplit)split).getStart() + split.getLength(); if (((FileSplit)split).getStart() > in.getPosition()) { in.sync(((FileSplit)split).getStart()); // sync to start } this.start = in.getPosition(); vbytes = in.createValueBytes(); done = start >= end; } @Override public BytesWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } /** * Retrieve the name of the key class for this SequenceFile. * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName */ public String getKeyClassName() { return in.getKeyClassName(); } /** * Retrieve the name of the value class for this SequenceFile. * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName */ public String getValueClassName() { return in.getValueClassName(); } /** * Read raw bytes from a SequenceFile. */ public synchronized boolean nextKeyValue() throws IOException, InterruptedException { if (done) { return false; } long pos = in.getPosition(); boolean eof = -1 == in.nextRawKey(buffer); if (!eof) { if (key == null) { key = new BytesWritable(); } if (value == null) { value = new BytesWritable(); } key.set(buffer.getData(), 0, buffer.getLength()); buffer.reset(); in.nextRawValue(vbytes); vbytes.writeUncompressedBytes(buffer); value.set(buffer.getData(), 0, buffer.getLength()); buffer.reset(); } return !(done = (eof || (pos >= end && in.syncSeen()))); } public void close() throws IOException { in.close(); } /** * Return the progress within the input split * @return 0.0 to 1.0 of the input byte range */ public float getProgress() throws IOException, InterruptedException { if (end == start) { return 0.0f; } else { return Math.min(1.0f, (float)((in.getPosition() - start) / (double)(end - start))); } } } }
三、SequenceFileAsBinaryRecordReader
及SequenceFileAsTextRecordReader
/** * This class is similar to SequenceFileInputFormat, except it generates * SequenceFileAsTextRecordReader which converts the input keys and values * to their String forms by calling toString() method. */ @InterfaceAudience.Public @InterfaceStability.Stable public class SequenceFileAsTextInputFormat extends SequenceFileInputFormat<Text, Text> { public SequenceFileAsTextInputFormat() { super(); } public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { context.setStatus(split.toString()); return new SequenceFileAsTextRecordReader(); } }
/** * This class converts the input keys and values to their String forms by * calling toString() method. This class to SequenceFileAsTextInputFormat * class is as LineRecordReader class to TextInputFormat class. */ @InterfaceAudience.Public @InterfaceStability.Stable public class SequenceFileAsTextRecordReader extends RecordReader<Text, Text> { private final SequenceFileRecordReader<WritableComparable<?>, Writable> sequenceFileRecordReader; private Text key; private Text value; public SequenceFileAsTextRecordReader() throws IOException { sequenceFileRecordReader = new SequenceFileRecordReader<WritableComparable<?>, Writable>(); } public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { sequenceFileRecordReader.initialize(split, context); } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } /** Read key/value pair in a line. */ public synchronized boolean nextKeyValue() throws IOException, InterruptedException { if (!sequenceFileRecordReader.nextKeyValue()) { return false; } if (key == null) { key = new Text(); } if (value == null) { value = new Text(); } key.set(sequenceFileRecordReader.getCurrentKey().toString()); value.set(sequenceFileRecordReader.getCurrentValue().toString()); return true; } public float getProgress() throws IOException, InterruptedException { return sequenceFileRecordReader.getProgress(); } public synchronized void close() throws IOException { sequenceFileRecordReader.close(); } }
发表评论
-
数据迁移相关(关系型数据库mysql,oracle和nosql数据库如hbase)
2015-04-01 15:15 720HBase数据迁移(1) http://www.importn ... -
zookeeper适用场景:如何竞选Master及代码实现
2015-04-01 14:53 767zookeeper适用场景:如何竞选Master及代码实现 h ... -
MR/hive 数据去重
2015-04-01 14:43 708海量数据去重的五大策略 http://www.ciotimes ... -
面试牛x题
2015-03-18 23:50 0hive、mr(各需三道) 1.分别使用Hadoop MapR ... -
使用shell并发上传文件到hdfs
2015-03-16 21:41 1225使用shell并发上传文件到hdfs http://mos19 ... -
hadoop集群监控工具Apache Ambari
2015-03-14 17:27 0Apache Ambari官网 http://ambari.a ... -
Hadoop MapReduce优化相关
2015-03-16 21:46 449[大牛翻译系列]Hadoop 翻译文章索引 http://ww ... -
数据倾斜问题 牛逼(1)数据倾斜之MapReduce&hive
2015-03-16 21:43 776数据倾斜总结 http://www.alidata.org/a ... -
MapReduce牛逼(4)WritableComparable接口
2015-03-12 08:57 575@Public @Stable A Writable whi ... -
MapReduce牛逼(3)(继承WritableComparable)实现自定义key键,实现二重排序
2015-03-12 08:57 620package sort; import jav ... -
MapReduce牛逼(2)MR简单实现 导入数据到hbase例子
2015-03-12 08:57 1244package cmd; /** * MapRe ... -
MapReduce牛逼(1)MR单词计数例子
2015-03-11 00:44 1180package cmd; import org. ... -
InputFormat牛逼(8)FileInputFormat实现类之TextInputFormat
2015-03-11 00:19 555/** An {@link InputFormat} for ... -
InputFormat牛逼(6)org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
2015-03-11 00:11 646@Public @Evolving A RecordRead ... -
InputFormat牛逼(5)org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>
2015-03-10 23:10 577@Public @Stable A InputFormat ... -
InputFormat牛逼(4)org.apache.hadoop.mapreduce.RecordReader<KEYIN, VALUEIN>
2015-03-10 22:50 341@Public @Stable The record rea ... -
InputFormat牛逼(3)org.apache.hadoop.mapreduce.InputFormat<K, V>
2015-03-10 22:46 627@Public @Stable InputFormat d ... -
InputFormat牛逼(2)org.apache.hadoop.mapreduce.InputSplit & DBInputSplit
2015-03-10 22:22 503@Public @Stable InputSplit rep ... -
InputFormat牛逼(1)org.apache.hadoop.mapreduce.lib.db.DBWritable
2015-03-10 22:07 524@Public @Stable Objects that a ... -
如何把hadoop2 的job作业 提交到 yarn平台
2015-01-08 21:09 0aaa萨芬撒点
相关推荐
自定义MapReduce的InputFormat,实现提取指定开始与结束限定符的内容。
结合Hadoop源码,详细讲解了MapReduce开发中的InputFormat,很详细。
自定义inputFormat&&outputFormat1
hive inputformat实例代码,按照空格对日志文件进行拆分
Hadoop 代码使用方式 job.setInputFormatClass(SmallFileCombineTextInputFormat.class); 当前Maven提加如下依赖 讲无法从reposity中找到直接jar,需手动编译下载...-Dmapreduce.input.fileinputformat.split.maxsize=10
问题背景:框架默认的TextInputformat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量
Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...
目前似乎没有任何可以支持多行 JSON 的 JSON InputFormat 类。 执照 Apache 许可。 用法 要开始,只需: 下载并运行ant 在您的环境中包含dist/lib/json-mapreduce-1.0.jar 使用MultiLineJsonInputFormat类作为您...
3.2 InputFormat 数据输入 3.2.1 Job 提交流程和切片源码详解 3.2.2 FileInputFormat 切片机制
##Couchbase InputFormat 提供什么? 在与 Couchbase Sqoop 连接器搏斗时,发现了一些错误,使其无法与 CDH3 版本正常工作。 从 Couchbase 中提取键/值的实际 InputFormat 存在于 Sqoop 连接器的基于代码中,但对 ...
MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系...InputFormat接口的设计与实现3.3.3 OutputFormat接口的设计与实现3.3.4 Mapper与Reducer解析3.3.5 Partitioner接口的设计与实现3.4 非...
使用 XML InputFormat 映射 Reduce。 这是一段代码,用于清理 Wiki XML 数据集并将其转换为带分隔符的文本。 从维基百科档案中提取电影数据进行分析。 提供了 Sample.xml。 如果您的 XML 结构发生变化,请查看 ...
用法首先,您必须将WikiInputFormat设置为您的作业 InputFormat: job . setInputFormatClass( WikiInputFormat . class); 您的 Mappers 传入 Key 和 Value 需要来自LongWritable和WikiRevisionWritable类型。
linux和windows安装openOffice java通过jodconverter 将excel、doc文件转成pdf或html,比2.2.1版本相比 提供office 2007版本支持
映射文件输入格式MapFiles 的 Hadoop InputFormat,它在将任何内容传递给映射器之前过滤不相关的 FileSplits。目的假设您的文件系统中有一些带有排序键的非常大的文件,并且键已排序。 在编写 MapReduce 作业时,您...
使用ExcelInputFormat类作为Mapper的输入格式。 检查src / test / resource / test.xls以查看演示文件。 返回的键是从零开始的文件偏移量,值是单行的所有列值。 不支持Zip文件执行任务为> hadoop jar ...
解决openOffice jodconverter-2.2.1包不能将2007及以上office转为PDF和解决txt乱码问题
date = datetime(data.Date, 'InputFormat', 'yyyy/MM/dd'); openPrice = data.Open; closePrice = data.Close; 长期和短期价格趋势。 移动平均线、波动率等。 交易量与价格之间的关系。 大额交易对价格的影响。
FocusBigData :elephant:Hadoop分布存储框架 Hadoop篇 HDFS篇 ...MapReduce之InputFormat数据输入 MapReduce之OutputFormat数据输出 MapReduce之Shuffle机制 MapReduce之MapJoin和ReduceJoin MapReduce之