2008-05-20

nutch源代码分析之Generator

关键字: generator, nutch, mapreduce, hadoop
MapReduce1:选择要获取的urls
  • 输入:爬虫数据库文件
  •   public Path generate(...) {
      ...
        job.setInputPath(new Path(dbDir, CrawlDb.CURRENT_NAME));
        job.setInputFormat(SequenceFileInputFormat.class);
      }
    

  • Map() -> 如果date <= now, 反转成<CrawlDatum, url>

  •   /** Selects entries due for fetch. */
      public static class Selector implements Mapper ...{
    
        private SelectorEntry entry = new SelectorEntry();
       
        /** Select & invert subset due for fetch. */
        public void map(WritableComparable key, Writable value,
                        OutputCollector output, Reporter reporter)
          throws IOException {
          Text url = (Text)key;
          ...
          CrawlDatum crawlDatum = (CrawlDatum)value;
    
          if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_GONE ||
              crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM)
            return;                                   // don't retry
    
          if (crawlDatum.getFetchTime() > curTime)
            return;                                   // not time yet
    
          LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
          if (oldGenTime != null) { // awaiting fetch & update
            if (oldGenTime.get() + genDelay > curTime) // still wait for update
              return;
          }
          ...
          // record generation time
          crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
          entry.datum = crawlDatum;
          entry.url = (Text)key;
          output.collect(sortValue, entry);          // invert for sort by score
        }
      }
    
  • 以随机整数为种子, 用hash函数来划分数据块
  • 
    
      /**
       * Generate fetchlists in a segment.
       * @return Path to generated segment or null if no entries were selected.
       * */
      public Path generate(...) {
      ...
      job.setInt("partition.url.by.host.seed", new Random().nextInt());
      }
    
      public static class Selector implements Mapper, Partitioner, Reducer {
    
        private Partitioner hostPartitioner = new PartitionUrlByHost();
        ...
        /** Partition by host. */
        public int getPartition(WritableComparable key, Writable value,
                                int numReduceTasks) {
          return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
                                              numReduceTasks);
        }
        ...
      }
    
    
    
    /** Partition urls by hostname. */
    public class PartitionUrlByHost implements Partitioner {
    
      private int seed;
      ...
    
      public void configure(JobConf job) {
        seed = job.getInt("partition.url.by.host.seed", 0);
        ...
      }
    
      /** Hash by hostname. */
      public int getPartition(WritableComparable key, Writable value,
                              int numReduceTasks) {
      ...
        int hashCode = (url==null ? urlString : url.getHost()).hashCode();
    
        // make hosts wind up in different partitions on different runs
        hashCode ^= seed;
    
        return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
      }
    }
    
    
  • Reduce()是同一化
  • 以CrawlDatum.linkCount降序排序
  • 输出链接数最多的N个CrawlDatum实体


MapReduce2:准备获取
  • Map()是反向;Partition()根据主机划分;Reduce()是同一化
  • Reduce: 合并CrawlDatum成单个入口
  • 输出: <url,CrawlDatum>文件集,用来并行地获取
评论
发表评论

您还没有登录,请登录后发表评论

coderplay
搜索本博客
存档
最新评论