Skip to content

Commit

Permalink
Reset connection on excessive keepalive pitr drift (#35)
Browse files Browse the repository at this point in the history
Keep a count of how many times the same pitr is seen in
consecutive keepalive messages. If it exceeds that count, that
means there must be a connection issue and we should restart the
connection.

Move defaults in python, document in yml, update tests for emitting keepalives
  • Loading branch information
polastre authored Apr 1, 2022
1 parent d0012e9 commit 6f64230
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 27 deletions.
1 change: 1 addition & 0 deletions .github/workflows/dockerimage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:

env:
KEEPALIVE: 60
KEEPALIVE_STALE_PITRS: 5
SERVER: firehose-test.flightaware.com
PRINT_STATS_PERIOD: 0
FH_USERNAME: ${{ secrets.FH_USERNAME }}
Expand Down
34 changes: 25 additions & 9 deletions connector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@

CONNECTION_ERROR_LIMIT = 3

COMPRESSION: str
USERNAME: str
APIKEY: str
KEEPALIVE: int

COMPRESSION: str
INIT_CMD_ARGS: str
INIT_CMD_TIME: str
KEEPALIVE: int
KEEPALIVE_STALE_PITRS: int
SERVERNAME: str
STATS_PERIOD: int

Expand Down Expand Up @@ -95,20 +97,21 @@ def parse_script_args() -> None:
"""Sets global variables based on the environment variables provided in docker-compose"""
# pylint: disable=global-statement
# pylint: disable=line-too-long
global USERNAME, APIKEY, SERVERNAME, COMPRESSION, STATS_PERIOD, KEEPALIVE, INIT_CMD_TIME, INIT_CMD_ARGS
global USERNAME, APIKEY, SERVERNAME, COMPRESSION, STATS_PERIOD, KEEPALIVE, KEEPALIVE_STALE_PITRS, INIT_CMD_TIME, INIT_CMD_ARGS

# **** REQUIRED ****
USERNAME = os.environ["FH_USERNAME"]
APIKEY = os.environ["FH_APIKEY"]
# **** NOT REQUIRED ****
SERVERNAME = os.environ["SERVER"]
COMPRESSION = os.environ["COMPRESSION"]
STATS_PERIOD = int(os.environ["PRINT_STATS_PERIOD"])
KEEPALIVE = int(os.environ["KEEPALIVE"])
INIT_CMD_TIME = os.environ["INIT_CMD_TIME"]
SERVERNAME = os.environ.get("SERVER", "firehose-test.flightaware.com")
COMPRESSION = os.environ.get("COMPRESSION", "")
STATS_PERIOD = int(os.environ.get("PRINT_STATS_PERIOD", "10"))
KEEPALIVE = int(os.environ.get("KEEPALIVE", "60"))
KEEPALIVE_STALE_PITRS = int(os.environ.get("KEEPALIVE_STALE_PITRS", "5"))
INIT_CMD_TIME = os.environ.get("INIT_CMD_TIME", "live")
if INIT_CMD_TIME.split()[0] not in ["live", "pitr"]:
raise ValueError(f'$INIT_CMD_TIME value is invalid, should be "live" or "pitr <pitr>"')
INIT_CMD_ARGS = os.environ["INIT_CMD_ARGS"]
INIT_CMD_ARGS = os.environ.get("INIT_CMD_ARGS", "")
for command in ["live", "pitr", "compression", "keepalive", "username", "password"]:
if command in INIT_CMD_ARGS.split():
raise ValueError(
Expand Down Expand Up @@ -203,6 +206,7 @@ async def read_firehose(time_mode: str) -> Optional[str]:
await fh_writer.drain()

pitr = None
num_keepalives, last_good_keepalive_pitr = 0, 0
while True:
timeout = (KEEPALIVE + 10) if KEEPALIVE else None
try:
Expand All @@ -221,6 +225,18 @@ async def read_firehose(time_mode: str) -> Optional[str]:
print(f'Error: {message["error_msg"]}')
break

if message["type"] == "keepalive":
# if the pitr is the same as the last keepalive pitr, keep track of how long this is happening
if last_good_keepalive_pitr == message["pitr"]:
num_keepalives += 1
else:
num_keepalives = 0
if num_keepalives >= KEEPALIVE_STALE_PITRS:
break
last_good_keepalive_pitr = message["pitr"]
else:
num_keepalives = 0

last_good_pitr = pitr = message["pitr"]

async with stats_lock:
Expand Down
59 changes: 50 additions & 9 deletions connector/test/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def setUp(self):
"FH_USERNAME": "testuser",
"FH_APIKEY": "testapikey",
"KEEPALIVE": "60",
"KEEPALIVE_STALE_PITRS": "5",
"INIT_CMD_ARGS": "",
"INIT_CMD_TIME": "live",
"SERVER": "testserver",
Expand All @@ -50,13 +51,14 @@ def reconnect_after_error(
self, test_reconnect_live, mock_kafkaproducer, mock_openconnection, error
):
# mock setup
if not isinstance(error, list):
error = [error]
if test_reconnect_live:
self.mock_reader.readline.side_effect = [error]
self.mock_reader.readline.side_effect = error
else:
self.mock_reader.readline.side_effect = [
b'{"pitr":"1584126630","type":"arrival","id":"KPVD-1588929046-hexid-ADF994"}',
error,
]
] + error
mock_openconnection.return_value = self.mock_reader, self.mock_writer

# run test
Expand Down Expand Up @@ -84,12 +86,15 @@ def reconnect_after_error(
],
)
# verify expect output to kafka
mock_kafkaproducer.return_value.produce.assert_called_once_with(
"topic1",
key=b"KPVD-1588929046-hexid-ADF994",
value=b'{"pitr":"1584126630","type":"arrival","id":"KPVD-1588929046-hexid-ADF994"}',
callback=ANY,
)
if len(error) == 1:
mock_kafkaproducer.return_value.produce.assert_called_once_with(
"topic1",
key=b"KPVD-1588929046-hexid-ADF994",
value=b'{"pitr":"1584126630","type":"arrival","id":"KPVD-1588929046-hexid-ADF994"}',
callback=ANY,
)
else:
self.assertEqual(mock_kafkaproducer.return_value.produce.call_count, len(error))

@patch("main.open_connection", new_callable=AsyncMock)
@patch("main.Producer", new_callable=Mock)
Expand Down Expand Up @@ -145,6 +150,42 @@ def test_live_error_msg(self, mock_kafkaproducer, mock_openconnection):
b'{"pitr":"1584126630","type":"error","error_msg":"test error"}',
)

@patch("main.open_connection", new_callable=AsyncMock)
@patch("main.Producer", new_callable=Mock)
def test_pitr_drift_exceeded(self, mock_kafkaproducer, mock_openconnection):
self.reconnect_after_error(
False,
mock_kafkaproducer,
mock_openconnection,
[
b'{"pitr":"1584126630","type":"keepalive"}',
b'{"pitr":"1584126630","type":"keepalive"}',
b'{"pitr":"1584126630","type":"keepalive"}',
b'{"pitr":"1584126630","type":"keepalive"}',
b'{"pitr":"1584126630","type":"keepalive"}',
b'{"pitr":"1584126630","type":"keepalive"}',
]
)

@patch("main.open_connection", new_callable=AsyncMock)
@patch("main.Producer", new_callable=Mock)
def test_pitr_drift_reset(self, mock_kafkaproducer, mock_openconnection):
# does not reconnect and is waiting for the next message
with self.assertRaises(StopAsyncIteration), self.env:
self.reconnect_after_error(
False,
mock_kafkaproducer,
mock_openconnection,
[
b'{"pitr":"1584126630","type":"keepalive"}',
b'{"pitr":"1584126630","type":"keepalive"}',
b'{"pitr":"1584126630","type":"keepalive"}',
b'{"pitr":"1584126630","type":"keepalive"}',
b'{"pitr":"1584126630","type":"keepalive"}',
b'{"pitr":"1584126631","type":"keepalive"}',
]
)


# THIS TEST WILL ONLY RUN IN TRAVIS
@unittest.skipIf(not os.getenv("FH_APIKEY"), "No login credentials")
Expand Down
27 changes: 18 additions & 9 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,45 @@ services:
# by non-dockerized applications
- "${STREAMING_PORT:-127.0.0.1:1601}:1601"
environment:
# REQUIRED environment variables
# Firehose account username
- FH_USERNAME=${FH_USERNAME:?FH_USERNAME variable must be set}
# Firehose account key
- FH_APIKEY=${FH_APIKEY:?FH_APIKEY variable must be set}
# Firehose URL, firehose.flightaware.com can also be used
- SERVER=${SERVER:-firehose-test.flightaware.com}
# Use a single topic for all events to ensure proper ordering per flight
- KAFKA_TOPIC_NAME=events

# OPTIONAL environment variables
# Firehose URL, defaults to firehose-test.flightaware.com.
# firehose.flightaware.com can also be used
# - SERVER=${SERVER:-}
# Streaming compression of incoming Firehose data. Valid values are gzip,
# deflate, or compress. Leave blank to disable compression.
- COMPRESSION=${COMPRESSION:-}
# - COMPRESSION=${COMPRESSION:-}
# Frequency in seconds to print stats about connection (messages/bytes
# per second). Set to 0 to disable.
- PRINT_STATS_PERIOD=${PRINT_STATS_PERIOD:-10}
# - PRINT_STATS_PERIOD=${PRINT_STATS_PERIOD:-}
# Frequency in seconds that Firehose should send a synthetic "keepalive"
# message to help connector ensure the connection is still alive. If no
# such message is received within roughly $keepalive seconds, connector
# will automatically reconnect to Firehose.
- KEEPALIVE=${KEEPALIVE:-60}
# - KEEPALIVE=${KEEPALIVE:-}
# The number of times that the same pitr seen in consecutive keeplive
# messages should trigger an error and a restart of the connection
# - KEEPALIVE_STALE_PITRS=${KEEPALIVE_STALE_PITRS:-}
# "Time mode" of Firehose init command. Can be "live" or "pitr <pitr>";
# range is currently not supported.
# See https://flightaware.com/commercial/firehose/documentation/commands
# for more details.
- INIT_CMD_TIME=${INIT_CMD_TIME:-live}
# - INIT_CMD_TIME=${INIT_CMD_TIME:-}
# The "optional" section of the Firehose init command. Mostly consists of
# filters for the data. Do not put username, password, keepalive, or
# compression commands here. Documentation at
# https://flightaware.com/commercial/firehose/documentation/commands
- INIT_CMD_ARGS=${INIT_CMD_ARGS:-}
# - INIT_CMD_ARGS=${INIT_CMD_ARGS:-}

# PYTHON settings
- PYTHONUNBUFFERED=1
# Use a single topic for all events to ensure proper ordering per flight
- KAFKA_TOPIC_NAME=events
logging:
driver: "json-file"
options:
Expand Down

0 comments on commit 6f64230

Please sign in to comment.