Skip to content

Commit

Permalink
Attempt to no longer use the docker data image and pull everything fr…
Browse files Browse the repository at this point in the history
…om the archive.
  • Loading branch information
cmccully committed Dec 5, 2023
1 parent d064f8d commit 21550b7
Show file tree
Hide file tree
Showing 7 changed files with 416 additions and 85 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ jobs:
- name: Test Master Bias Creation
run: |
kubectl exec banzai-e2e-test -c banzai-listener -- pytest -s --pyargs banzai.tests --durations=0 --junitxml=/archive/engineering/pytest-master-bias.xml -m master_bias
- name: Cleanup
run: |
kubectl delete banzai-e2e-test
7 changes: 0 additions & 7 deletions banzai/dark.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from banzai.calibrations import CalibrationStacker, CalibrationUser, CalibrationComparer
from banzai.utils import qc
from banzai.logs import get_logger
import numpy as np


logger = get_logger()

Expand Down Expand Up @@ -42,14 +40,9 @@ def calibration_type(self):

def apply_master_calibration(self, image, master_calibration_image):
master_calibration_image *= image.exptime
temperature_scaling_factor = np.exp(master_calibration_image.dark_temperature_coefficient * \
(image.measured_ccd_temperature - master_calibration_image.measured_ccd_temperature))
master_calibration_image *= temperature_scaling_factor

image -= master_calibration_image
image.meta['L1IDDARK'] = master_calibration_image.filename, 'ID of dark frame'
image.meta['L1STATDA'] = 1, 'Status flag for dark frame correction'
image.meta['DRKTSCAL'] = temperature_scaling_factor, 'Temperature scaling factor applied to dark image'
return image


Expand Down
332 changes: 332 additions & 0 deletions banzai/tests/data/test_data.dat

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions banzai/tests/data/test_precals.dat
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
frameid filename binning mode obstype dateobs site instrument
9741441 bpm_lsc_fa03_20181001_bin1x1.fits.fz '[1, 1]' default BPM lsc fa03
3519536 bpm_ogg_fs02_20140325_bin2x2.fits.fz '[2, 2]' default BPM ogg fs02
8831324 bpm_ogg_kb27_20180605_bin1x1.fits.fz '[1, 1]' default BPM ogg kb27
13261910 bpm-cpt-fa06-central2k-20190617.fits.fz '[2, 2]' central2k BPM cpt fa06
33347346 tst1m0XX-ep01-20200710-bpm-1.fits.fz '[1, 1]' default BPM tst ep01
33347347 tst1m0XX-ep02-20200710-bpm-1.fits.fz '[1, 1]' default BPM tst ep02
33347348 tst1m0XX-ep03-20200710-bpm-1.fits.fz '[1, 1]' default BPM tst ep03
33347349 tst1m0XX-ep04-20200710-bpm-1.fits.fz '[1, 1]' default BPM tst ep04
58186456 ogg0m404-sq30-20221123-readnoise-default-e2e.fits.fz '[1, 1]' default READNOISE ogg sq30
56389948 ogg0m404-sq30-20221206-bpm-default.fits.fz '[1, 1]' default BPM ogg sq30
30 changes: 2 additions & 28 deletions banzai/tests/e2e-k8s.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,7 @@ spec:
securityContext:
fsGroup: 10000

initContainers:
# When the Pod is initialized, copy all files within the container at path
# /archive/engineering into the empty data volume mounted at /data
- name: banzai-data
image: docker.lco.global/banzai-e2e-data:1.4.2
imagePullPolicy: IfNotPresent
securityContext:
runAsUser: 10087
runAsGroup: 10000
volumeMounts:
- name: banzai-data
mountPath: /data
readOnly: false
command:
- /bin/cp
- -a
- /archive/engineering
- /data/
resources:
requests:
cpu: 0.1
memory: 256M
limits:
cpu: 1
memory: 1Gi

containers:
containers:
- name: banzai-redis
image: redis:5.0.3
imagePullPolicy: IfNotPresent
Expand Down Expand Up @@ -141,7 +115,7 @@ spec:
memory: 6Gi
limits:
cpu: 8
memory: 8Gi
memory: 10Gi
- name: banzai-celery-beat
image: banzai:test-latest
imagePullPolicy: IfNotPresent
Expand Down
113 changes: 65 additions & 48 deletions banzai/tests/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from types import ModuleType
from banzai.celery import app, schedule_calibration_stacking
from banzai.dbs import get_session, CalibrationImage, get_timezone, populate_instrument_tables
from banzai.dbs import mark_frame
from banzai.utils import fits_utils, file_utils
from banzai.dbs import mark_frame, query_for_instrument
from banzai.utils import file_utils
from banzai.main import add_super_calibration
from banzai.tests.utils import FakeResponse, get_min_and_max_dates, FakeContext
from astropy.io import fits
from astropy.io import fits, ascii
import pkg_resources
from banzai.logs import get_logger

Expand All @@ -28,15 +28,18 @@

app.conf.update(CELERY_TASK_ALWAYS_EAGER=True)

TEST_PACKAGE = 'banzai.tests'
TEST_FRAMES = ascii.read(pkg_resources.resource_filename(TEST_PACKAGE, 'data/test_data.dat'))

PRECAL_FRAMES = ascii.read(pkg_resources.resource_filename(TEST_PACKAGE, 'data/test_precal.dat'))

DATA_ROOT = os.path.join(os.sep, 'archive', 'engineering')
SITES = [os.path.basename(site_path) for site_path in glob(os.path.join(DATA_ROOT, '???'))]
INSTRUMENTS = [os.path.join(site, os.path.basename(instrument_path)) for site in SITES
for instrument_path in glob(os.path.join(os.path.join(DATA_ROOT, site, '*')))]
# Use the LCO filenaming convention to infer the sites
SITES = set([frame[:3] for frame in TEST_FRAMES['filename']])
INSTRUMENTS = set([os.path.join(frame[:3], frame.split('-')[1]) for frame in TEST_FRAMES['filename']])

DAYS_OBS = [os.path.join(instrument, os.path.basename(dayobs_path)) for instrument in INSTRUMENTS
for dayobs_path in glob(os.path.join(DATA_ROOT, instrument, '20*'))]
DAYS_OBS = set([os.path.join(frame[:3], frame.split('-')[1], frame.split('-')[2]) for frame in TEST_FRAMES['filename']])

TEST_PACKAGE = 'banzai.tests'
CONFIGDB_FILENAME = pkg_resources.resource_filename(TEST_PACKAGE, 'data/configdb_example.json')


Expand All @@ -62,14 +65,14 @@ def celery_join():
break


def run_reduce_individual_frames(raw_filenames):
logger.info('Reducing individual frames for filenames: {filenames}'.format(filenames=raw_filenames))
for day_obs in DAYS_OBS:
raw_path = os.path.join(DATA_ROOT, day_obs, 'raw')
for filename in glob(os.path.join(raw_path, raw_filenames)):
file_utils.post_to_archive_queue(filename, os.getenv('FITS_BROKER'), exchange_name=os.getenv('FITS_EXCHANGE'))
def run_reduce_individual_frames(filename_pattern):
logger.info('Reducing individual frames for filenames: {filenames}'.format(filenames=filename_pattern))
for frame in TEST_FRAMES:
if filename_pattern in frame['filename']:
file_utils.post_to_archive_queue(frame['filename'], frame['frameid'], os.getenv('FITS_BROKER'),
exchange_name=os.getenv('FITS_EXCHANGE'))
celery_join()
logger.info('Finished reducing individual frames for filenames: {filenames}'.format(filenames=raw_filenames))
logger.info('Finished reducing individual frames for filenames: {filenames}'.format(filenames=filename_pattern))


def stack_calibrations(frame_type):
Expand Down Expand Up @@ -102,39 +105,44 @@ def mark_frames_as_good(raw_filenames):
logger.info('Finished marking frames as good for filenames: {filenames}'.format(filenames=raw_filenames))


def get_expected_number_of_calibrations(raw_filenames, calibration_type):
def get_expected_number_of_calibrations(raw_filename_pattern, calibration_type):
context = FakeContext()
context.db_address = os.environ['DB_ADDRESS']
number_of_stacks_that_should_have_been_created = 0
for day_obs in DAYS_OBS:
raw_filenames_for_this_dayobs = glob(os.path.join(DATA_ROOT, day_obs, 'raw', raw_filenames))
if calibration_type.lower() == 'skyflat':
site, instrument, dayobs = day_obs.split('/')
raw_frames_for_this_dayobs = [
frame for frame in TEST_FRAMES
if site in frame['filename'] and instrument in frame['filename']
and dayobs in frame['filename'] and raw_filename_pattern in frame['filename']
]
if 'calibration_type.lower()' == 'skyflat':
# Group by filter
observed_filters = []
for raw_filename in raw_filenames_for_this_dayobs:
skyflat_hdu, skyflat_filename, frame_id = fits_utils.open_fits_file({'path': raw_filename}, context)
observed_filters.append(skyflat_hdu[0].header.get('FILTER'))
for frame in raw_frames_for_this_dayobs:
observed_filters.append(frame['filter'])
observed_filters = set(observed_filters)
number_of_stacks_that_should_have_been_created += len(observed_filters)
else:
# Just one calibration per night
if len(raw_filenames_for_this_dayobs) > 0:
if len(raw_frames_for_this_dayobs) > 0:
number_of_stacks_that_should_have_been_created += 1
return number_of_stacks_that_should_have_been_created


def run_check_if_stacked_calibrations_were_created(raw_filenames, calibration_type):
def run_check_if_stacked_calibrations_were_created(raw_file_pattern, calibration_type):
created_stacked_calibrations = []
number_of_stacks_that_should_have_been_created = get_expected_number_of_calibrations(raw_filenames, calibration_type)
number_of_stacks_that_should_have_been_created = get_expected_number_of_calibrations(raw_file_pattern, calibration_type)
for day_obs in DAYS_OBS:
created_stacked_calibrations += glob(os.path.join(DATA_ROOT, day_obs, 'processed',
'*' + calibration_type.lower() + '*.fits*'))
assert number_of_stacks_that_should_have_been_created > 0
assert len(created_stacked_calibrations) == number_of_stacks_that_should_have_been_created


def run_check_if_stacked_calibrations_are_in_db(raw_filenames, calibration_type):
number_of_stacks_that_should_have_been_created = get_expected_number_of_calibrations(raw_filenames, calibration_type)
def run_check_if_stacked_calibrations_are_in_db(raw_file_pattern, calibration_type):
number_of_stacks_that_should_have_been_created = get_expected_number_of_calibrations(raw_file_pattern,
calibration_type)
with get_session(os.environ['DB_ADDRESS']) as db_session:
calibrations_in_db = db_session.query(CalibrationImage).filter(CalibrationImage.type == calibration_type)
calibrations_in_db = calibrations_in_db.filter(CalibrationImage.is_master).all()
Expand All @@ -156,19 +164,28 @@ def observation_portal_side_effect(*args, **kwargs):
# Note this is complicated by the fact that things are running as celery tasks.
@pytest.mark.e2e
@pytest.fixture(scope='module')
@mock.patch('banzai.main.argparse.ArgumentParser.parse_args')
@mock.patch('banzai.main.file_utils.post_to_ingester', return_value={'frameid': None})
@mock.patch('banzai.dbs.requests.get', return_value=FakeResponse(CONFIGDB_FILENAME))
def init(configdb, mock_ingester, mock_args):
os.system(f'banzai_create_db --db-address={os.environ["DB_ADDRESS"]}')
populate_instrument_tables(db_address=os.environ["DB_ADDRESS"], configdb_address='http://fakeconfigdb')
for instrument in INSTRUMENTS:
for bpm_filepath in glob(os.path.join(DATA_ROOT, instrument, 'bpm/*bpm*')):
mock_args.return_value = argparse.Namespace(filepath=bpm_filepath, db_address=os.environ['DB_ADDRESS'], log_level='debug')
add_super_calibration()
for noise_map_filepath in glob(os.path.join(DATA_ROOT, instrument, 'readnoise/*readnoise*')):
mock_args.return_value = argparse.Namespace(filepath=noise_map_filepath, db_address=os.environ['DB_ADDRESS'], log_level='debug')
add_super_calibration()

for frame in PRECAL_FRAMES:
instrument = query_for_instrument(camera=frame['instrument'],
site=frame['site'],
db_address=os.environ['DB_ADDRESS'])
calimage = CalibrationImage(
type=frame['obstype'],
filename=frame['filename'],
frameid=frame['frameid'],
dateobs=frame['dateobs'],
datecreated='2023-11-19',
instrument_id=instrument.id,
is_master=True, is_bad=False,
attributes={'binning': frame['binning'], 'configuration_mode': frame['mode']}
)
with get_session(os.environ['DB_ADDRESS']) as db_session:
db_session.add(calimage)
db_session.commit()


@pytest.mark.e2e
Expand All @@ -177,13 +194,13 @@ class TestMasterBiasCreation:
@pytest.fixture(autouse=True)
@mock.patch('banzai.utils.observation_utils.requests.get', side_effect=observation_portal_side_effect)
def stack_bias_frames(self, mock_observation_portal, init):
run_reduce_individual_frames('*b00.fits*')
run_reduce_individual_frames('b00.fits')
mark_frames_as_good('*b91.fits*')
stack_calibrations('bias')

def test_if_stacked_bias_frame_was_created(self):
run_check_if_stacked_calibrations_were_created('*b00.fits*', 'bias')
run_check_if_stacked_calibrations_are_in_db('*b00.fits*', 'BIAS')
run_check_if_stacked_calibrations_were_created('b00.fits', 'bias')
run_check_if_stacked_calibrations_are_in_db('b00.fits', 'BIAS')


@pytest.mark.e2e
Expand All @@ -192,13 +209,13 @@ class TestMasterDarkCreation:
@pytest.fixture(autouse=True)
@mock.patch('banzai.utils.observation_utils.requests.get', side_effect=observation_portal_side_effect)
def stack_dark_frames(self, mock_observation_portal):
run_reduce_individual_frames('*d00.fits*')
run_reduce_individual_frames('d00.fits')
mark_frames_as_good('*d91.fits*')
stack_calibrations('dark')

def test_if_stacked_dark_frame_was_created(self):
run_check_if_stacked_calibrations_were_created('*d00.fits*', 'dark')
run_check_if_stacked_calibrations_are_in_db('*d00.fits*', 'DARK')
run_check_if_stacked_calibrations_were_created('d00.fits', 'dark')
run_check_if_stacked_calibrations_are_in_db('d00.fits', 'DARK')


@pytest.mark.e2e
Expand All @@ -207,13 +224,13 @@ class TestMasterFlatCreation:
@pytest.fixture(autouse=True)
@mock.patch('banzai.utils.observation_utils.requests.get', side_effect=observation_portal_side_effect)
def stack_flat_frames(self, mock_observation_portal):
run_reduce_individual_frames('*f00.fits*')
run_reduce_individual_frames('f00.fits')
mark_frames_as_good('*f91.fits*')
stack_calibrations('skyflat')

def test_if_stacked_flat_frame_was_created(self):
run_check_if_stacked_calibrations_were_created('*f00.fits*', 'skyflat')
run_check_if_stacked_calibrations_are_in_db('*f00.fits*', 'SKYFLAT')
run_check_if_stacked_calibrations_were_created('f00.fits', 'skyflat')
run_check_if_stacked_calibrations_are_in_db('f00.fits', 'SKYFLAT')


@pytest.mark.e2e
Expand All @@ -222,14 +239,14 @@ class TestScienceFileCreation:
@pytest.fixture(autouse=True)
@mock.patch('banzai.utils.observation_utils.requests.get', side_effect=observation_portal_side_effect)
def reduce_science_frames(self, mock_observation_portal):
run_reduce_individual_frames('*e00.fits*')
run_reduce_individual_frames('e00.fits')

def test_if_science_frames_were_created(self):
expected_files = []
created_files = []
for day_obs in DAYS_OBS:
expected_files += [os.path.basename(filename).replace('e00', 'e91')
for filename in glob(os.path.join(DATA_ROOT, day_obs, 'raw', '*e00*'))]
expected_files += [filename.replace('e00', 'e91')
for filename in TEST_FRAMES['filename']]
created_files += [os.path.basename(filename) for filename in glob(os.path.join(DATA_ROOT, day_obs,
'processed', '*e91*'))]
assert len(expected_files) > 0
Expand Down
4 changes: 2 additions & 2 deletions banzai/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
logger = get_logger()


def post_to_archive_queue(image_path, broker_url, exchange_name='fits_files'):
def post_to_archive_queue(filename, frameid, broker_url, exchange_name='fits_files'):
exchange = Exchange(exchange_name, type='fanout')
with Connection(broker_url) as conn:
producer = conn.Producer(exchange=exchange)
producer.publish({'path': image_path})
producer.publish({'filename': filename, 'frameid': frameid})
producer.release()


Expand Down

0 comments on commit 21550b7

Please sign in to comment.