Skip to content

Commit

Permalink
Data Aggregator Rewrite (#753)
Browse files Browse the repository at this point in the history
* First steps in the rewrite

* Fixed import paths

* One giant refactor

* Fixing tests

* Adding mypy

* Removed mypy from pre-commit workflow

* First draft on DataAggregator

* Wrote a DataAggregator that starts and shuts down

* Created tests and added more empty types

* Got demo.py working

* Created sql_provider

* Cleaned up imports in TaskManager

* Added async

* Fixed minor bugs

* First steps at porting arrow

* Introduced TableName and different Task handling

* Added more failing tests

* First first completes others don't

* It works

* Started working on arrow_provider

* Implemented ArrowProvider

* Added logger fixture

* Fixed test_storage_controller

* Fixing OpenWPMTest.visit()

* Moved test/storage_providers to test/storage

* Fixing up tests

* Moved automation to openwpm

* Readded datadir to .gitignore

* Ran repin.sh

* Fixed formatting

* Let's see if this works

* Fixed imports

* Got arrow_memory_provider working

* Starting to rewrite tests

* Setting up fixtures

* Attempting to fix all the tests

* Still fixing tests

* Broken content saving

* Added node

* Fixed screenshot tests

* Fixing more tests

* Fixed tests

* Implemented local_storage.py

* Cleaned up flush_cache

* Fixing more tests

* Wrote test for LocalArrowProvider

* Introduced tests for local_storage_provider.py

* Asserting test dir is empty

* Creating subfolder for different aggregators

* New depencies and init()

* Everything is terribly broken

* Figured out finalize_visit_id

* Running two event loops kinda works???

* Rearming the event

* Introduced mypy

* Downgraded black in pre-commit

* Modifying the database directly

* Fixed formatting

* Made mypy a lil stricter

* Fixing docs and config printing

* Realising I've been using the wrong with

* Trying to figure arrow_storage

* Moving lock initialization in in_memory_storage

* Fixing tests

* Fixing up tests and adding more typechecking

* Fixed num_browsers in test_cache_hits_recorded

* Parametrized unstructured

* String fix

* Added failing test

* New test

* Review changes with Steven

* Fixed repin.sh and test_arrow_cache

* Minor change

* Fixed prune-environment.py

* Removing references to DataAggregator

* Fixed test_seed_persistance

* More paths

* Fixed test display shutdown

* Made cache test more robust

* Update crawler.py

Co-authored-by: Steven Englehardt <[email protected]>

* Slimming down ManagerParams

* Fixing more tests

* Update test/storage/test_storage_controller.py

Co-authored-by: Steven Englehardt <[email protected]>

* Purging references to DataAggregator

* Reverted changes to .travis.yml

* Demo.py saves locally again

* Readjusting test paths

* Expanded comment on initialize to reference #846

* Made token optional in finalize_visit_id

* Simplified test paramtetrization

* Fixed callback semantics change

* Removed test_parse_http_stack_trace_str

* Added DataSocket

* WIP need to fix path encoding

* Fixed path encoding

* Added task and crawl to schema

* Fixed paths in GitHub actions

* Refactored completion handling

* Fix tests

* Trying to fix tests on CI

* Removed redundant setting of tag

* Removing references to S3

* Purging more DataAggregator references

* Craking up logging to figure out test failure

* Moved test_values into a fixture

* Fixing GcpUnstructuredProvider

* Fixed paths for future crawls

* Renamed sqllite to official sqlite

* Restored demo.py

* Update openwpm/commands/profile_commands.py

Co-authored-by: Georgia Kokkinou <[email protected]>

* Restored previous behaviour of DumpProfileCommand

Co-authored-by: Georgia Kokkinou <[email protected]>

* Removed leftovers

* Cleaned up comments

* Expanded lock check

* Fixed more stuff

* More comment updates

* Update openwpm/socket_interface.py

Co-authored-by: Georgia Kokkinou <[email protected]>

* Removed outdated comment

* Using config_encoder

* Renamed tar_location to tar_path

* Removed references to database_name in docs

* Cleanup

* Moved screenshot_path and source_dump_path to ManagerParamsInternal

* Fixed imports

* Fixing up comments

* Fixing up comments

* More docs

* updated dependencies

* Fixed test_task_manager

* Reupgraded to python 3.9.1

* Restoring crawl_reference in mp_logger

* Removed unused imports

* Apply suggestions from code review

Co-authored-by: Steven Englehardt <[email protected]>

* Cleaned up socket handling

* Fixed TaskManager.__exit__

* Moved validation code into config.py

* Removed comment

* Removed comment

* Removed comment

Co-authored-by: Steven Englehardt <[email protected]>
Co-authored-by: Georgia Kokkinou <[email protected]>
  • Loading branch information
3 people committed Feb 22, 2021
1 parent a81d80a commit b29c3f4
Show file tree
Hide file tree
Showing 96 changed files with 4,422 additions and 3,095 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
test-groups: ["test_[a-e]*", "test_[f-h]*", "test_[i-r,t-z]*", "test_[s]*"]
test-groups: ["test/test_[a-e]*", "test/test_[f-h]*", "test/test_[i-r,t-z]*", "test/test_[s]*", "test/storage/*"]
fail-fast: false
steps:
# All of these steps are just setup, maybe we should wrap them in an action
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,5 @@ openwpm/Extension/firefox/dist
openwpm/Extension/firefox/openwpm.xpi
openwpm/Extension/firefox/src/content.js
openwpm/Extension/firefox/src/feature.js

datadir
13 changes: 11 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@ repos:
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 20.8b1 # Replace by any tag/version: https://github.com/psf/black/tags
rev: 20.8b1
hooks:
- id: black
language_version: python3 # Should be a command that runs python3.6+
language_version: python3
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.790
hooks:
- id: mypy
additional_dependencies: [pytest]
# We may need to add more and more dependencies here, as pre-commit
# runs in an environment without our dependencies


1 change: 0 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ OpenWPM's tests are build on [pytest](https://docs.pytest.org/en/latest/). Execu
in the test directory to run all tests:

$ conda activate openwpm
$ cd test
$ py.test -vv

See the [pytest docs](https://docs.pytest.org/en/latest/) for more information on selecting
Expand Down
108 changes: 60 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,25 @@ the instrumentation section below for more details.

Table of Contents <!-- omit in toc -->
------------------
* [Installation](#installation)
* [Pre-requisites](#pre-requisites)
* [Install](#install)
* [Mac OSX](#mac-osx)
* [Quick Start](#quick-start)
* [Troubleshooting](#troubleshooting)
* [Advice for Measurement Researchers](#advice-for-measurement-researchers)
* [Developer instructions](#developer-instructions)
* [Instrumentation and Configuration](#instrumentation-and-configuration)
* [Persistence Types](#persistence-types)
* [Local Databases](#local-databases)
* [Parquet on Amazon S3](#parquet-on-amazon-s3)
* [Docker Deployment for OpenWPM](#docker-deployment-for-openwpm)
* [Building the Docker Container](#building-the-docker-container)
* [Running Measurements from inside the Container](#running-measurements-from-inside-the-container)
* [MacOS GUI applications in Docker](#macos-gui-applications-in-docker)
* [Citation](#citation)
* [License](#license)

- [Installation](#installation)
- [Pre-requisites](#pre-requisites)
- [Install](#install)
- [Mac OSX](#mac-osx)
- [Quick Start](#quick-start)
- [Troubleshooting](#troubleshooting)
- [Advice for Measurement Researchers](#advice-for-measurement-researchers)
- [Developer instructions](#developer-instructions)
- [Instrumentation and Configuration](#instrumentation-and-configuration)
- [Storage](#storage)
- [Local Storage](#local-storage)
- [Remote storage](#remote-storage)
- [Docker Deployment for OpenWPM](#docker-deployment-for-openwpm)
- [Building the Docker Container](#building-the-docker-container)
- [Running Measurements from inside the Container](#running-measurements-from-inside-the-container)
- [MacOS GUI applications in Docker](#macos-gui-applications-in-docker)
- [Citation](#citation)
- [License](#license)

Installation
------------
Expand Down Expand Up @@ -178,40 +178,52 @@ If you want to contribute to OpenWPM have a look at our [CONTRIBUTING.md](./CONT

Instrumentation and Configuration
-------------------------------

OpenWPM provides a breadth of configuration options which can be found
in [Configuration.md](docs/Configuration.md)
More detail on the output is available [below](#persistence-types).

Persistence Types
Storage
------------

#### Local Databases
By default OpenWPM saves all data locally on disk in a variety of formats.
Most of the instrumentation saves to a SQLite database specified
by `manager_params.database_name` in the main output directory. Response
bodies are saved in a LevelDB database named `content.ldb`, and are keyed by
the hash of the content. In addition, the browser commands that dump page
source and save screenshots save them in the `sources` and `screenshots`
subdirectories of the main output directory. The SQLite schema
specified by: `openwpm/DataAggregator/schema.sql`. You can specify additional tables
inline by sending a `create_table` message to the data aggregator.

#### Parquet on Amazon S3
As an option, OpenWPM can save data directly to an Amazon S3 bucket as a
Parquet Dataset. This is currently experimental and hasn't been thoroughly
tested. Screenshots, and page source saving is not currently supported and
will still be stored in local databases and directories. To enable S3
saving specify the following configuration parameters in `manager_params`:
* Persistence Type: `manager_params.output_format = 's3'`
* S3 bucket name: `manager_params.s3_bucket = 'openwpm-test-crawl'`
* Directory within S3 bucket: `manager_params.s3_directory = '2018-09-09_test-crawl-new'`
In order to save to S3 you must have valid access credentials stored in
`~/.aws`. We do not currently allow you to specify an alternate storage
location.
**NOTE:** The schemas should be kept in sync with the exception of
output-specific columns (e.g., `instance_id` in the S3 output). You can compare
OpenWPM distinguishes between two types of data, structured and unstructured.
Structured data is all data captured by the instrumentation or emitted by the platform.
Generally speaking all data you download is unstructured data.

For each of the data classes we offer a variety of storage providers, and you are encouraged
to implement your own, should the provided backends not be enough for you.

We have an outstanding issue to enable saving content generated by commands, such as
screenshots and page dumps to unstructured storage (see [#232](https://github.com/mozilla/OpenWPM/issues/232)).
For now, they get saved to `manager_params.data_directory`.

### Local Storage

For storing structured data locally we offer two StorageProviders:

- The SQLiteStorageProvider which writes all data into a SQLite database
- This is the recommended approach for getting started as the data is easily explorable
- The LocalArrowProvider which stores the data into Parquet files.
- This method integrates well with NumPy/Pandas
- It might be harder to ad-hoc process

For storing unstructured data locally we also offer two solutions:

- The LevelDBProvider which stores all data into a LevelDB
- This is the recommended approach
- The LocalGzipProvider that gzips and stores the files individually on disk
- Please note that file systems usually don't like thousands of files in one folder
- Use with care or for single site visits
### Remote storage
When running in the cloud, saving records to disk is not a reasonable thing to do.
So we offer a remote StorageProviders for S3 (See [#823](https://github.com/mozilla/OpenWPM/issues/823)) and GCP.
Currently, all remote StorageProviders write to the respective object storage service (S3/GCS).
The structured providers use the Parquet format.
**NOTE:** The Parquet and SQL schemas should be kept in sync except
output-specific columns (e.g., `instance_id` in the Parquet output). You can compare
the two schemas by running
`diff -y openwpm/DataAggregator/schema.sql openwpm/DataAggregator/parquet_schema.py`.
Expand All @@ -238,7 +250,7 @@ Docker service.
__Step 2:__ to build the image, run the following command from a terminal
within the root OpenWPM directory:
```
```bash
docker build -f Dockerfile -t openwpm .
```
Expand All @@ -253,7 +265,7 @@ X-server. You can do this by running: `xhost +local:docker`
Then you can run the demo script using:
```
```bash
mkdir -p docker-volume && docker run -v $PWD/docker-volume:/opt/Desktop \
-e DISPLAY=$DISPLAY -v /tmp/.X11-unix:/tmp/.X11-unix --shm-size=2g \
-it openwpm python3 /opt/OpenWPM/demo.py
Expand Down
81 changes: 51 additions & 30 deletions crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,53 @@
import signal
import sys
import time
from pathlib import Path
from threading import Lock
from typing import Any, Callable, List

import boto3
import sentry_sdk

from openwpm import mp_logger
from openwpm.command_sequence import CommandSequence
from openwpm.config import BrowserParams, ManagerParams
from openwpm.mp_logger import parse_config_from_env
from openwpm.storage.cloud_storage.gcp_storage import (
GcsStructuredProvider,
GcsUnstructuredProvider,
)
from openwpm.task_manager import TaskManager
from openwpm.utilities import rediswq
from test.utilities import LocalS3Session, local_s3_bucket

# Configuration via environment variables
# Crawler specific config
REDIS_HOST = os.getenv("REDIS_HOST", "redis-box")
REDIS_QUEUE_NAME = os.getenv("REDIS_QUEUE_NAME", "crawl-queue")
MAX_JOB_RETRIES = int(os.getenv("MAX_JOB_RETRIES", "2"))
DWELL_TIME = int(os.getenv("DWELL_TIME", "10"))
TIMEOUT = int(os.getenv("TIMEOUT", "60"))

# Storage Provider Params
CRAWL_DIRECTORY = os.getenv("CRAWL_DIRECTORY", "crawl-data")
S3_BUCKET = os.getenv("S3_BUCKET", "openwpm-crawls")
GCS_BUCKET = os.getenv("GCS_BUCKET", "openwpm-crawls")
GCP_PROJECT = os.getenv("GCP_PROJECT", "")
AUTH_TOKEN = os.getenv("GCP_AUTH_TOKEN", "cloud")

# Browser Params
DISPLAY_MODE = os.getenv("DISPLAY_MODE", "headless")
HTTP_INSTRUMENT = os.getenv("HTTP_INSTRUMENT", "1") == "1"
COOKIE_INSTRUMENT = os.getenv("COOKIE_INSTRUMENT", "1") == "1"
NAVIGATION_INSTRUMENT = os.getenv("NAVIGATION_INSTRUMENT", "1") == "1"
JS_INSTRUMENT = os.getenv("JS_INSTRUMENT", "1") == "1"
CALLSTACK_INSTRUMENT = os.getenv("CALLSTACK_INSTRUMENT", "1") == "1"
JS_INSTRUMENT_SETTINGS = os.getenv(
"JS_INSTRUMENT_SETTINGS", '["collection_fingerprinting"]'
JS_INSTRUMENT_SETTINGS = json.loads(
os.getenv("JS_INSTRUMENT_SETTINGS", '["collection_fingerprinting"]')
)

SAVE_CONTENT = os.getenv("SAVE_CONTENT", "")
PREFS = os.getenv("PREFS", None)
DWELL_TIME = int(os.getenv("DWELL_TIME", "10"))
TIMEOUT = int(os.getenv("TIMEOUT", "60"))
SENTRY_DSN = os.getenv("SENTRY_DSN", None)
LOGGER_SETTINGS = parse_config_from_env()
MAX_JOB_RETRIES = int(os.getenv("MAX_JOB_RETRIES", "2"))

JS_INSTRUMENT_SETTINGS = json.loads(JS_INSTRUMENT_SETTINGS)

SENTRY_DSN = os.getenv("SENTRY_DSN", None)
LOGGER_SETTINGS = mp_logger.parse_config_from_env()

if CALLSTACK_INSTRUMENT is True:
# Must have JS_INSTRUMENT True for CALLSTACK_INSTRUMENT to work
Expand Down Expand Up @@ -74,29 +85,38 @@
browser_params[i].prefs = json.loads(PREFS)

# Manager configuration
manager_params.data_directory = "~/Desktop/%s/" % CRAWL_DIRECTORY
manager_params.log_directory = "~/Desktop/%s/" % CRAWL_DIRECTORY
manager_params.output_format = "s3"
manager_params.s3_bucket = S3_BUCKET
manager_params.s3_directory = CRAWL_DIRECTORY

# Allow the use of localstack's mock s3 service
S3_ENDPOINT = os.getenv("S3_ENDPOINT")
if S3_ENDPOINT:
boto3.DEFAULT_SESSION = LocalS3Session(endpoint_url=S3_ENDPOINT)
manager_params.s3_bucket = local_s3_bucket(boto3.resource("s3"), name=S3_BUCKET)

manager_params.data_directory = Path("~/Desktop/") / CRAWL_DIRECTORY
manager_params.log_directory = Path("~/Desktop/") / CRAWL_DIRECTORY

structured = GcsStructuredProvider(
project=GCP_PROJECT,
bucket_name=GCS_BUCKET,
base_path=CRAWL_DIRECTORY,
token=AUTH_TOKEN,
)
unstructured = GcsUnstructuredProvider(
project=GCP_PROJECT,
bucket_name=GCS_BUCKET,
base_path=CRAWL_DIRECTORY + "/data",
token=AUTH_TOKEN,
)
# Instantiates the measurement platform
# Commands time out by default after 60 seconds
manager = TaskManager(manager_params, browser_params, logger_kwargs=LOGGER_SETTINGS)
manager = TaskManager(
manager_params,
browser_params,
structured,
unstructured,
logger_kwargs=LOGGER_SETTINGS,
)

# At this point, Sentry should be initiated
if SENTRY_DSN:
# Add crawler.py-specific context
with sentry_sdk.configure_scope() as scope:
# tags generate breakdown charts and search filters
scope.set_tag("CRAWL_DIRECTORY", CRAWL_DIRECTORY)
scope.set_tag("S3_BUCKET", S3_BUCKET)
scope.set_tag("GCS_BUCKET", GCS_BUCKET)
scope.set_tag("DISPLAY_MODE", DISPLAY_MODE)
scope.set_tag("HTTP_INSTRUMENT", HTTP_INSTRUMENT)
scope.set_tag("COOKIE_INSTRUMENT", COOKIE_INSTRUMENT)
Expand All @@ -108,9 +128,10 @@
scope.set_tag("DWELL_TIME", DWELL_TIME)
scope.set_tag("TIMEOUT", TIMEOUT)
scope.set_tag("MAX_JOB_RETRIES", MAX_JOB_RETRIES)
scope.set_tag("CRAWL_REFERENCE", "%s/%s" % (S3_BUCKET, CRAWL_DIRECTORY))
scope.set_tag("CRAWL_REFERENCE", "%s/%s" % (GCS_BUCKET, CRAWL_DIRECTORY))
# context adds addition information that may be of interest
scope.set_context("PREFS", PREFS)
if PREFS:
scope.set_context("PREFS", json.loads(PREFS))
scope.set_context(
"crawl_config",
{
Expand Down Expand Up @@ -159,9 +180,9 @@ def get_job_completion_callback(
job_queue: rediswq.RedisWQ,
job: bytes,
) -> Callable[[bool], None]:
def callback(sucess: bool) -> None:
def callback(success: bool) -> None:
with unsaved_jobs_lock:
if sucess:
if success:
logger.info("Job %r is done", job)
job_queue.complete(job)
else:
Expand Down
Loading

0 comments on commit b29c3f4

Please sign in to comment.