Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade the project to python 3.10 #267

Open
wants to merge 11 commits into
base: dev
Choose a base branch
from
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ jobs:

# is a tagged release | is a pull request | is on the develop branch
if [[ ! -z $CIRCLE_TAG ]] || [[ ! -z $CIRCLE_PULL_REQUEST ]] || [[ $BRANCH == $DEVELOP_BRANCH ]]; then
docker login -u "${DOCKER_USER}" -p "${DOCKER_PASS}"
docker login -u "${DOCKER_USER}" -p "${DOCKER_PASS}" docker.io

docker build --build-arg BUILD_NUMBER=$CIRCLE_BUILD_NUM -t $KAFKA_MONITOR_IMAGE-$VERSION_TAG -f docker/kafka-monitor/Dockerfile.py3 .
docker build --build-arg BUILD_NUMBER=$CIRCLE_BUILD_NUM -t $REDIS_MONITOR_IMAGE-$VERSION_TAG -f docker/redis-monitor/Dockerfile.py3 .
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Please see the ``requirements.txt`` within each sub project for Pip package depe

Other important components required to run the cluster

- Python 2.7 or 3.6: https://www.python.org/downloads/
- Python 3.10: https://www.python.org/downloads/
- Redis: http://redis.io
- Zookeeper: https://zookeeper.apache.org
- Kafka: http://kafka.apache.org
Expand Down
112 changes: 70 additions & 42 deletions crawler/crawling/distributed_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
from six import string_types
from builtins import object
from scrapy.http import Request
from scrapy.conf import settings

# from scrapy.conf import settings
from scrapy.settings import Settings
from . import settings
new_settings = Settings()
new_settings.setmodule(settings)

from scrapy.utils.python import to_unicode
from scrapy.utils.reqser import request_to_dict, request_from_dict
from scrapy.utils.request import request_from_dict

import redis
import random
Expand Down Expand Up @@ -106,12 +112,12 @@ def __init__(self, server, persist, update_int, timeout, retries, logger,
self.my_uuid = str(uuid.uuid4()).split('-')[4]

def setup_zookeeper(self):
self.assign_path = settings.get('ZOOKEEPER_ASSIGN_PATH', "")
self.my_id = settings.get('ZOOKEEPER_ID', 'all')
self.assign_path = new_settings.get('ZOOKEEPER_ASSIGN_PATH', "")
self.my_id = new_settings.get('ZOOKEEPER_ID', 'all')
self.logger.debug("Trying to establish Zookeeper connection")
try:
self.zoo_watcher = ZookeeperWatcher(
hosts=settings.get('ZOOKEEPER_HOSTS'),
hosts=new_settings.get('ZOOKEEPER_HOSTS'),
filepath=self.assign_path + self.my_id,
config_handler=self.change_config,
error_handler=self.error_config,
Expand All @@ -120,7 +126,7 @@ def setup_zookeeper(self):
self.logger.error("Could not connect to Zookeeper")
sys.exit(1)

if self.zoo_watcher.ping():
if self.zoo_watcher.is_valid():
self.logger.debug("Successfully set up Zookeeper connection")
else:
self.logger.error("Could not ping Zookeeper")
Expand Down Expand Up @@ -290,7 +296,7 @@ def update_ipaddress(self):
self.old_ip = self.my_ip
self.my_ip = '127.0.0.1'
try:
obj = urllib.request.urlopen(settings.get('PUBLIC_IP_URL',
obj = urllib.request.urlopen(new_settings.get('PUBLIC_IP_URL',
'http://ip.42.pl/raw'))
results = self.ip_regex.findall(obj.read().decode('utf-8'))
if len(results) > 0:
Expand All @@ -311,7 +317,7 @@ def report_self(self):
'''
Reports the crawler uuid to redis
'''
self.logger.debug("Reporting self id", extra={'uuid':self.my_uuid})
self.logger.debug("Reporting self id: {extra}".format(extra={'uuid':self.my_uuid}))
key = "stats:crawler:{m}:{s}:{u}".format(
m=socket.gethostname(),
s=self.spider.name,
Expand All @@ -321,36 +327,36 @@ def report_self(self):

@classmethod
def from_settings(cls, settings):
server = redis.Redis(host=settings.get('REDIS_HOST'),
port=settings.get('REDIS_PORT'),
db=settings.get('REDIS_DB'),
password=settings.get('REDIS_PASSWORD'),
server = redis.Redis(host=new_settings.get('REDIS_HOST'),
port=new_settings.get('REDIS_PORT'),
db=new_settings.get('REDIS_DB'),
password=new_settings.get('REDIS_PASSWORD'),
decode_responses=True,
socket_timeout=settings.get('REDIS_SOCKET_TIMEOUT'),
socket_connect_timeout=settings.get('REDIS_SOCKET_TIMEOUT'))
persist = settings.get('SCHEDULER_PERSIST', True)
up_int = settings.get('SCHEDULER_QUEUE_REFRESH', 10)
hits = settings.get('QUEUE_HITS', 10)
window = settings.get('QUEUE_WINDOW', 60)
mod = settings.get('QUEUE_MODERATED', False)
timeout = settings.get('DUPEFILTER_TIMEOUT', 600)
ip_refresh = settings.get('SCHEDULER_IP_REFRESH', 60)
add_type = settings.get('SCHEDULER_TYPE_ENABLED', False)
add_ip = settings.get('SCHEDULER_IP_ENABLED', False)
retries = settings.get('SCHEUDLER_ITEM_RETRIES', 3)
ip_regex = settings.get('IP_ADDR_REGEX', '.*')
backlog_blacklist = settings.get('SCHEDULER_BACKLOG_BLACKLIST', True)
queue_timeout = settings.get('SCHEDULER_QUEUE_TIMEOUT', 3600)


my_level = settings.get('SC_LOG_LEVEL', 'INFO')
my_name = settings.get('SC_LOGGER_NAME', 'sc-logger')
my_output = settings.get('SC_LOG_STDOUT', True)
my_json = settings.get('SC_LOG_JSON', False)
my_dir = settings.get('SC_LOG_DIR', 'logs')
my_bytes = settings.get('SC_LOG_MAX_BYTES', '10MB')
my_file = settings.get('SC_LOG_FILE', 'main.log')
my_backups = settings.get('SC_LOG_BACKUPS', 5)
socket_timeout=new_settings.get('REDIS_SOCKET_TIMEOUT'),
socket_connect_timeout=new_settings.get('REDIS_SOCKET_TIMEOUT'))
persist = new_settings.get('SCHEDULER_PERSIST', True)
up_int = new_settings.get('SCHEDULER_QUEUE_REFRESH', 10)
hits = new_settings.get('QUEUE_HITS', 10)
window = new_settings.get('QUEUE_WINDOW', 60)
mod = new_settings.get('QUEUE_MODERATED', False)
timeout = new_settings.get('DUPEFILTER_TIMEOUT', 600)
ip_refresh = new_settings.get('SCHEDULER_IP_REFRESH', 60)
add_type = new_settings.get('SCHEDULER_TYPE_ENABLED', False)
add_ip = new_settings.get('SCHEDULER_IP_ENABLED', False)
retries = new_settings.get('SCHEUDLER_ITEM_RETRIES', 3)
ip_regex = new_settings.get('IP_ADDR_REGEX', '.*')
backlog_blacklist = new_settings.get('SCHEDULER_BACKLOG_BLACKLIST', True)
queue_timeout = new_settings.get('SCHEDULER_QUEUE_TIMEOUT', 3600)


my_level = new_settings.get('SC_LOG_LEVEL', 'INFO')
my_name = new_settings.get('SC_LOGGER_NAME', 'sc-logger')
my_output = new_settings.get('SC_LOG_STDOUT', True)
my_json = new_settings.get('SC_LOG_JSON', False)
my_dir = new_settings.get('SC_LOG_DIR', 'logs')
my_bytes = new_settings.get('SC_LOG_MAX_BYTES', '10MB')
my_file = new_settings.get('SC_LOG_FILE', 'main.log')
my_backups = new_settings.get('SC_LOG_BACKUPS', 5)

logger = LogFactory.get_instance(json=my_json,
name=my_name,
Expand All @@ -361,9 +367,9 @@ def from_settings(cls, settings):
bytes=my_bytes,
backups=my_backups)

global_page_per_domain_limit = settings.get('GLOBAL_PAGE_PER_DOMAIN_LIMIT', None)
global_page_per_domain_limit_timeout = settings.get('GLOBAL_PAGE_PER_DOMAIN_LIMIT_TIMEOUT', 600)
domain_max_page_timeout = settings.get('DOMAIN_MAX_PAGE_TIMEOUT', 600)
global_page_per_domain_limit = new_settings.get('GLOBAL_PAGE_PER_DOMAIN_LIMIT', None)
global_page_per_domain_limit_timeout = new_settings.get('GLOBAL_PAGE_PER_DOMAIN_LIMIT_TIMEOUT', 600)
domain_max_page_timeout = new_settings.get('DOMAIN_MAX_PAGE_TIMEOUT', 600)

return cls(server, persist, up_int, timeout, retries, logger, hits,
window, mod, ip_refresh, add_type, add_ip, ip_regex,
Expand Down Expand Up @@ -411,6 +417,27 @@ def is_blacklisted(self, appid, crawlid):
redis_key = self.spider.name + ":blacklist"
return self.redis_conn.sismember(redis_key, key_check)

def decode_dict(self, data):
decoded_dict = {}
for key, value in data.items():
if isinstance(value, bytes):
decoded_dict[key] = value.decode()
elif isinstance(value, dict):
decoded_dict[key] = self.decode_dict(value)
elif isinstance(value, list):
decoded_list = []
for item in value:
if isinstance(item, bytes):
decoded_list.append(item.decode())
elif isinstance(item, dict):
decoded_list.append(self.decode_dict(item))
else:
decoded_list.append(item)
decoded_dict[key] = decoded_list
else:
decoded_dict[key] = value
return decoded_dict

def enqueue_request(self, request):
'''
Pushes a request from the spider into the proper throttled queue
Expand All @@ -422,7 +449,7 @@ def enqueue_request(self, request):
return

# An individual crawling request of a domain's page
req_dict = request_to_dict(request, self.spider)
req_dict = Request.to_dict(request, spider=self.spider)

# # # # # # # # # # # # # # # # # # Page Limit Filters # # # # # # # # # # # # # # #
# Max page filter per individual domain
Expand Down Expand Up @@ -468,6 +495,7 @@ def enqueue_request(self, request):
curr_time < req_dict['meta']['expires']):
# we may already have the queue in memory
if key in self.queue_keys:
req_dict = self.decode_dict(req_dict)
self.queue_dict[key][0].push(req_dict,
req_dict['meta']['priority'])
else:
Expand Down Expand Up @@ -535,7 +563,7 @@ def next_request(self):
.format(url=item['url']))
if 'meta' in item:
# item is a serialized request
req = request_from_dict(item, self.spider)
req = request_from_dict(item, spider=self.spider)
else:
# item is a feed from outside, parse it manually
req = self.request_from_feed(item)
Expand Down
32 changes: 16 additions & 16 deletions crawler/crawling/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@

# Define here the models for your scraped items

from scrapy import Item, Field
import scrapy


class RawResponseItem(Item):
appid = Field()
crawlid = Field()
url = Field()
response_url = Field()
status_code = Field()
status_msg = Field()
response_headers = Field()
request_headers = Field()
body = Field()
links = Field()
attrs = Field()
success = Field()
exception = Field()
encoding = Field()
class RawResponseItem(scrapy.Item):
appid = scrapy.Field()
crawlid = scrapy.Field()
url = scrapy.Field()
response_url = scrapy.Field()
status_code = scrapy.Field()
status_msg = scrapy.Field()
response_headers = scrapy.Field()
request_headers = scrapy.Field()
body = scrapy.Field()
links = scrapy.Field()
attrs = scrapy.Field()
success = scrapy.Field()
exception = scrapy.Field()
encoding = scrapy.Field()
4 changes: 2 additions & 2 deletions crawler/crawling/log_retry_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sys
from scrapy.utils.response import response_status_message

from scrapy.xlib.tx import ResponseFailed
from scrapy.exceptions import IgnoreRequest
from twisted.internet import defer
from twisted.internet.error import TimeoutError, DNSLookupError, \
ConnectionRefusedError, ConnectionDone, ConnectError, \
Expand All @@ -22,7 +22,7 @@ class LogRetryMiddleware(object):

EXCEPTIONS_TO_RETRY = (defer.TimeoutError, TimeoutError, DNSLookupError,
ConnectionRefusedError, ConnectionDone, ConnectError,
ConnectionLost, TCPTimedOutError, ResponseFailed,
ConnectionLost, TCPTimedOutError, IgnoreRequest,
IOError)

def __init__(self, settings):
Expand Down
22 changes: 22 additions & 0 deletions crawler/crawling/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,27 @@ def _kafka_failure(self, item, spider, exception):
item = self._clean_item(item)
self.logger.error("Failed to send page to Kafka", item)

def decode_dict(self, data):
decoded_dict = {}
for key, value in data.items():
if isinstance(value, bytes):
decoded_dict[key] = value.decode()
elif isinstance(value, dict):
decoded_dict[key] = self.decode_dict(value)
elif isinstance(value, list):
decoded_list = []
for item in value:
if isinstance(item, bytes):
decoded_list.append(item.decode())
elif isinstance(item, dict):
decoded_list.append(self.decode_dict(item))
else:
decoded_list.append(item)
decoded_dict[key] = decoded_list
else:
decoded_dict[key] = value
return decoded_dict

def process_item(self, item, spider):
try:
self.logger.debug("Processing item in KafkaPipeline")
Expand All @@ -194,6 +215,7 @@ def process_item(self, item, spider):
elif 'utf-8' != encoding:
datum['body'] = datum['body'].decode(datum['encoding'])

datum = self.decode_dict(datum)
message = ujson.dumps(datum, sort_keys=True)
except:
message = 'json failed to parse'
Expand Down
4 changes: 2 additions & 2 deletions crawler/crawling/redis_domain_max_page_filter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# coding=utf-8
import tldextract
from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.reqser import request_to_dict
from scrapy.http import Request


class RFDomainMaxPageFilter(BaseDupeFilter):
Expand Down Expand Up @@ -31,7 +31,7 @@ def __init__(self, server, key, timeout):
def request_page_limit_reached(self, request, spider):
# Collect items composing the redis key
# grab the tld of the request
req_dict = request_to_dict(request, spider)
req_dict = Request.to_dict(request, spider=spider)
ex_res = self.extract(req_dict['url'])
domain = "{d}.{s}".format(d=ex_res.domain, s=ex_res.suffix)

Expand Down
4 changes: 2 additions & 2 deletions crawler/crawling/redis_global_page_per_domain_filter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# coding=utf-8
import tldextract
from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.reqser import request_to_dict
from scrapy.http import Request


class RFGlobalPagePerDomainFilter(BaseDupeFilter):
Expand Down Expand Up @@ -41,7 +41,7 @@ def __init__(self, server, key, page_limit, timeout):
def request_page_limit_reached(self, request, spider):
# Collect items composing the redis key
# grab the tld of the request
req_dict = request_to_dict(request, spider)
req_dict = Request.to_dict(request, spider=spider)
ex_res = self.extract(req_dict['url'])
domain = "{d}.{s}".format(d=ex_res.domain, s=ex_res.suffix)

Expand Down
9 changes: 4 additions & 5 deletions crawler/crawling/spiders/link_spider.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from __future__ import absolute_import
import scrapy

from scrapy.http import Request
from crawling.spiders.lxmlhtml import CustomLxmlLinkExtractor as LinkExtractor
from scrapy.conf import settings
from .lxmlhtml import CustomLxmlLinkExtractor as LinkExtractor

from crawling.items import RawResponseItem
from crawling.spiders.redis_spider import RedisSpider

from ..items import RawResponseItem
from .redis_spider import RedisSpider


class LinkSpider(RedisSpider):
Expand Down
5 changes: 2 additions & 3 deletions crawler/crawling/spiders/lxmlhtml.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from scrapy.linkextractors.lxmlhtml import LxmlLinkExtractor
from scrapy.link import Link
from six.moves.urllib.parse import urljoin
from scrapy.utils.python import unique as unique_list, to_native_str
import lxml.etree as etree
from scrapy.utils.misc import rel_has_nofollow

Expand Down Expand Up @@ -36,8 +35,8 @@ def _extract_links(self, selector, response_url, response_encoding, base_url):
if url is None:
continue
# added 'ignore' to encoding errors
url = to_native_str(url, encoding=response_encoding,
errors='ignore')
if isinstance(url, bytes):
url = url.decode('utf-8')
# to fix relative links after process_value
url = urljoin(response_url, url)
link = Link(url, _collect_string_content(el) or u'',
Expand Down
4 changes: 1 addition & 3 deletions crawler/crawling/spiders/wandering_spider.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from __future__ import absolute_import
# Example Wandering Spider
import scrapy

from scrapy.http import Request
from .lxmlhtml import CustomLxmlLinkExtractor as LinkExtractor
from scrapy.conf import settings

from crawling.items import RawResponseItem
from ..items import RawResponseItem
from .redis_spider import RedisSpider

import random
Expand Down
Loading