Skip to content

Commit

Permalink
[CT-352] catch and retry malformed json (#4982)
Browse files Browse the repository at this point in the history
* catch None and malformed json reponses

* add json.dumps for format

* format

* Cache registry request results. Avoid one request per version

* updated to be direct in type checking

* add changelog entry

* add back logic for none check

* PR feedback: memoize > global

* add checks for expected types and keys

* consolidated cache and retry logic

* minor cleanup for clarity/consistency

* add pr review suggestions

* update unit test

Co-authored-by: Jeremy Cohen <[email protected]>
  • Loading branch information
emmyoop and jtcohen6 authored Apr 5, 2022
1 parent 0b92f04 commit 7f953a6
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 36 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20220331-143923.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Catch more cases to retry package retrieval for deps pointing to the hub. Also start to cache the package requests.
time: 2022-03-31T14:39:23.952705-05:00
custom:
Author: emmyoop
Issue: "4849"
PR: "4982"
146 changes: 111 additions & 35 deletions core/dbt/clients/registry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import functools
from typing import Any, Dict, List
import requests
from dbt.events.functions import fire_event
from dbt.events.types import RegistryProgressMakingGETRequest, RegistryProgressGETResponse
from dbt.events.types import (
RegistryProgressMakingGETRequest,
RegistryProgressGETResponse,
RegistryIndexProgressMakingGETRequest,
RegistryIndexProgressGETResponse,
RegistryResponseUnexpectedType,
RegistryResponseMissingTopKeys,
RegistryResponseMissingNestedKeys,
RegistryResponseExtraNestedKeys,
)
from dbt.utils import memoized, _connection_exception_retry as connection_exception_retry
from dbt import deprecations
import os
Expand All @@ -12,55 +22,77 @@
DEFAULT_REGISTRY_BASE_URL = "https://hub.getdbt.com/"


def _get_url(url, registry_base_url=None):
def _get_url(name, registry_base_url=None):
if registry_base_url is None:
registry_base_url = DEFAULT_REGISTRY_BASE_URL
url = "api/v1/{}.json".format(name)

return "{}{}".format(registry_base_url, url)


def _get_with_retries(path, registry_base_url=None):
get_fn = functools.partial(_get, path, registry_base_url)
def _get_with_retries(package_name, registry_base_url=None):
get_fn = functools.partial(_get_cached, package_name, registry_base_url)
return connection_exception_retry(get_fn, 5)


def _get(path, registry_base_url=None):
url = _get_url(path, registry_base_url)
def _get(package_name, registry_base_url=None):
url = _get_url(package_name, registry_base_url)
fire_event(RegistryProgressMakingGETRequest(url=url))
# all exceptions from requests get caught in the retry logic so no need to wrap this here
resp = requests.get(url, timeout=30)
fire_event(RegistryProgressGETResponse(url=url, resp_code=resp.status_code))
resp.raise_for_status()

# It is unexpected for the content of the response to be None so if it is, raising this error
# will cause this function to retry (if called within _get_with_retries) and hopefully get
# a response. This seems to happen when there's an issue with the Hub.
# The response should always be a dictionary. Anything else is unexpected, raise error.
# Raising this error will cause this function to retry (if called within _get_with_retries)
# and hopefully get a valid response. This seems to happen when there's an issue with the Hub.
# Since we control what we expect the HUB to return, this is safe.
# See https://github.com/dbt-labs/dbt-core/issues/4577
if resp.json() is None:
raise requests.exceptions.ContentDecodingError(
"Request error: The response is None", response=resp
)
return resp.json()


def index(registry_base_url=None):
return _get_with_retries("api/v1/index.json", registry_base_url)


index_cached = memoized(index)


def packages(registry_base_url=None):
return _get_with_retries("api/v1/packages.json", registry_base_url)

# and https://github.com/dbt-labs/dbt-core/issues/4849
response = resp.json()

def package(name, registry_base_url=None):
response = _get_with_retries("api/v1/{}.json".format(name), registry_base_url)
if not isinstance(response, dict): # This will also catch Nonetype
error_msg = (
f"Request error: Expected a response type of <dict> but got {type(response)} instead"
)
fire_event(RegistryResponseUnexpectedType(response=response))
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

# check for expected top level keys
expected_keys = {"name", "versions"}
if not expected_keys.issubset(response):
error_msg = (
f"Request error: Expected the response to contain keys {expected_keys} "
f"but is missing {expected_keys.difference(set(response))}"
)
fire_event(RegistryResponseMissingTopKeys(response=response))
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

# check for the keys we need nested under each version
expected_version_keys = {"name", "packages", "downloads"}
all_keys = set().union(*(response["versions"][d] for d in response["versions"]))
if not expected_version_keys.issubset(all_keys):
error_msg = (
"Request error: Expected the response for the version to contain keys "
f"{expected_version_keys} but is missing {expected_version_keys.difference(all_keys)}"
)
fire_event(RegistryResponseMissingNestedKeys(response=response))
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

# all version responses should contain identical keys.
has_extra_keys = set().difference(*(response["versions"][d] for d in response["versions"]))
if has_extra_keys:
error_msg = (
"Request error: Keys for all versions do not match. Found extra key(s) "
f"of {has_extra_keys}."
)
fire_event(RegistryResponseExtraNestedKeys(response=response))
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

# Either redirectnamespace or redirectname in the JSON response indicate a redirect
# redirectnamespace redirects based on package ownership
# redirectname redirects based on package name
# Both can be present at the same time, or neither. Fails gracefully to old name

if ("redirectnamespace" in response) or ("redirectname" in response):

if ("redirectnamespace" in response) and response["redirectnamespace"] is not None:
Expand All @@ -74,15 +106,59 @@ def package(name, registry_base_url=None):
use_name = response["name"]

new_nwo = use_namespace + "/" + use_name
deprecations.warn("package-redirect", old_name=name, new_name=new_nwo)
deprecations.warn("package-redirect", old_name=package_name, new_name=new_nwo)

return response


_get_cached = memoized(_get)


def package(package_name, registry_base_url=None) -> Dict[str, Any]:
# returns a dictionary of metadata for all versions of a package
response = _get_with_retries(package_name, registry_base_url)
return response["versions"]


def package_version(package_name, version, registry_base_url=None) -> Dict[str, Any]:
# returns the metadata of a specific version of a package
response = package(package_name, registry_base_url)
return response[version]


def get_available_versions(package_name) -> List["str"]:
# returns a list of all available versions of a package
response = package(package_name)
return list(response)


def _get_index(registry_base_url=None):

url = _get_url("index", registry_base_url)
fire_event(RegistryIndexProgressMakingGETRequest(url=url))
# all exceptions from requests get caught in the retry logic so no need to wrap this here
resp = requests.get(url, timeout=30)
fire_event(RegistryIndexProgressGETResponse(url=url, resp_code=resp.status_code))
resp.raise_for_status()

# The response should be a list. Anything else is unexpected, raise an error.
# Raising this error will cause this function to retry and hopefully get a valid response.

response = resp.json()

if not isinstance(response, list): # This will also catch Nonetype
error_msg = (
f"Request error: The response type of {type(response)} is not valid: {resp.text}"
)
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

return response


def package_version(name, version, registry_base_url=None):
return _get_with_retries("api/v1/{}/{}.json".format(name, version), registry_base_url)
def index(registry_base_url=None) -> List[str]:
# this returns a list of all packages on the Hub
get_index_fn = functools.partial(_get_index, registry_base_url)
return connection_exception_retry(get_index_fn, 5)


def get_available_versions(name):
response = package(name)
return list(response["versions"])
index_cached = memoized(index)
66 changes: 66 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,25 @@ def message(self) -> str:
return f" Checked out at {self.end_sha}."


@dataclass
class RegistryIndexProgressMakingGETRequest(DebugLevel):
url: str
code: str = "M022"

def message(self) -> str:
return f"Making package index registry request: GET {self.url}"


@dataclass
class RegistryIndexProgressGETResponse(DebugLevel):
url: str
resp_code: int
code: str = "M023"

def message(self) -> str:
return f"Response from registry index: GET {self.url} {self.resp_code}"


@dataclass
class RegistryProgressMakingGETRequest(DebugLevel):
url: str
Expand All @@ -310,6 +329,45 @@ def message(self) -> str:
return f"Response from registry: GET {self.url} {self.resp_code}"


@dataclass
class RegistryResponseUnexpectedType(DebugLevel):
response: str
code: str = "M024"

def message(self) -> str:
return f"Response was None: {self.response}"


@dataclass
class RegistryResponseMissingTopKeys(DebugLevel):
response: str
code: str = "M025"

def message(self) -> str:
# expected/actual keys logged in exception
return f"Response missing top level keys: {self.response}"


@dataclass
class RegistryResponseMissingNestedKeys(DebugLevel):
response: str
code: str = "M026"

def message(self) -> str:
# expected/actual keys logged in exception
return f"Response missing nested keys: {self.response}"


@dataclass
class RegistryResponseExtraNestedKeys(DebugLevel):
response: str
code: str = "M027"

def message(self) -> str:
# expected/actual keys logged in exception
return f"Response contained inconsistent keys: {self.response}"


# TODO this was actually `logger.exception(...)` not `logger.error(...)`
@dataclass
class SystemErrorRetrievingModTime(ErrorLevel):
Expand Down Expand Up @@ -2422,6 +2480,14 @@ def message(self) -> str:
GitNothingToDo(sha="")
GitProgressUpdatedCheckoutRange(start_sha="", end_sha="")
GitProgressCheckedOutAt(end_sha="")
RegistryIndexProgressMakingGETRequest(url="")
RegistryIndexProgressGETResponse(url="", resp_code=1234)
RegistryProgressMakingGETRequest(url="")
RegistryProgressGETResponse(url="", resp_code=1234)
RegistryResponseUnexpectedType(response=""),
RegistryResponseMissingTopKeys(response=""),
RegistryResponseMissingNestedKeys(response=""),
RegistryResponseExtraNestedKeys(response=""),
SystemErrorRetrievingModTime(path="")
SystemCouldNotWrite(path="", reason="", exc=Exception(""))
SystemExecutingCmd(cmd=[""])
Expand Down
6 changes: 6 additions & 0 deletions test/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,12 @@ def MockNode():
PrintDebugStackTrace(),
MainReportArgs(args={}),
RegistryProgressMakingGETRequest(url=''),
RegistryIndexProgressMakingGETRequest(url=""),
RegistryIndexProgressGETResponse(url="", resp_code=1),
RegistryResponseUnexpectedType(response=""),
RegistryResponseMissingTopKeys(response=""),
RegistryResponseMissingNestedKeys(response=""),
RegistryResponseExtraNestedKeys(response=""),
DepsUTD(),
PartialParsingNotEnabled(),
SQlRunnerException(exc=Exception('')),
Expand Down
1 change: 0 additions & 1 deletion tests/functional/permission/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def test_create_schema_permissions(
project,
):
# now it should work!
# breakpoint()
project.run_sql("grant create on database {} to noaccess".format(project.database))
project.run_sql(
'grant usage, create on schema "{}" to noaccess'.format(project.test_schema)
Expand Down

0 comments on commit 7f953a6

Please sign in to comment.