-
Notifications
You must be signed in to change notification settings - Fork 0
/
WarcTestProcessor.java
257 lines (231 loc) · 9.02 KB
/
WarcTestProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package crawlercommons.sitemaps;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import org.netpreserve.jwarc.MessageBody;
import org.netpreserve.jwarc.MessageHeaders;
import org.netpreserve.jwarc.WarcPayload;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRecord;
import org.netpreserve.jwarc.WarcResponse;
import org.netpreserve.jwarc.WarcTargetRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class WarcTestProcessor {
protected static final long MAX_PAYLOAD_SIZE = 128 * 1048576;
protected static final int BUFFER_SIZE = 8192;
private static Logger LOG = LoggerFactory.getLogger(WarcTestProcessor.class);
protected Map<String,Record> records = new LinkedHashMap<>();
protected List<String> warcFiles = new ArrayList<>();
protected List<FileChannel> warcChannels = new ArrayList<>();
protected enum ContentEncoding { NOT_SUPPORTED, IDENTITY, GZIP, DEFLATE };
protected class Record {
long offset;
int status;
int warcFileId;
boolean isProcessed = false;
String contentType;
MessageHeaders header;
MessageHeaders httpHeaders;
private int parseHttpHeader(WarcResponse record) throws IOException {
httpHeaders = record.http().headers();
int status = record.http().status();
contentType = httpHeaders.first("Content-Type").orElse(null);
return status;
}
public Record(WarcResponse record, long offset) throws IOException {
header = record.headers();
this.offset = offset;
status = parseHttpHeader(record);
}
public byte[] getContent() throws IOException {
FileChannel channel = warcChannels.get(warcFileId);
channel.position(offset);
try (WarcReader warcReader = new WarcReader(channel)) {
Optional<WarcRecord> record = warcReader.next();
if (record.isPresent()) {
return WarcTestProcessor.getContent((WarcResponse) record.get());
}
}
throw new IOException("No Warc response record at offset " + offset);
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("warc-file-id=").append(warcFileId);
sb.append(", offset=").append(offset);
sb.append(", status=").append(status);
return sb.toString();
}
}
private static ContentEncoding getContentEncoding(WarcResponse record) throws IOException {
List<String> contentEncodings = record.http().headers().all("Content-Encoding");
if (contentEncodings.isEmpty()) {
return ContentEncoding.IDENTITY;
} else if (contentEncodings.size() > 1) {
LOG.warn("Multiple Content-Encodings not supported: {}", contentEncodings);
return ContentEncoding.NOT_SUPPORTED;
}
switch (contentEncodings.get(0).toLowerCase(Locale.ROOT)) {
case "identity":
case "none":
case "":
return ContentEncoding.IDENTITY;
case "gzip":
case "x-gzip":
return ContentEncoding.GZIP;
case "deflate":
return ContentEncoding.DEFLATE;
}
LOG.warn("Unknown/unsupported Content-Encoding: {}", contentEncodings.get(0));
return ContentEncoding.NOT_SUPPORTED;
}
public static byte[] getContent(WarcResponse record) throws IOException {
return getContent(record, MAX_PAYLOAD_SIZE);
}
public static byte[] getContent(WarcResponse record, long maxSize) throws IOException {
Optional<WarcPayload> payload = record.payload();
if (!payload.isPresent()) {
return new byte[0];
}
MessageBody body = payload.get().body();
long size = body.size();
if (size > maxSize) {
throw new IOException("WARC payload too large");
}
// Wrap channel to decode/uncompress Content-Encoding
ReadableByteChannel bodyChan = body;
ContentEncoding contentEncoding = getContentEncoding(record);
switch (contentEncoding) {
case IDENTITY:
break;
case GZIP:
size = -1;
// TODO: jwarc 0.12.1 or upwards:
// bodyChan = org.netpreserve.jwarc.IOUtils.gunzipChannel(body);
LOG.error("Content-Encoding `gzip` not yet supported for {}", record.target());
break;
case DEFLATE:
// TODO: jwarc 0.12.1 or upwards:
// bodyChan = org.netpreserve.jwarc.IOUtils.inflateChannel(body);
LOG.error("Content-Encoding `deflate` not yet supported for {}", record.target());
break;
case NOT_SUPPORTED:
// even if unsupported: try to parse the sitemap
break;
}
ByteBuffer buf;
if (size >= 0) {
buf = ByteBuffer.allocate((int) size);
} else {
buf = ByteBuffer.allocate(BUFFER_SIZE);
}
/** dynamically growing list of buffers for large content of unknown size */
ArrayList<ByteBuffer> bufs = new ArrayList<>();
int r, read = 0;
while (true) {
try {
if ((r = bodyChan.read(buf)) < 0) break;
} catch (Exception e) {
LOG.error("Failed to read content of {}: {}", record.target(), e);
break;
}
if (r == 0 && !buf.hasRemaining()) {
buf.flip();
bufs.add(buf);
buf = ByteBuffer.allocate(BUFFER_SIZE);
}
read += r;
}
buf.flip();
if (read == size) {
return buf.array();
}
byte[] arr = new byte[read];
int pos = 0;
for (ByteBuffer b :bufs) {
r = b.remaining();
b.get(arr, pos, r);
pos += r;
}
buf.get(arr, pos, buf.remaining());
return arr;
}
public void readWarcFile(String warcPath, ArchiveRecordProcessor proc) throws MalformedURLException, IOException {
FileChannel channel = FileChannel.open(Paths.get(warcPath));
warcFiles.add(warcPath);
warcChannels.add(channel);
try (WarcReader reader = new WarcReader(channel)) {
int records = 0;
for (WarcRecord record : reader) {
if (!(record instanceof WarcResponse)) {
continue;
}
records++;
proc.process(record, reader.position());
if ((records % 1000) == 0) {
LOG.info("Read {} WARC response records", records);
}
}
LOG.info("Read {} WARC response records from file {}", records, warcPath);
}
}
public Record getRecord(String url) {
return records.get(url);
}
protected interface ArchiveRecordProcessor {
public void process(WarcRecord record, long offset);
/** Get URL, trimming <code><...></code> if needed */
public default String getUrl(WarcRecord record) {
if (!(record instanceof WarcTargetRecord)) {
return null;
}
return ((WarcTargetRecord) record).target();
}
}
protected class ArchiveRecordIndexer implements ArchiveRecordProcessor {
private int warcId;
public ArchiveRecordIndexer(int warcId) {
this.warcId = warcId;
}
public void setWarcId(int warcId) {
this.warcId = warcId;
}
public void process(WarcRecord record, long offset) {
if (!(record instanceof WarcResponse)) {
return;
}
String url = ((WarcTargetRecord) record).target();
try {
Record warcRecord = new Record((WarcResponse) record, offset);
warcRecord.warcFileId = this.warcId;
records.put(url, warcRecord);
} catch(IOException e) {
LOG.error("Failed to process WARC record " + url, e);
records.put(url, null);
}
}
}
protected class Counter {
int processed = 0;
int failedFetch = 0;
int success = 0;
protected String f(int n) {
return String.format(Locale.ROOT, "%8d", n);
}
public void log(Logger log) {
log.info("{}\tdocuments processed total", f(processed));
log.info("{}\tsuccessfully processed", f(success));
log.info("{}\tfailed to process", f(processed - success));
log.info("{}\tfailed to fetch document", f(failedFetch));
}
}
}