扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章主要讲解了“Hadoop序列化怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Hadoop序列化怎么实现”吧!
创新互联建站坚持“要么做到,要么别承诺”的工作理念,服务领域包括:网站建设、网站制作、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的永春网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!
Hdfs: % hadoop fs -cat hdfs://namenode/data/a.txt
LocalFS: % hadoop fs -cat file:///tmp/a.txt
generate crc check sum file
%hadoop fs -copyToLocal -crc /data/a.txt file:///data/a.txt
check sum file: .a.txt.crc is a hidden file.
Ref: CRC-32,循环冗余校验算法,error-detecting.
io.bytes.per.checksum is deprecated, it's dfs.bytes-per-checksum, default is 512, Must not be larger than dfs.stream-buffer-size,which is the size of buffer to stream files. The size of this buffer should probably be a multiple of hardware page size (4096 on Intel x86), and it determines how much data is buffered during read and write operations.
常用算法
读书时,hadoop支持四种压缩算法,如果调解空间和效率的话,-1 ~ -9,代表从最优速度到最优空间. 压缩算法支持在org.apache.hadoop.io.compress.*.
deflate(.deflate), 就是常用的gzip, package ..DefaultCodec
Gzip(.gz),在deflate格式加了文件头和尾. 压缩速度(适中),解压速度(适中),压缩效率(适中),package ..GzipCodec, both of java and native
bzip2(.bz2), 压缩速度(最差),< 解压速度(最差),压缩效率 (最好),特点是支持可切分(splitable),对map-red非常友好。,package ..BZip2Codec,java only
LZO(.lzo), 压缩速度(最快),解压速度(最快),压缩效率(最差),,package com.hadoop.compressiojn.lzo.lzopCodec, native only
如果禁用原生库,使用hadoop.native.lib.
如果使用原生库,可能对象创建的成本较高,所以可以使用CodecPool,重复使用这些对象。
对于一个非常大的数据文件,存储如下方案:
使用支持切分的bzip2
手动切分,并使压缩后的part接近于block size.
使用Sequence File, 它支持压缩和切分
使用Avro数据文件,它也支持压缩和切分,而且增加了很多编程语言的可读写性。
如果Map-Red的output自动压缩:
conf.setBoolean ("mared.output.compress",true); conf.setClass("mapred.output.compression.codec",GzipCodec.class,CompressionCodec.class);
如果Map-Red的中间结果的自动压缩:
//or conf.setCompressMapOutput(true); conf.setBoolean ("mared.compress.map.output",true); //or conf.setMapOutputComressorClass(GzipCodec.class) conf.setClass("mapred.map.output.compression.codec",GzipCodec.class,CompressionCodec.class);
// core class for hadoop public interface Writable{ void write(DataOutput out) throw IOException; void readFields(DataInput in) throw IOException; } public interface Comparable{ int compareTo(T o); } //core class for map-reduce shuffle public interface WritableComparable extends Writable, Comparable { } // Sample public class MyWritableComparable implements WritableComparable { // Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public int compareTo(MyWritableComparable o) { int thisValue = this.value; int thatValue = o.value; return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); } public int hashCode() { final int prime = 31; int result = 1; result = prime * result + counter; result = prime * result + (int) (timestamp ^ (timestamp >>> 32)); return result } } //optimize for stream comparasion public interface RawComparator extends Comparator { // s1 start position, l1, length of bytes public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2); } public class WritableComparator implements RawComparator{ }
WritableComparator 提供了原始compator的compare反序列化对象的实现,性能较差。不过它作为RawComparator实例的工厂:
RawComparator
// 注册一个经过优化的比较算子。Register an optimized comparator for a WritableComparable implementation.
static void define(Class c, WritableComparator comparator);
// 获得一个WritableComparable的比较算子. Get a comparator for a WritableComparable implementation.
static WritableComparator get(Class extends WritableComparable> c);
public MyWritableComparator extends WritableComparator{ static{ define(MyWritableComparable.class, new MyWritableComparator()); } public MyWritableComparator { super(MyWritableComparable.class); } @Override public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2){ } }
注: 要使static initializer被调用,除非有该类的实例被创建,或某静态方法或成员被访问。或者直接强制,代码如:
Class.forName("package.yourclass"); 它会强制初始化静态initializer.
BooleanWritable, 1
ByteWritable, 1,
BytesWritable,
IntWritable,4
VIntWritable,1~5
FloatWritable,4,
LongWritable,8,
VLongWritable,1~9
DoubleWritable,8
NullWritable,Immutable singletone.
Text,4~
MD5Hash,
ObjectWritable,
GenericWritable
ArrayWritable
TwoDArrayWritable
AbstractMapWritable
MapWritable
SortedMapWritable
值得一提的是Text的序列化方式是Zero-compressed encoding,这个看过一些资料,其实是一种编码方式,意图是省略掉高位0所占用的空间,对于小数,它能节省空间,对于大数会额外占用空间。相比压缩,它能比较快速。其实类似于VIntWritable, VLongWritable的编码方式。
- 如何选择变长和定长数值呢?
1. 定长适合分布非常均匀的数值(如hash),变长适合分布非常不均匀的数值。
2. 变长可以节省空间,而且可以在VIntWritable 和VLongWritable之间转换。
- Text和String的区别
1。String是char序列,Text是UTF-8的byte序列.
UTF-8类不能对字符串大于32767的进行utf-8编码。
(Indexing)索引:对于ASCII来说, Text和String是一样的, 对于Unicode就不同了。String类的长度是其所含char编码单元的长度,然而Text是UTF-8的字节码的长度。CodePointAt表示一个真正的Unicode字符,它可以是2char,4bytes的unicode。
Iteration(迭代): 将Text转换ByteBuffer,然后反复调用bytesToCodePoint()静态方法,可以取到整型的Unicode.
Mutable(易变性): 可以set,类似writable 和StringBuffer,getLength()返回有效字串长度,getbytes().length,返回空间大小。
这是二进制数组的封装,类似于windows下的BSTR,都是前面一个整型表示字节长度,后面是字节的二进制流。
它也是mutable,getLength() != getBytes().length
NullWritable是Writable的一个特殊类型。它的序列化长度为0,其实只是一个占位符,既不读入,也不写出。只是存在于程序体中。
Immutable,是一个singleton。
ObjectWritable是Java的Array, String, 以及Primitive类型的通用封装 (注:不包含Integer)。它的序列化则使用java的类型序列化,写入类型信息等,比较占用空间。
通过两个特殊的构造:
public ObjectWritable(Object instance);
public ObjectWritable(Class declaredClass,Object instance);
举例子:
ObjectWritable objectw = new ObjectWritable(int.class,5);
首先这是一个抽象类,需要被具象化才能使用。
观察下面这个实列,它以一种Union方式,显示的代理一个Writable实例,解决了Reduce函数的参数声明问题。
public class MyGenericWritable extends GenericWritable { private static Class extends Writable>[] CLASSES = null; static { CLASSES = (Class extends Writable>[]) new Class[] { IntWritable.class, Text.class //add as many different Writable class as you want }; } @Override protected Class extends Writable>[] getTypes() { return CLASSES; } @Override public String toString() { return "MyGenericWritable [getTypes()=" + Arrays.toString(getTypes()) + "]"; } // override hashcode(); } public class Reduce extends Reducer{ public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { }
ArrayWritable aw = new ArrayWriable(Text.class);
实现了java.util.Map
它的serialize, 使用先写map
集合小结:
1. 如果是单类型的列表,使用ArrayWritable就足够了
2。如果是把不同类型的Writable存储在一个列表中:
-- 可以使用GenerickWritable,把元素封装在一个ArrayWritable,这个貌似只能同一类型。
public class MyGenericWritable extends GenericWritable { private static Class extends Writable>[] CLASSES = null; static { CLASSES = (Class extends Writable>[]) new Class[] { ArrayWritable.class, //add as many different Writable class as you want }; } @Override protected Class extends Writable>[] getTypes() { return CLASSES; }
-- 可以使用写一个仿照MapWritable的ListWritable
//注意实现hashcode,equals,toString, comparTo (if possible)
//hashcode尤其重要,HashPartitioner通常用hashcode来选择reduce分区,所以为你的类写一个比较好的hashcode非常必要。
public class ListWritable extends ArrayList
}
/** * @author cloudera * */ public class ListWritable extends ArrayListimplements Writable { private List list = new ArrayList (); public void set(Writable writable){ list.add(writable); } @Override public void readFields(DataInput in) throws IOException { int nsize = in.readInt(); Configuration conf = new Configuration(); Text className = new Text(); while(nsize-->0){ Class theClass = null; try { className.readFields(in); theClass = Class.forName(className.toString()); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } Writable w = (Writable)ReflectionUtils.newInstance(theClass,conf); w.readFields(in); add(w); } } @Override public void write(DataOutput out) throws IOException { Writable w = null; out.writeInt(size()); for(int i = 0;i 感谢各位的阅读,以上就是“Hadoop序列化怎么实现”的内容了,经过本文的学习后,相信大家对Hadoop序列化怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
标题名称:Hadoop序列化怎么实现
网站URL:http://kswjz.com/article/pphhgi.html
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流