Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ceph to S3 #2665

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ This library provides a library called `thoth-storages
<https://pypi.org/project/thoth-storages>`__ used in project `Thoth
<https://thoth-station.ninja>`__. The library exposes core queries and methods
for `PostgreSQL database <https://www.postgresql.org/>`__ as well as adapters
for manipulating with `Ceph <https://ceph.io/>`__ via its S3 compatible API.
for manipulating the S3 store.

Quick Start
===========
Expand Down Expand Up @@ -301,7 +301,7 @@ Automatic backups of Thoth deployment

In each deployment, an automatic knowledge `graph backup cronjob
<https://github.com/thoth-station/graph-backup-job>`__ is run, usually once a
day. Results of automatic backups are stored on Ceph - you can find them in
day. Results of automatic backups are stored in the S3 store - you can find them in
``s3://<bucket-name>/<prefix>/<deployment-name>/graph-backup/pg_dump-<timestamp>.sql``.
Refer to deployment configuration for expansion of parameters in the path.

Expand All @@ -311,7 +311,7 @@ PostgreSQL instance and fill it from the backup file:
.. code-block:: console

$ cd thoth-station/storages
$ aws s3 --endpoint <ceph-s3-endpoint> cp s3://<bucket-name>/<prefix>/<deployment-name>/graph-backup/pg_dump-<timestamp> pg_dump-<timestamp>.sql
$ aws s3 --endpoint <s3-endpoint> cp s3://<bucket-name>/<prefix>/<deployment-name>/graph-backup/pg_dump-<timestamp> pg_dump-<timestamp>.sql
$ podman-compose up
$ psql -h localhost -p 5432 --username=postgres < pg_dump-<timestamp>.sql
password: <type password "postgres" here>
Expand Down Expand Up @@ -368,9 +368,9 @@ Syncing results of a workflow run in the cluster

Each workflow task in the cluster reports a JSON which states necessary
information about the task run (metadata) and actual results. These results of
workflow tasks are stored on object storage `Ceph <https://ceph.io/>`__ via S3
compatible API and later on synced via graph syncs to the knowledge graph. The
component responsible for graph syncs is `graph-sync-job
workflow tasks are stored on in a object storage supporting the S3 API and
later on synced via graph syncs to the knowledge graph. The component
responsible for graph syncs is `graph-sync-job
<https://github.com/thoth-station/graph-sync-job>`__ which is written generic
enough to sync any data and report metrics about synced data so you don't need
to provide such logic on each new workload registered in the system. To sync
Expand All @@ -390,10 +390,10 @@ For query naming conventions, please read all the docs in `conventions for
query name
<https://github.com/thoth-station/storages/blob/master/docs/conventions/README.md>`__.

Accessing data on Ceph
======================
To access data on Ceph, you need to know ``aws_access_key_id`` and ``aws_secret_access_key`` credentials
of endpoint you are connecting to.
Accessing data on the S3 store
==============================
To access data on the S3 store, you need to know ``aws_access_key_id`` and
``aws_secret_access_key`` credentials of endpoint you are connecting to.

Absolute file path of data you are accessing is constructed as: ``s3://<bucket_name>/<prefix_name>/<file_path>``

Expand All @@ -408,15 +408,15 @@ There are two ways to initialize the data handler:
* - Variable name
- Content
* - ``S3_ENDPOINT_URL``
- Ceph Host name
- S3 Host name
* - ``CEPH_BUCKET``
- Ceph Bucket name
- S3 Bucket name
* - ``CEPH_BUCKET_PREFIX``
- Ceph Prefix
- S3 Prefix
* - ``CEPH_KEY_ID``
- Ceph Key ID
- S3 Key ID
* - ``CEPH_SECRET_KEY``
- Ceph Secret Key
- S3 Secret Key

.. code-block:: python

Expand Down
2 changes: 1 addition & 1 deletion docs/syncs/README.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Thoth Storages syncs into Thoth Databse
Thoth Storages syncs into Thoth Database
----------------------------------------

Thoth collects several type of observations and sync them using `graph-sync-job <https://github.com/thoth-station/graph-sync-job>`_
Expand Down
18 changes: 9 additions & 9 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ def _get_result_type(cls, result_type):

class StorageBaseTest(ThothStoragesTest):
def test_connect(self, adapter):
"""Test lazy connection to Ceph."""
"""Test lazy connection to S3 store."""
assert not adapter.is_connected()

flexmock(adapter.ceph).should_receive("connect").with_args().and_return(None).once()
flexmock(adapter.ceph).should_receive("is_connected").with_args().and_return(True).once()
flexmock(adapter.s3).should_receive("connect").with_args().and_return(None).once()
flexmock(adapter.s3).should_receive("is_connected").with_args().and_return(True).once()
adapter.connect()

assert adapter.is_connected()
Expand All @@ -72,17 +72,17 @@ def test_retrieve_document(self, adapter):
"""Test proper document retrieval."""
document = {"foo": "bar"}
document_id = "<document_id>"
flexmock(adapter.ceph).should_receive("retrieve_document").with_args(document_id).and_return(document).once()
assert adapter.ceph.retrieve_document(document_id) == document
flexmock(adapter.s3).should_receive("retrieve_document").with_args(document_id).and_return(document).once()
assert adapter.s3.retrieve_document(document_id) == document

def test_iterate_results(self, adapter):
"""Test iterating over results for build logs stored on Ceph."""
"""Test iterating over results for build logs stored on S3 store."""
# Just check that the request is properly propagated.
flexmock(adapter.ceph).should_receive("iterate_results").with_args().and_yield().once()
flexmock(adapter.s3).should_receive("iterate_results").with_args().and_yield().once()
assert list(adapter.iterate_results()) == []

def test_get_document_listing(self, adapter):
"""Test document listing for build logs stored on Ceph."""
"""Test document listing for build logs stored on S3 store."""
# Just check that the request is properly propagated.
flexmock(adapter.ceph).should_receive("get_document_listing").with_args().and_return([]).once()
flexmock(adapter.s3).should_receive("get_document_listing").with_args().and_return([]).once()
assert list(adapter.get_document_listing()) == []
6 changes: 3 additions & 3 deletions tests/test_analyses.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from thoth.storages import AnalysisResultsStore

from .test_result_base import ResultBaseTest
from .test_ceph import CEPH_INIT_KWARGS
from .test_s3 import CEPH_INIT_KWARGS


_DEPLOYMENT_NAME = "thoth-my-deployment"
Expand All @@ -40,8 +40,8 @@ class TestAnalysisResultsStore(ResultBaseTest):
"""A Test Class for Analysis Results Store."""

def test_prefix(self, adapter):
"""Test that results stored on Ceph are correctly prefixed."""
assert adapter.ceph.prefix == f"{_BUCKET_PREFIX}/{_DEPLOYMENT_NAME}/{adapter.RESULT_TYPE}/"
"""Test that results stored on S3 store are correctly prefixed."""
assert adapter.s3.prefix == f"{_BUCKET_PREFIX}/{_DEPLOYMENT_NAME}/{adapter.RESULT_TYPE}/"

@pytest.mark.parametrize("document,document_id", ResultBaseTest.get_analyzer_results())
def test_store_document(self, adapter, document, document_id):
Expand Down
36 changes: 18 additions & 18 deletions tests/test_buildlogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from thoth.storages import BuildLogsStore

from .base import StorageBaseTest
from .test_ceph import CEPH_ENV_MAP
from .test_ceph import CEPH_INIT_ENV
from .test_ceph import CEPH_INIT_KWARGS
from .test_s3 import CEPH_ENV_MAP
from .test_s3 import CEPH_INIT_ENV
from .test_s3 import CEPH_INIT_KWARGS
from .utils import with_adjusted_env

_BUILDLOGS_INIT_KWARGS = {
Expand Down Expand Up @@ -67,43 +67,43 @@ def test_init_kwargs(self):
f"got {getattr(adapter, key)!r} instead"
)

assert adapter.ceph is not None
assert not adapter.ceph.is_connected()
assert adapter.s3 is not None
assert not adapter.s3.is_connected()

for key, value in CEPH_INIT_KWARGS.items():
assert getattr(adapter.ceph, key) == value, (
f"Ceph's adapter key {key!r} should have value {value!r} but "
f"got {getattr(adapter.ceph, key)!r} instead"
assert getattr(adapter.s3, key) == value, (
f"S3 store's adapter key {key!r} should have value {value!r} but "
f"got {getattr(adapter.s3, key)!r} instead"
)

bucket_prefix = _BUILDLOGS_INIT_KWARGS_EXP["bucket_prefix"]
assert adapter.prefix == f"{bucket_prefix}/{adapter.deployment_name}/buildlogs/"
assert adapter.ceph.prefix == adapter.prefix
assert adapter.s3.prefix == adapter.prefix

def test_init_env(self, adapter):
"""Test initialization from env variables."""
assert not adapter.is_connected()
assert adapter.ceph is not None
assert not adapter.ceph.is_connected()
assert adapter.s3 is not None
assert not adapter.s3.is_connected()

assert adapter.deployment_name == _BUILDLOGS_INIT_ENV["THOTH_DEPLOYMENT_NAME"]

bucket_prefix = _BUILDLOGS_INIT_ENV_EXP["THOTH_CEPH_BUCKET_PREFIX"]
assert adapter.prefix == f"{bucket_prefix}/{adapter.deployment_name}/buildlogs/"
assert adapter.ceph.prefix == adapter.prefix
assert adapter.s3.prefix == adapter.prefix

for key, value in CEPH_INIT_ENV.items():
attribute = CEPH_ENV_MAP[key]
assert getattr(adapter.ceph, attribute) == value, (
f"Ceph's adapter attribute {attribute!r} should have value {value!r} but "
f"got {getattr(adapter.ceph, key)!r} instead (env: {key})"
assert getattr(adapter.s3, attribute) == value, (
f"S3 store's adapter attribute {attribute!r} should have value {value!r} but "
f"got {getattr(adapter.s3, key)!r} instead (env: {key})"
)

def test_store_document(self, adapter):
"""Test storing results on Ceph."""
"""Test storing results on S3 store."""
# This method handling is different from store_document() of result base as we use hashes as ids.
document = b'{\n "foo": "bar"\n}'
document_id = "buildlog-bbe8e9a86be651f9efc8e8df7fb76999d8e9a4a9674df9be8de24f4fb3d872a2"
adapter.ceph = flexmock(dict2blob=lambda _: document)
adapter.ceph.should_receive("store_blob").with_args(document, document_id).and_return(document_id).once()
adapter.s3 = flexmock(dict2blob=lambda _: document)
adapter.s3.should_receive("store_blob").with_args(document, document_id).and_return(document_id).once()
assert adapter.store_document(document) == document_id
14 changes: 7 additions & 7 deletions tests/test_result_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from thoth.storages.result_base import ResultStorageBase

from .base import StorageBaseTest
from .test_ceph import CEPH_INIT_ENV
from .test_ceph import CEPH_INIT_KWARGS
from .utils import connected_ceph_adapter
from .test_s3 import CEPH_INIT_ENV
from .test_s3 import CEPH_INIT_KWARGS
from .utils import connected_s3_adapter

_DEPLOYMENT_NAME = "my-deployment"
_RESULT_TYPE = "TEST"
Expand Down Expand Up @@ -58,8 +58,8 @@ def test_get_document_id(self):
@pytest.mark.parametrize("document,document_id", StorageBaseTest.get_all_results())
def test_store_document(self, adapter, document, document_id):
# pytest does not support fixtures and parameters at the same time
adapter.ceph = flexmock(get_document_id=ResultStorageBase.get_document_id)
adapter.ceph.should_receive("store_document").with_args(document, document_id).and_return(document_id).once()
adapter.s3 = flexmock(get_document_id=ResultStorageBase.get_document_id)
adapter.s3.should_receive("store_document").with_args(document, document_id).and_return(document_id).once()
assert adapter.store_document(document) == document_id

def test_assertion_error(self):
Expand All @@ -69,11 +69,11 @@ def test_assertion_error(self):

@staticmethod
def store_retrieve_document_test(adapter, document, document_id):
"""Some logic common to Ceph storing/retrieval wrappers.
"""Some logic common to S3 store storing/retrieval wrappers.

Call it with appropriate adapter.
"""
with connected_ceph_adapter(adapter) as connected_adapter:
with connected_s3_adapter(adapter) as connected_adapter:
stored_document_id = connected_adapter.store_document(document)
assert stored_document_id == document_id
assert connected_adapter.retrieve_document(stored_document_id) == document
Expand Down
42 changes: 21 additions & 21 deletions tests/test_ceph.py → tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import pytest
from moto import mock_s3

from thoth.storages import CephStore
from thoth.storages import S3store
from thoth.storages.exceptions import NotFoundError

from .base import ThothStoragesTest
from .utils import with_adjusted_env
from .utils import connected_ceph_adapter
from .utils import connected_s3_adapter


CEPH_INIT_ENV = {
Expand Down Expand Up @@ -63,46 +63,46 @@

@pytest.fixture(name="adapter")
def _fixture_adapter():
"""Retrieve an adapter to Ceph."""
"""Retrieve an adapter to S3 store."""
mock_s3().start()
try:
yield CephStore(_BUCKET_PREFIX, **CEPH_INIT_KWARGS)
yield S3store(_BUCKET_PREFIX, **CEPH_INIT_KWARGS)
finally:
mock_s3().stop()


@pytest.fixture(name="connected_adapter")
def _fixture_connected_adapter():
"""Retrieve a connected adapter to Ceph."""
adapter = CephStore(_BUCKET_PREFIX, **CEPH_INIT_KWARGS)
with connected_ceph_adapter(adapter, raw_ceph=True) as connected_adapter:
"""Retrieve a connected adapter to S3 store."""
adapter = S3store(_BUCKET_PREFIX, **CEPH_INIT_KWARGS)
with connected_s3_adapter(adapter, raw_s3=True) as connected_adapter:
yield connected_adapter


class TestCephStore(ThothStoragesTest):
class TestS3Store(ThothStoragesTest):
def test_init_kwargs(self):
"""Test initialization of Ceph based on arguments."""
adapter = CephStore(_BUCKET_PREFIX, **CEPH_INIT_KWARGS)
"""Test initialization of S3 store based on arguments."""
adapter = S3store(_BUCKET_PREFIX, **CEPH_INIT_KWARGS)

for key, value in CEPH_INIT_KWARGS.items():
assert (
getattr(adapter, key) == value
), f"Ceph attribute {key!r} has value {getattr(adapter, key)!r} but expected is {value!r}"
), f"S3 store attribute {key!r} has value {getattr(adapter, key)!r} but expected is {value!r}"

assert adapter.prefix == _BUCKET_PREFIX
assert not adapter.is_connected()

@with_adjusted_env(_ENV)
def test_init_env(self):
"""Test initialization of Ceph adapter based on env variables."""
adapter = CephStore(_BUCKET_PREFIX)
"""Test initialization of S3 store adapter based on env variables."""
adapter = S3store(_BUCKET_PREFIX)

assert adapter.prefix == _BUCKET_PREFIX

for key, value in CEPH_INIT_ENV.items():
attribute = CEPH_ENV_MAP[key]
assert getattr(adapter, attribute) == value, (
f"Ceph attribute {attribute!r} has value {getattr(adapter, attribute)!r} but expected is "
f"S3 store attribute {attribute!r} has value {getattr(adapter, attribute)!r} but expected is "
f"{value!r} (env: {key!r})"
)

Expand All @@ -113,11 +113,11 @@ def test_is_connected(self, adapter):
assert adapter.is_connected()

def test_get_document_listing_empty(self, connected_adapter):
"""Test listing of documents stored on Ceph."""
"""Test listing of documents stored on S3 store."""
assert list(connected_adapter.get_document_listing()) == []

def test_get_document_listing(self, connected_adapter):
"""Test listing of documents stored on Ceph."""
"""Test listing of documents stored on S3 store."""
assert list(connected_adapter.get_document_listing()) == []

document1, document1_id = {"foo": "bar"}, "666"
Expand All @@ -132,14 +132,14 @@ def test_get_document_listing(self, connected_adapter):
assert document2_id in document_listing

def test_test_store_blob(self, connected_adapter):
"""Test storing binary objects onto Ceph."""
"""Test storing binary objects onto S3 store."""
blob = b"foo"
key = "some-key"
connected_adapter.store_blob(blob, key)
assert connected_adapter.retrieve_blob(key) == blob

def test_store_document(self, connected_adapter):
"""Test storing document on Ceph."""
"""Test storing document on S3 store."""
document, key = {"thoth": "is awesome! ;-)"}, "my-key"
assert not connected_adapter.document_exists(key)

Expand All @@ -150,7 +150,7 @@ def test_iterate_results_empty(self, connected_adapter):
assert list(connected_adapter.iterate_results()) == []

def test_iterate_results(self, connected_adapter):
"""Test iterating over stored documents on Ceph."""
"""Test iterating over stored documents on S3 store."""
document1, key1 = {"thoth": "document"}, "key-1"
document2, key2 = {"just": "dict"}, "key-2"

Expand All @@ -168,13 +168,13 @@ def test_retrieve_document_not_exist(self, connected_adapter):
connected_adapter.retrieve_document("some-document-that-really-does-not-exist")

def test_document_exists(self, connected_adapter):
"""Test document presents on Ceph."""
"""Test document presents on S3 store."""
assert connected_adapter.document_exists("foo") is False
connected_adapter.store_document({"Hello": "Thoth"}, "foo")
assert connected_adapter.document_exists("foo") is True

def connect(self, adapter):
"""Test connecting to Ceph."""
"""Test connecting to S3 store."""
assert not adapter.is_connected()
adapter.connect()
assert adapter.is_connected()
Loading