Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changing job scheduling #388

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def run(
self.input_model.set_worker_id(self.worker_id)
apiclient = None

self.input_model.set_mode(mode)
# self.input_model.set_mode(mode)
if mode != InputModel.NORMAL:
self.workdir = os.path.join(self.workdir, mode)
os.makedirs(self.base_workdir, exist_ok=True)
Expand Down Expand Up @@ -1088,6 +1088,7 @@ def run(

if "normal" in modes:
threads = []
self.input_model.set_mode(InputModel.NORMAL)
for runner in runners:
t = threading.Thread(
target=runner.run, args=[errors, InputModel.NORMAL]
Expand All @@ -1102,6 +1103,7 @@ def run(

if "overspecified" in modes:
threads = []
self.input_model.set_mode(InputModel.OVERSPECIFIED)
for runner in runners:
t = threading.Thread(
target=runner.run, args=([errors, InputModel.OVERSPECIFIED])
Expand All @@ -1116,6 +1118,7 @@ def run(

if "copiedover" in modes:
threads = []
self.input_model.set_mode(InputModel.COPIED_OVER)
for runner in runners:
t = threading.Thread(
target=runner.run, args=([errors, InputModel.COPIED_OVER])
Expand All @@ -1130,6 +1133,7 @@ def run(

if InputModel.ADDITIONAL_SEMANTIC in modes:
threads = []
self.input_model.set_mode(InputModel.ADDITIONAL_SEMANTIC)
for runner in runners:
t = threading.Thread(
target=runner.run,
Expand Down
82 changes: 45 additions & 37 deletions acto/input/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from acto.utils import get_thread_logger

from .testcase import TestCase
from .testplan import DeterministicTestPlan, TestGroup, TestPlan
from .testplan import DeterministicTestPlan, SharedTestPlan, TestGroup, TestPlan
from .value_with_schema import attach_schema_to_value


Expand All @@ -49,6 +49,8 @@ class InputMetadata(pydantic.BaseModel):
# The number of test cases to form a group
CHUNK_SIZE = 10

# The number of groups to fetch when empty to form a group
FETCH_SIZE = 1

class InputModel(abc.ABC):
"""An abstract class for input model"""
Expand Down Expand Up @@ -269,15 +271,14 @@ def set_worker_id(self, worker_id: int):
# Thread local variables
self.thread_vars.id = worker_id
# so that we can run the test case itself right after the setup
self.thread_vars.normal_test_plan = DeterministicTestPlan()

# thread_vars.test_plan is the local queue, fetch from global queue when empty
self.thread_vars.test_plan = DeterministicTestPlan()
self.thread_vars.semantic_test_plan = TestPlan(
self.root_schema.to_tree()
)

for group in self.normal_test_plan_partitioned[worker_id]:
self.thread_vars.normal_test_plan.add_testcase_group(
TestGroup(group)
)


def generate_test_plan(
self,
Expand All @@ -293,9 +294,7 @@ def generate_test_plan(

normal_testcases = {}

test_cases = get_testcases(
self.get_schema_by_path(self.mount), self.full_matched_schemas
)
test_cases = get_testcases(self.root_schema, self.full_matched_schemas)

num_test_cases = 0
num_run_test_cases = 0
Expand Down Expand Up @@ -374,25 +373,21 @@ def split_into_subgroups(
return subgroups

normal_subgroups = split_into_subgroups(normal_test_plan_items)

# Initialize the three test plans, and assign test cases to them
# according to the number of workers
for i in range(self.num_workers):
self.normal_test_plan_partitioned.append([])

for i in range(0, len(normal_subgroups)):
self.normal_test_plan_partitioned[i % self.num_workers].append(
normal_subgroups[i]
# global job queue
self.normal_test_plan = SharedTestPlan()
self.semantic_test_plan = TestPlan(
self.root_schema.to_tree()
)
for group in normal_subgroups:
self.normal_test_plan.add_testcase_group(
TestGroup(group)
)

# appending empty lists to avoid no test cases distributed to certain
# work nodes
assert self.num_workers == len(self.normal_test_plan_partitioned)

return {
"normal_testcases": normal_testcases,
}

def next_test(
self,
) -> Optional[List[Tuple[TestGroup, tuple[str, TestCase]]]]:
Expand All @@ -406,39 +401,51 @@ def next_test(
"""
logger = get_thread_logger(with_prefix=True)

logger.info("Progress [%d] cases left", len(self.thread_vars.test_plan))
logger.info("Global queue [%d] cases left", len(self.test_plan))
logger.info("Local queue [%d] cases left", len(self.thread_vars.test_plan))

selected_group: TestGroup = self.thread_vars.test_plan.next_group()

if selected_group is None:
return None
elif len(selected_group) == 0:
if selected_group is None or len(selected_group) == 0:

for i in range(FETCH_SIZE):
new_group = self.test_plan.next_group()
if new_group is None:
break
self.thread_vars.test_plan.add_testcase_group(new_group)

return None
else:

else:
testcase = selected_group.get_next_testcase()
return [(selected_group, testcase)]



def set_mode(self, mode: str):
if mode == InputModel.NORMAL:
self.thread_vars.test_plan = self.thread_vars.normal_test_plan
self.test_plan = self.normal_test_plan
elif mode == "OVERSPECIFIED":
self.thread_vars.test_plan = (
self.thread_vars.overspecified_test_plan
self.test_plan = (
self.overspecified_test_plan.next_group()
)
elif mode == "COPIED_OVER":
self.thread_vars.test_plan = self.thread_vars.copiedover_test_plan
self.test_plan = self.copiedover_test_plan
elif mode == InputModel.SEMANTIC:
self.thread_vars.test_plan = self.thread_vars.semantic_test_plan
self.test_plan = self.semantic_test_plan
elif mode == InputModel.ADDITIONAL_SEMANTIC:
self.thread_vars.test_plan = (
self.thread_vars.additional_semantic_test_plan
self.test_plan = (
self.additional_semantic_test_plan
)
else:
raise ValueError(mode)



def is_empty(self):
"""if test plan is empty"""
return len(self.thread_vars.test_plan) == 0
"""if test plan is empty, both global queue and local queue"""
return len(self.test_plan) == 0 and len(self.thread_vars.test_plan) == 0


def get_seed_input(self) -> dict:
"""Get the raw value of the seed input"""
Expand Down Expand Up @@ -528,3 +535,4 @@ def apply_default_value(self, default_value_result: dict):
"Setting default value for %s to %s", path, decoded_value
)
self.get_schema_by_path(path).set_default(value)

42 changes: 39 additions & 3 deletions acto/input/testplan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import queue
import random
import threading
from typing import List, Tuple

from acto.schema.base import TreeNode
Expand Down Expand Up @@ -251,8 +253,8 @@ def finish_testcase(self):

def __len__(self):
return len(self.tests)


class DeterministicTestPlan(TestPlan):

def __init__(self):
Expand All @@ -275,4 +277,38 @@ def add_testcase_group(self, groups: TestGroup):
self.groups.append(groups)

def __len__(self):
return sum([len(i) for i in self.groups])
return sum([len(i) for i in self.groups])


class SharedTestPlan(TestPlan):

def __init__(self):
self.groups = queue.Queue()
self.length = 0
pass

def next_group(self):
if self.groups.empty():
return None

head = self.groups.get()
self.groups.task_done()

if len(head) == 0:
return None
else:
self.length -= len(head)
return head

def add_testcase_groups(self, groups: List[TestGroup]):
for group in groups:
self.groups.put(group)
self.length += len(group)


def add_testcase_group(self, groups: TestGroup):
self.groups.put(groups)
self.length += len(groups)

def __len__(self):
return self.length