From 9472c2e2247b8540f83a7ab6a562170bba53aadf Mon Sep 17 00:00:00 2001 From: Peter Oostewechel Date: Fri, 19 Nov 2021 22:36:24 +0100 Subject: [PATCH 1/8] Remove deprecated API V2 --- README.md | 44 ------- digikey/__init__.py | 1 - digikey/oauth/oauth2.py | 8 +- digikey/v2/__init__.py | 0 digikey/v2/api.py | 54 -------- digikey/v2/client.py | 145 --------------------- digikey/v2/directives.py | 58 --------- digikey/v2/models.py | 267 --------------------------------------- 8 files changed, 1 insertion(+), 576 deletions(-) delete mode 100644 digikey/v2/__init__.py delete mode 100644 digikey/v2/api.py delete mode 100644 digikey/v2/client.py delete mode 100644 digikey/v2/directives.py delete mode 100644 digikey/v2/models.py diff --git a/README.md b/README.md index bff63e5..954f5aa 100644 --- a/README.md +++ b/README.md @@ -114,47 +114,3 @@ The dict will be filled with the information returned from the API: } ``` Sometimes the API does not return any rate limit data, the values will then be set to None. - -# API V2 [Deprecated] -**NOTE: API V2 is not supported anymore by Digi-Key and you cannot register new applications** - -See API V3 above to use the new API. - -## Register -Register an app on the Digikey API portal: [Digi-Key API V2](https://api-portal.digikey.com/start). You will need the client -ID and the client secret to use the API. You will also need a Digi-Key account to authenticate, using the Oauth2 process. - -## Use -Python will automatically spawn a browser to allow you to authenticate using the Oauth2 process. After obtaining a token -the library will cache the access token and use the refresh token to automatically refresh your credentials. - -```python -import os -import digikey - -os.environ['DIGIKEY_CLIENT_ID'] = 'client_id' -os.environ['DIGIKEY_CLIENT_SECRET'] = 'client_secret' -os.environ['DIGIKEY_STORAGE_PATH'] = 'cache_dir' - -dkpn = '296-6501-1-ND' -part = digikey.part(dkpn) -print(part) -# - -print(part.manufacturer) -# 'Texas Instruments' -``` - -## Test -```sh -python -m pytest --cov=digikey --doctest-modules --ignore=setup.py -python -m mypy digikey --ignore-missing-imports -``` - -## Top-level API -* `digikey.search()` -* `digikey.part()` - -## Data models -* `digikey.models.KeywordSearchResult` -* `digikey.models.Part` \ No newline at end of file diff --git a/digikey/__init__.py b/digikey/__init__.py index 4a0387c..566f840 100644 --- a/digikey/__init__.py +++ b/digikey/__init__.py @@ -1,4 +1,3 @@ -from digikey.v2.api import (search, part) from digikey.v3.api import (keyword_search, product_details, digi_reel_pricing, suggested_parts, manufacturer_product_details) from digikey.v3.api import (status_salesorder_id, salesorder_history) diff --git a/digikey/oauth/oauth2.py b/digikey/oauth/oauth2.py index 809d65f..1568e18 100644 --- a/digikey/oauth/oauth2.py +++ b/digikey/oauth/oauth2.py @@ -19,9 +19,6 @@ CA_CERT = 'digikey-api.pem' TOKEN_STORAGE = 'token_storage.json' -AUTH_URL_V2 = 'https://sso.digikey.com/as/authorization.oauth2' -TOKEN_URL_V2 = 'https://sso.digikey.com/as/token.oauth2' - AUTH_URL_V3_PROD = 'https://api.digikey.com/v1/oauth2/authorize' TOKEN_URL_V3_PROD = 'https://api.digikey.com/v1/oauth2/token' @@ -108,10 +105,7 @@ def __init__(self, version: int = 2, sandbox: bool = False): - if version == 2: - self.auth_url = AUTH_URL_V2 - self.token_url = TOKEN_URL_V2 - elif version == 3: + if version == 3: if sandbox: self.auth_url = AUTH_URL_V3_SB self.token_url = TOKEN_URL_V3_SB diff --git a/digikey/v2/__init__.py b/digikey/v2/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/digikey/v2/api.py b/digikey/v2/api.py deleted file mode 100644 index d383625..0000000 --- a/digikey/v2/api.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -Top-level API, provides access to the Digikey API -without directly instantiating a client object. -Also wraps the response JSON in types that provide easier access -to various fields. -""" -from digikey.v2 import models -from digikey.v2.client import DigikeyClient - - -def search(query: str, - start: int = 0, - limit: int = 10, - ) -> models.KeywordSearchResult: - """ - Search Digikey for a general keyword (and optional filters). - Args: - query (str): Free-form keyword query - start: Ordinal position of first result - limit: Maximum number of results to return - Returns: - list of `models.KeywordSearchResult` objects. - """ - - client = DigikeyClient() - response = client.search( - query, - start=start, - limit=limit, - ) - return models.KeywordSearchResult(response) - - -def part(partnr: str, - include_associated: bool = False, - include_for_use_with: bool = False, - ) -> models.Part: - """ - Query part by unique ID - Args: - partnr (str): Part number. Works best with Digi-Key part numbers. - include_associated (bool): The option to include all Associated products - include_for_use_with (bool): The option to include all For Use With product - Kwargs: - Returns: - dict. See `models.Part` for exact fields. - """ - client = DigikeyClient() - response = client.part( - partnr, - include_associated=include_associated, - include_for_use_with=include_for_use_with, - ) - return models.Part(response['PartDetails']) diff --git a/digikey/v2/client.py b/digikey/v2/client.py deleted file mode 100644 index b35bfbf..0000000 --- a/digikey/v2/client.py +++ /dev/null @@ -1,145 +0,0 @@ -import logging -import os -import re -import typing as t -from pathlib import Path - -import requests - -from digikey.v2 import models -from digikey.constants import USER_AGENT -from digikey.decorators import retry -from digikey.exceptions import DigikeyError -from digikey.oauth.oauth2 import TokenHandler - -logger = logging.getLogger(__name__) - -DEFAULT_BASE_URL = 'https://api.digikey.com/services/partsearch/v2' - - -class DigikeyClient(object): - """Client object for Digikey API - Visit https://api-portal.digikey.com/ to get an app key and secret, then set it as - an environment variable or pass the key directly to this constructor. - """ - - def __init__(self, - a_id: t.Optional[str] = None, - a_secret: t.Optional[str] = None, - a_token_storage_path: t.Optional[str] = None, - base_url: t.Optional[str] = DEFAULT_BASE_URL - ) -> None: - - a_id = a_id or os.getenv('DIGIKEY_CLIENT_ID') - a_secret = a_secret or os.getenv('DIGIKEY_CLIENT_SECRET') - if not a_id or not a_secret: - raise ValueError( - "CLIENT ID and SECRET must be set. " - "Set 'DIGIKEY_CLIENT_ID' and 'DIGIKEY_CLIENT_SECRET' " - "as an environment variable, or pass your keys directly to the client." - ) - - a_token_storage_path = a_token_storage_path or os.getenv('DIGIKEY_STORAGE_PATH') - if not a_token_storage_path or not Path(a_token_storage_path).exists(): - raise ValueError( - "STORAGE PATH must be set and must exist." - "Set 'DIGIKEY_STORAGE_PATH' as an environment variable, " - "or pass your keys directly to the client." - ) - - self._id = a_id - self._secret = a_secret - self._token_storage_path = Path(a_token_storage_path).joinpath('token_storage.json') - self.base_url = base_url - self.oauth2 = TokenHandler().get_access_token() - - @property - def client_key_param(self) -> t.Dict[str, str]: - return {'clientid': self._id, - 'clientsecret': self._secret} - - @retry - def _request(self, - path: str, - data: t.Dict[str, t.Any]=None - ) -> t.Any: - headers = {'user-agent': USER_AGENT, - 'x-ibm-client-id': self._id, - 'authorization': self.oauth2.get_authorization()} - - response = requests.post('%s%s' % (self.base_url, path), json=data, headers=headers) - try: - rate_limit = re.split('[,;]+', response.headers['x-ratelimit-limit'])[1] - rate_limit_rem = re.split('[,;]+', response.headers['x-ratelimit-remaining'])[1] - logger.debug('Requested Digikey URI: {} [{}/{}]'.format(response.url, rate_limit_rem, rate_limit)) - except KeyError as e: - logger.debug('Requested Digikey URI: {}'.format(response.url)) - - response.raise_for_status() - return response.json() - - def search(self, - query: str, # maps to "keyword" parameter in Digikey API - start: int = 0, - limit: int = 10, - ) -> dict: - """ - Search for parts, using more fields and filter options than 'match'. - This calls the /parts/search endpoint of the Octopart API: - https://octopart.com/api/docs/v3/rest-api#endpoints-parts-search - Args: - query (str): free-form keyword query - start (int): ordinal position of first result - limit (int): maximum number of results to return - Kwargs: - Returns: - dict. See `models.PartsSearchResponse` for exact fields. - """ - data = { - 'keywords': query, - 'search_options': None, - 'record_count': limit, - 'record_start_pos': start, - 'filters': None, - 'sort': None, - 'requested_quantity': 1 - } - - if not models.KeywordSearchRequest.is_valid(data): - errors = models.KeywordSearchRequest.errors(data) - raise DigikeyError('Query is malformed: %s' % errors) - - # Convert `query` to format that Octopart accepts. - params = models.KeywordSearchRequest.camelize(models.KeywordSearchRequest(data).to_primitive()) - - return self._request('/keywordsearch', data=params) - - def part(self, - partnr: str, - include_associated: bool = False, - include_for_use_with: bool = False, - ) -> dict: - """ - Query part by unique ID - Args: - partnr (str): Part number. Works best with Digi-Key part numbers. - include_associated (bool): The option to include all Associated products - include_for_use_with (bool): The option to include all For Use With product - Kwargs: - Returns: - dict. See `models.Part` for exact fields. - """ - data = { - 'part': partnr, - 'include_all_associated_products': include_associated, - 'include_all_for_use_with_products': include_for_use_with - } - - if not models.PartDetailPostRequest.is_valid(data): - errors = models.PartDetailPostRequest.errors(data) - raise DigikeyError('Query is malformed: %s' % errors) - - # Convert `query` to format that Octopart accepts. - params = models.PartDetailPostRequest.camelize(models.PartDetailPostRequest(data).to_primitive()) - - return self._request('/partdetails', data=params) diff --git a/digikey/v2/directives.py b/digikey/v2/directives.py deleted file mode 100644 index 9104e7c..0000000 --- a/digikey/v2/directives.py +++ /dev/null @@ -1,58 +0,0 @@ -import enum -from typing import List - - -class IncludeDirectives(str, enum.Enum): - """Categories of information that can optionally be requested - These categories may be included in the include[] request parameter to - request additional information in the response content. - API docs: https://octopart.com/api/docs/v3/rest-api#include-directives - """ - short_description = 'short_description' - datasheets = 'datasheets' - compliance_documents = 'compliance_documents' - descriptions = 'descriptions' - imagesets = 'imagesets' - specs = 'specs' - category_uids = 'category_uids' - external_links = 'external_links' - reference_designs = 'reference_designs' - cad_models = 'cad_models' - - -def include_directives_from_kwargs(**kwargs) -> List[str]: - """Turn "include_"-prefixed kwargs into list of strings for the request - Arguments: - All keyword arguments whose name consists of "include_*" and an - entry of the INCLUDE enum are used to construct the output. All - others are ignored. - Known directives are included in the output if their value is truthy: - >>> include_directives_from_kwargs( - ... include_datasheets=True, include_specs=True, - ... include_imagesets=False) - ['datasheets', 'specs'] - Keyword args whose name starts with "include_" but don't match known - directives trigger an exception: - >>> include_directives_from_kwargs(include_abcdefg=True) - Traceback (most recent call last): - ... - ValueError: abcdefg is not a known include directive - However, keyword arguments not starting with "include_" are ignored - silently: - >>> include_directives_from_kwargs(abcdefg=True, include_specs=True) - ['specs'] - """ - includes = [] - - for kw_key, kw_val in kwargs.items(): - # filter for kwargs named include_* and value True - if kw_key.startswith('include_') and kw_val: - _, incl_key = kw_key.split('include_') - # only accept documented values for the include directive - if hasattr(IncludeDirectives, incl_key): - includes.append(incl_key) - else: - raise ValueError( - f"{incl_key} is not a known include directive") - - return includes \ No newline at end of file diff --git a/digikey/v2/models.py b/digikey/v2/models.py deleted file mode 100644 index 6759296..0000000 --- a/digikey/v2/models.py +++ /dev/null @@ -1,267 +0,0 @@ -""" -Types that wrap responses from the Octopart API -and make various attributes easier to access. -""" - -import inflection - -from schematics.exceptions import ConversionError, DataError, ValidationError -from schematics.models import Model -from schematics.types import BooleanType, IntType, StringType -from schematics.types.compound import ListType, ModelType - - -class BaseModel(Model): - @classmethod - def errors(cls, dict_): - """ - Wraps `schematics` validate method to return an error list instead of - having to catch an exception in the caller. - Returns: - list of validation errors, or None. - """ - try: - cls(dict_).validate() - return None - except (DataError, ValidationError) as err: - return err.messages - - @classmethod - def errors_list(cls, list_): - """ - Return any validation errors in list of dicts. - Args: - list_ (list): dicts to be validated. - Returns: - list of errors, if any, otherwise None. - """ - try: - errors = [cls(dict_).errors for dict_ in list_] - if any(errors): - return [_f for _f in errors if _f] - return None - except (ConversionError, DataError) as err: - return err.messages - - @classmethod - def is_valid(cls, dict_): - return not cls.errors(dict_) - - @classmethod - def is_valid_list(cls, list_): - try: - return all([cls(dict_).is_valid for dict_ in list_]) - except (ConversionError, DataError): - return False - - @classmethod - def camelize(cls, dict_): - return {inflection.camelize(k): v for k, v in dict_.items()} - - -class Filters(BaseModel): - """Query format sent to the search endpoint - https://api-portal.digikey.com/node/8517 - """ - - -class Sort(BaseModel): - """Query format sent to the search endpoint - https://api-portal.digikey.com/node/8517 - """ - - -class KeywordSearchRequest(BaseModel): - """Query format sent to the search endpoint - https://api-portal.digikey.com/node/8517 - """ - # Keywords to search on - keywords = StringType(required=True) - # Filters the search results by the included SearchOptions - search_options = ListType(StringType) - # Maximum number of items to return - record_count = IntType(default=10, min_value=1, max_value=50, required=True) - # Ordinal position of first returned item - record_start_pos = IntType(default=0) - # Set Filters to narrow down search response - filters = ModelType(Filters) - # Sort Parameters - sort = ModelType(Sort) - # The RequestedQuantity is used with the SortByUnitPrice Sort Option to sort by unit price at the RequestedQuantity - requested_quantity = IntType(default=1) - - -class PartDetailPostRequest(BaseModel): - """Query format sent to the partdetails endpoint - https://api-portal.digikey.com/node/8517 - """ - # Part number. Works best with Digi-Key part numbers. - part = StringType(required=True) - # The option to include all Associated products - include_all_associated_products = BooleanType() - # The option to include all For Use With products - include_all_for_use_with_products = BooleanType() - - -class KeywordSearchResult: - def __init__(self, result): - self._result = result - - @property - def parts(self): - return [ - Part(result) - for result in self._result.get('Parts', []) - ] - - def __repr__(self): - return '' % self._result['Results'] - - def pretty_print(self): - print(self) - for part in self.parts: - print('\t%s' % part) - - -''' -Helper classes for responses -''' - - -class PriceBreak: - def __init__(self, pricebreak: dict): - self._pricebreak = pricebreak - - @property - def breakquantity(self) -> int: - return self._pricebreak.get('BreakQuantity', 0) - - @property - def unitprice(self) -> float: - return self._pricebreak.get('UnitPrice', 0.0) - - @property - def totalprice(self) -> float: - return self._pricebreak.get('TotalPrice', 0.0) - - -class IdTextPair: - def __init__(self, idtextpair: dict): - self._idtextpair = idtextpair - - @property - def id(self) -> str: - return self._idtextpair.get('Id', '') - - @property - def text(self) -> str: - return self._idtextpair.get('Text', '') - - -class PidVid: - def __init__(self, pidvid: dict): - self._pidvid = pidvid - - @property - def parameter_id(self) -> int: - return self._pidvid.get('ParameterId', 0) - - @property - def value_id(self) -> int: - return self._pidvid.get('ValueId', 0) - - @property - def parameter(self) -> str: - return self._pidvid.get('Parameter', '') - - @property - def value(self) -> str: - return self._pidvid.get('Value', '') - - def __repr__(self): - return ''.format(self.parameter, self.value) - - -class Family: - def __init__(self, family: dict): - self._family = family - - @property - def id(self) -> str: - return self._family.get('Id', '') - - @property - def name(self) -> str: - return self._family.get('Name', '') - - @property - def part_count(self) -> int: - return self._family.get('PartCount', 0) - - -class Part: - def __init__(self, part: dict): - self._part = part - - @property - def standard_pricing(self) -> list: - return [ - PriceBreak(part) - for part in self._part.get('StandardPricing', []) - ] - - @property - def category(self) -> IdTextPair: - return IdTextPair(self._part.get('Category', {})) - - @property - def family(self) -> IdTextPair: - return IdTextPair(self._part.get('Family', {})) - - @property - def manufacturer(self) -> str: - return IdTextPair(self._part.get('ManufacturerName', {})).text - - @property - def mpn(self) -> str: - return self._part.get('ManufacturerPartNumber', None) - - @property - def part_status(self) -> str: - return self._part.get('PartStatus', None) - - @property - def digikey_pn(self) -> str: - return self._part.get('DigiKeyPartNumber', None) - - @property - def digikey_url(self) -> str: - return 'https://www.digikey.com' + self._part.get('PartUrl', '') - - @property - def in_stock(self) -> int: - return self._part.get('QuantityOnHand', None) - - @property - def moq(self) -> int: - return self._part.get('MinimumOrderQuantity', None) - - @property - def parameters(self) -> dict: - _params = [PidVid(param) for param in self._part.get('Parameters', [])] - return {p.parameter: p.value for p in _params} - - @property - def description_product(self) -> str: - return self._part.get('ProductDescription', None) - - @property - def description_detailed(self) -> str: - return self._part.get('DetailedDescription', None) - - @property - def datasheet(self) -> str: - return self._part.get('PrimaryDatasheet', None) - - def __repr__(self): - return '' % self.mpn From 636fa5973fcb86c00bd512f20d61a554ab5e3310 Mon Sep 17 00:00:00 2001 From: Electro707 Date: Sun, 30 May 2021 20:53:14 -0400 Subject: [PATCH 2/8] Changed so that a json config is fed in instead of environment variables --- digikey/__init__.py | 21 ++- digikey/configfile.py | 25 +++ digikey/oauth/oauth2.py | 19 +-- digikey/v3/api.py | 338 ++++++++++++++++++++++------------------ 4 files changed, 232 insertions(+), 171 deletions(-) create mode 100644 digikey/configfile.py diff --git a/digikey/__init__.py b/digikey/__init__.py index 566f840..1bd7614 100644 --- a/digikey/__init__.py +++ b/digikey/__init__.py @@ -1,6 +1,19 @@ -from digikey.v3.api import (keyword_search, product_details, digi_reel_pricing, suggested_parts, - manufacturer_product_details) -from digikey.v3.api import (status_salesorder_id, salesorder_history) -from digikey.v3.api import (batch_product_details) +import logging +from digikey.v2.api import (search, part) +from digikey.v3.api import DigikeyAPI + +logger = logging.getLogger(__name__) + + +def setup_logger(logger_ref): + logger_ref.setLevel(logging.DEBUG) + formatter = logging.Formatter( + '%(asctime)s - %(name)20.20s - %(levelname)8s: %(message)s') + handler = logging.StreamHandler() + handler.setFormatter(formatter) + logger_ref.addHandler(handler) + + +setup_logger(logger) name = 'digikey' diff --git a/digikey/configfile.py b/digikey/configfile.py new file mode 100644 index 0000000..6ef08e6 --- /dev/null +++ b/digikey/configfile.py @@ -0,0 +1,25 @@ +import os +import json + + +class DigikeyApiConfig: + def __init__(self, file_name): + self.file_name = file_name + # Get config from file if it exists + if os.path.exists(self.file_name): + with open(self.file_name, 'r') as f: + self.config = json.load(f) + else: + self.config = {} + + def save(self): + with open(self.file_name, 'w') as f: + json.dump(self.config, f) + + def get(self, what: str): + if what in self.config: + return self.config[what] + return None + + def set(self, what: str, to): + self.config[what] = to diff --git a/digikey/oauth/oauth2.py b/digikey/oauth/oauth2.py index 1568e18..0a1303e 100644 --- a/digikey/oauth/oauth2.py +++ b/digikey/oauth/oauth2.py @@ -15,9 +15,11 @@ from digikey.constants import USER_AGENT from digikey.exceptions import DigikeyOauthException +from digikey import configfile CA_CERT = 'digikey-api.pem' TOKEN_STORAGE = 'token_storage.json' +CONFIGURATION_TEMPORARY_PATH = '/tmp/dk_config' AUTH_URL_V3_PROD = 'https://api.digikey.com/v1/oauth2/authorize' TOKEN_URL_V3_PROD = 'https://api.digikey.com/v1/oauth2/token' @@ -99,9 +101,10 @@ class TokenHandler: Functions used to handle Digikey oAuth """ def __init__(self, + dg_config: configfile.DigikeyApiConfig, a_id: t.Optional[str] = None, a_secret: t.Optional[str] = None, - a_token_storage_path: t.Optional[str] = None, + # a_token_storage_path: t.Optional[str] = None, version: int = 2, sandbox: bool = False): @@ -117,8 +120,8 @@ def __init__(self, logger.debug(f'Using API V{version}') - a_id = a_id or os.getenv('DIGIKEY_CLIENT_ID') - a_secret = a_secret or os.getenv('DIGIKEY_CLIENT_SECRET') + a_id = a_id or dg_config.get('client-id') + a_secret = a_secret or dg_config.get('client-secret') if not a_id or not a_secret: raise ValueError( 'CLIENT ID and SECRET must be set. ' @@ -126,17 +129,9 @@ def __init__(self, 'as an environment variable, or pass your keys directly to the client.' ) - a_token_storage_path = a_token_storage_path or os.getenv('DIGIKEY_STORAGE_PATH') - if not a_token_storage_path or not Path(a_token_storage_path).exists(): - raise ValueError( - 'STORAGE PATH must be set and must exist.' - 'Set "DIGIKEY_STORAGE_PATH" as an environment variable, ' - 'or pass your keys directly to the client.' - ) - self._id = a_id self._secret = a_secret - self._storage_path = Path(a_token_storage_path) + self._storage_path = Path(CONFIGURATION_TEMPORARY_PATH) self._token_storage_path = self._storage_path.joinpath(TOKEN_STORAGE) self._ca_cert = self._storage_path.joinpath(CA_CERT) diff --git a/digikey/v3/api.py b/digikey/v3/api.py index 188da7a..2d31d73 100644 --- a/digikey/v3/api.py +++ b/digikey/v3/api.py @@ -2,6 +2,7 @@ import logging from distutils.util import strtobool import digikey.oauth.oauth2 +import digikey.configfile from digikey.exceptions import DigikeyError from digikey.v3.productinformation import (KeywordSearchRequest, KeywordSearchResponse, ProductDetails, DigiReelPricing, ManufacturerProductDetailsRequest) @@ -12,10 +13,8 @@ logger = logging.getLogger(__name__) -class DigikeyApiWrapper(object): - def __init__(self, wrapped_function, module): - self.sandbox = False - +class DigikeyAPI: + class DigikeyApiWrapper(object): apinames = { digikey.v3.productinformation: 'Search', digikey.v3.ordersupport: 'OrderDetails', @@ -28,154 +27,183 @@ def __init__(self, wrapped_function, module): digikey.v3.batchproductdetails: digikey.v3.batchproductdetails.BatchSearchApi } - apiname = apinames[module] - apiclass = apiclasses[module] - - # Configure API key authorization: apiKeySecurity - configuration = module.Configuration() - configuration.api_key['X-DIGIKEY-Client-Id'] = os.getenv('DIGIKEY_CLIENT_ID') - - # Return quietly if no clientid has been set to prevent errors when importing the module - if os.getenv('DIGIKEY_CLIENT_ID') is None or os.getenv('DIGIKEY_CLIENT_SECRET') is None: - raise DigikeyError('Please provide a valid DIGIKEY_CLIENT_ID and DIGIKEY_CLIENT_SECRET in your env setup') - - # Use normal API by default, if DIGIKEY_CLIENT_SANDBOX is True use sandbox API - configuration.host = 'https://api.digikey.com/' + apiname + '/v3' - try: - if bool(strtobool(os.getenv('DIGIKEY_CLIENT_SANDBOX'))): - configuration.host = 'https://sandbox-api.digikey.com/' + apiname + '/v3' - self.sandbox = True - except (ValueError, AttributeError): - pass - - # Uncomment below to setup prefix (e.g. Bearer) for API key, if needed - # configuration.api_key_prefix['X-DIGIKEY-Client-Id'] = 'Bearer' - - # Configure OAuth2 access token for authorization: oauth2AccessCodeSecurity - self._digikeyApiToken = digikey.oauth.oauth2.TokenHandler(version=3, sandbox=self.sandbox).get_access_token() - configuration.access_token = self._digikeyApiToken.access_token - - # create an instance of the API class - self._api_instance = apiclass(module.ApiClient(configuration)) - - # Populate reused ids - self.authorization = self._digikeyApiToken.get_authorization() - self.x_digikey_client_id = os.getenv('DIGIKEY_CLIENT_ID') - - self.wrapped_function = wrapped_function - - @staticmethod - def _remaining_requests(header, api_limits): - try: - rate_limit = header['X-RateLimit-Limit'] - rate_limit_rem = header['X-RateLimit-Remaining'] - - if api_limits is not None and type(api_limits) == dict: - api_limits['api_requests_limit'] = int(rate_limit) - api_limits['api_requests_remaining'] = int(rate_limit_rem) - - logger.debug('Requests remaining: [{}/{}]'.format(rate_limit_rem, rate_limit)) - except (KeyError, ValueError) as e: - logger.debug(f'No api limits returned -> {e.__class__.__name__}: {e}') - if api_limits is not None and type(api_limits) == dict: - api_limits['api_requests_limit'] = None - api_limits['api_requests_remaining'] = None - - @staticmethod - def _store_api_statuscode(statuscode, status): - if status is not None and type(status) == dict: - status['code'] = int(statuscode) - - logger.debug('API returned code: {}'.format(statuscode)) - - def call_api_function(self, *args, **kwargs): - try: - # If optional api_limits, status mutable object is passed use it to store API limits and status code - api_limits = kwargs.pop('api_limits', None) - status = kwargs.pop('status', None) - - func = getattr(self._api_instance, self.wrapped_function) - logger.debug(f'CALL wrapped -> {func.__qualname__}') - api_response = func(*args, self.authorization, self.x_digikey_client_id, **kwargs) - self._remaining_requests(api_response[2], api_limits) - self._store_api_statuscode(api_response[1], status) - - return api_response[0] - except ApiException as e: - logger.error(f'Exception when calling {self.wrapped_function}: {e}') - self._store_api_statuscode(e.status, status) - - -def keyword_search(*args, **kwargs) -> KeywordSearchResponse: - client = DigikeyApiWrapper('keyword_search_with_http_info', digikey.v3.productinformation) - - if 'body' in kwargs and type(kwargs['body']) == KeywordSearchRequest: - logger.info(f'Search for: {kwargs["body"].keywords}') - logger.debug('CALL -> keyword_search') - return client.call_api_function(*args, **kwargs) - else: - raise DigikeyError('Please provide a valid KeywordSearchRequest argument') - - -def product_details(*args, **kwargs) -> ProductDetails: - client = DigikeyApiWrapper('product_details_with_http_info', digikey.v3.productinformation) - - if len(args): - logger.info(f'Get product details for: {args[0]}') - return client.call_api_function(*args, **kwargs) - - -def digi_reel_pricing(*args, **kwargs) -> DigiReelPricing: - client = DigikeyApiWrapper('digi_reel_pricing_with_http_info', digikey.v3.productinformation) - - if len(args): - logger.info(f'Calculate the DigiReel pricing for {args[0]} with quantity {args[1]}') - return client.call_api_function(*args, **kwargs) - - -def suggested_parts(*args, **kwargs) -> ProductDetails: - client = DigikeyApiWrapper('suggested_parts_with_http_info', digikey.v3.productinformation) - - if len(args): - logger.info(f'Retrieve detailed product information and two suggested products for: {args[0]}') - return client.call_api_function(*args, **kwargs) - - -def manufacturer_product_details(*args, **kwargs) -> KeywordSearchResponse: - client = DigikeyApiWrapper('manufacturer_product_details_with_http_info', digikey.v3.productinformation) - - if 'body' in kwargs and type(kwargs['body']) == ManufacturerProductDetailsRequest: - logger.info(f'Search for: {kwargs["body"].manufacturer_product}') - return client.call_api_function(*args, **kwargs) - else: - raise DigikeyError('Please provide a valid ManufacturerProductDetailsRequest argument') - - -def status_salesorder_id(*args, **kwargs) -> OrderStatusResponse: - client = DigikeyApiWrapper('order_status_with_http_info', digikey.v3.ordersupport) - - if len(args): - logger.info(f'Get order details for: {args[0]}') - return client.call_api_function(*args, **kwargs) - - -def salesorder_history(*args, **kwargs) -> [SalesOrderHistoryItem]: - client = DigikeyApiWrapper('order_history_with_http_info', digikey.v3.ordersupport) - - if 'start_date' in kwargs and type(kwargs['start_date']) == str \ - and 'end_date' in kwargs and type(kwargs['end_date']) == str: - logger.info(f'Searching for orders in date range ' + kwargs['start_date'] + ' to ' + kwargs['end_date']) - return client.call_api_function(*args, **kwargs) - else: - raise DigikeyError('Please provide valid start_date and end_date strings') - - -def batch_product_details(*args, **kwargs) -> BatchProductDetailsResponse: - client = DigikeyApiWrapper('batch_product_details_with_http_info', digikey.v3.batchproductdetails) - - if 'body' in kwargs and type(kwargs['body']) == BatchProductDetailsRequest: - logger.info(f'Batch product search: {kwargs["body"].products}') - logger.debug('CALL -> batch_product_details') - return client.call_api_function(*args, **kwargs) - else: - raise DigikeyError('Please provide a valid BatchProductDetailsRequest argument') + def __init__(self, config_file: digikey.configfile.DigikeyApiConfig, is_sandbox: bool = False): + self.sandbox = is_sandbox + self.apiname = None + self._api_instance = None + self.wrapped_function = None + self.x_digikey_client_id = None + self.config = config_file + + # Return quietly if no clientid has been set to prevent errors when importing the module + # if os.getenv('DIGIKEY_CLIENT_ID') is None or os.getenv('DIGIKEY_CLIENT_SECRET') is None: + # raise DigikeyError('Please provide a valid DIGIKEY_CLIENT_ID and DIGIKEY_CLIENT_SECRET in your env setup') + + # Uncomment below to setup prefix (e.g. Bearer) for API key, if needed + # configuration.api_key_prefix['X-DIGIKEY-Client-Id'] = 'Bearer' + + self._digikeyApiToken = None + self.authorization = None + + def get_authorization(self): + """ + Function that get Oauth2 authorization. This is not implement in __init__ to give the user + a chance to set the client secret and client id if it's not in the config file already + """ + # Configure OAuth2 access token for authorization: oauth2AccessCodeSecurity + self._digikeyApiToken = digikey.oauth.oauth2.TokenHandler(self.config, version=3, sandbox=self.sandbox).get_access_token() + + # Populate reused ids + self.authorization = self._digikeyApiToken.get_authorization() + + def change_api(self, wrapped_function, module): + apiname = self.apinames[module] + apiclass = self.apiclasses[module] + # Only change the API when it's different than the current API + if self.apiname != apiname: + + if self.config.get('client-id') is None or self.config.get('client-secret') is None: + raise DigikeyError('Please provide a valid DIGIKEY_CLIENT_ID and DIGIKEY_CLIENT_SECRET in your env setup') + + if self._digikeyApiToken is None: + self.get_authorization() + + self.apiname = apiname + self.x_digikey_client_id = self.config.get('client-id') + + # Configure API key authorization: apiKeySecurity + configuration = module.Configuration() + configuration.api_key['X-DIGIKEY-Client-Id'] = self.config.get('client-id') + + # Use normal API by default, if DIGIKEY_CLIENT_SANDBOX is True use sandbox API + configuration.host = 'https://api.digikey.com/' + apiname + '/v3' + try: + if self.sandbox: + configuration.host = 'https://sandbox-api.digikey.com/' + apiname + '/v3' + except (ValueError, AttributeError): + pass + + configuration.access_token = self._digikeyApiToken.access_token + + # create an instance of the API class + self._api_instance = apiclass(module.ApiClient(configuration)) + self.wrapped_function = wrapped_function + + @staticmethod + def _remaining_requests(header, api_limits): + try: + rate_limit = header['X-RateLimit-Limit'] + rate_limit_rem = header['X-RateLimit-Remaining'] + + if api_limits is not None and type(api_limits) == dict: + api_limits['api_requests_limit'] = int(rate_limit) + api_limits['api_requests_remaining'] = int(rate_limit_rem) + + logger.debug('Requests remaining: [{}/{}]'.format(rate_limit_rem, rate_limit)) + except (KeyError, ValueError) as e: + logger.debug(f'No api limits returned -> {e.__class__.__name__}: {e}') + if api_limits is not None and type(api_limits) == dict: + api_limits['api_requests_limit'] = None + api_limits['api_requests_remaining'] = None + + def call_api_function(self, *args, **kwargs): + try: + # If optional api_limits mutable object is passed use it to store API limits + api_limits = kwargs.pop('api_limits', None) + + func = getattr(self._api_instance, self.wrapped_function) + logger.debug(f'CALL wrapped -> {func.__qualname__}') + api_response = func(*args, self.authorization, self.x_digikey_client_id, **kwargs) + self._remaining_requests(api_response[2], api_limits) + + return api_response[0] + except ApiException as e: + logger.error(f'Exception when calling {self.wrapped_function}: {e}') + + def __init__(self, config_file, is_sandbox: bool = False): + self.config = digikey.configfile.DigikeyApiConfig(config_file) + self.client = self.DigikeyApiWrapper(self.config, is_sandbox) + + def needs_client_id(self) -> bool: + if self.config.get('client-id') is None: + return True + return False + + def needs_client_secret(self) -> bool: + if self.config.get('client-secret') is None: + return True + return False + + def set_client_info(self, client_id=None, client_secret=None): + if client_id is not None: + self.config.set('client-id', client_id) + if client_secret is not None: + self.config.set('client-secret', client_secret) + self.config.save() + + def keyword_search(self, *args, **kwargs) -> KeywordSearchResponse: + self.client.change_api('keyword_search_with_http_info', digikey.v3.productinformation) + + if 'body' in kwargs and type(kwargs['body']) == KeywordSearchRequest: + logger.info(f'Search for: {kwargs["body"].keywords}') + logger.debug('CALL -> keyword_search') + return self.client.call_api_function(*args, **kwargs) + else: + raise DigikeyError('Please provide a valid KeywordSearchRequest argument') + + def product_details(self, *args, **kwargs) -> ProductDetails: + self.client.change_api('product_details_with_http_info', digikey.v3.productinformation) + + if len(args): + logger.info(f'Get product details for: {args[0]}') + return self.client.call_api_function(*args, **kwargs) + + def digi_reel_pricing(self, *args, **kwargs) -> DigiReelPricing: + self.client.change_api('digi_reel_pricing_with_http_info', digikey.v3.productinformation) + + if len(args): + logger.info(f'Calculate the DigiReel pricing for {args[0]} with quantity {args[1]}') + return self.client.call_api_function(*args, **kwargs) + + def suggested_parts(self, *args, **kwargs) -> ProductDetails: + self.client.change_api('suggested_parts_with_http_info', digikey.v3.productinformation) + + if len(args): + logger.info(f'Retrieve detailed product information and two suggested products for: {args[0]}') + return self.client.call_api_function(*args, **kwargs) + + def manufacturer_product_details(self, *args, **kwargs) -> KeywordSearchResponse: + self.client.change_api('manufacturer_product_details_with_http_info', digikey.v3.productinformation) + + if 'body' in kwargs and type(kwargs['body']) == ManufacturerProductDetailsRequest: + logger.info(f'Search for: {kwargs["body"].manufacturer_product}') + return self.client.call_api_function(*args, **kwargs) + else: + raise DigikeyError('Please provide a valid ManufacturerProductDetailsRequest argument') + + def status_salesorder_id(self, *args, **kwargs) -> OrderStatusResponse: + self.client.change_api('status_salesorder_id_get_with_http_info', digikey.v3.ordersupport) + + if len(args): + logger.info(f'Get order details for: {args[0]}') + return self.client.call_api_function(*args, **kwargs) + + def salesorder_history(self, *args, **kwargs) -> [SalesOrderHistoryItem]: + self.client.change_api('history_get_with_http_info', digikey.v3.ordersupport) + + if 'start_date' in kwargs and type(kwargs['start_date']) == str \ + and 'end_date' in kwargs and type(kwargs['end_date']) == str: + logger.info(f'Searching for orders in date range ' + kwargs['start_date'] + ' to ' + kwargs['end_date']) + return self.client.call_api_function(*args, **kwargs) + else: + raise DigikeyError('Please provide valid start_date and end_date strings') + + def batch_product_details(self, *args, **kwargs) -> BatchProductDetailsResponse: + self.client.change_api('batch_product_details_with_http_info', digikey.v3.batchproductdetails) + + if 'body' in kwargs and type(kwargs['body']) == BatchProductDetailsRequest: + logger.info(f'Batch product search: {kwargs["body"].products}') + logger.debug('CALL -> batch_product_details') + return self.client.call_api_function(*args, **kwargs) + else: + raise DigikeyError('Please provide a valid BatchProductDetailsRequest argument') From acea92859a67ee425c6aeea9cfb489e65a38c381 Mon Sep 17 00:00:00 2001 From: Electro707 Date: Sun, 30 May 2021 23:57:27 -0400 Subject: [PATCH 3/8] Added so that oauth2 uses the json config file --- digikey/oauth/oauth2.py | 84 +++++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/digikey/oauth/oauth2.py b/digikey/oauth/oauth2.py index 0a1303e..6f3cfe5 100644 --- a/digikey/oauth/oauth2.py +++ b/digikey/oauth/oauth2.py @@ -19,7 +19,7 @@ CA_CERT = 'digikey-api.pem' TOKEN_STORAGE = 'token_storage.json' -CONFIGURATION_TEMPORARY_PATH = '/tmp/dk_config' +CERTIFICATION_TEMPORARY_PATH = '/tmp/dk_cert' AUTH_URL_V3_PROD = 'https://api.digikey.com/v1/oauth2/authorize' TOKEN_URL_V3_PROD = 'https://api.digikey.com/v1/oauth2/token' @@ -34,26 +34,31 @@ class Oauth2Token: - def __init__(self, token): - self._token = token + def __init__(self, config: configfile.DigikeyApiConfig): + self._config = config @property def access_token(self): - return self._token.get('access_token') + return self._config.get('access_token') @property def refresh_token(self): - return self._token.get('refresh_token') + return self._config.get('refresh_token') @property def expires(self): - return datetime.fromtimestamp(self._token.get('expires'), timezone.utc) + if self._config.get('expires') is not None: + return datetime.fromtimestamp(self._config.get('expires'), timezone.utc) + return None @property def type(self): - return self._token.get('token_type') + return self._config.get('token_type') def expired(self) -> bool: + expires = self.expires + if expires is None: + return True return datetime.now(timezone.utc) >= self.expires def get_authorization(self) -> str: @@ -101,7 +106,7 @@ class TokenHandler: Functions used to handle Digikey oAuth """ def __init__(self, - dg_config: configfile.DigikeyApiConfig, + dk_config: configfile.DigikeyApiConfig, a_id: t.Optional[str] = None, a_secret: t.Optional[str] = None, # a_token_storage_path: t.Optional[str] = None, @@ -120,8 +125,10 @@ def __init__(self, logger.debug(f'Using API V{version}') - a_id = a_id or dg_config.get('client-id') - a_secret = a_secret or dg_config.get('client-secret') + self.dk_config = dk_config + + a_id = a_id or self.dk_config.get('client-id') + a_secret = a_secret or self.dk_config.get('client-secret') if not a_id or not a_secret: raise ValueError( 'CLIENT ID and SECRET must be set. ' @@ -131,8 +138,9 @@ def __init__(self, self._id = a_id self._secret = a_secret - self._storage_path = Path(CONFIGURATION_TEMPORARY_PATH) - self._token_storage_path = self._storage_path.joinpath(TOKEN_STORAGE) + + self._storage_path = Path(CERTIFICATION_TEMPORARY_PATH) + # self._token_storage_path = self._storage_path.joinpath(TOKEN_STORAGE) self._ca_cert = self._storage_path.joinpath(CA_CERT) def __generate_certificate(self): @@ -175,7 +183,9 @@ def __exchange_for_token(self, code): # Create epoch timestamp from expires in, with 1 minute margin token_json['expires'] = int(token_json['expires_in']) + datetime.now(timezone.utc).timestamp() - 60 - return token_json + # TODO: Perhaps expand out for loop to only write what's nessesary + for key in token_json: + self.dk_config.set(key, token_json[key]) def __refresh_token(self, refresh_token: str): headers = {'user-agent': USER_AGENT, @@ -201,12 +211,15 @@ def __refresh_token(self, refresh_token: str): # Create epoch timestamp from expires in, with 1 minute margin token_json['expires'] = int(token_json['expires_in']) + datetime.now(timezone.utc).timestamp() - 60 - return token_json + # TODO: Perhaps expand out for loop to only write what's nessesary + for key in token_json: + self.dk_config.set(key, token_json[key]) - def save(self, json_data): - with open(self._token_storage_path, 'w') as f: - json.dump(json_data, f) - logger.debug('Saved token to: {}'.format(self._token_storage_path)) + def save(self): + # with open(self._token_storage_path, 'w') as f: + # json.dump(json_data, f) + # logger.debug('Saved token to: {}'.format(self._token_storage_path)) + self.dk_config.save() def get_access_token(self) -> Oauth2Token: """ @@ -218,29 +231,28 @@ def get_access_token(self) -> Oauth2Token: """ # Check if a token already exists on the storage - token_json = None - try: - with open(self._token_storage_path, 'r') as f: - token_json = json.load(f) - except (EnvironmentError, JSONDecodeError): - logger.warning('Oauth2 token storage does not exist or malformed, creating new.') + # token_json = None + # try: + # with open(self._token_storage_path, 'r') as f: + # token_json = json.load(f) + # except (EnvironmentError, JSONDecodeError): + # logger.warning('Oauth2 token storage does not exist or malformed, creating new.') - token = None - if token_json is not None: - token = Oauth2Token(token_json) + token_config = Oauth2Token(self.dk_config) + needs_authorization_token = False # Try to refresh the credentials with the stores refresh token - if token is not None and token.expired(): + if token_config.expired(): try: - logger.debug(f'REFRESH - Current token is stale, refresh using: {token.refresh_token}') - token_json = self.__refresh_token(token.refresh_token) - self.save(token_json) + logger.debug(f'REFRESH - Current token is stale, refresh using: {token_config.refresh_token}') + self.__refresh_token(token_config.refresh_token) + self.save() except DigikeyOauthException: logger.error('REFRESH - Failed to use refresh token, starting new authorization flow.') - token_json = None + needs_authorization_token = True # Obtain new credentials using the Oauth flow if no token stored or refresh fails - if token_json is None: + if needs_authorization_token is True: open_new(self.__build_authorization_url()) filename = self.__generate_certificate() httpd = HTTPServer( @@ -263,9 +275,9 @@ def get_access_token(self) -> Oauth2Token: logger.error('Cannot remove temporary certificates: {}'.format(e)) # Get the acccess token from the auth code - token_json = self.__exchange_for_token(httpd.auth_code) + self.__exchange_for_token(httpd.auth_code) # Save the newly obtained credentials to the filesystem - self.save(token_json) + self.save() - return Oauth2Token(token_json) + return Oauth2Token(self.dk_config) From e491d189c6542d6a51df571ab40014c69411ab04 Mon Sep 17 00:00:00 2001 From: Electro707 Date: Mon, 31 May 2021 16:36:53 -0400 Subject: [PATCH 4/8] Config file is now a generic class where more config implementations can be added --- digikey/configfile.py | 23 ++++++++++++++++++++++- digikey/oauth/oauth2.py | 4 ++-- digikey/v3/api.py | 10 +++------- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/digikey/configfile.py b/digikey/configfile.py index 6ef08e6..05dff2d 100644 --- a/digikey/configfile.py +++ b/digikey/configfile.py @@ -2,8 +2,29 @@ import json -class DigikeyApiConfig: +class DigikeyBaseConfig: + """ + Base class for a configuration handler which saves info related to Digikey's API like the client-ID + This class must not be directly used, instead another class that inherits this class. The class that inherits + this class must override save(), get(), and set() with the same parameters and returns. + Check out DigikeyJsonConfig for more details as to how to do this. + """ + def __init__(self): + pass + + def save(self): + pass + + def get(self, what: str): + pass + + def set(self, what: str, to): + pass + + +class DigikeyJsonConfig(DigikeyBaseConfig): def __init__(self, file_name): + super().__init__() self.file_name = file_name # Get config from file if it exists if os.path.exists(self.file_name): diff --git a/digikey/oauth/oauth2.py b/digikey/oauth/oauth2.py index 6f3cfe5..44565a1 100644 --- a/digikey/oauth/oauth2.py +++ b/digikey/oauth/oauth2.py @@ -34,7 +34,7 @@ class Oauth2Token: - def __init__(self, config: configfile.DigikeyApiConfig): + def __init__(self, config: configfile.DigikeyBaseConfig): self._config = config @property @@ -106,7 +106,7 @@ class TokenHandler: Functions used to handle Digikey oAuth """ def __init__(self, - dk_config: configfile.DigikeyApiConfig, + dk_config: configfile.DigikeyBaseConfig, a_id: t.Optional[str] = None, a_secret: t.Optional[str] = None, # a_token_storage_path: t.Optional[str] = None, diff --git a/digikey/v3/api.py b/digikey/v3/api.py index 2d31d73..f607039 100644 --- a/digikey/v3/api.py +++ b/digikey/v3/api.py @@ -27,7 +27,7 @@ class DigikeyApiWrapper(object): digikey.v3.batchproductdetails: digikey.v3.batchproductdetails.BatchSearchApi } - def __init__(self, config_file: digikey.configfile.DigikeyApiConfig, is_sandbox: bool = False): + def __init__(self, config_file: digikey.configfile.DigikeyBaseConfig, is_sandbox: bool = False): self.sandbox = is_sandbox self.apiname = None self._api_instance = None @@ -35,10 +35,6 @@ def __init__(self, config_file: digikey.configfile.DigikeyApiConfig, is_sandbox: self.x_digikey_client_id = None self.config = config_file - # Return quietly if no clientid has been set to prevent errors when importing the module - # if os.getenv('DIGIKEY_CLIENT_ID') is None or os.getenv('DIGIKEY_CLIENT_SECRET') is None: - # raise DigikeyError('Please provide a valid DIGIKEY_CLIENT_ID and DIGIKEY_CLIENT_SECRET in your env setup') - # Uncomment below to setup prefix (e.g. Bearer) for API key, if needed # configuration.api_key_prefix['X-DIGIKEY-Client-Id'] = 'Bearer' @@ -120,8 +116,8 @@ def call_api_function(self, *args, **kwargs): except ApiException as e: logger.error(f'Exception when calling {self.wrapped_function}: {e}') - def __init__(self, config_file, is_sandbox: bool = False): - self.config = digikey.configfile.DigikeyApiConfig(config_file) + def __init__(self, config_constructor: digikey.configfile.DigikeyBaseConfig, is_sandbox: bool = False): + self.config = config_constructor self.client = self.DigikeyApiWrapper(self.config, is_sandbox) def needs_client_id(self) -> bool: From 5b6ed304e965ecbf6b468f64084cc95242d55ab8 Mon Sep 17 00:00:00 2001 From: Electro707 Date: Fri, 20 Aug 2021 02:04:36 -0400 Subject: [PATCH 5/8] Started adding some docs. Changed the config class's arguments name from to and to --- digikey/configfile.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/digikey/configfile.py b/digikey/configfile.py index 05dff2d..958f51c 100644 --- a/digikey/configfile.py +++ b/digikey/configfile.py @@ -15,10 +15,10 @@ def __init__(self): def save(self): pass - def get(self, what: str): + def get(self, key: str): pass - def set(self, what: str, to): + def set(self, key: str, val: str): pass @@ -37,10 +37,10 @@ def save(self): with open(self.file_name, 'w') as f: json.dump(self.config, f) - def get(self, what: str): - if what in self.config: - return self.config[what] + def get(self, key: str): + if key in self.config: + return self.config[key] return None - def set(self, what: str, to): - self.config[what] = to + def set(self, key: str, val: str): + self.config[key] = val From 94c2ca37c45706e5551ec22cdfd82570744bde91 Mon Sep 17 00:00:00 2001 From: Electro707 Date: Sun, 26 Sep 2021 17:23:46 -0400 Subject: [PATCH 6/8] Added docs to create a custom config class. Updated README.md to reflect new changes. Added the configurations to __init__ for importing digikey. Minor message fix --- README.md | 57 +++++++++++++++------------------------ digikey/__init__.py | 1 + digikey/oauth/oauth2.py | 4 +-- docs/DigikeyBaseConfig.md | 51 +++++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 38 deletions(-) create mode 100644 docs/DigikeyBaseConfig.md diff --git a/README.md b/README.md index 954f5aa..4b3dfdc 100644 --- a/README.md +++ b/README.md @@ -15,10 +15,6 @@ manufacturers overlap other manufacturer part numbers. ## Install ```sh pip install digikey-api - -export DIGIKEY_CLIENT_ID="client_id" -export DIGIKEY_CLIENT_SECRET="client_secret" -export DIGIKEY_STORAGE_PATH="cache_dir" ``` # API V3 @@ -39,59 +35,47 @@ structure of the Sandbox API response will be a representation of what to expect For valid responses make sure you use the client ID and secret for a [Production App](https://developer.digikey.com/documentation/organization) ```python -import os import digikey from digikey.v3.productinformation import KeywordSearchRequest -os.environ['DIGIKEY_CLIENT_ID'] = 'client_id' -os.environ['DIGIKEY_CLIENT_SECRET'] = 'client_secret' -os.environ['DIGIKEY_CLIENT_SANDBOX'] = 'False' -os.environ['DIGIKEY_STORAGE_PATH'] = 'cache_dir' +dk_config = digikey.DigikeyJsonConfig(file_name='dk_conf.json') +dk_config.set('client-id', 'ENTER_CLIENT_ID') +dk_config.set('client-secret', 'ENTER_CLIENT_SECRET') + +dk_api = digikey.DigikeyAPI(dk_config, is_sandbox=False) # Query product number dkpn = '296-6501-1-ND' -part = digikey.product_details(dkpn) +part = dk_api.product_details(dkpn) # Search for parts search_request = KeywordSearchRequest(keywords='CRCW080510K0FKEA', record_count=10) -result = digikey.keyword_search(body=search_request) +result = dk_api.keyword_search(body=search_request) ``` -## Logging [API V3] -Logging is not forced upon the user but can be enabled according to convention: -```python -import logging - -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) - -digikey_logger = logging.getLogger('digikey') -digikey_logger.setLevel(logging.DEBUG) - -handler = logging.StreamHandler() -handler.setLevel(logging.DEBUG) -logger.addHandler(handler) -digikey_logger.addHandler(handler) -``` +## API Configuration Storage +`DigikeyAPI` requires a configuration class that will handle getting, storing, and saving key-value pairs. Currently +only `DigikeyJsonConfig` is implemented for storing settings in a JSON file, but a custom configuration can be created. +See [docs/DigikeyBaseConfig.md](docs/DigikeyBaseConfig.md) for more details on that. ## Top-level APIs #### Product Information All functions from the [PartSearch](https://developer.digikey.com/products/product-information/partsearch/) API have been implemented. -* `digikey.keyword_search()` -* `digikey.product_details()` -* `digikey.digi_reel_pricing()` -* `digikey.suggested_parts()` -* `digikey.manufacturer_product_details()` +* `DigikeyAPI.keyword_search()` +* `DigikeyAPI.product_details()` +* `DigikeyAPI.digi_reel_pricing()` +* `DigikeyAPI.suggested_parts()` +* `DigikeyAPI.manufacturer_product_details()` #### Batch Product Details The one function from the [BatchProductDetailsAPI](https://developer.digikey.com/products/batch-productdetails/batchproductdetailsapi) API has been implemented. -* `digikey.batch_product_details()` +* `DigikeyAPI.batch_product_details()` #### Order Support All functions from the [OrderDetails](https://developer.digikey.com/products/order-support/orderdetails/) API have been implemented. -* `digikey.salesorder_history()` -* `digikey.status_salesorder_id()` +* `DigikeyAPI.salesorder_history()` +* `DigikeyAPI.status_salesorder_id()` #### Barcode TODO @@ -103,7 +87,7 @@ It is possible to retrieve the number of max requests and current requests by pa ```python api_limit = {} search_request = KeywordSearchRequest(keywords='CRCW080510K0FKEA', record_count=10) -result = digikey.keyword_search(body=search_request, api_limits=api_limit) +result = dk_api.keyword_search(body=search_request, api_limits=api_limit) ``` The dict will be filled with the information returned from the API: @@ -114,3 +98,4 @@ The dict will be filled with the information returned from the API: } ``` Sometimes the API does not return any rate limit data, the values will then be set to None. + diff --git a/digikey/__init__.py b/digikey/__init__.py index 1bd7614..f9edc9e 100644 --- a/digikey/__init__.py +++ b/digikey/__init__.py @@ -1,6 +1,7 @@ import logging from digikey.v2.api import (search, part) from digikey.v3.api import DigikeyAPI +from digikey.configfile import (DigikeyBaseConfig, DigikeyJsonConfig) logger = logging.getLogger(__name__) diff --git a/digikey/oauth/oauth2.py b/digikey/oauth/oauth2.py index 44565a1..587d5df 100644 --- a/digikey/oauth/oauth2.py +++ b/digikey/oauth/oauth2.py @@ -132,8 +132,8 @@ def __init__(self, if not a_id or not a_secret: raise ValueError( 'CLIENT ID and SECRET must be set. ' - 'Set "DIGIKEY_CLIENT_ID" and "DIGIKEY_CLIENT_SECRET" ' - 'as an environment variable, or pass your keys directly to the client.' + 'Set "client-id" and "client-secret" ' + 'in the configuration constructor.' ) self._id = a_id diff --git a/docs/DigikeyBaseConfig.md b/docs/DigikeyBaseConfig.md new file mode 100644 index 0000000..7e96a6f --- /dev/null +++ b/docs/DigikeyBaseConfig.md @@ -0,0 +1,51 @@ +# Digikey-API Settings Storage Configuration + +The `DigikeyAPI` class of the V3 API must be given a configuration class as one of it's arguments. +The purpose of this configuration class is to determine how the API will store some settings like the client-id, client-secret, and other stuff. + +As of right now, there is only 1 available configuration class which is `DigikeyJsonConfig`. Of course you can create your own configurator class and input that into `DigikeyAPI`, for example if you want the API to store it's configuration in a database. + +## DigikeyJsonConfig + +This configuration classes stores the API settings in a JSON file. When initializing this class, a file name/path must be given. + +Example: +```python +dk_config = digikey.DigikeyJsonConfig(file_name='test_conf.json') +dk_config.set('client-id', 'ENTER_CLIENT_ID') +dk_config.set('client-secret', 'ENTER_CLIENT_SECRET') +``` + +## Create your own storage configuration + +You can create your own storage configurator as mentioned to define how to store the API's settings. The `DigikeyBaseConfig` class can be inherited to create it. You must override and define 3 functions: + +- save(self): This function gets called when the API wants to save the settings. +- get(self, key: str): This function gets called when the API wants to retrieve a value for a given key. Return `None` if it doesn't exist +- set(self, key: str, val: str): This function gets called when the API wants to store a value for a given key. + +As an example, this is how `DigikeyJsonConfig` is implemented: +```python + class DigikeyJsonConfig(DigikeyBaseConfig): + def __init__(self, file_name): + super().__init__() + self.file_name = file_name + # Get config from file if it exists + if os.path.exists(self.file_name): + with open(self.file_name, 'r') as f: + self.config = json.load(f) + else: + self.config = {} + + def save(self): + with open(self.file_name, 'w') as f: + json.dump(self.config, f) + + def get(self, key: str): + if key in self.config: + return self.config[key] + return None + + def set(self, key: str, val: str): + self.config[key] = val +``` From 74ba3af3cc80cef369d42a7da8a8ff5aa5a67509 Mon Sep 17 00:00:00 2001 From: Electro707 Date: Mon, 27 Sep 2021 13:21:16 -0400 Subject: [PATCH 7/8] Updated README.md to document new config related API functions --- README.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 4b3dfdc..d32ae85 100644 --- a/README.md +++ b/README.md @@ -39,10 +39,8 @@ import digikey from digikey.v3.productinformation import KeywordSearchRequest dk_config = digikey.DigikeyJsonConfig(file_name='dk_conf.json') -dk_config.set('client-id', 'ENTER_CLIENT_ID') -dk_config.set('client-secret', 'ENTER_CLIENT_SECRET') - dk_api = digikey.DigikeyAPI(dk_config, is_sandbox=False) +dk_api.set_client_info(client_id='ENTER_CLIENT_ID', client_secret='ENTER_CLIENT_SECRET') # Query product number dkpn = '296-6501-1-ND' @@ -60,6 +58,14 @@ See [docs/DigikeyBaseConfig.md](docs/DigikeyBaseConfig.md) for more details on t ## Top-level APIs +#### Configuration Related Functions +* `set_client_info()` + * Arguments are `client_id` and `client_secret` +* `DigikeyAPI.needs_client_id()` + * Returns `True` if a client ID is needed/missing +* `DigikeyAPI.needs_client_secret()` + * Returns `True` if a client secret is needed/missing + #### Product Information All functions from the [PartSearch](https://developer.digikey.com/products/product-information/partsearch/) API have been implemented. * `DigikeyAPI.keyword_search()` From 7819e5daeecd62fbe5ab0e1dc5d80620edd1cb70 Mon Sep 17 00:00:00 2001 From: Electro707 Date: Mon, 22 Nov 2021 15:14:33 -0500 Subject: [PATCH 8/8] Removed forced logging --- digikey/__init__.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/digikey/__init__.py b/digikey/__init__.py index f9edc9e..e3c0620 100644 --- a/digikey/__init__.py +++ b/digikey/__init__.py @@ -1,20 +1,5 @@ import logging -from digikey.v2.api import (search, part) from digikey.v3.api import DigikeyAPI from digikey.configfile import (DigikeyBaseConfig, DigikeyJsonConfig) -logger = logging.getLogger(__name__) - - -def setup_logger(logger_ref): - logger_ref.setLevel(logging.DEBUG) - formatter = logging.Formatter( - '%(asctime)s - %(name)20.20s - %(levelname)8s: %(message)s') - handler = logging.StreamHandler() - handler.setFormatter(formatter) - logger_ref.addHandler(handler) - - -setup_logger(logger) - name = 'digikey'