Skip to content

Commit

Permalink
Merge pull request #366 from LCOGT/fix/stage-grouping
Browse files Browse the repository at this point in the history
Fix a bug that wouldn't allow only grouping by instrument
  • Loading branch information
cmccully authored Aug 11, 2023
2 parents caf7345 + c90e399 commit 5ccd3e9
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
1.11.0 (2023-08-10)
-------------------
- Added the process_by_group keyword to stages to fix a bug that wouldn't allow grouping only by instrument
- Updated the logging scheme

1.10.1 (2023-05-31)
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pipeline {
script {
withKubeConfig([credentialsId: 'build-kube-config']) {
// delete previous run if the previous failed somehow
sh('kubectl -n dev delete pod banzai-e2e-test || true')
sh('kubectl -n build delete pod banzai-e2e-test || true')
// we will be testing the image that we just built
sh('sed -i -e "s^@BANZAI_IMAGE@^${DOCKER_IMG}^g" banzai/tests/e2e-k8s.yaml')
// deploy the test pod to the cluster
Expand Down
4 changes: 4 additions & 0 deletions banzai/calibrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class CalibrationStacker(CalibrationMaker):
def __init__(self, runtime_context):
super(CalibrationStacker, self).__init__(runtime_context)

@property
def process_by_group(self):
return True

def make_master_calibration_frame(self, images):
make_calibration_name = file_utils.make_calibration_filename_function(self.calibration_type,
self.runtime_context)
Expand Down
2 changes: 1 addition & 1 deletion banzai/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def run_realtime_pipeline():
def start_listener(runtime_context):
# Need to keep the amqp logger level at least as high as INFO,
# or else it send heartbeat check messages every second
logging.getLogger('amqp').setLevel(max(logging.getLogger().level, getattr(logging, 'INFO')))
logging.getLogger('amqp').setLevel(logging.WARNING)
logger.info('Starting pipeline listener')

fits_exchange = Exchange(runtime_context.FITS_EXCHANGE, type='fanout')
Expand Down
12 changes: 9 additions & 3 deletions banzai/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ def stage_name(self):
def group_by_attributes(self):
return []

@property
def process_by_group(self):
return False

def get_grouping(self, image):
grouping_criteria = [image.instrument.site, image.instrument.id]
if self.group_by_attributes:
Expand All @@ -30,11 +34,13 @@ def get_grouping(self, image):
def run(self, images):
if not images:
return images
if not self.group_by_attributes:
image_sets = images
else:
if self.group_by_attributes or self.process_by_group:
images.sort(key=self.get_grouping)
image_sets = [list(image_set) for _, image_set in itertools.groupby(images, self.get_grouping)]
else:
# Treat each image individually
image_sets = images

processed_images = []
for image_set in image_sets:
try:
Expand Down
13 changes: 6 additions & 7 deletions banzai/tests/e2e-k8s.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ spec:
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 0.1
cpu: 1
memory: 512M
limits:
cpu: 1
cpu: 2
memory: 1Gi
readinessProbe:
exec:
Expand Down Expand Up @@ -110,7 +110,6 @@ spec:
value: "e2e_task_queue"
- name: REFERENCE_CATALOG_URL
value: "http://phot-catalog.lco.gtn/"

command:
- celery
- -A
Expand All @@ -119,7 +118,7 @@ spec:
- --hostname
- "banzai-celery-worker"
- --concurrency
- "2"
- "4"
- -l
- "debug"
- "-Q"
Expand All @@ -139,10 +138,10 @@ spec:
timeoutSeconds: 10
resources:
requests:
cpu: 1
memory: 8Gi
cpu: 6
memory: 6Gi
limits:
cpu: 2
cpu: 8
memory: 8Gi
- name: banzai-celery-beat
image: @BANZAI_IMAGE@
Expand Down
4 changes: 2 additions & 2 deletions helm-chart/banzai/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ horizontalPodAutoscaler:

image:
repository: docker.lco.global/banzai
tag: "1.10.0"
tag: "1.11.0"
pullPolicy: IfNotPresent

# Values for the OCS Ingester library, used by BANZAI.
Expand All @@ -35,7 +35,7 @@ banzai:
calibrateProposalId: calibrate
banzaiWorkerLogLevel: info
rawDataApiRoot: http://archiveapi-internal.prod/
fitsBroker: rabbitmq.lco.gtn
fitsBroker: rabbitmq-ha.prod.svc.cluster.local.
fitsExchange: archived_fits
queueName: banzai_pipeline
celeryTaskQueueName: banzai_imaging
Expand Down

0 comments on commit 5ccd3e9

Please sign in to comment.