Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add class task management + csv path from config #138

Open
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

rv2931
Copy link
Collaborator

@rv2931 rv2931 commented Mar 24, 2024

L'idée est d'intégrer la proposition de gestion des tâches sur la base d'une classe BaseTask permettent de lancer des tâches et groupes de tâches de manière simple mais tracées avec gestion des erreurs et envoi d'email

Pour une plus grande flexibilité, le nom des fichiers CSV est migré vers un paramètre de config bloom.config, ceux-ci étant du coup surchargeable via le fichier de config .env.*

  • AMP_DATA_CSV_PATH=
  • PORT_DATA_CSV_PATH=
  • VESSEL_DATA_CSV_PATH=
  • VESSEL_POSITIONS_DATA_CSV_PATH=

Utilisation des tasks unitaires

  • /venv/bin/python src/tasks/facts/load_spire_data_from_json.py /path/to/file.json
  • /venv/bin/python src/tasks/facts/load_spire_data_from_api.py -d /path/to/dump

Utilisation des groupes de tasks (Pipeline)

  • /venv/bin/python src/tasks/load_dimensions.py
(venv) root@75af1fe86f04:/project# python src/tasks/load_dimensions.py
[bloom INFO @ 21:08:34] Starting task LoadDimensions
[bloom INFO @ 21:08:34] Starting task LoadAmpDataTask
[bloom INFO @ 21:08:42] Task LoadAmpDataTask finished
[bloom INFO @ 21:08:42] Starting task LoadPortDataTask
[bloom INFO @ 21:08:42] Task LoadPortDataTask finished
[bloom INFO @ 21:08:42] Starting task LoadVesselsDataTask
[bloom INFO @ 21:08:43] Task LoadVesselsDataTask finished
[bloom INFO @ 21:08:43] Task LoadDimensions finished
  • /venv/bin/python src/tasks/load_facts.py
(venv) root@75af1fe86f04:/project# python src/tasks/load_facts.py
[bloom INFO @ 21:10:11] Starting task LoadFacts
[bloom INFO @ 21:10:11] Starting task LoadVesselPositionsDataTask
[bloom INFO @ 21:10:12] Task LoadVesselPositionsDataTask finished
[bloom INFO @ 21:10:12] Task LoadFacts finished
  • /venv/bin/python src/tasks/pipeline_ports.py
(venv) root@75af1fe86f04:/project# python src/tasks/pipeline_ports.py
[bloom INFO @ 20:59:48] Starting task PipelinePorts
[bloom INFO @ 20:59:48] Starting task ComputePortGeometryBuffer
[bloom INFO @ 20:59:48] Point in time=None
[bloom INFO @ 20:59:49] 0 buffer de ports mis à jour
[bloom INFO @ 20:59:49] Task ComputePortGeometryBuffer finished
[bloom INFO @ 20:59:49] Starting task UpdateVesselDataVoyage
[bloom INFO @ 20:59:49] Point in time=2024-03-26 20:52:15.294403+00:00
[bloom INFO @ 20:59:49] 0 bateaux à traiter
[bloom INFO @ 20:59:49] 0 données SPIRE traitées
[bloom INFO @ 20:59:49] 0 données statiques mises à jour
[bloom INFO @ 20:59:49] 0 données de voyage mises à jour
[bloom INFO @ 20:59:49] Task UpdateVesselDataVoyage finished
[bloom INFO @ 20:59:49] Task PipelinePorts finished

Passage de paramètres

Les paramètres des tâches sont passés directement à la création

task=MyTask(param,mykey=value)
task.start()

Le paramètre "param" sera accessible via l'argument args de la fonction run(self,*args,**kwargs)
print(args['param'])
Le paramètre "mykey" sera accessible via l'argument kwargs de la fonction run(self,*args,**kwargs)
print(kwargs['mykey'])

Si la tâche est correctement écrite il est possible de passer les paramètres directement en ligne de commande

Les paramètres sont passés de la tâche parente à la tâche enfant en les précisant explicitement

class LoadDimensions(BaseTask):
    
    def run(self,*args,**kwargs):
        LoadAmpDataTask(*args,**kwargs).start()
        LoadPortDataTask(*args,**kwargs).start()
        LoadVesselsDataTask(*args,**kwargs).start()


if __name__ == "__main__":
    task= LoadDimensions(*list(arg for arg in sys.argv[1:] if arg.find('=') <= 0 ),
                         **dict(arg.split('=') for arg in sys.argv[1:] if arg.find('=') > 0))
    task.start()

@rv2931 rv2931 force-pushed the task_mgmt branch 3 times, most recently from 9a6e8b0 to 8eb4337 Compare March 26, 2024 20:08
@rv2931 rv2931 requested a review from njouanin March 26, 2024 21:06
@njouanin
Copy link
Collaborator

j'ai l'erreur suivante lorsque je lance le traitement init.py:

Traceback (most recent call last):
  File "/Users/nicolasjouanin/Dev/python/bloom/12_bloom/src/tasks/init.py", line 2, in <module>
    from tasks.data import LoadAmpDataTask, LoadPortDataTask, LoadVesselsDataTask, LoadVesselPositionsDataTask
ImportError: cannot import name 'LoadVesselPositionsDataTask' from 'tasks.data' (/Users/nicolasjouanin/Dev/python/bloom/12_bloom/src/tasks/data/__init__.py)

src/tasks/base.py Outdated Show resolved Hide resolved
src/tasks/data/load_amp_data.py Outdated Show resolved Hide resolved
src/tasks/data/load_port_data.py Outdated Show resolved Hide resolved
src/tasks/data/load_vessels_data.py Outdated Show resolved Hide resolved
src/tasks/facts/load_vessel_positions_data.py Outdated Show resolved Hide resolved
src/tasks/load_dimensions.py Outdated Show resolved Hide resolved
src/tasks/dimensions/load_dim_zone_amp_from_csv.py Outdated Show resolved Hide resolved
src/tasks/load_dimensions.py Show resolved Hide resolved
@rv2931
Copy link
Collaborator Author

rv2931 commented Mar 26, 2024

corrigé pour la tâche init, en fait je voulais la supprimer au profit de LoadDimensions et LoadFacts, mais au final ça peut avoir un sens que Init appelle Loaddimensions et LoadFacts donc je l'ai laissée et mise à jour

@rv2931
Copy link
Collaborator Author

rv2931 commented Mar 26, 2024

Oui, je voulais que tu regardes pour me dire quels traitement étaient toujours d'actualité et lesquels potentiellement pourrait grouper dans un LoadDimensions, LoadFacts, PipelineXXX et éventuel Init
Merci pour la review je regarde tout ça demain et je corrige

@njouanin
Copy link
Collaborator

Oui, je voulais que tu regardes pour me dire quels traitement étaient toujours d'actualité et lesquels potentiellement pourrait grouper dans un LoadDimensions, LoadFacts, PipelineXXX et éventuel Init

pour l'instant j'avais laissé les anciens traitements. Mais en gros tout ce qui charge directement du CSV vers des tables en passant par Dataframe est à supprimer.

@rv2931
Copy link
Collaborator Author

rv2931 commented Mar 26, 2024

Hésite pas à propose une autre répartition logique
Là j'ai fait

  • tasks/dimensions
  • tasks/facts
  • tasks/transformations
    et mis les pipelines groupés à la racine de tasks mais c'est qu'une proposition

@njouanin
Copy link
Collaborator

et convert_spire_vessels_to_spire_ais_data.py ne devrait servir qu'une fois (en prod) pour convertir les données de l'ancienne table de positions vers la nouvelle.

@njouanin
Copy link
Collaborator

Hésite pas à propose une autre répartition logique Là j'ai fait

* tasks/dimensions

* tasks/facts

* tasks/transformations
  et mis les pipelines groupés à la racine de tasks mais c'est qu'une proposition

OK, je te laisse terminer, je regarderai demain 🥱... et on reparle demain à 19h en visio si tu es dispo.

@rv2931 rv2931 marked this pull request as ready for review March 28, 2024 20:47
@rv2931 rv2931 force-pushed the task_mgmt branch 4 times, most recently from 56c1164 to fc93bb3 Compare March 28, 2024 21:42
@rv2931
Copy link
Collaborator Author

rv2931 commented Mar 28, 2024

J'ai corrigé des erreurs de syntaxe Union
Mis à jour la dock et le chargement de via docker-compose-load-data pour les tasks init & pipeline_ais_data

Par contre je termine mon chargement avec une erreur
bloom-load-data | (psycopg2.errors.ForeignKeyViolation) insert or update on table "spire_vessel_positions" violates foreign key constraint "fk_spire_vessel_positions_vessels"
bloom-load-data | DETAIL: Key (vessel_id)=(1431) is not present in table "vessels".

Y aurait un problème dans l'ordonnancement des tâches ou bien dans les fichier CSV ? @njouanin a ton avis ?

bloom-load-data  | [bloom INFO @ 22:52:48] Starting task InitTask
bloom-load-data  | [bloom INFO @ 22:52:48] Starting task LoadDimensions
bloom-load-data  | [bloom INFO @ 22:52:48] Starting task LoadDimZoneAmpFromCsv
bloom-load-data  | [bloom INFO @ 22:52:54] 720 zone AMP créés
bloom-load-data  | [bloom INFO @ 22:52:54] Task LoadDimZoneAmpFromCsv finished
bloom-load-data  | [bloom INFO @ 22:52:54] Starting task LoadDimPortFromCsv
bloom-load-data  | [bloom INFO @ 22:52:56] 5873 ports(s) créés
bloom-load-data  | [bloom INFO @ 22:52:56] Task LoadDimPortFromCsv finished
bloom-load-data  | [bloom INFO @ 22:52:56] Starting task ComputePortGeometryBuffer
bloom-load-data  | [bloom INFO @ 22:52:56] Point in time=None
bloom-load-data  | [bloom INFO @ 22:52:59] 5873 buffer de ports mis à jour
bloom-load-data  | [bloom INFO @ 22:52:59] Task ComputePortGeometryBuffer finished
bloom-load-data  | [bloom INFO @ 22:52:59] Starting task LoadDimVesselFromCsv
bloom-load-data  | [bloom INFO @ 22:53:00] 1787 bateau(x) créés
bloom-load-data  | [bloom INFO @ 22:53:00] Task LoadDimVesselFromCsv finished
bloom-load-data  | [bloom INFO @ 22:53:00] Task LoadDimensions finished
bloom-load-data  | [bloom INFO @ 22:53:00] Starting task LoadFacts
bloom-load-data  | [bloom INFO @ 22:53:00] Starting task LoadVesselPositionsDataTask
postgres_bloom   | 2024-03-28 22:53:00.462 UTC [659] ERROR:  insert or update on table "spire_vessel_positions" violates foreign key constraint "fk_spire_vessel_positions_vessels"
postgres_bloom   | 2024-03-28 22:53:00.462 UTC [659] DETAIL:  Key (vessel_id)=(1431) is not present in table "vessels".

@rv2931
Copy link
Collaborator Author

rv2931 commented Mar 28, 2024

je pense qu'il y a un micmac entre la table vessel et la table dim_vessel
On est d'accord:

  • qu'il faut utiliser vessel_subset.csv ?
  • que ce fichier est chargé directement dans dim_vessel ?
  • qu'on charge les positions avec load_spire_data_from_api ?
  • que load_spire_data_from_api doit chercher ses vessel dans dim_vessel et non dans la table vessel ? => si oui ce n'est pas le ca actuellement donc le chargement des positions se fini avec une erreur que le vessel 1431 n'existe pas dans vessel, effectivement il est dans dim_vessel
    J'ai bon ou j'ai faux ?

@njouanin
Copy link
Collaborator

je pense qu'il y a un micmac entre la table vessel et la table dim_vessel On est d'accord:

* qu'il faut utiliser vessel_subset.csv ?

Oui

* que ce fichier est chargé directement dans dim_vessel ?

oui, via le traitement load_dim_vessel_from_csv.py

* qu'on charge les positions avec load_spire_data_from_api ?

oui, via le traitement load_spire_data_from_api.py

* que load_spire_data_from_api doit chercher ses vessel dans dim_vessel et non dans la table vessel ? => si oui ce n'est pas le ca actuellement donc le chargement des positions se fini avec une erreur que le vessel 1431 n'existe pas dans vessel, effectivement il est dans dim_vessel
  J'ai bon ou j'ai faux ?

@njouanin
Copy link
Collaborator

Peut-être pour une prochaine évolution, mais pourrait-on améliorer la visibilité du logging:

  • rendre plus visible l'exécution des différentes étapes
  • marquer le début/fin de chaque étape (+ temps d'exécution)

Pourquoi pas ? pousser les log dans un channel slack....

herve.le-bars added 2 commits April 7, 2024 13:46
herve.le-bars added 23 commits April 7, 2024 13:46
# Conflicts:
#	src/bloom/tasks/load_spire_data_from_api.py
# Conflicts:
#	src/bloom/tasks/update_vessel_data_voyage.py
Usage:
/venv/bin/python src/task/pipeline_ports.py
# Conflicts:
#	src/tasks/dimensions/load_dim_vessel_from_csv.py
ré-organisation des tâches.

# Conflicts:
#	src/tasks/dimensions/load_dim_vessel_from_csv.py
# Conflicts:
#	src/bloom/domain/excursion.py
#	src/bloom/domain/segment.py
# Conflicts:
#	src/tasks/dimensions/load_dim_vessel_from_csv.py
# Conflicts:
#	src/bloom/domain/segment.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants