Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK: SingleUseRefreshTokenOauth2Authenticator update config with access tokens and expiration date #20923

Merged
merged 11 commits into from
Jan 3, 2023
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.16.3
Do not eagerly refresh access token in `SingleUseRefreshTokenOauth2Authenticator` [#20923](https://github.com/airbytehq/airbyte/pull/20923)

## 0.16.2
Fix the naming of OAuthAuthenticator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,36 +104,39 @@ def __init__(
connector_config: Mapping[str, Any],
token_refresh_endpoint: str,
scopes: List[str] = None,
token_expiry_date: pendulum.DateTime = None,
token_expiry_date_format: str = None,
access_token_name: str = "access_token",
expires_in_name: str = "expires_in",
refresh_token_name: str = "refresh_token",
refresh_request_body: Mapping[str, Any] = None,
grant_type: str = "refresh_token",
client_id_config_path: Sequence[str] = ("credentials", "client_id"),
client_secret_config_path: Sequence[str] = ("credentials", "client_secret"),
access_token_config_path: Sequence[str] = ("credentials", "access_token"),
refresh_token_config_path: Sequence[str] = ("credentials", "refresh_token"),
access_token_expiration_datetime_config_path: Sequence[str] = ("credentials", "access_token_expiration_datetime"),
):
"""

Args:
connector_config (Mapping[str, Any]): The full connector configuration
token_refresh_endpoint (str): Full URL to the token refresh endpoint
scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None.
token_expiry_date (pendulum.DateTime, optional): Datetime at which the current token will expire. Defaults to None.
access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token".
expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in".
refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token".
refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None.
grant_type (str, optional): OAuth grant type. Defaults to "refresh_token".
client_id_config_path (Sequence[str]): Dpath to the client_id field in the connector configuration. Defaults to ("credentials", "client_id").
client_secret_config_path (Sequence[str]): Dpath to the client_secret field in the connector configuration. Defaults to ("credentials", "client_secret").
access_token_config_path (Sequence[str]): Dpath to the access_token field in the connector configuration. Defaults to ("credentials", "access_token").
refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token").
access_token_expiration_datetime_config_path (Sequence[str]): Dpath to the access_token_expiration_datetime field in the connector configuration. Defaults to ("credentials", "access_token_expiration_datetime").
"""
self._client_id_config_path = client_id_config_path
self._client_secret_config_path = client_secret_config_path
self._access_token_config_path = access_token_config_path
self._refresh_token_config_path = refresh_token_config_path
self._access_token_expiration_datetime_config_path = access_token_expiration_datetime_config_path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also set the initial access_token from the config as part of the initialization - otherwise I believe we wouldn't have an access token and require it to refresh, right?

GitLab was doing this manually:

now = pendulum.now()
self.access_token = access_token
self.set_token_expiry_date(now.add(seconds=access_token_info["expires_in"]))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the change and added a test to ensure access_token attribute is set on init.

self._refresh_token_name = refresh_token_name
self._connector_config = observe_connector_config(connector_config)
self._validate_connector_config()
Expand All @@ -142,13 +145,12 @@ def __init__(
self.get_client_id(),
self.get_client_secret(),
self.get_refresh_token(),
scopes,
token_expiry_date,
token_expiry_date_format,
access_token_name,
expires_in_name,
refresh_request_body,
grant_type,
scopes=scopes,
token_expiry_date=self.get_access_token_expiration_datetime(),
access_token_name=access_token_name,
expires_in_name=expires_in_name,
refresh_request_body=refresh_request_body,
grant_type=grant_type
)

def _validate_connector_config(self):
Expand All @@ -161,6 +163,7 @@ def _validate_connector_config(self):
(self._client_id_config_path, self.get_client_id, "client_id_config_path"),
(self._client_secret_config_path, self.get_client_secret, "client_secret_config_path"),
(self._refresh_token_config_path, self.get_refresh_token, "refresh_token_config_path"),
(self._access_token_expiration_datetime_config_path, self.get_access_token_expiration_datetime, "access_token_expiration_datetime_config_path"),
]:
try:
assert getter()
Expand All @@ -181,13 +184,25 @@ def get_client_secret(self) -> str:
def get_refresh_token(self) -> str:
return dpath.util.get(self._connector_config, self._refresh_token_config_path)

def set_refresh_token(self, new_refresh_token: str):
"""Set the new refresh token value. The mutation of the connector_config object will emit an Airbyte control message.
def get_access_token_expiration_datetime(self) -> pendulum.DateTime:
return pendulum.parse(dpath.util.get(self._connector_config, self._access_token_expiration_datetime_config_path))


def _update_config_with_access_and_refresh_tokens(self, new_access_token: str, new_refresh_token: str, new_access_token_expiration_datetime: pendulum.DateTime):
"""Update the connector configuration with new access and refresh token values.
The mutation of the connector_config object will emit Airbyte control messages.

Args:
new_access_token (str): The new access token value.
new_refresh_token (str): The new refresh token value.
new_access_token_expiration_datetime (pendulum.DateTime): The new access token expiration date.
"""
# TODO alafanechere this will sequentially emit three control messages.
# We should rework the observer/config mutation logic if we want to have atomic config updates in a single control message.
dpath.util.set(self._connector_config, self._access_token_config_path, new_access_token)
dpath.util.set(self._connector_config, self._refresh_token_config_path, new_refresh_token)
dpath.util.set(self._connector_config, self._access_token_expiration_datetime_config_path, new_access_token_expiration_datetime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this means as currently implemented during the sync the platform will perform 3 separate config updates (api calls) for each of these pieces

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discarded the use of ObservedDict and directly call a function from this method to emit the control message.



def get_access_token(self) -> str:
"""Retrieve new access and refresh token if the access token has expired.
Expand All @@ -196,11 +211,10 @@ def get_access_token(self) -> str:
str: The current access_token, updated if it was previously expired.
"""
if self.token_has_expired():
t0 = pendulum.now()
new_access_token, access_token_expires_in, new_refresh_token = self.refresh_access_token()
self.access_token = new_access_token
self.set_token_expiry_date(t0, access_token_expires_in)
self.set_refresh_token(new_refresh_token)
self.set_token_expiry_date(pendulum.now("UTC"), access_token_expires_in)
self._update_config_with_access_and_refresh_tokens(new_access_token, new_refresh_token, self.get_token_expiry_date())
return self.access_token

def refresh_access_token(self) -> Tuple[str, int, str]:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.16.2",
version="0.16.3",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging

import freezegun
import pendulum
import pytest
import requests
Expand Down Expand Up @@ -188,6 +189,7 @@ def connector_config(self):
"refresh_token": "my_refresh_token",
"client_id": "my_client_id",
"client_secret": "my_client_secret",
"access_token_expiration_datetime": "2022-12-31T00:00:00+00:00"
}
}

Expand All @@ -209,6 +211,7 @@ def test_init_with_invalid_config(self, invalid_connector_config):
token_refresh_endpoint="foobar",
)

@freezegun.freeze_time("2022-12-31")
def test_get_access_token(self, capsys, mocker, connector_config):
authenticator = SingleUseRefreshTokenOauth2Authenticator(
connector_config,
Expand All @@ -218,9 +221,11 @@ def test_get_access_token(self, capsys, mocker, connector_config):
authenticator.token_has_expired = mocker.Mock(return_value=True)
access_token = authenticator.get_access_token()
captured = capsys.readouterr()
airbyte_message = json.loads(captured.out)
airbyte_message = json.loads(captured.out.split("\n")[-2])
expected_new_config = connector_config.copy()
expected_new_config["credentials"]["access_token"] = "new_access_token"
expected_new_config["credentials"]["refresh_token"] = "new_refresh_token"
expected_new_config["credentials"]["access_token_expiration_datetime"] = "2022-12-31T00:00:42+00:00"
assert airbyte_message["control"]["connectorConfig"]["config"] == expected_new_config
assert authenticator.access_token == access_token == "new_access_token"
assert authenticator.get_refresh_token() == "new_refresh_token"
Expand Down