Skip to content

Commit

Permalink
calib: parallelize patch library generation (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
petrasovaa committed Mar 15, 2022
1 parent 92db14d commit baae2d2
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 27 deletions.
129 changes: 103 additions & 26 deletions r.futures/r.futures.calib/r.futures.calib.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,52 @@
import atexit
import numpy as np
from io import StringIO
from multiprocessing import Process, Queue
from multiprocessing import Process, Queue, Pool

import grass.script.core as gcore
import grass.script.raster as grast
import grass.script.utils as gutils
from grass.exceptions import CalledModuleError


try:
from grass.script.utils import append_random
except ImportError:
import random
import string

def append_random(name, suffix_length=None, total_length=None):
"""Add a random part to of a specified length to a name (string)
>>> append_random("tmp", 8)
>>> append_random("tmp", total_length=16)
..note::
This function is copied from grass79.
"""
if suffix_length and total_length:
raise ValueError(
"Either suffix_length or total_length can be provided, not both"
)
if not suffix_length and not total_length:
raise ValueError("suffix_length or total_length has to be provided")
if total_length:
# remove len of name and one underscore
name_length = len(name)
suffix_length = total_length - name_length - 1
if suffix_length <= 0:
raise ValueError(
"No characters left for the suffix:"
" total_length <{total_length}> is too small"
" or name <{name}> ({name_length}) is too long".format(**locals())
)
# We don't do lower and upper case because that could cause conflicts in
# contexts which are case-insensitive.
# We use lowercase because that's what is in UUID4 hex string.
allowed_chars = string.ascii_lowercase + string.digits
# The following can be shorter with random.choices from Python 3.6.
suffix = "".join(random.choice(allowed_chars) for _ in range(suffix_length))
return "{name}_{suffix}".format(**locals())


TMP = []


Expand Down Expand Up @@ -391,26 +429,59 @@ def new_development(development_end, development_diff):
dev_end=development_end), overwrite=True, quiet=True)


def patch_analysis_per_subregion(development_diff, subregions, threshold, tmp_clump, tmp_clump_cat):
gcore.run_command('r.clump', input=development_diff, output=tmp_clump, overwrite=True, quiet=True)
cats = gcore.read_command("r.describe", flags="1n", map=subregions, quiet=True).strip().splitlines()
subregions_data = {}
def analyse_subregion(params):
tmp_clump_cat, subregions, cat, clump, threshold = params
grast.mapcalc(
"{new} = if ({reg} == {cat}, {clump}, null())".format(
new=tmp_clump_cat, reg=subregions, cat=cat, clump=clump
),
overwrite=True,
)
env = os.environ.copy()
env["GRASS_REGION"] = gcore.region_env(zoom=tmp_clump_cat)
try:
data = gcore.read_command(
"r.object.geometry",
input=tmp_clump_cat,
flags="m",
separator="comma",
env=env,
quiet=True,
).strip()
data = np.loadtxt(StringIO(data), delimiter=",", usecols=(1, 2), skiprows=1)
# in case there is just one record
data = data.reshape((-1, 2))
return data[data[:, 0] > threshold]
except CalledModuleError:
gcore.warning(
"Subregion {cat} has no changes in development, no patches found.".format(
cat=cat
)
)
return np.empty([0, 2])


def patch_analysis_per_subregion_parallel(
development_diff, subregions, threshold, tmp_clump, tmp_name, nprocs
):
gcore.run_command(
"r.clump", input=development_diff, output=tmp_clump, overwrite=True, quiet=True
)
cats = (
gcore.read_command("r.describe", flags="1n", map=subregions, quiet=True)
.strip()
.splitlines()
)
params = []
toremove = []
for cat in cats:
grast.mapcalc('{new} = if ({reg} == {cat}, {clump}, null())'.format(new=tmp_clump_cat, reg=subregions,
cat=cat, clump=tmp_clump),
overwrite=True)
env['GRASS_REGION'] = gcore.region_env(zoom=tmp_clump_cat)
try:
data = gcore.read_command('r.object.geometry', input=tmp_clump_cat,
flags='m', separator='comma', env=env, quiet=True).strip()
data = np.loadtxt(StringIO(data), delimiter=',', usecols=(1, 2), skiprows=1)
# in case there is just one record
data = data.reshape((-1, 2))
subregions_data[cat] = data[data[:, 0] > threshold]
except CalledModuleError:
gcore.warning("Subregion {cat} has no changes in development, no patches found.".format(cat=cat))
subregions_data[cat] = np.empty([0, 2])
tmp_clump_cat = append_random(tmp_name, suffix_length=8)
toremove.append(tmp_clump_cat)
params.append((tmp_clump_cat, subregions, cat, tmp_clump, threshold))
with Pool(processes=nprocs) as pool:
results = pool.map_async(analyse_subregion, params).get()
subregions_data = dict(zip(cats, results))
gcore.run_command("g.remove", type="raster", flags="f", name=toremove, quiet=True)
return subregions_data


Expand Down Expand Up @@ -508,10 +579,11 @@ def process_calibration(calib_file):
def main():
check_addon_installed('r.object.geometry', fatal=True)

dev_start = options['development_start']
dev_end = options['development_end']
only_file = flags['l']
patches_per_subregion = flags['s']
dev_start = options["development_start"]
dev_end = options["development_end"]
only_file = flags["l"]
nprocs = int(options["nprocs"])
patches_per_subregion = flags["s"]
if not only_file:
repeat = int(options['repeat'])
compactness_means = [float(each) for each in options['compactness_mean'].split(',')]
Expand Down Expand Up @@ -544,8 +616,14 @@ def main():
diff_development(dev_start, dev_end, options['subregions'], orig_patch_diff)
data = write_data = patch_analysis(orig_patch_diff, threshold, tmp_clump)
if patches_per_subregion:
subregions_data = patch_analysis_per_subregion(orig_patch_diff, options['subregions'],
threshold, tmp_clump, tmp_cat_clump)
subregions_data = patch_analysis_per_subregion_parallel(
orig_patch_diff,
options["subregions"],
threshold,
tmp_clump,
tmp_name,
nprocs,
)
# if there is just one column, write the previous analysis result
if len(subregions_data.keys()) > 1:
write_data = subregions_data
Expand Down Expand Up @@ -575,7 +653,6 @@ def main():
histogram_compactness_orig = histogram_compactness_orig * 100 # to get percentage for readability

seed = int(options['random_seed'])
nprocs = int(options['nprocs'])
count = 0
proc_count = 0
queue_list = []
Expand Down
3 changes: 2 additions & 1 deletion r.futures/r.futures.calib/testsuite/test_r_futures_calib.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ def test_pga_calib_library_subregions(self):
development_end='urban_2002',
patch_threshold=0,
subregions='zipcodes',
patch_sizes='data/out_library_subregion.csv')
patch_sizes='data/out_library_subregion.csv',
nprocs=8)
self.assertTrue(filecmp.cmp('data/out_library_subregion.csv', 'data/ref_library_subregion.csv', shallow=False),
"Patch libraries differ")

Expand Down

0 comments on commit baae2d2

Please sign in to comment.