-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9415553
commit f18f963
Showing
3 changed files
with
1,074 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,355 @@ | ||
import itertools | ||
import logging | ||
import math | ||
import re | ||
from dataclasses import dataclass | ||
from enum import Enum | ||
from queue import Queue | ||
from typing import Iterable, Mapping, cast | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
_Part = dict[str, int] | ||
|
||
_categories = frozenset("xmas") | ||
|
||
|
||
class _Comparison(Enum): | ||
LT = "<" | ||
GT = ">" | ||
|
||
def __str__(self) -> str: | ||
return self.value | ||
|
||
def __repr__(self) -> str: | ||
return f"{self.name}" | ||
|
||
|
||
@dataclass(slots=True, frozen=True) | ||
class _Rule: | ||
category: str | ||
comparison: _Comparison | ||
value: int | ||
action: str | ||
|
||
|
||
@dataclass(slots=True, frozen=True) | ||
class _Workflow: | ||
name: str | ||
rules: list[_Rule] | ||
default: str | ||
|
||
|
||
_rule_regex = re.compile( | ||
r"(?P<category>[xmas])(?P<comparison>[<>])(?P<value>\d+):(?P<action>\w+)" | ||
) | ||
|
||
|
||
def _parse_rule(statement: str) -> _Rule: | ||
match = _rule_regex.match(statement) | ||
assert match is not None | ||
category = match.group("category") | ||
assert category in _categories | ||
return _Rule( | ||
category, | ||
_Comparison(match.group("comparison")), | ||
int(match.group("value")), | ||
match.group("action"), | ||
) | ||
|
||
|
||
_workflow_regex = re.compile(r"(?P<name>\w+)\{(?P<rules>.*?)\}") | ||
|
||
|
||
def _parse_workflow(line: str) -> _Workflow: | ||
match = _workflow_regex.match(line) | ||
assert match is not None | ||
name = match.group("name") | ||
assert isinstance(name, str) | ||
rules_str = match.group("rules") | ||
assert isinstance(rules_str, str) | ||
*rule_statements, default = rules_str.split(",") | ||
rules = list(map(_parse_rule, rule_statements)) | ||
return _Workflow(name, rules, default) | ||
|
||
|
||
part_regex = re.compile(r"\{x=(?P<x>\d+),m=(?P<m>\d+),a=(?P<a>\d+),s=(?P<s>\d+)\}") | ||
|
||
|
||
def _parse_part(line: str) -> _Part: | ||
match = part_regex.match(line) | ||
assert match is not None | ||
return {k: int(v) for k, v in match.groupdict().items()} | ||
|
||
|
||
def _parse_input(input_str: str) -> tuple[dict[str, _Workflow], Iterable[_Part]]: | ||
input_line_iter = iter(input_str.splitlines()) | ||
workflows = { | ||
workflow.name: workflow | ||
for workflow in map( | ||
_parse_workflow, itertools.takewhile(lambda line: line, input_line_iter) | ||
) | ||
} | ||
allowed_action_names = frozenset(itertools.chain(workflows.keys(), "AR")) | ||
assert all( | ||
workflow.default in allowed_action_names | ||
and all(rule.action in allowed_action_names for rule in workflow.rules) | ||
for workflow in workflows.values() | ||
) | ||
|
||
return workflows, map(_parse_part, input_line_iter) | ||
|
||
|
||
def p1(input_str: str) -> int: | ||
def _lt(left: int, right: int) -> bool: | ||
return left < right | ||
|
||
def _gt(left: int, right: int) -> bool: | ||
return left > right | ||
|
||
def _process_part( | ||
in_workflow: _Workflow, workflows: dict[str, _Workflow], part: _Part | ||
) -> bool: | ||
workflow = in_workflow | ||
while True: | ||
action = None | ||
for rule in workflow.rules: | ||
if rule.comparison == _Comparison.LT: | ||
compare = _lt | ||
elif rule.comparison == _Comparison.GT: | ||
compare = _gt | ||
else: | ||
raise AssertionError() | ||
|
||
if compare(part[rule.category], rule.value): | ||
action = rule.action | ||
break | ||
else: | ||
action = workflow.default | ||
|
||
assert action is not None | ||
|
||
if action == "A": | ||
return True | ||
if action == "R": | ||
return False | ||
workflow = workflows[action] | ||
|
||
workflows, parts_iter = _parse_input(input_str) | ||
in_workflow = workflows["in"] | ||
result = 0 | ||
for part in parts_iter: | ||
if not _process_part(in_workflow, workflows, part): | ||
logger.debug("Part %s rejected", part) | ||
continue | ||
|
||
total = sum(part.values()) | ||
result += total | ||
logger.debug( | ||
"Part %s accepted: total=%d, result so far=%d", part, total, result | ||
) | ||
|
||
return result | ||
|
||
|
||
@dataclass | ||
class _WorkflowStep: | ||
category_value_ranges: dict[str, list[range] | None] | ||
next_workflow: str | ||
|
||
|
||
def _merge_value_range( | ||
left: list[range] | None, right: list[range] | None | ||
) -> list[range] | None: | ||
if left is None: | ||
return None if right is None else [r for r in right] | ||
if right is None: | ||
return [r for r in left] | ||
|
||
right_iter = iter(right) | ||
try: | ||
r2: range | None = next(right_iter) | ||
except StopIteration: | ||
return [] | ||
|
||
result = list[range]() | ||
for r in left: | ||
assert r2 is not None | ||
while r2.stop < r.start: | ||
r2 = next(right_iter, None) | ||
if r2 is None: | ||
break | ||
|
||
if r2 is None: | ||
break | ||
|
||
assert r2.stop >= r.start | ||
if r2.start > r.stop: | ||
continue | ||
|
||
start = max(r.start, r2.start) | ||
stop = min(r.stop, r2.stop) | ||
assert start < stop | ||
result.append(range(start, stop)) | ||
|
||
while r2.start < r.stop: | ||
r2 = next(right_iter, None) | ||
if r2 is None: | ||
break | ||
|
||
if r2.start > r.stop: | ||
break | ||
|
||
start = max(r.start, r2.start) | ||
assert start == r2.start | ||
stop = min(r.stop, r2.stop) | ||
result.append(range(start, stop)) | ||
|
||
return result | ||
|
||
|
||
def _merge_category_value_ranges( | ||
left: Mapping[str, list[range] | None], right: Mapping[str, list[range] | None] | ||
) -> dict[str, list[range] | None]: | ||
return { | ||
category: _merge_value_range(left[category], right[category]) | ||
for category in left | ||
} | ||
|
||
|
||
def _negated_value_range(value_range: list[range] | None) -> list[range] | None: | ||
if value_range is None: | ||
return None | ||
|
||
if not value_range: | ||
return [range(1, 4_000 + 1)] | ||
|
||
result = list[range]() | ||
prev_stop = 1 | ||
for r in value_range: | ||
if r.start > prev_stop: | ||
result.append(range(prev_stop, r.start)) | ||
prev_stop = r.stop | ||
if prev_stop < 4_000 + 1: | ||
result.append(range(prev_stop, 4_000 + 1)) | ||
return result | ||
|
||
|
||
def _negated_category_value_ranges( | ||
category_value_ranges: Mapping[str, list[range] | None] | ||
) -> dict[str, list[range] | None]: | ||
return { | ||
category: _negated_value_range(category_value_ranges[category]) | ||
for category in category_value_ranges | ||
} | ||
|
||
|
||
def _category_value_ranges_from_rule(rule: _Rule) -> dict[str, list[range] | None]: | ||
return { | ||
category: ( | ||
None | ||
if category != rule.category | ||
else [range(1, rule.value)] | ||
if rule.comparison == _Comparison.LT | ||
else [range(rule.value + 1, 4_000 + 1)] | ||
) | ||
for category in "xmas" | ||
} | ||
|
||
|
||
def _possible_category_value_ranges( | ||
category_value_ranges: dict[str, list[range] | None] | ||
) -> bool: | ||
return all( | ||
value_ranges is None or value_ranges | ||
for value_ranges in category_value_ranges.values() | ||
) | ||
|
||
|
||
def _construct_workflow_steps(workflow: _Workflow) -> list[_WorkflowStep]: | ||
result = list[_WorkflowStep]() | ||
failure_limits: dict[str, list[range] | None] = {cat: None for cat in "xmas"} | ||
for rule in workflow.rules: | ||
category_value_ranges = _category_value_ranges_from_rule(rule) | ||
applicable_category_value_ranges = _merge_category_value_ranges( | ||
failure_limits, category_value_ranges | ||
) | ||
assert _possible_category_value_ranges(applicable_category_value_ranges) | ||
result.append(_WorkflowStep(applicable_category_value_ranges, rule.action)) | ||
negated_category_value_ranges = _negated_category_value_ranges( | ||
category_value_ranges | ||
) | ||
failure_limits = _merge_category_value_ranges( | ||
failure_limits, negated_category_value_ranges | ||
) | ||
|
||
result.append(_WorkflowStep(failure_limits, workflow.default)) | ||
return result | ||
|
||
|
||
def p2(input_str: str) -> int: | ||
workflows, _ = _parse_input(input_str) | ||
|
||
queue = Queue[_WorkflowStep]() | ||
queue.put_nowait( | ||
_WorkflowStep({cat: [range(1, 4_000 + 1)] for cat in _categories}, "in") | ||
) | ||
|
||
workflow_step_cache = dict[str, list[_WorkflowStep]]() | ||
|
||
accepted_category_value_ranges = list[dict[str, list[range]]]() | ||
while not queue.empty(): | ||
step = queue.get_nowait() | ||
logger.debug("Processing step: %s", step) | ||
if step.next_workflow == "R": | ||
logger.debug("Rejected step") | ||
continue | ||
if step.next_workflow == "A": | ||
logger.debug("Accepted step") | ||
assert all( | ||
ranges is not None for ranges in step.category_value_ranges.values() | ||
) | ||
ranges = cast(dict[str, list[range]], step.category_value_ranges) | ||
accepted_category_value_ranges.append(ranges) | ||
continue | ||
|
||
workflow = workflows[step.next_workflow] | ||
workflow_steps = workflow_step_cache.get(workflow.name) | ||
if workflow_steps is None: | ||
logger.debug("Constructing workflow steps for %s", workflow) | ||
workflow_steps = _construct_workflow_steps(workflow) | ||
logger.debug("Constructed workflow steps: %s", workflow_steps) | ||
workflow_step_cache[workflow.name] = workflow_steps | ||
|
||
for workflow_step in workflow_steps: | ||
new_category_value_ranges = _merge_category_value_ranges( | ||
step.category_value_ranges, workflow_step.category_value_ranges | ||
) | ||
if not _possible_category_value_ranges(new_category_value_ranges): | ||
logger.debug( | ||
"Skipping step -> workflow as ranges are impossible: %s -> %s", | ||
step, | ||
workflow_step, | ||
) | ||
continue | ||
|
||
queue.put_nowait( | ||
_WorkflowStep(new_category_value_ranges, workflow_step.next_workflow) | ||
) | ||
|
||
assert all( | ||
ranges is not None and len(ranges) == 1 | ||
for cat_value_ranges in accepted_category_value_ranges | ||
for ranges in cat_value_ranges.values() | ||
) | ||
|
||
if logger.isEnabledFor(logging.DEBUG): | ||
for i, cat_value_ranges in enumerate(accepted_category_value_ranges): | ||
logger.debug("i=%d, cat_value_ranges=%s", i, cat_value_ranges) | ||
lenghts = {cat: len(ranges[0]) for cat, ranges in cat_value_ranges.items()} | ||
logger.debug("i=%d, range_lengths=%s", i, lenghts) | ||
|
||
return sum( | ||
math.prod(len(ranges[0]) for ranges in cat_value_ranges.values()) | ||
for cat_value_ranges in accepted_category_value_ranges | ||
) |
Oops, something went wrong.