Skip to content

Commit

Permalink
Make DBConnection context manager more flexible (#190)
Browse files Browse the repository at this point in the history
* Make DBConnection context manager more flexible

enabling it to return a connection, hook or SQL Alchemy Engine object

* Fix CI test workflow
* Update info on Docker installation
* Remove deprecated version attribute from docker compose

Refactor `test_db_to_db_operator.py`:
* Add missing docstrings
* Apply black formatting
* Reorder imports
* Refactor file opening, solve pylint warning
* Add missing import
* Disable pylint warning on long lines of lorem ipsum
* Remove unnecessary parenthesis
* Split too long string
  • Loading branch information
augusto-herrmann committed Apr 8, 2024
1 parent a215529 commit 0b1ced7
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 98 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
.PHONY: setup
setup:
docker-compose -f tests/docker-compose.yml up -d --force-recreate --remove-orphans
docker compose -f tests/docker-compose.yml up -d --force-recreate --remove-orphans

.PHONY: down
down:
docker-compose -f tests/docker-compose.yml down
docker compose -f tests/docker-compose.yml down

.PHONY: tests
tests:
Expand Down
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,7 @@ The test suite uses Docker containers to simulate a complete use
environment, including Airflow and the databases. For that reason, to
execute the tests, you first need to install Docker and docker-compose.

For people using Ubuntu 20.04, you can just type on the terminal:

```bash
snap install docker
```

For other versions and operating systems, see the
For instructions on how to do this, see the
[official Docker documentation](https://docs.docker.com/get-docker/).


Expand Down
37 changes: 21 additions & 16 deletions fastetl/custom_functions/utils/db_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,45 @@


class DbConnection:
"""Gera as conexões origem e destino dependendo do tipo de provider.
Providers disponíveis: 'mssql', 'postgres' e 'mysql'.
"""
Gera as conexões origem e destino dependendo do tipo de provider.
Providers disponíveis: 'mssql', 'postgres' e 'mysql'
"""

def __init__(self, conn_id: str):
def __init__(self, conn_id: str, use: str = "connection"):
"""Cria uma instância do context manager DBConnection.
Args:
conn_id (str): id da conexão no Airflow.
use (str, optional): "connection", "hook" ou "engine".
Determina o que será retornado pelo context manager
(cláusula "with"). Defaults to "connection".
"""
self.conn_type = get_conn_type(conn_id)
self.use = use
self.conn = None

if self.conn_type == "mssql":
self.mssql_conn_string = get_mssql_odbc_conn_str(
conn_id=conn_id, raw_str=True
)
else:
self.hook, _ = get_hook_and_engine_by_provider(conn_id)
self.hook, self.engine = get_hook_and_engine_by_provider(conn_id)

def __enter__(self):
if self.conn_type == "mssql":
try:
self.conn = pyodbc.connect(self.mssql_conn_string)
except Exception as exc:
raise Exception(
f"{self.conn_type} connection failed."
) from exc
raise IOError(f"{self.conn_type} connection failed.") from exc
else:
try:
self.conn = self.hook.get_conn()
except Exception as exc:
raise Exception(
f"{self.conn_type} connection failed."
) from exc
raise IOError(f"{self.conn_type} connection failed.") from exc

if self.use == "engine":
return self.engine
if self.use == "hook":
return self.hook
return self.conn

def __exit__(self, exc_type, exc_value, traceback):
Expand Down Expand Up @@ -154,9 +161,7 @@ def get_mssql_odbc_conn_str(conn_id: str, raw_str: bool = False) -> str:
if raw_str:
return mssql_conn_str

connection_url = URL.create(
"mssql+pyodbc", query={"odbc_connect": mssql_conn_str}
)
connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": mssql_conn_str})

return connection_url

Expand All @@ -168,7 +173,7 @@ def get_mssql_odbc_engine(conn_id: str, **kwargs):

return create_engine(get_mssql_odbc_conn_str(conn_id), **kwargs)


def get_hook_and_engine_by_provider(conn_id: str) -> Tuple[DbApiHook, Engine]:
"""
Creates connection hook and engine by connection type/provider.
Expand Down
1 change: 0 additions & 1 deletion tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3'
services:
airflow:
build: ../.
Expand Down
Loading

0 comments on commit 0b1ced7

Please sign in to comment.