Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WARC spout to emit captures into topology (implements #755) #799

Merged
merged 6 commits into from
Jun 25, 2020

Conversation

sebastian-nagel
Copy link
Contributor

A simple WARC spout:

  • inherits from FileSpout to read paths/URLs from local files
  • based on jwarc
  • mimics the behavior of FetcherBolt, i.e.
    • all captures are sent to status stream
    • only successful fetches (HTTP status 200) will emit content to default stream for parsing, indexing, etc.

Tested so far only with a few WARC files, both local and Common Crawl files referenced by URL http://commoncrawl.s3.amazonaws.com/crawl-data/...

@jnioche jnioche added this to the 1.17 milestone May 27, 2020
@jnioche
Copy link
Contributor

jnioche commented May 27, 2020

Nice one @sebastian-nagel!

Would some of the existing components of the WARC module benefit from jwarc?

Our code to generate WARCs leverages storm-hdfs (and can of course deal with local files as well), why not have a spout based on HdfsSpout instead? This way we could deal with remote and or larger input data?

@sebastian-nagel
Copy link
Contributor Author

Would some of the existing components of the WARC module benefit from jwarc?

Maybe, it's on my list to try to rewrite the WARCHdfsBolt using jwarc.

why not have a spout based on HdfsSpout instead?

After a look on HDFS spout: the way files are marked before and moved after processing seems not really applicable for web archives - those usually stay in place. We would need the same mechanism as in the first version of WarcSpout: write paths/URLs pointing to WARCs into files and put these in the input folder of HdfsSpout. I'll think about it. Maybe it's the next step.

@jnioche
Copy link
Contributor

jnioche commented May 29, 2020

makes sense

Could you please update the README for the WARC module as part of the PR? Good to go otherwise

thanks

@sebastian-nagel
Copy link
Contributor Author

Yes, I'll update the README. Give me a few days to do some more testing - I've found already one open point: WARC records with payload still compressed using Content-Encoding.

- add WARCSpout to warc module README
- refactor WARCSpout
- add WARC record location to metadata:
  warc.file.name, warc.record.offset and warc.record.length
- upgrade jwarc dependency to 0.12.0
- after emited fetched tuple: sleep to avoid the topology queues
  overflow (configurable via `warc.spout.emit.fetched.sleep.ms`)
- sleep 1 microsec. after "failed" fetches (HTTP status != 200)
@sebastian-nagel
Copy link
Contributor Author

Rebased to current master and addressed the following points:

  • added info about warc spout to README
  • store WARC record location to metadata (warc.file.name, warc.record.offset and warc.record.length), useful for debugging if the content caused a parser failure, related to wish: WARCHdfsBolt with CDX index #567
  • upgrade jwarc dependency to 0.12.0
  • add configurable sleep after fetches to work around OOMs due to large overflow queues

Open points:

  • HTTP Content-Encoding is not decoded. Waiting whether a method for this could be made available in jwarc, see Utility methods to read payload body iipc/jwarc#48
  • a single WARC spout is able to emit 2,000-3,000 "fetches" per sec. which makes any topology consuming the content reliably fail by an OOM caused by >20k tuples in the overflow queue. I've tried without success: shrink topology.max.spout.pending, backpressure enabled/disabled and various values for topology.message.timeout.secs (30-300 sec.)

Testing:

  • "dummy" topology in local mode: WARCSpout ( -> DummyIndexer ) -> StdOutStatusUpdater
    • processed 70 WARC files
    • average: 2,400 WARC records per sec.
    • no sleep
  • cluster mode, with 10 ms. sleep after "fetches":
    • rewrite WARC files: WARCSpout -> WARCHdfsBolt
    • parse and index content

if (warcReader == null && buffer.isEmpty()) {
// input exhausted
try {
Thread.sleep(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need it - Storm will handle sleeping between calls to nextTuple if you don't emit anything during a call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will remove it - was following the API doc of ISpout::nextTuple, obviously outdated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me double check in Storm's code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see http://storm.apache.org/releases/current/Performance.html -> wait strategy

default values

topology.sleep.spout.wait.strategy.time.ms | 1
topology.spout.wait.strategy | "org.apache.storm.spout.SleepSpoutWaitStrategy"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok: sleep removed.

return;

LOG.info("Reading WARC file {}", warcFileInProgress);
ReadableByteChannel warcChannel = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try with resource to simplify the code and remove the close section below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it does not work: the channel needs to stay, it'll be closed together by warcReader.close().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok!

LOG.debug("Skipped WARC record of type {}",
record.get().type());
}
if (storeHTTPHeaders && "request".equals(warcType)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that string "request" is used in other places in the module - have it as a constant of enum somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. But maybe better use instanceof WarcRequest as the literal "request" is an internal of jwarc.

@jnioche
Copy link
Contributor

jnioche commented Jun 22, 2020

Implementing our own timeout mechanism is not right, max spout pending should be the way to go.

Is there more than one emit per call to nextTuple()? if so this would explain why topology.max.spout.pending doesn't have any effect.

Looking at the code this does not seem to be the case though, any idea?

@sebastian-nagel
Copy link
Contributor Author

sebastian-nagel commented Jun 23, 2020

Hi @jnioche, I've addressed all your comments. Some example topologies which push WARC content into the topology are in sebastian-nagel/warc-crawler. The "devnull" topology reads 1.2 million WARC records in 9 min. on my laptop. However, the workers of the "rewarc" topo repeatedly crash with an OOM exception. An analysis of the heap dump shows that almost the entire memory is occupied by the overflow queue (cf. here). Some 20k tuples are emitted but not yet acked:
Screenshot_2020-06-23StormUI-rewarc
The WARCHdfsBolt is considerably slow (checksums and gzip are computationally expensive), but looks like that max. pending is not applied (maybe due to a misconfiguration). Maybe you have an idea?

@jnioche
Copy link
Contributor

jnioche commented Jun 24, 2020

Thanks @sebastian-nagel I can reproduce the issue and will look into it.

@jnioche
Copy link
Contributor

jnioche commented Jun 24, 2020

You need to emit with a tuple ID, otherwise, Storm won't track the acks and fails and the number of tuples being processed.

_collector.emit(new Values(url, content, metadata), url);
you might need to set topology.backpressure.enable: false for max.spout.pending to be effective

The change above seems to have fixed the issue in my case.


private int maxContentSize = -1;
private int contentBufferSize = 8192;
private long sleepEmitFetched = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used - remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@sebastian-nagel
Copy link
Contributor Author

Thanks, @jnioche! I've added the missing ID to the emit call. I've run only a short test: looks good, memory consumption is low.

@jnioche jnioche merged commit 9b5c090 into apache:master Jun 25, 2020
@jnioche
Copy link
Contributor

jnioche commented Jun 25, 2020

Thanks @sebastian-nagel, this is a great addition to our WARC module.

@jnioche jnioche linked an issue Jun 25, 2020 that may be closed by this pull request
@sebastian-nagel sebastian-nagel deleted the sc-755-warc-spout branch October 11, 2020 13:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement WARC spout
2 participants