-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/mvp queue #6
Conversation
@dhimmel @cgreene @dcgoss @stephenshank Here is our task service MVP. Please take a look, even if it's just to help me catch Python issues. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions. Looks good to me once those are cleared up!
@@ -81,6 +81,7 @@ celerybeat-schedule | |||
# virtualenv | |||
venv/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to edit the readme, contributors, or create a pull request template to suggest that if someone is storing their virtualenv in the project, they choose one of these locations? This way someone doesn't accidentally contribute a PR that includes their virtual environment.
self.base = base | ||
|
||
def __call__(self, value): | ||
print('in validator') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't quite get how this would be used yet, but should this be a call to a logger instead of print?
received_at = models.DateTimeField(null=True) | ||
status = models.CharField(choices=STATUS_CHOICES, max_length=17, default='queued') | ||
worker_id = models.CharField(null=True, max_length=255) | ||
locked_at = models.DateTimeField(null=True) | ||
priority = models.CharField(choices=PRIORITY_CHOICES, max_length=8, default="normal") | ||
unique = models.CharField(null=True, max_length=255) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't, at this moment, understand what unique
is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It maintains idempotency of scheduling a task in a distributed environment. Say if two things try to schedule something at the same time or if a client believes failure has occurred (502 error for instance) and retries, but the server did successfully schedule it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
from api.models import TaskDef, Task | ||
|
||
get_task_sql = """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there something that raw SQL is giving you that isn't available via the django queryset api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could load some postgres specific django DSL, but I find them harder to read than SQL for a complex query like this. We're using a SQL backend, anyone programming in the backend should understand SQL.
except Task.DoesNotExist: | ||
raise NotFound('Task not found') | ||
|
||
task.status = 'dequeued' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this one of the options for status
? I missed where it came from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@awm33 cleared this up. Ignore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I commented above
@@ -6,8 +6,6 @@ | |||
from django.contrib.postgres import fields as postgresfields | |||
|
|||
STATUS_CHOICES = ( | |||
("pending_queue", "Pending Queue"), | |||
("scheduled", "Scheduled"), | |||
("queued", "Queued"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later dequeued
is used. Didn't see it defined here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in the original code, look beyond the diff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@awm33 - aha!
image: postgres | ||
task: | ||
build: . | ||
command: bash -c "python manage.py migrate && python manage.py runserver 0.0.0.0:8001" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ahmed had some thoughts about alternative deployment strategies. Tagging him here so that he's aware. Altering deployment is probably a second PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean for cognoma/core-service#39 ? I think we should use an application server here as well, but docker compose is used for the development environment, not production.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@awm33 : yea - as in that issue. We may want to have both APIs hosted the same way. Particularly if a number of workers are going to access this one.
@cgreene I forget to do a search for |
@awm33 : Sounds good. Busy day so I probably won't have another chance to look at it. Approving based on those incoming changes addressing print & docs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take a look, even if it's just to help me catch Python issues.
As you wish. I made several non-functional (extremely trivial) comments. (:
import jwt | ||
|
||
class CognomaAuthentication(authentication.BaseAuthentication): | ||
def authenticate(self, request): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: blank line following class definition. PEP8 I think advocates for two blank lines. Personally, I think 1 is sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That convention doesn't seem to be followed by any of the Django Rest Framework docs
""" | ||
|
||
def dictfetchall(cursor): | ||
"Return all rows from a cursor as a dict" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use triple quotes for docstring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be: Return all rows from a cursor as a list of dicts
try: | ||
return TaskDef.objects.create(**validated_data) | ||
except IntegrityError: | ||
raise exceptions.ValidationError({'name': '"' + validated_data['name'] + '" already taken.'}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer readability of:
raise exceptions.ValidationError('"{name}" already taken.'.format(**validated_data))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dhimmel ValidationError requires a dict or list of dicts. The idea being that each key is the field with the error, then a pretty formatter could interpret it on the other end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I meant:
raise exceptions.ValidationError({
'name': '"{name}" already taken.'.format(**validated_data)
})
Just using the formatter to clean up the value creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't that dump all the keys from validated data? I just want name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh no, it would pass it but the string is only referring to {name}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless validated_data
has many elements, I don't think this is a concern. But if validated_data
is a large dictionary than you could always go:
'"{}" already taken.'.format(validated_data['name'])
'previous', | ||
'results']) | ||
self.assertEqual(len(list_response.data['results']), 2) | ||
self.assertEqual(list(list_response.data['results'][0].keys()), task_keys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.keys()
is not essential here, but okay for explicitness. In other words, taken a list of a dict, takes a list of the keys.
|
||
self.task_number = 0 | ||
|
||
for x in range(0,10): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove 0
?
else: | ||
timeout = 600 | ||
|
||
if timeout < 0 or timeout > 86400: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recently learned python can do:
if not 0 < timeout < 86400:
Up to you which you think is clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow. I just learned this too!
I was worried that this was taking advantage of the fact that 0 < timeout
was first evaluating to True which was < 86400
.
I played around with it a bit though, and it seems legit:
>>> if not -10 < -5 < 0:
... print("asdf")
...
>>> if not (-10 < -5) < 0:
... print("asdf")
...
asdf
>>>
Cool!
Just realized I'm not updating the status to "failed_retrying", "failed", or "complete" based on updates from the worker. I need to add that logic and some tests for it. I'm sure once it's being used by ml-workers, there will be more feedback / changes. |
Sounds good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commit LGTM 👍
Motivation
Queue implementation for the Cognoma MVP.
API changes
Some fields have been removed for MVP or are not needed for this queue backing type.
Implementation Notes
It uses a postgres SKIP LOCKED based task table and should handle a decent amount of concurrency, even greater than the number of workers we have budgeted for (2 to 3).
Functional Tests
Testing against the API using curl and the beginnings of the ml-workers code.