Skip to content

Commit

Permalink
Honor slave attributes as contrsaints, fixes #47
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs committed Sep 16, 2016
1 parent a2eb166 commit e338787
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 66 deletions.
17 changes: 17 additions & 0 deletions satyr/constraint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from __future__ import absolute_import, division, print_function

from .utils import partition


def pour(offers):
return offers, [] # accepts all offers


def has(offers, attribute):
def pred(offer):
for attrib in offer.attributes:
if attrib.name == attribute:
return True
return False

return partition(pred, offers)
113 changes: 57 additions & 56 deletions satyr/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,62 +40,6 @@ def exception(self):
return None


class PythonExecutor(ExecutorInfo):

proto = mesos_pb2.ExecutorInfo(
labels=mesos_pb2.Labels(
labels=[mesos_pb2.Label(key='python')]))

def __init__(self, docker='satyr', force_pull=False, envs={}, uris=[], **kwds):
super(PythonExecutor, self).__init__(**kwds)
self.container = ContainerInfo(
type='MESOS',
mesos=ContainerInfo.MesosInfo(
image=Image(type='DOCKER',
docker=Image.Docker())))
self.command = CommandInfo(value='python -m satyr.executor',
shell=True)
self.force_pull = force_pull
self.docker = docker
self.envs = envs
self.uris = uris

@property
def docker(self):
return self.container.mesos.image.docker.name

@docker.setter
def docker(self, value):
self.container.mesos.image.docker.name = value

@property
def force_pull(self):
# cached is the opposite of force pull image
return not self.container.mesos.image.cached

@force_pull.setter
def force_pull(self, value):
self.container.mesos.image.cached = not value

@property
def uris(self):
return [uri.value for uri in self.command.uris]

@uris.setter
def uris(self, value):
self.command.uris = [{'value': v} for v in value]

@property
def envs(self):
envs = self.command.environment.variables
return {env.name: env.value for env in envs}

@envs.setter
def envs(self, value):
envs = [{'name': k, 'value': v} for k, v in value.items()]
self.command.environment = Environment(variables=envs)


# TODO create custom messages per executor
class PythonTask(PickleMixin, TaskInfo):

Expand Down Expand Up @@ -160,3 +104,60 @@ def on_fail(self, status):
else:
logging.error('Aborting due to task {} failed with state {} and message '
'{}'.format(self.id, status.state, status.message))


class PythonExecutor(ExecutorInfo):

proto = mesos_pb2.ExecutorInfo(
labels=mesos_pb2.Labels(
labels=[mesos_pb2.Label(key='python')]))

def __init__(self, docker='satyr', force_pull=False,
envs={}, uris=[], **kwds):
super(PythonExecutor, self).__init__(**kwds)
self.container = ContainerInfo(
type='MESOS',
mesos=ContainerInfo.MesosInfo(
image=Image(type='DOCKER',
docker=Image.Docker())))
self.command = CommandInfo(value='python -m satyr.executor',
shell=True)
self.force_pull = force_pull
self.docker = docker
self.envs = envs
self.uris = uris

@property
def docker(self):
return self.container.mesos.image.docker.name

@docker.setter
def docker(self, value):
self.container.mesos.image.docker.name = value

@property
def force_pull(self):
# cached is the opposite of force pull image
return not self.container.mesos.image.cached

@force_pull.setter
def force_pull(self, value):
self.container.mesos.image.cached = not value

@property
def uris(self):
return [uri.value for uri in self.command.uris]

@uris.setter
def uris(self, value):
self.command.uris = [{'value': v} for v in value]

@property
def envs(self):
envs = self.command.environment.variables
return {env.name: env.value for env in envs}

@envs.setter
def envs(self, value):
envs = [{'name': k, 'value': v} for k, v in value.items()]
self.command.environment = Environment(variables=envs)
File renamed without changes.
41 changes: 32 additions & 9 deletions satyr/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import os
import time
from collections import Counter
from functools import partial

from mesos.native import MesosSchedulerDriver

from .binpack import bfd
from .constraint import pour
from .interface import Scheduler
from .placement import bfd
from .proxies import SchedulerDriverProxy, SchedulerProxy
from .proxies.messages import FrameworkInfo, TaskInfo, encode
from .utils import Interruptable, timeout
Expand All @@ -25,18 +27,26 @@ def __init__(self, scheduler, name, user='', master=os.getenv('MESOS_MASTER'),
super(SchedulerDriver, self).__init__(driver)


# TODO create a scheduler which is reusing the same type of executors
# todo configurable to reuse executors
class QueueScheduler(Scheduler):
# TODO reuse the same type of executors
class Framework(Scheduler):

def __init__(self, *args, **kwargs):
self.tasks = {} # holding task_id => task pairs
def __init__(self, constraint=pour, placement=partial(bfd, cpus=1, mem=1),
*args, **kwargs):
self.healthy = True
self.tasks = {} # holds task_id => task pairs
self.placement = placement
self.constraint = constraint

@property
def statuses(self):
return {task_id: task.status for task_id, task in self.tasks.items()}

# @property
# def executors(self):
# tpls = (((task.slave_id, task.executor.id), task)
# for task_id, task in self.tasks.items())
# return {k: list(v) for k, v in groupby(tpls)}

def is_idle(self):
return not len(self.tasks)

Expand All @@ -63,19 +73,28 @@ def on_offers(self, driver, offers):
logging.info('Received offers: {}'.format(sum(offers)))
self.report()

# maybe limit to the first n tasks
# query tasks ready for scheduling
staging = [self.tasks[status.task_id]
for status in self.statuses.values() if status.is_staging()]

# filter acceptable offers
accepts, declines = self.constraint(offers)

# best-fit-decreasing binpacking
bins, skip = bfd(staging, offers, cpus=1, mem=1)
bins, skip = self.placement(staging, accepts)

# reject offers not met constraints
for offer in declines:
driver.decline(offer)

# launch tasks
for offer, tasks in bins:
try:
for task in tasks:
task.slave_id = offer.slave_id
task.status.state = 'TASK_STARTING'
# running with empty task list will decline the offer
logging.info('lanunches {}'.format(tasks))
logging.info('launches {}'.format(tasks))
driver.launch(offer.id, tasks)
except Exception:
logging.exception('Exception occured during task launch!')
Expand All @@ -97,6 +116,10 @@ def on_update(self, driver, status):
self.report()


# backward compatibility
QueueScheduler = Framework


if __name__ == '__main__':
scheduler = QueueScheduler()
with SchedulerDriver(scheduler, name='test') as fw:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import, division, print_function

import pytest
from satyr.binpack import bf, bfd, ff, ffd, mr, weight
from satyr.placement import bf, bfd, ff, ffd, mr, weight
from satyr.proxies.messages import Cpus, Mem, Offer, TaskInfo


Expand Down
10 changes: 10 additions & 0 deletions satyr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@
from contextlib import contextmanager


def partition(pred, iterable):
trues, falses = [], []
for item in iterable:
if pred(item):
trues.append(item)
else:
falses.append(item)
return trues, falses


class TimeoutError(Exception):
pass

Expand Down

0 comments on commit e338787

Please sign in to comment.