forked from ikreymer/webarchive-indexing
-
Notifications
You must be signed in to change notification settings - Fork 3
/
indexwarcsjob.py
167 lines (132 loc) · 6.26 KB
/
indexwarcsjob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import logging
import sys
from datetime import datetime
import boto3
import botocore
from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol
from mrjob.util import log_to_stream
from tempfile import TemporaryFile
from pywb.indexer.cdxindexer import write_cdx_index
from gzip import GzipFile
LOG = logging.getLogger('IndexWARCJob')
log_to_stream(format="%(asctime)s %(levelname)s %(name)s: %(message)s",
name='IndexWARCJob')
#=============================================================================
class IndexWARCJob(MRJob):
""" This job receives as input a manifest of WARC/ARC files and produces
a CDX index per file
The pywb.indexer.cdxindexer is used to create the index, with a fixed set of options
TODO: add way to customized indexing options.
"""
INPUT_PROTOCOL = RawValueProtocol
OUTPUT_PROTOCOL = RawValueProtocol
HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
JOBCONF = {'mapreduce.task.timeout': '9600000',
'mapreduce.input.fileinputformat.split.maxsize': '50000000',
'mapreduce.map.speculative': 'false',
'mapreduce.reduce.speculative': 'false',
'mapreduce.job.jvm.numtasks': '-1',
'mapreduce.input.lineinputformat.linespermap': 2,
}
def configure_args(self):
"""Custom command line options for indexing"""
super(IndexWARCJob, self).configure_args()
self.add_passthru_arg('--warc_bucket', dest='warc_bucket',
default='commoncrawl',
help='source bucket for warc paths, if input is a relative path (S3 Only)')
self.add_passthru_arg('--cdx_bucket', dest='cdx_bucket',
default='my_cdx_bucket',
help='destination bucket for cdx (S3 Only)')
self.add_passthru_arg('--skip-existing', dest='skip_existing', action='store_true',
help='skip processing files that already have CDX',
default=True)
self.add_passthru_arg("--s3_local_temp_dir", dest='s3_local_temp_dir',
help='Local temporary directory to buffer content from S3',
default=None)
def mapper_init(self):
# Note: this assumes that credentials are properly configured, see
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials
# best via IAM roles:
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#best-practices-for-configuring-credentials
self.boto_config = botocore.client.Config(
read_timeout=180,
retries={'max_attempts' : 20})
s3client = boto3.client('s3', config=self.boto_config)
try:
s3client.head_bucket(Bucket=self.options.warc_bucket)
except botocore.exceptions.ClientError as e:
LOG.error('Failed to access bucket %s: %s',
self.options.warc_bucket, e)
return
try:
s3client.head_bucket(Bucket=self.options.cdx_bucket)
except botocore.exceptions.ClientError as e:
LOG.error('Failed to access bucket %s: %s',
self.options.cdx_bucket, e)
return
self.index_options = {
'surt_ordered': True,
'sort': True,
'cdxj': True,
#'minimal': True
}
def mapper(self, _, line):
warc_path = line.split('\t')[-1]
try:
self._load_and_index(warc_path)
except Exception as exc:
LOG.error('Failed to index %s', warc_path)
raise
def _conv_warc_to_cdx_path(self, warc_path):
# set cdx path
cdx_path = warc_path.replace('crawl-data', 'cc-index/cdx')
cdx_path = cdx_path.replace('.warc.gz', '.cdx.gz')
cdx_path = cdx_path.replace('.warc.wet.gz', '.wet.cdx.gz')
cdx_path = cdx_path.replace('.warc.wat.gz', '.wat.cdx.gz')
return cdx_path
def _load_and_index(self, warc_path):
cdx_path = self._conv_warc_to_cdx_path(warc_path)
LOG.info('Indexing WARC: %s', warc_path)
s3client = boto3.client('s3', config=self.boto_config)
if self.options.skip_existing:
try:
s3client.head_object(Bucket=self.options.cdx_bucket,
Key=cdx_path)
LOG.info('Already Exists: %s', cdx_path)
return
except botocore.client.ClientError as exception:
pass # ok, not found
try:
s3client.head_object(Bucket=self.options.warc_bucket,
Key=warc_path)
except botocore.client.ClientError as exception:
LOG.error('WARC not found: %s', warc_path)
return
with TemporaryFile(mode='w+b',
dir=self.options.s3_local_temp_dir) as warctemp:
LOG.info('Fetching WARC: %s', warc_path)
try:
s3client.download_fileobj(self.options.warc_bucket, warc_path, warctemp)
except botocore.client.ClientError as exception:
LOG.error('Failed to download %s: %s', warc_path, exception)
return
warctemp.seek(0)
LOG.info('Successfully fetched WARC: %s', warc_path)
with TemporaryFile(mode='w+b',
dir=self.options.s3_local_temp_dir) as cdxtemp:
with GzipFile(fileobj=cdxtemp, mode='w+b') as cdxfile:
# Index to temp
write_cdx_index(cdxfile, warctemp, warc_path, **self.index_options)
# Upload temp
cdxtemp.flush()
cdxtemp.seek(0)
LOG.info('Uploading CDX: %s', cdx_path)
try:
s3client.upload_fileobj(cdxtemp, self.options.cdx_bucket, cdx_path)
except botocore.client.ClientError as exception:
LOG.error('Failed to upload %s: %s', cdx_path, exception)
return
LOG.info('Successfully uploaded CDX: %s', cdx_path)
if __name__ == "__main__":
IndexWARCJob.run()