2008-05-20
nutch源代码分析之Generator
关键字: generator, nutch, mapreduce, hadoop
MapReduce1:选择要获取的urls
Map() -> 如果date <= now, 反转成<CrawlDatum, url>
以随机整数为种子, 用hash函数来划分数据块
Reduce()是同一化
以CrawlDatum.linkCount降序排序
输出链接数最多的N个CrawlDatum实体
MapReduce2:准备获取
- 输入:爬虫数据库文件
public Path generate(...) {
...
job.setInputPath(new Path(dbDir, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
}
/** Selects entries due for fetch. */
public static class Selector implements Mapper ...{
private SelectorEntry entry = new SelectorEntry();
/** Select & invert subset due for fetch. */
public void map(WritableComparable key, Writable value,
OutputCollector output, Reporter reporter)
throws IOException {
Text url = (Text)key;
...
CrawlDatum crawlDatum = (CrawlDatum)value;
if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_GONE ||
crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM)
return; // don't retry
if (crawlDatum.getFetchTime() > curTime)
return; // not time yet
LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
if (oldGenTime != null) { // awaiting fetch & update
if (oldGenTime.get() + genDelay > curTime) // still wait for update
return;
}
...
// record generation time
crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
entry.datum = crawlDatum;
entry.url = (Text)key;
output.collect(sortValue, entry); // invert for sort by score
}
}
/**
* Generate fetchlists in a segment.
* @return Path to generated segment or null if no entries were selected.
* */
public Path generate(...) {
...
job.setInt("partition.url.by.host.seed", new Random().nextInt());
}
public static class Selector implements Mapper, Partitioner, Reducer {
private Partitioner hostPartitioner = new PartitionUrlByHost();
...
/** Partition by host. */
public int getPartition(WritableComparable key, Writable value,
int numReduceTasks) {
return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
numReduceTasks);
}
...
}
/** Partition urls by hostname. */
public class PartitionUrlByHost implements Partitioner {
private int seed;
...
public void configure(JobConf job) {
seed = job.getInt("partition.url.by.host.seed", 0);
...
}
/** Hash by hostname. */
public int getPartition(WritableComparable key, Writable value,
int numReduceTasks) {
...
int hashCode = (url==null ? urlString : url.getHost()).hashCode();
// make hosts wind up in different partitions on different runs
hashCode ^= seed;
return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
}
}
MapReduce2:准备获取
- Map()是反向;Partition()根据主机划分;Reduce()是同一化
- Reduce: 合并CrawlDatum成单个入口
- 输出: <url,CrawlDatum>文件集,用来并行地获取
- 03:33
- 浏览 (275)
- 评论 (0)
- 分类: lucene&nutch
- 发布在 lucene爱好者 圈子
- 相关推荐
发表评论
- 浏览: 21363 次
- 性别:

- 来自: 广州

- 详细资料
搜索本博客
最新评论
-
lucene2.3.2与2.2.0建索 ...
泡坛看到有人说已经在这上面开发中文分词了吧?
-- by Arbow -
lucene2.3.2与2.2.0建索 ...
没有嘞,谢谢阿宝同学,哈哈~~ 这个看features貌似很牛的说。不过要用它还 ...
-- by coderplay -
lucene2.3.2与2.2.0建索 ...
aol同学有没有试用过 Sphinx 的索引?据说这个项目的性能比lucene高 ...
-- by Arbow -
ejabberd在linux平台的安 ...
# erl -pa /var/lib/ejabberd/ebin \ # ...
-- by wenew -
ejabberd在linux平台的安 ...
按照我的经历:outrace:需要修改你的hosts表试试。coderplay: ...
-- by eric.l






评论排行榜