Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use sqlalchemy_url property in get_uri for postgresql provider #38831

Merged
merged 23 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f2c3fb7
update get_uri
rawwar Apr 8, 2024
fb4568e
update get_uri
rawwar Apr 8, 2024
718c42d
update docstring
rawwar Apr 8, 2024
a7c3956
Merge branch 'main' into kalyan/connections/38187
rawwar Apr 8, 2024
7bb8687
add and use sa_uri property
rawwar Apr 8, 2024
2a7765e
Merge branch 'main' into kalyan/connections/38187
rawwar Apr 8, 2024
bf48b0d
update database in sa_uri
rawwar Apr 8, 2024
c80a439
Merge branch 'main' into kalyan/connections/38187
rawwar Apr 9, 2024
45a51a6
update tests
rawwar Apr 9, 2024
2a41dff
remove client_encoding from test_get_uri
rawwar Apr 9, 2024
9ac3a35
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Apr 14, 2024
bf10733
use sqlalchemy_url property
rawwar Apr 14, 2024
6eea77d
add default port
rawwar Apr 14, 2024
b332c78
update tests
rawwar Apr 14, 2024
ed4c2a2
update usage of ports
rawwar Apr 14, 2024
7587ae2
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Apr 19, 2024
d101c28
Merge branch 'main' into kalyan/connections/38187
rawwar Apr 27, 2024
d59b56b
Merge branch 'main' into kalyan/connections/38187
rawwar Apr 30, 2024
702d963
Merge branch 'main' into kalyan/connections/38187
rawwar May 3, 2024
28801e8
revert client_encoding updates
rawwar May 6, 2024
4d48b2f
Merge branch 'main' into kalyan/connections/38187
rawwar May 6, 2024
3a99f5c
Merge branch 'main' into kalyan/connections/38187
rawwar May 6, 2024
fc827f7
Merge branch 'main' into kalyan/connections/38187
rawwar May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions airflow/providers/postgres/hooks/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import psycopg2.extras
from deprecated import deprecated
from psycopg2.extras import DictCursor, NamedTupleCursor, RealDictCursor
from sqlalchemy.engine import URL

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.sql.hooks.sql import DbApiHook
Expand Down Expand Up @@ -113,6 +114,18 @@ def schema(self):
def schema(self, value):
self.database = value

@property
def sqlalchemy_url(self) -> URL:
conn = self.get_connection(getattr(self, self.conn_name_attr))
return URL.create(
drivername="postgresql",
rawwar marked this conversation as resolved.
Show resolved Hide resolved
username=conn.login,
password=conn.password,
host=conn.host,
port=conn.port,
database=self.database or conn.schema,
rawwar marked this conversation as resolved.
Show resolved Hide resolved
)

def _get_cursor(self, raw_cursor: str) -> CursorType:
_cursor = raw_cursor.lower()
cursor_types = {
Expand Down Expand Up @@ -159,6 +172,10 @@ def get_conn(self) -> connection:
]:
conn_args[arg_name] = arg_val

client_encoding = conn.extra_dejson.get("client_encoding")
if isinstance(client_encoding, str):
conn_args["client_encoding"] = client_encoding

rawwar marked this conversation as resolved.
Show resolved Hide resolved
self.conn = psycopg2.connect(**conn_args)
return self.conn

Expand Down Expand Up @@ -186,12 +203,9 @@ def copy_expert(self, sql: str, filename: str) -> None:
def get_uri(self) -> str:
"""Extract the URI from the connection.

:return: the extracted uri.
:return: the extracted URI in Sqlalchemy URI format.
"""
conn = self.get_connection(getattr(self, self.conn_name_attr))
conn.schema = self.database or conn.schema
uri = conn.get_uri().replace("postgres://", "postgresql://")
return uri
return self.sqlalchemy_url.render_as_string(hide_password=False)

def bulk_load(self, table: str, tmp_file: str) -> None:
"""Load a tab-delimited file into a database table."""
Expand Down
14 changes: 10 additions & 4 deletions tests/providers/postgres/hooks/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ def test_get_conn(self, mock_connect):

@mock.patch("airflow.providers.postgres.hooks.postgres.psycopg2.connect")
def test_get_uri(self, mock_connect):
self.connection.extra = json.dumps({"client_encoding": "utf-8"})
self.connection.conn_type = "postgres"
self.connection.port = 5432
self.db_hook.get_conn()
assert mock_connect.call_count == 1
assert self.db_hook.get_uri() == "postgresql://login:password@host/database?client_encoding=utf-8"
assert self.db_hook.get_uri() == "postgresql://login:password@host:5432/database"

@mock.patch("airflow.providers.postgres.hooks.postgres.psycopg2.connect")
def test_get_conn_cursor(self, mock_connect):
Expand Down Expand Up @@ -155,10 +155,16 @@ def test_get_conn_rds_iam_postgres(self, mock_aws_hook_class, mock_connect, aws_

@mock.patch("airflow.providers.postgres.hooks.postgres.psycopg2.connect")
def test_get_conn_extra(self, mock_connect):
self.connection.extra = '{"connect_timeout": 3}'
self.connection.extra = json.dumps({"client_encoding": "utf-8", "connect_timeout": 3})
self.db_hook.get_conn()
mock_connect.assert_called_once_with(
user="login", password="password", host="host", dbname="database", port=None, connect_timeout=3
user="login",
password="password",
host="host",
dbname="database",
port=None,
connect_timeout=3,
client_encoding="utf-8",
)

@mock.patch("airflow.providers.postgres.hooks.postgres.psycopg2.connect")
Expand Down