Source code for crawler.tasks

"""
crawler.tasks
=============

This module contains integration tasks for synchronising this DB with the metadata used in the rest of the discovery layer.

.. autofunction:: sync_from_crawler

.. autofunction:: sync_updates_from_crawler

"""
from celery import shared_task
from django.db import connection
from .models import WebDocument
from metadata.tasks import insert_resource_from_row
from metadata.tasks import update_resource_from_row
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

@shared_task
[docs]def sync_from_crawler(limit=None): """dispatch metadata.Resource inserts for **new** crawler.WebDocuments""" DEFAULT_LIMIT = 12000 # 2/3 of throughput given 3 minute beat if limit is None: limit = DEFAULT_LIMIT if type(limit) != type(9): try: limit = int(limit) except: limit = DEFAULT_LIMIT logger.debug('sync_from_crawler: limit={0}'.format(limit,)) raw_sql = ''' select url, hash, protocol, "contentType", host, port, path, "lastFetchDateTime" from "webDocuments" where "fetchStatus" = 'downloaded' and url not in ( select url from metadata_resource ) LIMIT %d''' % limit logger.debug(raw_sql) cursor = connection.cursor() cursor.execute(raw_sql) for row in cursor: row = list(row) row[7] = row[7].isoformat() logger.debug('sync_from_crawler: dispatching {0}'.format(row,)) insert_resource_from_row.delay(row)
@shared_task def delete_metadata_when_missing(limit=None): """delete from metadata.Resource when the crawler finds it missing per #76... delete if if metadata.url == webDocument.url AND: (metadata.hash is not null AND webDocument.hash is null) OR (webDocument.httpCode != (2xx or 3xx)) OR (fetchStatus == ) "failed", "notfound", "redirected" sh """ DEFAULT_LIMIT = 1000 if limit is None: limit = DEFAULT_LIMIT if type(limit) != type(9): try: limit = int(limit) except: limit = DEFAULT_LIMIT raw_sql = ''' select wd.url from "webDocuments" as wd where wd."fetchStatus" in ("failed", "notfound", "redirected") and wd.url in (select url from metadata_resource) LIMIT %d''' % limit cursor = connection.cursor() cursor.execute(raw_sql) for row in cursor: delete_resource_with_url.delay(row[0]) @shared_task
[docs]def sync_updates_from_crawler(limit=None): """dispatch metadata.Resource updates for **changed** crawler.WebDocuments""" DEFAULT_LIMIT = 1000 if limit is None: limit = DEFAULT_LIMIT if type(limit) != type(9): try: limit = int(limit) except: limit = DEFAULT_LIMIT raw_sql = ''' select wd.url, wd.hash, wd.protocol, wd."contentType", wd.host, wd.port, wd.path, wd."lastFetchDateTime" from "webDocuments" as wd, metadata_resource as mr where wd."fetchStatus" = 'downloaded' and wd.url = mr.url and wd.hash != mr.hash LIMIT %d''' % limit cursor = connection.cursor() cursor.execute(raw_sql) for row in cursor: row=list(row) row[7] = row[7].isoformat() update_resource_from_row.delay(row)