Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6 scheduling #36

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open

6 scheduling #36

wants to merge 8 commits into from

Conversation

vlerkin
Copy link
Collaborator

@vlerkin vlerkin commented Nov 7, 2024

What happens in the PR:

  1. The logic of event watcher was separated in an observer class; the logic of log watching stayed in a log handler class, but the initialization was changed to subscribe to the event in case jobless feature was configured;
  2. The new class KubernetesScheduler was created to handle logic when jobs must be unsuspended and how (ordered);
  3. scheduler endpoint was modified, logic to set a value for start_suspended parameter was added;
  4. schedule method from k8s launcher has a new start_suspended parameter, it's value is passed when called inside the api; also new methods were added: unsuspend_job patches existing suspended job suspend=False, get_running_jobs_count returns the number of jobs that are currently running, list_suspended_jobs returns the list of jobs where spec.suspend is true, _get_job_name extracts the job name from the metadata, it is then used for unsuspend function;

The big picture:
Event watcher connects to the k8s api and receives the stream of events, it then notifies the subscribers if a new event is received and passes it to the provided callback. The subscriber - KubernetesScheduler - receives event in a handle_pod_event method, this method reacts to the changes in job statuses, and if job completed running or failed it calls another method - check_and_unsuspend_jobs - that checks capacity and unsuspends jobs until the number of allowed parallel jobs is reached, while doing this it relies on another method - get_next_suspended_job_id - to unsuspend the most recent job, to keep the order in which jobs were initially scheduled.
When the job is scheduled, based on the number of currently active jobs and max_proc provided in the config (default is 4), the job runs or goes to the queue of suspended jobs (native k8s queue). Then events that change the number of active jobs trigger the logic of KubernetesScheduler class that unsuspend suspended jobs until the desired state (num of parallel jobs) is achieved.

…logic for observer in a RecourceWatcher class; added method to stop a thread gracefully
…that handles the logic to unsuspend jobs and get the next in order according to the creation timestamp; modify schedule endpoint to start jobs suspended if there is already enogh jobs running; modify corresponding function in k8s launcher; add to k8s launcher methods to unsuspend job, to get current number of running jobs, to list suspended jobs and a private method to get job name to be used for unsuspend function
…source watcher instance to enable_joblogs to subscribe to the event watcher if the log feature is configured; delete logic about event watcher from main; pass container for list objects function instead of container name; remove start methon from log handler class; modify joblogs init to subscribe to event watcher
@vlerkin vlerkin requested a review from wvengen November 7, 2024 17:25
Copy link
Member

@wvengen wvengen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice you were able to come up with something so quickly already!
I looked at it from a high level, and noticed that this is currently implemented for Kubernetes only (that makes sense), and also setup in such a way that it needs refactoring for Docker. I would think of the scheduler as something that could work for both Docker and Kubernetes, especially the scheduling decisions. Also, there is now k8s-specific code in the main file (e.g. the import), and the kubernetes scheduler, this makes the code somewhat spaghetti: there are specific implementation-specific classes where responsibility is meant to be delegated. If you need to access the scheduler in the main file, use a generic scheduler, and make the docker-based parts not implemented. I think that would give a much cleaner design.

Also, I would consider making the launcher responsible for scheduling. And then have the scheduler talk to the launcher to actually start jobs.

I'm not yet sure if we should allow running without the scheduler, or if it would always be active.

kubernetes.yaml Outdated
@@ -181,7 +183,7 @@ rules:
verbs: ["get"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["get", "list", "create", "delete"]
verbs: ["get", "list", "create", "patch", "update", "delete"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need patch and update? what is the difference?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point, since we do not update the entire resource, patch is enough for the feature, deleted it

@wvengen
Copy link
Member

wvengen commented Nov 8, 2024

Hope my feedback was at an angle that helps you at this stage. In any case, well done, keep it going!

p.s. the CI error looks like it could be cause by Kubernetes-specific things having entered into the main api code, which wouldn't work when running with Docker.

@vlerkin
Copy link
Collaborator Author

vlerkin commented Nov 11, 2024

Working on Docker implementation to be added to this PR

…rs and run more from the queue of created jobs when capacity is available; add backgroung thread that sleeps for 5 sec and triggers the function that starts additional containers up to capacity; add a method to gracefully stop the background thread that might be used in the future to stop the thread when app stops; encapsulate k8s and docker related schedule functionality in corresponding launchers and keep api.py launcher agnostic; add max_proc to config for docker
Copy link
Member

@wvengen wvengen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see a working version! Quite readable :)
I think it needs a little cleanup, but you're getting there, I think.

@@ -16,6 +16,9 @@ repository = scrapyd_k8s.repository.Local
# Since this is the Docker example, we choose Docker here.
launcher = scrapyd_k8s.launcher.Docker

# Maximum number of jobs running in parallel
max_proc = 1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure yet if we want to enable this by default
do you know what scrapyd has in its default configuration?

Copy link
Collaborator Author

@vlerkin vlerkin Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In scrapyd the max_proc parameter is not default. I think there are reasons to consider making it default because it enhances cluster stability but on the other hand, if a user did not think of optimal resource usage this batch limiting can lead to a smaller output at a given time. So it's beneficial if we need to be conscious about recourses of the prod setting but not very handy if we want to go all in and extract as much data as possible and faster.

@@ -7,6 +7,7 @@
from natsort import natsort_keygen, ns

from .config import Config
from .k8s_resource_watcher import ResourceWatcher
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reference to k8s-specific term k8s_resource_watcher, this file should use the configured backend and launchers instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is required for conditional start of joblogs feature in the run method and this method does not rely on a specific launcher, that is why I can't think of a clean way how to remove it from the api.py. But I am open for suggestions if you see a better way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the ResourceWatcher is not Kubernetes-specific, perhaps the k8s_ could be dropped from the filename?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code, I do see k8s-specific things in there.
I think the launcher needs to be responsible for this somehow. Then the launcher can decide what the k8s and Docker specific parts are.

(sorry, I'm not diving fully now into the whole code, otherwise I could have given a more direct answer making more sense perhaps - can you think of a way to separate the k8s part here?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree that it shouldn't be here and we don't want to violate SOLID, I will think how to refactor this.

scrapyd_k8s/k8s_resource_watcher.py Show resolved Hide resolved
try:
subscriber(event)
except Exception as e:
logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why catch this? when do you expect this?

Copy link
Collaborator Author

@vlerkin vlerkin Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception is added so we can separate the exception originated in particular subscriber from watcher and from other subscribers. And we also protect watcher from crashing due to problems in subscribers which is external in terms of the design.
If there is an unexpected edge case for subscriber it is nice to catch it and understand where it comes from, also easier to debug.

@@ -0,0 +1,96 @@
import logging
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fit this in the directory structure? I wouldn't expect this in the src root.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, this functionality seems related to the kubernetes launcher.

def enable_joblogs(self, config, resource_watcher):
joblogs_init(config, resource_watcher)

def unsuspend_job(self, job_id: str):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is "unsuspend" the k8s terminology? I would also imagine resume, or start or so.

return suspended_jobs
except Exception as e:
logger.exception(f"Error listing suspended jobs: {e}")
return []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a situation where you would want to get both the running and suspended job count? Then it could be nice to do one call to list_namespaced_job to obtain both.

if not jobs.items:
logger.error(f"No job found with job_id={job_id}")
return None
return jobs.items[0].metadata.name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you're listing jobs, would you also get the name already?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants