Skip to content

Commit

Permalink
First implementation
Browse files Browse the repository at this point in the history
handle exception
use O(N) instead of O(NlogN)
here we have request as struct
additional check for meptiness
small performance improvement
do not consume another request
test number of responses
mark requests
back to 3 slots test case
raise exceptions in case of missed meta
add marks to requests and work only with your own requests
only disk queue should obtain signals
separate functions for slot rasd/write
use signlas without variable
stop crawler
get signals in correct place
logic test for download-aware priority queue
update comment for structure
ensure text type
transform slot name to path
use implicit structure
use unicode type implicitly
use real crawler
add signals
more slot accounting
simple implementation of pop
small slot accounting code
no need for custom len function
ability to call super in py27
add slots
generic tests for downloader aware queue
dummy implementation of crawler aware priority queue
move common logic to base class
rename class
pass crawler to pqclass constructor
do not copy quelib.PriorityQueue code
add comment about new class
remove obsolete function
modify behaviour of queuelib.PriorityQueue to dodge very complex priority
better way to get name
remove obsolete commentary
check boundaries
function for priority convertion with known limits
correct import path
move file
do not switch on by deffault as ip concurrency not supported
set scheduler slot in case of empty slot
use constant
single place for added urls
single place for constants
use as default queue
correct format for error text
test migration from old version with on disk queue
in these tests we have only two inflection points - jobdir and priority_queue_cls
we do not need separate mock spider, use usual one
do not rely on order of dict elements, imply order of list
test round robiness of priority queue
add comments and requirements for our magick function
remove debug logging
put queues into slot
as we fabricate priorities we do not need special types anymore
fabricate priority for priority queue
more versatile priorities
Scheduler class is not inflection point
wrap correct types
check for emptinees before initialization
tests for new priority queue
correct default type for startprios
use exact values
put common settings to base class
test priorities for disk scheduler
test dequeue for disk scheduler
test length for disk scheduler
setUp/tearDown methods for on disk schedulers
new methods
remove excessive line
base class to handle scheduler creation
correct method names
test priorities
deque test
close scheduler on test end
enqueue some requests
test template for scheduler
use downloader slot
I/O implementation for RoundRobin queue
round-robin implementation without I/O and slot detection
wrappers for every disk queue class
  • Loading branch information
whalebot-helmsman committed Mar 22, 2019
1 parent a25cf5c commit 821f5bb
Show file tree
Hide file tree
Showing 5 changed files with 578 additions and 23 deletions.
8 changes: 5 additions & 3 deletions scrapy/core/downloader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def _get_concurrency_delay(concurrency, spider, settings):

class Downloader(object):

DOWNLOAD_SLOT = 'download_slot'

def __init__(self, crawler):
self.settings = crawler.settings
self.signals = crawler.signals
Expand Down Expand Up @@ -111,8 +113,8 @@ def _get_slot(self, request, spider):
return key, self.slots[key]

def _get_slot_key(self, request, spider):
if 'download_slot' in request.meta:
return request.meta['download_slot']
if self.DOWNLOAD_SLOT in request.meta:
return request.meta[self.DOWNLOAD_SLOT]

key = urlparse_cached(request).hostname or ''
if self.ip_concurrency:
Expand All @@ -122,7 +124,7 @@ def _get_slot_key(self, request, spider):

def _enqueue_request(self, request, spider):
key, slot = self._get_slot(request, spider)
request.meta['download_slot'] = key
request.meta[self.DOWNLOAD_SLOT] = key

def _deactivate(response):
slot.active.remove(request)
Expand Down
15 changes: 0 additions & 15 deletions scrapy/core/queues.py

This file was deleted.

17 changes: 12 additions & 5 deletions scrapy/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
class Scheduler(object):

def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,
logunser=False, stats=None, pqclass=None):
logunser=False, stats=None, pqclass=None, crawler=None):
self.df = dupefilter
self.dqdir = self._dqdir(jobdir)
self.pqclass = pqclass
self.dqclass = dqclass
self.mqclass = mqclass
self.logunser = logunser
self.stats = stats
self.crawler = crawler

@classmethod
def from_crawler(cls, crawler):
Expand All @@ -32,14 +33,15 @@ def from_crawler(cls, crawler):
mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])
logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS', settings.getbool('SCHEDULER_DEBUG'))
return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,
stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass)
stats=crawler.stats, pqclass=pqclass, dqclass=dqclass,
mqclass=mqclass, crawler=crawler)

def has_pending_requests(self):
return len(self) > 0

def open(self, spider):
self.spider = spider
self.mqs = self.pqclass(self._newmq)
self.mqs = create_instance(self.pqclass, None, self.crawler, self._newmq)
self.dqs = self._dq() if self.dqdir else None
return self.df.open()

Expand Down Expand Up @@ -111,7 +113,7 @@ def _newmq(self, priority):
return self.mqclass()

def _newdq(self, priority):
return self.dqclass(join(self.dqdir, 'p%s' % priority))
return self.dqclass(join(self.dqdir, 'p%s' % (priority, )))

def _dq(self):
activef = join(self.dqdir, 'active.json')
Expand All @@ -120,7 +122,12 @@ def _dq(self):
prios = json.load(f)
else:
prios = ()
q = self.pqclass(self._newdq, startprios=prios)

q = create_instance(self.pqclass,
None,
self.crawler,
self._newdq,
startprios=prios)
if q:
logger.info("Resuming crawl (%(queuesize)d requests scheduled)",
{'queuesize': len(q)}, extra={'spider': self.spider})
Expand Down
246 changes: 246 additions & 0 deletions scrapy/pqueues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
from collections import deque
import hashlib
import logging
from six import text_type
from six.moves.urllib.parse import urlparse

from queuelib import PriorityQueue

from scrapy.core.downloader import Downloader
from scrapy.http import Request
from scrapy.signals import request_reached_downloader, response_downloaded


logger = logging.getLogger(__name__)


SCHEDULER_SLOT_META_KEY = Downloader.DOWNLOAD_SLOT


def _get_from_request(request, key, default=None):
if isinstance(request, dict):
return request.get(key, default)

if isinstance(request, Request):
return getattr(request, key, default)

raise ValueError('Bad type of request "%s"' % (request.__class__, ))


def _scheduler_slot_read(request, default=None):
meta = _get_from_request(request, 'meta', dict())
slot = meta.get(SCHEDULER_SLOT_META_KEY, default)
return slot


def _scheduler_slot_write(request, slot):
meta = _get_from_request(request, 'meta', None)
if not isinstance(meta, dict):
raise ValueError('No meta attribute in %s' % (request, ))
meta[SCHEDULER_SLOT_META_KEY] = slot


def _scheduler_slot(request):

slot = _scheduler_slot_read(request, None)
if slot is None:
url = _get_from_request(request, 'url')
slot = urlparse(url).hostname or ''
_scheduler_slot_write(request, slot)

return slot


def _pathable(x):
pathable_slot = "".join([c if c.isalnum() or c in '-._' else '_' for c in x])

"""
as we replace some letters we can get collision for different slots
add we add unique part
"""
unique_slot = hashlib.md5(x.encode('utf8')).hexdigest()

return '-'.join([pathable_slot, unique_slot])


class PrioritySlot:
__slots__ = ('priority', 'slot')

def __init__(self, priority=0, slot=None):
self.priority = priority
self.slot = slot

def __hash__(self):
return hash((self.priority, self.slot))

def __eq__(self, other):
return (self.priority, self.slot) == (other.priority, other.slot)

def __lt__(self, other):
return (self.priority, self.slot) < (other.priority, other.slot)

def __str__(self):
return '_'.join([text_type(self.priority), _pathable(text_type(self.slot))])


class PriorityAsTupleQueue(PriorityQueue):
"""
Python structures is not directly (de)serialized (to)from json.
We need this modified queue to transform custom structure (from)to
json serializable structures
"""
def __init__(self, qfactory, startprios=()):

super(PriorityAsTupleQueue, self).__init__(
qfactory,
[PrioritySlot(priority=p[0], slot=p[1]) for p in startprios]
)

def close(self):
startprios = super(PriorityAsTupleQueue, self).close()
return [(s.priority, s.slot) for s in startprios]

def is_empty(self):
return not self.queues or len(self) == 0


class SlotBasedPriorityQueue(object):

def __init__(self, qfactory, startprios={}):
self.pqueues = dict() # slot -> priority queue
self.qfactory = qfactory # factory for creating new internal queues

if not startprios:
return

if not isinstance(startprios, dict):
raise ValueError("Looks like your priorities file malforfemed. "
"Possible reason: You run scrapy with previous "
"version. Interrupted it. Updated scrapy. And "
"run again.")

for slot, prios in startprios.items():
self.pqueues[slot] = PriorityAsTupleQueue(self.qfactory, prios)

def pop_slot(self, slot):
queue = self.pqueues[slot]
request = queue.pop()
is_empty = queue.is_empty()
if is_empty:
del self.pqueues[slot]

return request, is_empty

def push_slot(self, request, priority):
slot = _scheduler_slot(request)
is_new = False
if slot not in self.pqueues:
is_new = True
self.pqueues[slot] = PriorityAsTupleQueue(self.qfactory)
self.pqueues[slot].push(request, PrioritySlot(priority=priority, slot=slot))
return slot, is_new

def close(self):
startprios = dict()
for slot, queue in self.pqueues.items():
prios = queue.close()
startprios[slot] = prios
self.pqueues.clear()
return startprios

def __len__(self):
return sum(len(x) for x in self.pqueues.values()) if self.pqueues else 0


class RoundRobinPriorityQueue(SlotBasedPriorityQueue):

def __init__(self, qfactory, startprios={}):
super(RoundRobinPriorityQueue, self).__init__(qfactory, startprios)
self._slots = deque()
for slot in self.pqueues:
self._slots.append(slot)

def push(self, request, priority):
slot, is_new = self.push_slot(request, priority)
if is_new:
self._slots.append(slot)

def pop(self):
if not self._slots:
return

slot = self._slots.popleft()
request, is_empty = self.pop_slot(slot)

if not is_empty:
self._slots.append(slot)

return request

def close(self):
self._slots.clear()
return super(RoundRobinPriorityQueue, self).close()


class DownloaderAwarePriorityQueue(SlotBasedPriorityQueue):

_DOWNLOADER_AWARE_PQ_ID = 'DOWNLOADER_AWARE_PQ_ID'

@classmethod
def from_crawler(cls, crawler, qfactory, startprios={}):
return cls(crawler, qfactory, startprios)

def __init__(self, crawler, qfactory, startprios={}):
super(DownloaderAwarePriorityQueue, self).__init__(qfactory, startprios)
self._slots = {slot: 0 for slot in self.pqueues}
crawler.signals.connect(self.on_response_download,
signal=response_downloaded)
crawler.signals.connect(self.on_request_reached_downloader,
signal=request_reached_downloader)

def mark(self, request):
meta = _get_from_request(request, 'meta', None)
if not isinstance(meta, dict):
raise ValueError('No meta attribute in %s' % (request, ))
meta[self._DOWNLOADER_AWARE_PQ_ID] = id(self)

def check_mark(self, request):
return request.meta.get(self._DOWNLOADER_AWARE_PQ_ID, None) == id(self)

def pop(self):
slots = [(d, s) for s,d in self._slots.items() if s in self.pqueues]

if not slots:
return

slot = min(slots)[1]
request, _ = self.pop_slot(slot)
self.mark(request)
return request

def push(self, request, priority):
slot, _ = self.push_slot(request, priority)
if slot not in self._slots:
self._slots[slot] = 0

def on_response_download(self, response, request, spider):
if not self.check_mark(request):
return

slot = _scheduler_slot_read(request)
if slot not in self._slots or self._slots[slot] <= 0:
raise ValueError('Get response for wrong slot "%s"' % (slot, ))
self._slots[slot] = self._slots[slot] - 1
if self._slots[slot] == 0 and slot not in self.pqueues:
del self._slots[slot]

def on_request_reached_downloader(self, request, spider):
if not self.check_mark(request):
return

slot = _scheduler_slot_read(request)
self._slots[slot] = self._slots.get(slot, 0) + 1

def close(self):
self._slots.clear()
return super(DownloaderAwarePriorityQueue, self).close()
Loading

0 comments on commit 821f5bb

Please sign in to comment.