`
zcwfeng
  • 浏览: 98170 次
  • 性别: Icon_minigender_1
  • 来自: 吉林
社区版块
存档分类
最新评论

Hadoop的Partitioner

阅读更多

MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。我们在中间key上使用分区函数来对数据进行分区,之后再输入到后续任务执行进程。一个缺省的分区函数是使用hash方法(比如,hash(key) mod R)进行分区。hash方法能产生非常平衡的分区。然而,有的时候,其它的一些分区函数对key值进行的分区将非常有用。比如,输出的key值是URLs,我们希望每个主机的所有条目保持在同一个输出文件中。为了支持类似的情况,MapReduce库的用户需要提供专门的分区函数。例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数就可以把所有来自同一个主机的URLs保存在同一个输出文件中。

/**
* to keep sort as global
*
* @author hadoop
*
*/
public static class Partition extends Partitioner<IntWritable, IntWritable> {

@Override
public int getPartition(IntWritable key, IntWritable value,
int numPartitions) {
int MaxNumber = 65223;
int bound = MaxNumber / numPartitions + 1;
int keynumber = key.get();
for (int i = 0; i < numPartitions; i++) {
if (keynumber < bound * i && keynumber >= bound * (i - 1))
return i - 1;
}
return 0;//return -1; (error)
}

}

上面的这段代码是自己实现的简单分区,最大值未输入的一个最大数字。我用1.2.1版本,无论如何,分区返回不能为-1,所以改成0

hadoop的map/reduce中支持对key进行分区,从而让map出来的数据均匀分布在reduce上,当然,有时候由于机器间配置问题,可能不需要数据均匀,这时候也能派上用场。
框架自带了一个默认的分区类,HashPartitioner,先看看这个类,就知道怎么自定义key分区了。
public class HashPartitioner<K, V> extends Partitioner<K, V> {

/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

}
很简单,继承Partitioner即可。
先解释一下这个HashPartitioner做的事情
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
将key均匀分布在ReduceTasks上,举例如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int,string太大的话这个int值可能会溢出变成负数,所以与上Integer.MAX_VALUE(即0111111111111111),然后再对reduce个数取余,这样就可以让key均匀分布在reduce上。
这个简单算法得到的结果可能不均匀,因为key毕竟不会那么线性连续,这时候可以自己写个测试类,计算出最优的hash算法。
PS:hadoop框架本身包含了一些跟hash算法相关的数学之美,比如布隆过滤器(BloomFilter),写好hash函数是关键。


Partitioner

首先需要继承自Partitioner类(在0.19中为Partitioner接口),并重载它的getPartition方法:

[java]
  1. publicstaticclassCatPartitionerextendsPartitioner<Text,Text>{
  2. @Override
  3. publicintgetPartition(Textkey,Textvalue,intnumPartitions){
  4. String[]parts=key.toString().split("-");
  5. if(parts.length==2){
  6. returnMath.abs(parts[0].hashCode())%numPartitions;
  7. }
  8. returnMath.abs(key.toString().hashCode())%numPartitions;
  9. }
  10. }

然后在job配置中设置Partitioner Class:


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics