Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Aggregator Rewrite #753

Merged
merged 152 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
152 commits
Select commit Hold shift + click to select a range
f252410
First steps in the rewrite
Sep 29, 2020
0ba7de7
Fixed import paths
Sep 29, 2020
d65af1e
One giant refactor
Sep 29, 2020
0a18a19
Merge branch 'master' into DataAggregatorRefactor
Sep 30, 2020
3fb70b8
Fixing tests
Sep 30, 2020
53b55cd
Adding mypy
Sep 30, 2020
ec4c7df
Removed mypy from pre-commit workflow
Sep 30, 2020
7288783
First draft on DataAggregator
Oct 1, 2020
f694254
Wrote a DataAggregator that starts and shuts down
Oct 1, 2020
b7e9b1d
Created tests and added more empty types
Oct 2, 2020
8eaf7ce
Got demo.py working
Oct 2, 2020
d633f95
Created sql_provider
Oct 2, 2020
71ae8b0
Cleaned up imports in TaskManager
Oct 9, 2020
6e11e1b
Added async
Oct 9, 2020
07db4ab
Fixed minor bugs
Oct 9, 2020
f3857ac
First steps at porting arrow
Oct 9, 2020
ee5d5b6
Introduced TableName and different Task handling
Oct 9, 2020
9e06a5e
Added more failing tests
Oct 9, 2020
2737ec0
First first completes others don't
Oct 13, 2020
c63ae82
It works
Oct 13, 2020
8177ed4
Started working on arrow_provider
Oct 14, 2020
50e1539
Implemented ArrowProvider
Oct 16, 2020
9e1d67e
Added logger fixture
Oct 16, 2020
fe2e977
Fixed test_storage_controller
Oct 16, 2020
157ee24
Fixing OpenWPMTest.visit()
Oct 16, 2020
52c4ff6
Moved test/storage_providers to test/storage
Oct 16, 2020
1098727
Fixing up tests
Oct 19, 2020
d7e7268
Moved automation to openwpm
Nov 16, 2020
86fa88c
Merge branch 'master' into DataAggregatorRefactor
Nov 16, 2020
ce5e901
Readded datadir to .gitignore
Nov 16, 2020
0631848
Ran repin.sh
Nov 16, 2020
3d2d720
Fixed formatting
Nov 16, 2020
d167846
Let's see if this works
Nov 16, 2020
7f1597f
Fixed imports
Nov 16, 2020
5de0822
Got arrow_memory_provider working
Nov 16, 2020
ae718dc
Merge branch 'master' into DataAggregatorRefactor
Nov 25, 2020
12a60a0
Starting to rewrite tests
Nov 25, 2020
84bff66
Setting up fixtures
Nov 25, 2020
4eb5c23
Attempting to fix all the tests
Nov 25, 2020
9b03e30
Still fixing tests
Nov 25, 2020
95bfcd5
Broken content saving
Nov 25, 2020
1b2f162
Added node
Nov 25, 2020
f01756b
Fixed screenshot tests
Nov 26, 2020
c5dfcd6
Fixing more tests
Nov 27, 2020
9d635d3
Fixed tests
Nov 27, 2020
ceb1d98
Implemented local_storage.py
Nov 27, 2020
11fb99f
Cleaned up flush_cache
Nov 27, 2020
17835b4
Fixing more tests
Nov 27, 2020
cc9ed52
Wrote test for LocalArrowProvider
Nov 27, 2020
0098181
Introduced tests for local_storage_provider.py
Nov 27, 2020
bf4f92c
Asserting test dir is empty
Nov 30, 2020
5c0a1e1
Creating subfolder for different aggregators
Dec 4, 2020
5981463
New depencies and init()
Dec 4, 2020
ba56b34
Everything is terribly broken
Dec 4, 2020
74ae07c
Figured out finalize_visit_id
Dec 7, 2020
6068c69
Running two event loops kinda works???
Dec 7, 2020
17a22d3
Rearming the event
Dec 8, 2020
3389d00
Introduced mypy
Dec 8, 2020
7343c88
Downgraded black in pre-commit
Dec 8, 2020
babd962
Modifying the database directly
Dec 8, 2020
6f9a06d
Merge branch 'master' into DataAggregatorRefactor
Dec 8, 2020
b3d28a0
Fixed formatting
Dec 11, 2020
791d865
Made mypy a lil stricter
Dec 11, 2020
66e8caa
Fixing docs and config printing
Dec 11, 2020
70963bd
Realising I've been using the wrong with
Dec 11, 2020
9862bf7
Trying to figure arrow_storage
Dec 11, 2020
4a036fa
Moving lock initialization in in_memory_storage
Dec 11, 2020
57d8ba9
Fixing tests
Dec 11, 2020
67d3070
Fixing up tests and adding more typechecking
Dec 11, 2020
de00f94
Fixed num_browsers in test_cache_hits_recorded
Dec 11, 2020
4291ddb
Parametrized unstructured
Dec 11, 2020
fa1c52f
String fix
Dec 11, 2020
9aed882
Added failing test
Dec 15, 2020
ef0ba1e
New test
Dec 23, 2020
1b14cbd
Review changes with Steven
Dec 23, 2020
8eb6ef0
Fixed repin.sh and test_arrow_cache
Jan 8, 2021
51d510f
Merge branch 'master' into DataAggregatorRefactor
Jan 15, 2021
24fc5d2
Minor change
Jan 15, 2021
0096007
Fixed prune-environment.py
Jan 15, 2021
962af53
Removing references to DataAggregator
Jan 15, 2021
902e4ed
Fixed test_seed_persistance
Jan 15, 2021
25cd9cf
More paths
Jan 15, 2021
dcb9a6a
Fixed test display shutdown
Jan 18, 2021
e91aba7
Made cache test more robust
Jan 18, 2021
e4c9bb8
Update crawler.py
Jan 18, 2021
247a69c
Slimming down ManagerParams
Jan 18, 2021
41e59ad
Fixing more tests
Jan 18, 2021
c9e52ee
Merge remote-tracking branch 'origin/DataAggregatorRefactor' into Dat…
Jan 18, 2021
7acb624
Update test/storage/test_storage_controller.py
Jan 19, 2021
db0d27f
Purging references to DataAggregator
Jan 22, 2021
abe4a01
Reverted changes to .travis.yml
Jan 22, 2021
2223d0c
Merge remote-tracking branch 'origin/DataAggregatorRefactor' into Dat…
Jan 22, 2021
d7400d2
Demo.py saves locally again
Jan 22, 2021
645240b
Readjusting test paths
Jan 22, 2021
ecb87f0
Expanded comment on initialize to reference #846
Jan 22, 2021
8629538
Made token optional in finalize_visit_id
Jan 22, 2021
9983362
Simplified test paramtetrization
Jan 22, 2021
105c73b
Fixed callback semantics change
Jan 22, 2021
f5a0abd
Removed test_parse_http_stack_trace_str
Jan 22, 2021
e6175db
Added DataSocket
Jan 22, 2021
173de3a
WIP need to fix path encoding
Jan 22, 2021
501dc5c
Fixed path encoding
Jan 25, 2021
6bd5575
Added task and crawl to schema
Jan 25, 2021
e5395d4
Merge branch 'master' into DataAggregatorRefactor
Jan 29, 2021
eeceaa3
Fixed paths in GitHub actions
Jan 29, 2021
22a822b
Merge branch 'master' into DataAggregatorRefactor
Jan 29, 2021
d7db8ca
Refactored completion handling
Jan 29, 2021
d5733db
Fix tests
Jan 29, 2021
6ee9972
Trying to fix tests on CI
Feb 1, 2021
89635c2
Removed redundant setting of tag
Feb 1, 2021
d4a391d
Removing references to S3
Feb 1, 2021
ffbb346
Purging more DataAggregator references
Feb 1, 2021
379af2d
Craking up logging to figure out test failure
Feb 1, 2021
e5c897b
Moved test_values into a fixture
Feb 1, 2021
8520be6
Fixing GcpUnstructuredProvider
Feb 1, 2021
6527179
Fixed paths for future crawls
Feb 1, 2021
1ca2739
Renamed sqllite to official sqlite
Feb 3, 2021
7cf48a1
Restored demo.py
Feb 3, 2021
29d3e27
Update openwpm/commands/profile_commands.py
Feb 3, 2021
5b5f229
Restored previous behaviour of DumpProfileCommand
Feb 3, 2021
73f0850
Removed leftovers
Feb 3, 2021
a4a75ff
Cleaned up comments
Feb 3, 2021
41f6656
Expanded lock check
Feb 3, 2021
9046d0d
Fixed more stuff
Feb 3, 2021
0ec3353
More comment updates
Feb 3, 2021
c1a6038
Update openwpm/socket_interface.py
Feb 3, 2021
ae25bfa
Removed outdated comment
Feb 4, 2021
4e89806
Using config_encoder
Feb 4, 2021
669a40f
Merge remote-tracking branch 'origin/DataAggregatorRefactor' into Dat…
Feb 4, 2021
85a4c2d
Renamed tar_location to tar_path
Feb 4, 2021
4c03174
Removed references to database_name in docs
Feb 5, 2021
f565507
Cleanup
Feb 5, 2021
2553a09
Moved screenshot_path and source_dump_path to ManagerParamsInternal
Feb 5, 2021
fceaee0
Fixed imports
Feb 12, 2021
1846d25
Fixing up comments
Feb 12, 2021
dfdc34d
Fixing up comments
Feb 12, 2021
b922774
More docs
Feb 15, 2021
a7bcbb8
Merge branch 'master' into DataAggregatorRefactor
Feb 15, 2021
55f6cdb
updated dependencies
Feb 16, 2021
65c6eda
Fixed test_task_manager
Feb 17, 2021
9dbeb7e
Merge branch 'master' into DataAggregatorRefactor
Feb 17, 2021
048546d
Reupgraded to python 3.9.1
Feb 17, 2021
59484de
Restoring crawl_reference in mp_logger
Feb 22, 2021
937c8fe
Removed unused imports
Feb 22, 2021
1c819f7
Apply suggestions from code review
Feb 22, 2021
2cbb801
Cleaned up socket handling
Feb 22, 2021
2dd339c
Fixed TaskManager.__exit__
Feb 22, 2021
74762cb
Merge remote-tracking branch 'origin/DataAggregatorRefactor' into Dat…
Feb 22, 2021
a555c14
Moved validation code into config.py
Feb 22, 2021
4f6aed1
Removed comment
Feb 22, 2021
eb08c13
Removed comment
Feb 22, 2021
4820b2c
Removed comment
Feb 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ repos:
- id: black
language_version: python3 # Should be a command that runs python3.6+


14 changes: 5 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ env:
# Once we add and remove tests, this distribution may become unbalanced.
# Feel free to move tests around to make the running time of the jobs
# as close as possible.
- TESTS=test_[a-e]*
- TESTS=test_[f-h]*
- TESTS=test_[i-r,t-z]*
- TESTS=test/test_[a-e]*
- TESTS=test/test_[f-h]*
- TESTS=test/test_[i-r,t-z]*
# test_simple_commands.py is slow due to parametrization.
- TESTS=test_[s]*
- TESTS=test/test_[s]*
- TESTS=webextension
git:
depth: 3
Expand All @@ -36,15 +36,11 @@ after_success:

jobs:
include:
- language:
python:
- language: "python"
env:
- TESTS="Docker"
services:
- docker
before_install:
before_script:
install:
script:
- docker build -f Dockerfile -t openwpm .
- ./scripts/deploy-to-dockerhub.sh
vringar marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions automation/BrowserManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .Commands.Types import ShutdownCommand
from .DeployBrowsers import deploy_browser
from .Errors import BrowserConfigError, BrowserCrashError, ProfileLoadError
from .SocketInterface import clientsocket
from .SocketInterface import ClientSocket
from .utilities.multiprocess_utils import (
Process,
kill_process_and_children,
Expand Down Expand Up @@ -478,7 +478,7 @@ def BrowserManager(
"BROWSER %i: Connecting to extension on port %i"
% (browser_params["browser_id"], port)
)
extension_socket = clientsocket(serialization="json")
extension_socket = ClientSocket(serialization="json")
extension_socket.connect("127.0.0.1", int(port))
else:
extension_socket = None
Expand Down
2 changes: 1 addition & 1 deletion automation/CommandSequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def run_custom_function(self, function_handle, func_args=(), timeout=30):
command = RunCustomFunctionCommand(function_handle, func_args)
self._commands_with_timeout.append((command, timeout))

def mark_done(self, success: bool):
def mark_done(self, success: bool) -> None:
if self.callback is not None:
self.callback(success)

Expand Down
8 changes: 4 additions & 4 deletions automation/Commands/browser_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

from ..SocketInterface import clientsocket
from ..SocketInterface import ClientSocket
from .utils.webdriver_utils import (
execute_in_all_frames,
execute_script_with_retry,
Expand Down Expand Up @@ -112,7 +112,7 @@ def tab_restart_browser(webdriver):


def get_website(
url, sleep, visit_id, webdriver, browser_params, extension_socket: clientsocket
url, sleep, visit_id, webdriver, browser_params, extension_socket: ClientSocket
):
"""
goes to <url> using the given <webdriver> instance
Expand Down Expand Up @@ -372,7 +372,7 @@ def collect_source(driver, frame_stack, rv={}):


def finalize(
visit_id: int, webdriver: WebDriver, extension_socket: clientsocket, sleep: int
visit_id: int, webdriver: WebDriver, extension_socket: ClientSocket, sleep: int
) -> None:
""" Informs the extension that a visit is done """
tab_restart_browser(webdriver)
Expand All @@ -383,6 +383,6 @@ def finalize(
extension_socket.send(msg)


def initialize(visit_id: int, extension_socket: clientsocket) -> None:
def initialize(visit_id: int, extension_socket: ClientSocket) -> None:
msg = {"action": "Initialize", "visit_id": visit_id}
extension_socket.send(msg)
20 changes: 13 additions & 7 deletions automation/DataAggregator/BaseAggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from multiprocess import Queue

from ..SocketInterface import serversocket
from ..SocketInterface import ServerSocket
from ..utilities.multiprocess_utils import Process

RECORD_TYPE_CONTENT = "page_content"
Expand Down Expand Up @@ -61,7 +61,7 @@ def __init__(
self.record_queue: Queue = None # Initialized on `startup`
self.logger = logging.getLogger("openwpm")
self.curent_visit_ids: List[int] = list() # All visit_ids in flight
self.sock: Optional[serversocket] = None
self.sock: Optional[ServerSocket] = None

@abc.abstractmethod
def process_record(self, record):
Expand All @@ -84,7 +84,9 @@ def process_content(self, record):
for (content, content_hash)"""

@abc.abstractmethod
def run_visit_completion_tasks(self, visit_id: int, interrupted: bool = False):
def run_visit_completion_tasks(
self, visit_id: int, interrupted: bool = False
) -> None:
"""Will be called once a visit_id will receive no new records

Parameters
Expand All @@ -98,7 +100,7 @@ def startup(self):
"""Run listener startup tasks

Note: Child classes should call this method"""
self.sock = serversocket(name=type(self).__name__)
self.sock = ServerSocket(name=type(self).__name__)
self.status_queue.put(self.sock.sock.getsockname())
self.sock.start_accepting()
self.record_queue = self.sock.queue
Expand Down Expand Up @@ -162,7 +164,7 @@ def mark_visit_complete(self, visit_id: int) -> None:
relating to a certain visit_id have been saved"""
self.completion_queue.put((visit_id, False))

def mark_visit_incomplete(self, visit_id: int):
def mark_visit_incomplete(self, visit_id: int) -> None:
"""This function should be called to indicate that a certain visit
has been interrupted and will forever be incomplete
"""
Expand Down Expand Up @@ -224,6 +226,10 @@ def get_next_visit_id(self):
def get_next_browser_id(self):
"""Return a unique crawl ID used as a key for a browser instance"""

@abc.abstractmethod
def launch(self):
"""Launch the aggregator listener process"""

def get_most_recent_status(self):
"""Return the most recent queue size sent from the listener process"""

Expand Down Expand Up @@ -273,15 +279,15 @@ def get_new_completed_visits(self) -> List[Tuple[int, bool]]:
finished_visit_ids.append(self.completion_queue.get())
return finished_visit_ids

def launch(self, listener_process_runner, *args):
def _launch(self, listener_process_runner, *args):
"""Launch the aggregator listener process"""
args = ((self.status_queue, self.completion_queue, self.shutdown_queue),) + args
self.listener_process = Process(target=listener_process_runner, args=args)
self.listener_process.daemon = True
self.listener_process.start()
self.listener_address = self.status_queue.get()

def shutdown(self, relaxed: bool = True):
def shutdown(self, relaxed: bool = True) -> None:
""" Terminate the aggregator listener process"""
self.logger.debug(
"Sending the shutdown signal to the %s listener process..."
Expand Down
12 changes: 8 additions & 4 deletions automation/DataAggregator/LocalAggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
SQL_BATCH_SIZE = 1000
LDB_BATCH_SIZE = 100
MIN_TIME = 5 # seconds
SCHEMA_FILE = os.path.join(os.path.dirname(__file__), "schema.sql")
SCHEMA_FILE = os.path.join(
os.path.dirname(__file__), "..", "data_aggregator", "schema.sql"
)
LDB_NAME = "content.ldb"


Expand Down Expand Up @@ -87,7 +89,7 @@ def _generate_insert(self, table, data):
statement = statement + ") " + value_str + ")"
return statement, values

def process_record(self, record: Tuple[str, Union[str, Dict[str, Any]]]):
def process_record(self, record: Tuple[str, Union[str, Dict[str, Any]]]) -> None:
"""Add `record` to database"""

if len(record) != 2:
Expand Down Expand Up @@ -181,7 +183,9 @@ def maybe_commit_records(self):
self._ldb_counter = 0
self._ldb_commit_time = time.time()

def run_visit_completion_tasks(self, visit_id: int, interrupted: bool = False):
def run_visit_completion_tasks(
self, visit_id: int, interrupted: bool = False
) -> None:
if interrupted:
self.logger.warning("Visit with visit_id %d got interrupted", visit_id)
self.cur.execute("INSERT INTO incomplete_visits VALUES (?)", (visit_id,))
Expand Down Expand Up @@ -283,7 +287,7 @@ def get_next_browser_id(self):

def launch(self):
"""Launch the aggregator listener process"""
super(LocalAggregator, self).launch(
super(LocalAggregator, self)._launch(
listener_process_runner, self.manager_params, self.ldb_enabled
)

Expand Down
10 changes: 6 additions & 4 deletions automation/DataAggregator/S3Aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from botocore.exceptions import ClientError, EndpointConnectionError
from pyarrow.filesystem import S3FSWrapper # noqa

from ..data_aggregator.parquet_schema import PQ_SCHEMAS
from .BaseAggregator import (
RECORD_TYPE_CONTENT,
RECORD_TYPE_CREATE,
Expand All @@ -26,7 +27,6 @@
BaseListener,
BaseParams,
)
from .parquet_schema import PQ_SCHEMAS

CACHE_SIZE = 500
SITE_VISITS_INDEX = "_site_visits_index"
Expand Down Expand Up @@ -76,7 +76,7 @@ def __init__(
def factory_function():
return defaultdict(list)

self._records: Dict[int, DefaultDict[str, List[Any]]] = defaultdict(
self._records: DefaultDict[int, DefaultDict[str, List[Any]]] = defaultdict(
factory_function
) # maps visit_id and table to records
self._batches: DefaultDict[str, List[pa.RecordBatch]] = defaultdict(
Expand Down Expand Up @@ -302,7 +302,9 @@ def drain_queue(self):
super(S3Listener, self).drain_queue()
self._send_to_s3(force=True)

def run_visit_completion_tasks(self, visit_id: int, interrupted: bool = False):
def run_visit_completion_tasks(
self, visit_id: int, interrupted: bool = False
) -> None:
if interrupted:
self.logger.error("Visit with visit_id %d got interrupted", visit_id)
self._write_record("incomplete_visits", {"visit_id": visit_id}, visit_id)
Expand Down Expand Up @@ -406,6 +408,6 @@ def get_next_browser_id(self):

def launch(self):
"""Launch the aggregator listener process"""
super(S3Aggregator, self).launch(
super(S3Aggregator, self)._launch(
listener_process_runner, self.manager_params, self._instance_id
)
4 changes: 2 additions & 2 deletions automation/MPLogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from tblib import pickling_support

from .Commands.utils.webdriver_utils import parse_neterror
from .SocketInterface import serversocket
from .SocketInterface import ServerSocket

pickling_support.install()

Expand Down Expand Up @@ -218,7 +218,7 @@ def _initialize_sentry(self):

def _start_listener(self):
"""Start listening socket for remote logs from extension"""
socket = serversocket(name="loggingserver")
socket = ServerSocket(name="loggingserver")
self._status_queue.put(socket.sock.getsockname())
socket.start_accepting()
self._status_queue.join() # block to allow parent to retrieve address
Expand Down
12 changes: 6 additions & 6 deletions automation/SocketInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# see: https://stackoverflow.com/a/1148237


class serversocket:
class ServerSocket:
"""
A server socket to receive and process string messages
from client sockets to a central queue
Expand Down Expand Up @@ -111,7 +111,7 @@ def close(self):
self.sock.close()


class clientsocket:
class ClientSocket:
"""A client socket for sending messages"""

def __init__(self, serialization="json", verbose=False):
Expand Down Expand Up @@ -171,17 +171,17 @@ def main():

# Just for testing
if sys.argv[1] == "s":
sock = serversocket(verbose=True)
sock.start_accepting()
ssock: ServerSocket = ServerSocket(verbose=True)
ssock.start_accepting()
input("Press enter to exit...")
sock.close()
ssock.close()
elif sys.argv[1] == "c":
host = input("Enter the host name:\n")
port = input("Enter the port:\n")
serialization = input("Enter the serialization type (default: 'json'):\n")
if serialization == "":
serialization = "json"
sock = clientsocket(serialization=serialization)
sock: ClientSocket = ClientSocket(serialization=serialization)
sock.connect(host, int(port))
msg = None

Expand Down
Loading