Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hdxdsys 843 Add DTM data #168

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.10.0] - 2024-09-19

### Added

- IDP scraper

## [0.9.58] - 2024-09-18

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ classifiers = [
requires-python = ">=3.8"

dependencies = [
"hapi-schema>=0.8.15",
"hapi-schema>=0.8.17",
"hdx-python-api>= 6.3.4",
"hdx-python-country>= 3.7.8",
"hdx-python-database[postgresql]>= 1.3.1",
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ greenlet==3.1.0
# via sqlalchemy
gspread==6.1.2
# via hdx-python-scraper
hapi-schema==0.8.16
hapi-schema==0.8.17
# via hapi-pipelines (pyproject.toml)
hdx-python-api==6.3.4
# via
Expand Down
1 change: 1 addition & 0 deletions src/hapi/pipelines/app/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def main(
"core.yaml",
"food_security.yaml",
"funding.yaml",
"idps.yaml",
"national_risk.yaml",
"operational_presence.yaml",
"population.yaml",
Expand Down
22 changes: 22 additions & 0 deletions src/hapi/pipelines/app/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from hapi.pipelines.database.food_security import FoodSecurity
from hapi.pipelines.database.funding import Funding
from hapi.pipelines.database.humanitarian_needs import HumanitarianNeeds
from hapi.pipelines.database.idps import IDPs
from hapi.pipelines.database.locations import Locations
from hapi.pipelines.database.metadata import Metadata
from hapi.pipelines.database.national_risk import NationalRisk
Expand Down Expand Up @@ -174,6 +175,13 @@ def _create_configurable_scrapers(
_create_configurable_scrapers("national_risk", "national")
_create_configurable_scrapers("funding", "national")
_create_configurable_scrapers("refugees", "national")
_create_configurable_scrapers("idps", "national")
_create_configurable_scrapers(
"idps", "adminone", adminlevel=self.adminone
)
_create_configurable_scrapers(
"idps", "admintwo", adminlevel=self.admintwo
)
_create_configurable_scrapers("poverty_rate", "national")
_create_configurable_scrapers("conflict_event", "national")
_create_configurable_scrapers(
Expand Down Expand Up @@ -270,6 +278,19 @@ def output_refugees(self):
)
refugees.populate()

def output_idps(self):
if not self.themes_to_run or "idps" in self.themes_to_run:
results = self.runner.get_hapi_results(
self.configurable_scrapers["idps"]
)
idps = IDPs(
session=self.session,
metadata=self.metadata,
admins=self.admins,
results=results,
)
idps.populate()

def output_funding(self):
if not self.themes_to_run or "funding" in self.themes_to_run:
results = self.runner.get_hapi_results(
Expand Down Expand Up @@ -352,6 +373,7 @@ def output(self):
self.output_humanitarian_needs()
self.output_national_risk()
self.output_refugees()
self.output_idps()
self.output_funding()
self.output_poverty_rate()
self.output_conflict_event()
Expand Down
66 changes: 66 additions & 0 deletions src/hapi/pipelines/configs/idps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#National risk config file

idps_default:
scrapers_with_defaults:
- "dtm"
format: "csv"
use_hxl: True
admin_exact: True
input:
- "#affected+idps"
- "#date+reported"
- "#round+code"
- "#assessment+type"
- "#operation+name"
list:
- "#affected+idps"
- "#date+reported"
- "#round+code"
- "#assessment+type"
- "#operation+name"
output:
- "number_idps"
- "reporting_date"
- "round_number"
- "asessment_type"
- "operation"
output_hxl:
- "#affected+idps"
- "#date+reported"
- "#round+code"
- "#assessment+type"
- "#operation+name"

idps_national:
dtm:
dataset: "global-iom-dtm-from-api"
resource: "Global IOM DTM data for admin levels 0-2"
filter_cols:
- "#adm1+code"
prefilter: "#adm1+code is None"
admin:
- "#country+code"

idps_adminone:
dtm:
dataset: "global-iom-dtm-from-api"
resource: "Global IOM DTM data for admin levels 0-2"
filter_cols:
- "#adm1+code"
- "#adm2+code"
prefilter: "#adm1+code is not None and #adm2+code is None"
admin:
- "#country+code"
- "#adm1+code"

idps_admintwo:
dtm:
dataset: "global-iom-dtm-from-api"
resource: "Global IOM DTM data for admin levels 0-2"
filter_cols:
- "#adm1+code"
- "#adm2+code"
prefilter: "#adm1+code is not None and #adm2+code is not None"
admin:
- "#country+code"
- "#adm2+code"
99 changes: 99 additions & 0 deletions src/hapi/pipelines/database/idps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Functions specific to the refugees theme."""

from logging import getLogger
from typing import Dict

from hapi_schema.db_idps import DBIDPs
from sqlalchemy.orm import Session

from ..utilities.logging_helpers import add_message
from . import admins
from .base_uploader import BaseUploader
from .metadata import Metadata

logger = getLogger(__name__)


class IDPs(BaseUploader):
def __init__(
self,
session: Session,
metadata: Metadata,
admins: admins.Admins,
results: Dict,
):
super().__init__(session)
self._metadata = metadata
self._admins = admins
self._results = results

def populate(self) -> None:
# TODO: This might be better suited to just work with the DTM resource
# directly as done with HNO, rather than using a configurable scraper
logger.info("Populating IDPs table")
errors = set()
# self._results is a dictionary where the keys are the HDX dataset ID and the
# values are a dictionary with keys containing HDX metadata plus a "results" key
# containing the results, stored in a dictionary with admin levels as keys.
# There is only one dataset for now in the results dictionary, take first value
# (popitem returns a tuple with (key, value) so take the value)
dataset = self._results.popitem()[1]
dataset_name = dataset["hdx_stub"]
for admin_level, admin_results in dataset["results"].items():
# admin_results contains the keys "headers", "values", and "hapi_resource_metadata".
# admin_results["values"] is a list of dictionaries of the format:
# [{AFG: [1, 2], BFA: [3, 4]}, {AFG: [A, B], BFA: [C, D]} etc
# So the way to get info from it is values[i_hdx_key][pcode][i] where
# i is just an iterator for the number of rows for that particular p-code
resource_id = admin_results["hapi_resource_metadata"]["hdx_id"]
hxl_tags = admin_results["headers"][1]
values = admin_results["values"]
admin_codes = values[0].keys()
for admin_code in admin_codes:
Copy link
Contributor

@mcarans mcarans Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we're probably going to pause HAPI development this quarter, this may be a moot point, but for data that has already been set up in the right form by pipeline, I wonder if the simplicity of the YAML configurable scraper for reading the data is cancelled out by the complexity of the database upload code with the nested for loops.

That was my thinking for humanitarian needs where it looked much simpler (and more efficient) to read the file and upload to db in one step and not use a configurable scraper at all. This probably indicates we're missing the right kind of configurable reader as there would probably be commonality between IDPs and humanitarian needs reading. Just something to think about.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why you didn't use the configurable scraper for HNO, and now that makes so much sense. Indeed there are a lot of inefficiencies in my DTM implementation, but let's just close our eyes and get it out the door. If we end up moving forward with HAPI / standardization then it will get refactored to the DTM scraper I'd imagine.

admin2_code = admins.get_admin2_code_based_on_level(
admin_code=admin_code, admin_level=admin_level
)
duplicate_rows = set()
for row in zip(
*[
values[hxl_tags.index(tag)][admin_code]
for tag in hxl_tags
]
):
# Keeping these defined outside of the row for now
# as we may need to check for duplicates in the future
admin2_ref = self._admins.admin2_data[admin2_code]
assessment_type = row[hxl_tags.index("#assessment+type")]
date_reported = row[hxl_tags.index("#date+reported")]
reporting_round = row[hxl_tags.index("#round+code")]
operation = row[hxl_tags.index("#operation+name")]
duplicate_row_check = (
admin2_ref,
assessment_type,
date_reported,
reporting_round,
operation,
)
if duplicate_row_check in duplicate_rows:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why are there duplicate rows?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question, I plan on emailing DTM to ask

text = (
f"Duplicate row for admin code {admin2_code}, assessment type {assessment_type}, "
f"reporting round {reporting_round}, operation {operation}, reporting round "
f"{reporting_round}"
)
add_message(errors, dataset_name, text)
continue
idps_row = DBIDPs(
resource_hdx_id=resource_id,
admin2_ref=admin2_ref,
assessment_type=assessment_type,
reporting_round=reporting_round,
operation=operation,
population=row[hxl_tags.index("#affected+idps")],
reference_period_start=date_reported,
reference_period_end=date_reported,
)
self._session.add(idps_row)
duplicate_rows.add(duplicate_row_check)
self._session.commit()
for error in sorted(errors):
logger.error(error)
Loading