Skip to content

Commit

Permalink
Add direct Atlos integration (#137)
Browse files Browse the repository at this point in the history
* Add Atlos feeder

* Add Atlos db

* Add Atlos storage

* Fix Atlos storages

* Fix Atlos feeder

* Only include URLs in Atlos feeder once they're processed

* Remove print

* Add Atlos documentation to README

* Formatting fixes

* Don't archive existing material

* avoid KeyError in atlos_db

* version bump

---------

Co-authored-by: msramalho <[email protected]>
  • Loading branch information
milesmcc and msramalho authored Apr 15, 2024
1 parent eb37f0b commit f603400
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 18 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ instaloader.session
orchestration.yaml
auto_archiver.egg-info*
logs*
*.csv
*.csv
archived/
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,38 @@ To use Google Drive storage you need the id of the shared folder in the `config.
#### Telethon + Instagram with telegram bot
The first time you run, you will be prompted to do a authentication with the phone number associated, alternatively you can put your `anon.session` in the root.

#### Atlos
When integrating with [Atlos](https://atlos.org), you will need to provide an API token in your configuration. You can learn more about Atlos and how to get an API token [here](https://docs.atlos.org/technical/api). You will have to provide this token to the `atlos_feeder`, `atlos_storage`, and `atlos_db` steps in your orchestration file. If you use a custom or self-hosted Atlos instance, you can also specify the `atlos_url` option to point to your custom instance's URL. For example:

```yaml
# orchestration.yaml content
steps:
feeder: atlos_feeder
archivers: # order matters
- youtubedl_archiver
enrichers:
- thumbnail_enricher
- hash_enricher
formatter: html_formatter
storages:
- atlos_storage
databases:
- console_db
- atlos_db

configurations:
atlos_feeder:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
atlos_db:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
atlos_storage:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
hash_enricher:
algorithm: "SHA-256"
```
## Running on Google Sheets Feeder (gsheet_feeder)
The `--gsheet_feeder.sheet` property is the name of the Google Sheet to check for URLs.
Expand Down
11 changes: 6 additions & 5 deletions src/auto_archiver/core/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@ class Media:
_mimetype: str = None # eg: image/jpeg
_stored: bool = field(default=False, repr=False, metadata=config(exclude=lambda _: True)) # always exclude

def store(self: Media, override_storages: List = None, url: str = "url-not-available"):
# stores the media into the provided/available storages [Storage]
# repeats the process for its properties, in case they have inner media themselves
# for now it only goes down 1 level but it's easy to make it recursive if needed
def store(self: Media, override_storages: List = None, url: str = "url-not-available", metadata: Any = None):
# 'Any' typing for metadata to avoid circular imports. Stores the media
# into the provided/available storages [Storage] repeats the process for
# its properties, in case they have inner media themselves for now it
# only goes down 1 level but it's easy to make it recursive if needed.
storages = override_storages or ArchivingContext.get("storages")
if not len(storages):
logger.warning(f"No storages found in local context or provided directly for {self.filename}.")
return

for s in storages:
for any_media in self.all_inner_media(include_self=True):
s.store(any_media, url)
s.store(any_media, url, metadata=metadata)

def all_inner_media(self, include_self=False):
""" Media can be inside media properties, examples include transformations on original media.
Expand Down
2 changes: 1 addition & 1 deletion src/auto_archiver/core/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def store(self: Metadata, override_storages: List = None):
self.remove_duplicate_media_by_hash()
storages = override_storages or ArchivingContext.get("storages")
for media in self.media:
media.store(override_storages=storages, url=self.get_url())
media.store(override_storages=storages, url=self.get_url(), metadata=self)

def set(self, key: str, val: Any) -> Metadata:
self.metadata[key] = val
Expand Down
2 changes: 1 addition & 1 deletion src/auto_archiver/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def archive(self, result: Metadata) -> Union[Metadata, None]:

# 6 - format and store formatted if needed
if (final_media := self.formatter.format(result)):
final_media.store(url=url)
final_media.store(url=url, metadata=result)
result.set_final_media(final_media)

if result.is_empty():
Expand Down
3 changes: 2 additions & 1 deletion src/auto_archiver/databases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
from .gsheet_db import GsheetsDb
from .console_db import ConsoleDb
from .csv_db import CSVDb
from .api_db import AAApiDb
from .api_db import AAApiDb
from .atlos_db import AtlosDb
79 changes: 79 additions & 0 deletions src/auto_archiver/databases/atlos_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
from typing import Union
from loguru import logger
from csv import DictWriter
from dataclasses import asdict
import requests

from . import Database
from ..core import Metadata
from ..utils import get_atlos_config_options


class AtlosDb(Database):
"""
Outputs results to Atlos
"""

name = "atlos_db"

def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)

@staticmethod
def configs() -> dict:
return get_atlos_config_options()

def failed(self, item: Metadata, reason: str) -> None:
"""Update DB accordingly for failure"""
# If the item has no Atlos ID, there's nothing for us to do
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return

requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={"metadata": {"processed": True, "status": "error", "error": reason}},
).raise_for_status()
logger.info(
f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}"
)

def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False

def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""

return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}

def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""

if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return

requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={
"metadata": dict(
processed=True,
status="success",
results=self._process_metadata(item),
)
},
).raise_for_status()

logger.info(
f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos"
)
2 changes: 1 addition & 1 deletion src/auto_archiver/enrichers/whisper_enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def enrich(self, to_enrich: Metadata) -> None:
job_results = {}
for i, m in enumerate(to_enrich.media):
if m.is_video() or m.is_audio():
m.store(url=url)
m.store(url=url, metadata=to_enrich)
try:
job_id = self.submit_job(m)
job_results[job_id] = False
Expand Down
3 changes: 2 additions & 1 deletion src/auto_archiver/feeders/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from.feeder import Feeder
from .gsheet_feeder import GsheetsFeeder
from .cli_feeder import CLIFeeder
from .cli_feeder import CLIFeeder
from .atlos_feeder import AtlosFeeder
56 changes: 56 additions & 0 deletions src/auto_archiver/feeders/atlos_feeder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from loguru import logger
import requests

from . import Feeder
from ..core import Metadata, ArchivingContext
from ..utils import get_atlos_config_options


class AtlosFeeder(Feeder):
name = "atlos_feeder"

def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
if type(self.api_token) != str:
raise Exception("Atlos Feeder did not receive an Atlos API token")

@staticmethod
def configs() -> dict:
return get_atlos_config_options()

def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0
cursor = None
while True:
response = requests.get(
f"{self.atlos_url}/api/v2/source_material",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"cursor": cursor},
)
data = response.json()
response.raise_for_status()
cursor = data["next"]

for item in data["results"]:
if (
item["source_url"] not in [None, ""]
and (
item["metadata"]
.get("auto_archiver", {})
.get("processed", False)
!= True
)
and item["visibility"] == "visible"
and item["status"] not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set(
"atlos_id", item["id"]
)
count += 1

if len(data["results"]) == 0 or cursor is None:
break

logger.success(f"Processed {count} URL(s)")
3 changes: 2 additions & 1 deletion src/auto_archiver/storages/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .storage import Storage
from .s3 import S3Storage
from .local import LocalStorage
from .gd import GDriveStorage
from .gd import GDriveStorage
from .atlos import AtlosStorage
74 changes: 74 additions & 0 deletions src/auto_archiver/storages/atlos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
from typing import IO, List, Optional
from loguru import logger
import requests
import hashlib

from ..core import Media, Metadata
from ..storages import Storage
from ..utils import get_atlos_config_options


class AtlosStorage(Storage):
name = "atlos_storage"

def __init__(self, config: dict) -> None:
super().__init__(config)

@staticmethod
def configs() -> dict:
return dict(Storage.configs(), **get_atlos_config_options())

def get_cdn_url(self, _media: Media) -> str:
# It's not always possible to provide an exact URL, because it's
# possible that the media once uploaded could have been copied to
# another project.
return self.atlos_url

def _hash(self, media: Media) -> str:
# Hash the media file using sha-256. We don't use the existing auto archiver
# hash because there's no guarantee that the configuerer is using sha-256, which
# is how Atlos hashes files.

sha256 = hashlib.sha256()
with open(media.filename, "rb") as f:
while True:
buf = f.read(4096)
if not buf: break
sha256.update(buf)
return sha256.hexdigest()

def upload(self, media: Media, metadata: Optional[Metadata]=None, **_kwargs) -> bool:
atlos_id = metadata.get("atlos_id")
if atlos_id is None:
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos")
return False

media_hash = self._hash(media)

# Check whether the media has already been uploaded
source_material = requests.get(
f"{self.atlos_url}/api/v2/source_material/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
).json()["result"]
existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])]
if media_hash in existing_media:
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
return True

# Upload the media to the Atlos API
requests.post(
f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
params={
"title": media.properties
},
files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))},
).raise_for_status()

logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")

return True

# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass
8 changes: 4 additions & 4 deletions src/auto_archiver/storages/storage.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from typing import IO
from typing import IO, Optional
import os

from ..utils.misc import random_str

from ..core import Media, Step, ArchivingContext
from ..core import Media, Step, ArchivingContext, Metadata
from ..enrichers import HashEnricher
from loguru import logger
from slugify import slugify
Expand Down Expand Up @@ -43,12 +43,12 @@ def init(name: str, config: dict) -> Storage:
# only for typing...
return Step.init(name, config, Storage)

def store(self, media: Media, url: str) -> None:
def store(self, media: Media, url: str, metadata: Optional[Metadata]=None) -> None:
if media.is_stored():
logger.debug(f"{media.key} already stored, skipping")
return
self.set_key(media, url)
self.upload(media)
self.upload(media, metadata=metadata)
media.add_url(self.get_cdn_url(media))

@abstractmethod
Expand Down
3 changes: 2 additions & 1 deletion src/auto_archiver/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
from .misc import *
from .webdriver import Webdriver
from .gsheet import Gsheets
from .url import UrlUtil
from .url import UrlUtil
from .atlos import get_atlos_config_options
13 changes: 13 additions & 0 deletions src/auto_archiver/utils/atlos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
def get_atlos_config_options():
return {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"cli_set": lambda cli_val, _: cli_val
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"cli_set": lambda cli_val, _: cli_val
},
}
2 changes: 1 addition & 1 deletion src/auto_archiver/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
_MINOR = "11"
# On main and in a nightly release the patch should be one ahead of the last
# released build.
_PATCH = "0"
_PATCH = "1"
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
# https://semver.org/#is-v123-a-semantic-version for the semantics.
_SUFFIX = ""
Expand Down

0 comments on commit f603400

Please sign in to comment.