Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add improvements to "Building a connector the hard way" #19093

Merged
merged 8 commits into from
Nov 8, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
def read(config, catalog):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)

# Find the stock_prices stream if it is present in the input catalog
Expand All @@ -43,22 +43,22 @@ def read(config, catalog):
stock_prices_stream = configured_stream

if stock_prices_stream is None:
log("No streams selected")
log_error("No streams selected")
return

# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if stock_prices_stream["sync_mode"] != "full_refresh":
log("This connector only supports full refresh syncs! (for now)")
log_error("This connector only supports full refresh syncs! (for now)")
sys.exit(1)

# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log("Failure occurred when calling Polygon.io API")
log_error("Failure occurred when calling Polygon.io API")
sys.exit(1)
else:
# Stock prices are returned sorted by by date in ascending order
# Stock prices are returned sorted by date in ascending order
# We want to output them one by one as AirbyteMessages
results = response.json()["results"]
for result in results:
Expand All @@ -83,7 +83,7 @@ def _call_api(ticker, token):
def check(config):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
else:
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
Expand All @@ -107,6 +107,12 @@ def log(message):
print(json.dumps(log_json))


def log_error(error_message):
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
print(json.dumps(log_json))
erohmensing marked this conversation as resolved.
Show resolved Hide resolved


def discover():
catalog = {
"streams": [{
Expand Down
103 changes: 73 additions & 30 deletions docs/connector-development/tutorials/build-a-connector-the-hard-way.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ README.md
acceptance-test-config.yml
acceptance-test-docker.sh
build.gradle
source.py
spec.json
```

Expand Down Expand Up @@ -261,6 +262,9 @@ Then we'll add the `check_method`:

```python
import requests
import datetime
from datetime import date
from datetime import timedelta

def _call_api(ticker, token):
today = date.today()
Expand All @@ -284,6 +288,8 @@ def check(config):
print(json.dumps(output_message))
```

In order to run this code later, be sure to `pip install requests` in your local virtual environment. `datetime` should already be included in your Python installation.
erohmensing marked this conversation as resolved.
Show resolved Hide resolved

Lastly we'll extend the `run` method to accept the `check` command and call the `check` method. First we'll add a helper method for reading input:

```python
Expand Down Expand Up @@ -314,6 +320,12 @@ elif command == "check":
check(config)
```

Then we need to update our list of available commands:

```python
log("Invalid command. Allowable commands: [spec, check]")
```

This results in the following `run` method.

```python
Expand Down Expand Up @@ -349,7 +361,9 @@ def run(args):
sys.exit(0)
```

and that should be it. Let's test our new method:
and that should be it.

Let's test our new method:

```bash
$ python source.py check --config secrets/valid_config.json
Expand Down Expand Up @@ -416,6 +430,12 @@ elif command == "discover":
discover()
```

We need to update our list of available commands:

```python
log("Invalid command. Allowable commands: [spec, check, discover]")
```

You may be wondering why `config` is a required input to `discover` if it's not used. This is done for consistency: the Airbyte Specification requires `--config` as an input to `discover` because many sources require it \(e.g: to discover the tables available in a Postgres database, you must supply a password\). So instead of guessing whether the flag is required depending on the connector, we always assume it is required, and the connector can choose whether to use it.

The full run method is now below:
Expand Down Expand Up @@ -526,14 +546,16 @@ First, let's create a configured catalog `fullrefresh_configured_catalog.json` t
Then we'll define the `read` method in `source.py`:

```python
import datetime
from datetime import date
from datetime import timedelta
def log_error(error_message):
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
print(json.dumps(log_json))


def read(config, catalog):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)

# Find the stock_prices stream if it is present in the input catalog
Expand All @@ -543,19 +565,19 @@ def read(config, catalog):
stock_prices_stream = configured_stream

if stock_prices_stream is None:
log("No streams selected")
log_error("No stream selected.")
return

# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if stock_prices_stream["sync_mode"] != "full_refresh":
log("This connector only supports full refresh syncs! (for now)")
log_error("This connector only supports full refresh syncs! (for now)")
sys.exit(1)

# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log("Failure occurred when calling Polygon.io API")
log_error("Failure occurred when calling Polygon.io API")
sys.exit(1)
else:
# Stock prices are returned sorted by date in ascending order
Expand All @@ -568,6 +590,8 @@ def read(config, catalog):
print(json.dumps(output_message))
```

Note we've added a `log_error()` function to simplify formatting error messages from within connector functions as [`AirbyteTraceMessage`](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#airbytetracemessage)s, specifically `AirbyteErrorTraceMessage`s.

After doing some input validation, the code above calls the API to obtain daily prices for the input stock ticker, then outputs the prices. As always, our output is formatted according to the Airbyte Specification. Let's update our args parser with the following blocks:

```python
Expand All @@ -590,6 +614,12 @@ elif command == "read":
read(config, configured_catalog)
```

and:

```python
log("Invalid command. Allowable commands: [spec, check, discover, read]")
```

this yields the following `run` method:

```python
Expand Down Expand Up @@ -696,7 +726,7 @@ from datetime import timedelta
def read(config, catalog):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)

# Find the stock_prices stream if it is present in the input catalog
Expand All @@ -706,19 +736,19 @@ def read(config, catalog):
stock_prices_stream = configured_stream

if stock_prices_stream is None:
log("No streams selected")
log_error("No streams selected")
return

# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if stock_prices_stream["sync_mode"] != "full_refresh":
log("This connector only supports full refresh syncs! (for now)")
log_error("This connector only supports full refresh syncs! (for now)")
sys.exit(1)

# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log("Failure occurred when calling Polygon.io API")
log_error("Failure occurred when calling Polygon.io API")
sys.exit(1)
else:
# Stock prices are returned sorted by date in ascending order
Expand Down Expand Up @@ -746,7 +776,7 @@ def _call_api(ticker, token):
def check(config):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
else:
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
Expand All @@ -770,6 +800,12 @@ def log(message):
print(json.dumps(log_json))


def log_error(error_message):
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
print(json.dumps(log_json))


def discover():
catalog = {
"streams": [{
Expand Down Expand Up @@ -915,7 +951,7 @@ Then we can run the image using:
docker run airbyte/source-stock-ticker-api:dev
```

to run any of our commands, we'll need to mount all the inputs into the Docker container first, then refer to their _mounted_ paths when invoking the connector. For example, we'd run `check` or `read` as follows:
To run any of our commands, we'll need to mount all the inputs into the Docker container first, then refer to their _mounted_ paths when invoking the connector. This allows the connector to access your secrets without having to build them into the container. For example, we'd run `check` or `read` as follows:

```bash
$ docker run airbyte/source-stock-ticker-api:dev spec
Expand Down Expand Up @@ -948,25 +984,31 @@ The code generator should have already generated a YAML file which configures th
# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-stock-ticker-api:dev
tests:
spec:
- spec_path: "spec.json"
config_path: "secrets/valid_config.json"
connection:
- config_path: "secrets/valid_config.json"
status: "succeed"
- config_path: "secrets/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/valid_config.json"
acceptance_tests:
basic_read:
- config_path: "secrets/valid_config.json"
configured_catalog_path: "fullrefresh_configured_catalog.json"
tests:
- config_path: secrets/valid_config.json
configured_catalog_path: fullrefresh_configured_catalog.json
empty_streams: []
connection:
tests:
- config_path: secrets/valid_config.json
status: succeed
- config_path: secrets/invalid_config.json
status: failed
discovery:
tests:
- config_path: secrets/valid_config.json
full_refresh:
- config_path: "secrets/valid_config.json"
configured_catalog_path: "fullrefresh_configured_catalog.json"
tests:
- config_path: secrets/valid_config.json
configured_catalog_path: fullrefresh_configured_catalog.json
spec:
tests:
- config_path: secrets/valid_config.json
spec_path: spec.json
# incremental: # TODO uncomment this once you implement incremental sync in part 2 of the tutorial
# tests:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
Expand All @@ -975,6 +1017,7 @@ tests:
Then from the connector module directory run

```bash
chmod 755 acceptance-test-docker.sh
erohmensing marked this conversation as resolved.
Show resolved Hide resolved
./acceptance-test-docker.sh
```

Expand Down Expand Up @@ -1058,7 +1101,7 @@ airbyte-server | Version: dev
airbyte-server |
```

After you see the above banner printed out in the terminal window where you are running `docker-compose up`, visit [http://localhost:8000](http://localhost:8000) in your browser.
After you see the above banner printed out in the terminal window where you are running `docker-compose up`, visit [http://localhost:8000](http://localhost:8000) in your browser and log in with the default credentials: username `airbyte` and password `password`.

If this is the first time using the Airbyte UI, then you will be prompted to go through a first-time wizard. To skip it, click the "Skip Onboarding" button.

Expand Down