Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Adds type annotations #43

Merged
merged 20 commits into from
Oct 30, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[mypy]
show_error_codes = True
1 change: 0 additions & 1 deletion saq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@
from saq.queue import Queue
from saq.worker import Worker


__version__ = "0.9.0"
45 changes: 25 additions & 20 deletions saq/job.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import dataclasses
import enum
import typing
peterschutt marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -6,8 +8,11 @@

ABORT_ID_PREFIX = "saq:abort:"

if typing.TYPE_CHECKING:
peterschutt marked this conversation as resolved.
Show resolved Hide resolved
from saq.queue import Queue


def get_default_job_key():
def get_default_job_key() -> str:
return uuid1()


Expand Down Expand Up @@ -103,7 +108,7 @@ class Job:
status: Status = Status.NEW
meta: dict = dataclasses.field(default_factory=dict)

def __repr__(self):
def __repr__(self) -> str:
kwargs = ", ".join(
f"{k}={v}"
for k, v in {
Expand All @@ -126,22 +131,22 @@ def __repr__(self):
)
return f"Job<{kwargs}>"

def __hash__(self):
def __hash__(self) -> int:
return hash(self.key)

@property
def id(self):
def id(self) -> str:
return self.queue.job_id(self.key)

@classmethod
def key_from_id(cls, job_id):
def key_from_id(cls, job_id: str) -> str:
return job_id.split(":")[-1]

@property
def abort_id(self):
def abort_id(self) -> str:
return f"{ABORT_ID_PREFIX}{self.key}"

def to_dict(self):
def to_dict(self) -> typing.Dict[str, typing.Any]:
result = {}
for field in dataclasses.fields(self):
key = field.name
Expand All @@ -155,7 +160,7 @@ def to_dict(self):
result[key] = value
return result

def duration(self, kind):
def duration(self, kind: str) -> typing.Optional[int]:
"""
Returns the duration of the job given kind.

Expand All @@ -172,19 +177,19 @@ def duration(self, kind):
return self._duration(now(), self.started)
raise ValueError(f"Unknown duration type: {kind}")

def _duration(self, a, b):
def _duration(self, a: int, b: int) -> typing.Optional[int]:
return a - b if a and b else None

@property
def stuck(self):
"""Checks if an active job is passed it's timeout or heartbeat."""
def stuck(self) -> bool:
"""Checks if an active job is passed its timeout or heartbeat."""
current = now()
return (self.status == Status.ACTIVE) and (
return (self.status == Status.ACTIVE) and bool(
seconds(current - self.started) > self.timeout
or (self.heartbeat and seconds(current - self.touched) > self.heartbeat)
)

def next_retry_delay(self):
def next_retry_delay(self) -> float:
if self.retry_backoff:
max_delay = self.retry_delay
if max_delay is True:
Expand All @@ -197,7 +202,7 @@ def next_retry_delay(self):
)
return self.retry_delay

async def enqueue(self, queue=None):
async def enqueue(self, queue: typing.Optional[Queue] = None) -> None:
"""
Enqueues the job to it's queue or a provided one.

Expand All @@ -209,19 +214,19 @@ async def enqueue(self, queue=None):
if not await queue.enqueue(self):
await self.refresh()

async def abort(self, error, ttl=5):
async def abort(self, error: str, ttl: int = 5) -> None:
"""Tries to abort the job."""
await self.queue.abort(self, error, ttl=ttl)

async def finish(self, status, *, result=None, error=None):
async def finish(self, status: Status, *, result=None, error=None) -> None:
"""Finishes the job with a Job.Status, result, and or error."""
await self.queue.finish(self, status, result=result, error=error)

async def retry(self, error):
async def retry(self, error: typing.Optional[str]) -> None:
"""Retries the job by removing it from active and requeueing it."""
await self.queue.retry(self, error)

async def update(self, **kwargs):
async def update(self, **kwargs) -> None:
"""
Updates the stored job in redis.

Expand All @@ -231,7 +236,7 @@ async def update(self, **kwargs):
setattr(self, k, v)
await self.queue.update(self)

async def refresh(self, until_complete=None):
async def refresh(self, until_complete: typing.Optional[float] = None) -> None:
"""
Refresh the current job with the latest data from the db.

Expand All @@ -254,7 +259,7 @@ async def callback(_id, status):
await self.queue.listen([self.key], callback, until_complete)
await self.refresh()

def replace(self, job):
def replace(self, job: "Job") -> None:
peterschutt marked this conversation as resolved.
Show resolved Hide resolved
"""Replace current attributes with job attributes."""
for field in job.__dataclass_fields__:
setattr(self, field, getattr(job, field))
Loading