Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cache backend to store tqdm process information #58

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion huey_monitor/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def duration(self, obj):
ordering = ('-update_dt',)
list_display_links = None
date_hierarchy = 'create_dt'
list_filter = ('state__hostname', 'name', 'state__signal_name')
list_filter = ('finished', 'state__signal_name', 'name', 'state__hostname')
search_fields = ('name', 'state__exception_line', 'state__exception')
fieldsets = (
(_('Meta'), {
Expand Down
16 changes: 16 additions & 0 deletions huey_monitor/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from huey import signals as _huey_signals


# We need the information at which point a task is "finished"
# and no longer waits or runs, etc.
# It does not mean that execution was successfully completed!
#
# Collect these Huey signals here:
ENDED_HUEY_SIGNALS = (
_huey_signals.SIGNAL_CANCELED,
_huey_signals.SIGNAL_COMPLETE,
_huey_signals.SIGNAL_ERROR,
_huey_signals.SIGNAL_EXPIRED,
_huey_signals.SIGNAL_REVOKED,
_huey_signals.SIGNAL_INTERRUPTED,
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this information is completely correct.

48 changes: 48 additions & 0 deletions huey_monitor/migrations/0006_remove_task_progress_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Generated by Django 3.2.10 on 2022-01-03 17:39

from django.db import migrations, models


def forward_code(apps, schema_editor):
# Assume that all old, existing task are finished ;)
TaskModel = apps.get_model('huey_monitor', 'taskmodel')
TaskModel.objects.update(finished=True)


class Migration(migrations.Migration):

dependencies = [
('huey_monitor', '0005_progress_info'),
]

operations = [
migrations.AlterField(
model_name='taskmodel',
name='desc',
field=models.CharField(blank=True, default='', help_text='Prefix for progress information', max_length=64, verbose_name='Description'),
),
migrations.AlterField(
model_name='signalinfomodel',
name='exception_line',
field=models.TextField(blank=True, max_length=128, verbose_name='Exception Line'),
),
migrations.AddField(
model_name='signalinfomodel',
name='progress_count',
field=models.PositiveIntegerField(blank=True, help_text='Number of units processed (At the time of this signal creation)', null=True, verbose_name='Progress Count'),
),
migrations.AddField(
model_name='taskmodel',
name='finished',
field=models.BooleanField(default=False, help_text='Indicates that this Task no longer waits or run. (It does not mean that execution was successfully completed.)', verbose_name='Finished'),
),
migrations.AddField(
model_name='taskmodel',
name='progress_count',
field=models.PositiveIntegerField(blank=True, help_text='Number of units processed (Up-to-date only if task finished)', null=True, verbose_name='Progress Count'),
),
migrations.DeleteModel(
name='TaskProgressModel',
),
migrations.RunPython(forward_code, reverse_code=migrations.RunPython.noop),
]
176 changes: 97 additions & 79 deletions huey_monitor/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

from bx_django_utils.models.timetracking import TimetrackingBaseModel
from django.db import models
from django.db.models import Sum
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from huey.signals import SIGNAL_EXECUTING

from huey_monitor.humanize import format_sizeof, percentage, throughput
from huey_monitor.progress_cache import get_last_update_dt, get_total_progress_count


try:
Expand Down Expand Up @@ -59,16 +59,29 @@ class TaskModel(TimetrackingBaseModel):
verbose_name=_('State'),
help_text=_('Last Signal information'),
)
finished = models.BooleanField(
default=False,
verbose_name=_('Finished'),
help_text=_(
'Indicates that this Task no longer waits or run.'
' (It does not mean that execution was successfully completed.)'
)
)

desc = models.CharField(
max_length=64, default='',
max_length=64, default='', blank=True,
verbose_name=_('Description'),
help_text=_('Prefix for progress information'),
)
total = models.PositiveIntegerField(
null=True, blank=True,
help_text=_('The number of expected iterations')
)
progress_count = models.PositiveIntegerField(
null=True, blank=True,
verbose_name=_('Progress Count'),
help_text=_('Number of units processed (Up-to-date only if task finished)'),
)
unit = models.CharField(
max_length=64,
default='it',
Expand All @@ -79,6 +92,24 @@ class TaskModel(TimetrackingBaseModel):
help_text=_('Used to convert the units.'),
)

def get_task_ids(self):
"""
Returns a list of all task UUID (Usefull if main/sub tasks are used.
"""
task_ids = [self.task_id]

if self.parent_task_id is not None:
# This is a sub task -> Progress information only about this sub task
return task_ids
else:
# Progress information for main and all sub tasks
task_ids += list(
TaskModel.objects.filter(
parent_task_id=self.task_id
).values_list('task_id', flat=True)
)
return task_ids

@cached_property
def executing_dt(self):
executing_signal = SignalInfoModel.objects.filter(
Expand All @@ -89,24 +120,56 @@ def executing_dt(self):
return executing_signal.create_dt

@cached_property
def progress_info(self):
progress_count = TaskProgressModel.objects.get_progress_info(task_id=self.task_id)
if progress_count and self.executing_dt:
dt_diff = self.update_dt - self.executing_dt
elapsed_sec = dt_diff.total_seconds()
def total_progress_count(self):
if self.executing_dt is None:
# Not started -> No progress
return None

if self.progress_count is not None:
count = self.progress_count
else:
# Get count from process cache:
task_ids = self.get_task_ids()
count = get_total_progress_count(task_ids=task_ids)

return count

@cached_property
def last_update_dt(self):
if self.finished:
# Use datetime from last signal:
state = self.state
last_update_dt = state.create_dt
else:
elapsed_sec = None
return progress_count, elapsed_sec
# Get datetime from process cache:
task_ids = self.get_task_ids()
last_update_dt = get_last_update_dt(task_ids=task_ids)
return last_update_dt

@cached_property
def elapsed_sec(self):
last_update_dt = self.last_update_dt
if last_update_dt is None:
return None

dt_diff = last_update_dt - self.executing_dt
elapsed_sec = dt_diff.total_seconds()
logger.debug(
'Last dt: %s - Executing dt: %s - elapsed: %s sec.',
last_update_dt, self.executing_dt, elapsed_sec
)
return elapsed_sec

def human_percentage(self):
progress_count, elapsed_sec = self.progress_info
if progress_count and self.total is not None:
return percentage(num=progress_count, total=self.total)
if self.total is not None:
progress_count = self.total_progress_count
if progress_count is not None:
return percentage(num=progress_count, total=self.total)
human_percentage.short_description = _('percentage')

def human_progress(self):
progress_count, elapsed_sec = self.progress_info
if progress_count:
progress_count = self.total_progress_count
if progress_count is not None:
return format_sizeof(
num=progress_count,
suffix=self.unit,
Expand All @@ -115,18 +178,25 @@ def human_progress(self):
human_progress.short_description = _('progress')

def human_throughput(self):
progress_count, elapsed_sec = self.progress_info
if progress_count and elapsed_sec:
return throughput(
progress_count = self.total_progress_count
if not progress_count:
return

elapsed_sec = self.elapsed_sec
if elapsed_sec:
throughput_str = throughput(
num=progress_count,
elapsed_sec=elapsed_sec,
suffix=self.unit,
divisor=self.unit_divisor
)
logger.debug('%s items in %s sec == %s', progress_count, elapsed_sec, throughput_str)
return throughput_str
human_throughput.short_description = _('throughput')

def human_progress_string(self):
progress_count, elapsed_sec = self.progress_info
progress_count = self.total_progress_count
elapsed_sec = self.elapsed_sec

parts = []
if progress_count is None or elapsed_sec is None:
Expand All @@ -140,14 +210,15 @@ def human_progress_string(self):
parts.append(f'{progress_count}{self.unit}')
parts.append(self.human_throughput())

return ' '.join(parts)
return ' '.join(part for part in parts if part)
human_progress_string.short_description = _('Progress')

def human_unit(self):
"""
Used in admin: Display the unit only if process info used.
"""
progress_count, elapsed_sec = self.progress_info
progress_count = self.total_progress_count
elapsed_sec = self.elapsed_sec
if progress_count or elapsed_sec:
return self.unit
human_unit.short_description = _('Unit')
Expand Down Expand Up @@ -217,8 +288,13 @@ class SignalInfoModel(models.Model):
verbose_name=_('Signal Name'),
help_text=_('Name of the signal'),
)
progress_count = models.PositiveIntegerField(
null=True, blank=True,
verbose_name=_('Progress Count'),
help_text=_('Number of units processed (At the time of this signal creation)'),
)
exception_line = models.TextField(
max_length=128,
blank=True, max_length=128,
verbose_name=_('Exception Line'),
)
exception = models.TextField(
Expand All @@ -244,61 +320,3 @@ def __str__(self):
class Meta:
verbose_name = _('Task Signal')
verbose_name_plural = _('Task Signals')


class TaskProgressManager(models.Manager):
def get_progress_info(self, task_id):
qs = self.all().filter(
task_id=task_id
).aggregate(
Sum('progress_count'),
)
progress_count = qs['progress_count__sum'] or 0
return progress_count


class TaskProgressModel(models.Model):
objects = TaskProgressManager()

id = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False,
)

task = models.ForeignKey(
'huey_monitor.TaskModel',
on_delete=models.CASCADE,
related_name='progress',
verbose_name=_('Task'),
help_text=_('The Task instance for this processed info entry.'),
)
progress_count = models.PositiveIntegerField(
null=True, blank=True,
verbose_name=_('Progress Count'),
help_text=_('Number of units processed in current update.'),
)
create_dt = models.DateTimeField(
auto_now_add=True,
verbose_name=_('Create date'),
help_text=_('(will be set automatically)')
)

def human_progress_count(self):
if self.progress_count:
return format_sizeof(
num=self.progress_count,
suffix=self.task.unit,
divisor=self.task.unit_divisor
)

def admin_link(self):
url = reverse('admin:huey_monitor_taskprogressmodel_change', args=[self.pk])
return url

def __str__(self):
return f'{self.task.name} {self.human_progress_count()}'

class Meta:
verbose_name = _('Task Progress')
verbose_name_plural = _('Task Progress')
Loading