Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

08 multisource run #97

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def get_date_from_PSfilename(name):
return date


# TODO: create empty dataframe if no files found
def get_datasets(path, depth=0, preprocessed=False):
dirs = listdirs2(path, depth=depth)
df = pd.DataFrame(data=dirs, columns=['path'])
Expand Down Expand Up @@ -141,6 +142,7 @@ def get_processing_status(raw_data_dir, processing_dir, inference_dir, model):
except:
df_raw = get_datasets(raw_data_dir, depth=0)
# get processed
# TODO: check here for both options - make 2 runs
df_processed = get_datasets(processing_dir / 'tiles', depth=0, preprocessed=True)
# calculate prperties
diff = df_raw[~df_raw['name'].isin(df_processed['name'])]
Expand Down
126 changes: 126 additions & 0 deletions process_02_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@

from pathlib import Path
import torch
import pandas as pd
import os
import numpy as np
import tqdm
from joblib import delayed, Parallel
import shutil
from tqdm.notebook import tqdm
import swifter

from lib.postprocessing import *

# ### Settings

# Local code dir
CODE_DIR = Path('/isipd/projects/p_aicore_pf/initze/code/aicore_inference')
# Location of raw data
RAW_DATA_DIR = Path('/isipd/projects/p_aicore_pf/initze/data/planet/planet_data_inference_grid/tiles')
# Location data processing
PROCESSING_DIR = Path('/isipd/projects/p_aicore_pf/initze/processing')
# Target directory for
INFERENCE_DIR = Path('/isipd/projects/p_aicore_pf/initze/processed/inference')

# Target to models - RTS
MODEL_DIR = Path('/isipd/projects/p_aicore_pf/initze/models/thaw_slumps')

MODEL='RTS_v6_notcvis'
models = ['RTS_v6_notcvis', 'RTS_v6_tcvis']

#USE_GPU = [1,2,3,4]
USE_GPU = [1,2]
RUNS_PER_GPU = 5
MAX_IMAGES = None

# ### List all files with properties
df_processing_status = get_processing_status(RAW_DATA_DIR, PROCESSING_DIR, INFERENCE_DIR, MODEL)

df_final = df_processing_status

total_images = len(df_final)
preprocessed_images = df_final.preprocessed.sum()
finished_images = df_final.inference_finished.sum()
print(f'Number of images: {total_images}')
print(f'Number of preprocessed images: {preprocessed_images}')
print(f'Number of finished images: {finished_images}')
print(f'Number of image to process: {preprocessed_images - finished_images}')

# ## Preprocessing

# #### Update Arctic DEM data
print('Updating Elevation VRTs!')
dem_data_dir = Path('/isipd/projects/p_aicore_pf/initze/data/ArcticDEM')
vrt_target_dir = Path('/isipd/projects/p_aicore_pf/initze/processing/auxiliary/ArcticDEM')
#update_DEM(vrt_target_dir)
update_DEM2(dem_data_dir=dem_data_dir, vrt_target_dir=vrt_target_dir)

# #### Copy data for Preprocessing
# make better documentation

df_preprocess = df_final[~df_final.preprocessed]
print(f'Number of images to preprocess: {len(df_preprocess)}')

# Cleanup processing directories to avoid incomplete processing
input_dir_dslist = list((PROCESSING_DIR / 'input').glob('*'))
if len(input_dir_dslist) > 0:
print(input_dir_dslist)
for d in input_dir_dslist:
print('Delete', d)
shutil.rmtree(d)
else:
print('Processing directory is ready, nothing to do!')

# Copy Data
_ = df_preprocess.swifter.apply(lambda x: copy_unprocessed_files(x, PROCESSING_DIR), axis=1)

# #### Run Preprocessing
import warnings
warnings.filterwarnings('ignore')

N_JOBS=40
print(f'Preprocessing {len(df_preprocess)} images') #fix this
if len(df_preprocess) > 0:
pp_string = f'python setup_raw_data.py --data_dir {PROCESSING_DIR} --n_jobs {N_JOBS} --nolabel'
os.system(pp_string)

# ## Processing/Inference
# rerun processing status
df_processing_status2 = get_processing_status(RAW_DATA_DIR, PROCESSING_DIR, INFERENCE_DIR, MODEL)

# Filter to images that are not preprocessed yet
df_process = df_final[~df_final.inference_finished]
# update overview and filter accordingly - really necessary?
df_process_final = df_process.set_index('name').join(df_processing_status2[df_processing_status2['preprocessed']][['name']].set_index('name'), how='inner').reset_index(drop=False).iloc[:MAX_IMAGES]
# validate if images are correctly preprocessed
df_process_final['preprocessing_valid'] = (df_process_final.apply(lambda x: len(list(x['path'].glob('*'))), axis=1) >= 5)
# final filtering process to remove incorrectly preprocessed data
df_process_final = df_process_final[df_process_final['preprocessing_valid']]

print(f'Number of images:', len(df_process_final))

# #### Parallel runs
# Make splits to distribute the processing
n_splits = len(USE_GPU) * RUNS_PER_GPU
df_split = np.array_split(df_process_final, n_splits)
gpu_split = USE_GPU * RUNS_PER_GPU

#for split in df_split:
# print(f'Number of images: {len(split)}')

print('Run inference!')
# ### Parallel Inference execution
_ = Parallel(n_jobs=n_splits)(delayed(run_inference)(df_split[split], model=MODEL, processing_dir=PROCESSING_DIR, inference_dir=INFERENCE_DIR, model_dir=MODEL_DIR, gpu=gpu_split[split], run=True) for split in range(n_splits))
# #### Merge output files

# read all files which followiw the above defined threshold
flist = list((INFERENCE_DIR / MODEL).glob(f'*/*pred_binarized.shp'))
len(flist)
if len(df_process_final) > 0:
# load them in parallel
out = Parallel(n_jobs=6)(delayed(load_and_parse_vector)(f) for f in tqdm(flist[:]))
# merge them and save to geopackage file
merged_gdf = gpd.pd.concat(out)
print(INFERENCE_DIR / MODEL / f'{MODEL}_merged.gpkg')
merged_gdf.to_file(INFERENCE_DIR / MODEL / f'{MODEL}_merged.gpkg')
100 changes: 100 additions & 0 deletions process_03_ensemble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# # Create ensemble results from several model outputs
# ### Imports

from pathlib import Path
import pandas as pd
from joblib import delayed, Parallel
#from tqdm.notebook import tqdm
from tqdm import tqdm
from lib.postprocessing import *
import geopandas as gpd

# ### Settings
# Local code dir
CODE_DIR = Path('.')
BASE_DIR = Path('../..')
# Location of raw data
# TODO: make support for multiple sources
RAW_DATA_DIR = BASE_DIR / Path('data/planet/planet_data_inference_grid/tiles')
# Location data processing
PROCESSING_DIR = BASE_DIR / 'processing'
# Target directory for
INFERENCE_DIR = BASE_DIR / Path('processed/inference')
# Target to models - RTS
MODEL_DIR = BASE_DIR / Path('models/thaw_slumps')
# Ensemble Target
ENSEMBLE_NAME = 'RTS_v6_ensemble_v2'
MODEL_NAMES = ['RTS_v6_notcvis', 'RTS_v6_tcvis']
N_IMAGES = None # automatically run full set
N_JOBS = 15 # number of cpu jobs for ensembling
N_VECTOR_LOADERS = 6 # number of parallel vector loaders for final merge

# ### Select Data
# * create list of available files
# * filter whats available

# check if cucim is available
try:
import cucim
try_gpu = True
print ('Running ensembling with GPU!')
except:
try_gpu = False
print ('Cucim import failed')

# setup all params
kwargs_ensemble = {
'ensemblename': ENSEMBLE_NAME,
'inference_dir': INFERENCE_DIR,
'modelnames': MODEL_NAMES,
'binary_threshold': [0.4, 0.45, 0.5],
'border_size': 10,
'minimum_mapping_unit': 32,
'delete_binary': True,
'try_gpu': False, # currently default to CPU only
'gpu' : 0,
}


# Check for finalized products
df_processing_status = get_processing_status(RAW_DATA_DIR, PROCESSING_DIR, INFERENCE_DIR, model=kwargs_ensemble['ensemblename'])
df_ensemble_status = get_processing_status_ensemble(INFERENCE_DIR, model_input_names=kwargs_ensemble['modelnames'], model_ensemble_name=kwargs_ensemble['ensemblename'])
# Check which need to be process - check for already processed and invalid files
process = df_ensemble_status[df_ensemble_status['process']]

# #### Filter by tile_ids

#process = process[process.apply(lambda x: x['name'].split('_')[1].startswith('42'), axis=1)]
df_processing_status.groupby('inference_finished').count()

# #### Documentation

print('Number of files to process')
process.groupby('process').count().iloc[0,0]

# #### Run Ensemble Merging

print(f'Start running ensemble with {N_JOBS} jobs!')
print(f'Target ensemble name:', kwargs_ensemble['ensemblename'])
print(f'Source model output', kwargs_ensemble['modelnames'])
_ = Parallel(n_jobs=N_JOBS)(delayed(create_ensemble_v2)(image_id=process.iloc[row]['name'], **kwargs_ensemble) for row in tqdm(range(len(process.iloc[:N_IMAGES]))))

# #### run parallelized batch

# ### Merge vectors to complete dataset


ensemblename = ENSEMBLE_NAME
# set probability levels: 'class_05' means 50%, 'class_045' means 45%. This is the regex to search for vector naming
proba_strings = ['class_05', 'class_045','class_04']

for proba_string in proba_strings:
# read all files which followiw the above defined threshold
flist = list((INFERENCE_DIR / ensemblename).glob(f'*/*_{proba_string}.gpkg'))
len(flist)
# load them in parallel
out = Parallel(n_jobs=6)(delayed(load_and_parse_vector)(f) for f in tqdm(flist[:N_IMAGES]))
# merge them and save to geopackage file
print ('Merging results')
merged_gdf = gpd.pd.concat(out)
merged_gdf.to_file(INFERENCE_DIR / ensemblename / f'merged_{proba_string}.gpkg')