diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 2dc242e83d7c..7e8762e648a6 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -28,6 +28,7 @@ import io.airbyte.config.helpers.LogClientSingleton; import java.nio.file.Path; import java.util.Arrays; +import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -213,9 +214,11 @@ public String getTemporalHost() { @Override public Set getTemporalWorkerPorts() { - return Arrays.stream(getEnvOrDefault(TEMPORAL_WORKER_PORTS, "").split(",")) - .map(Integer::valueOf) - .collect(Collectors.toSet()); + var ports = getEnvOrDefault(TEMPORAL_WORKER_PORTS, ""); + if (ports.isEmpty()) { + return new HashSet<>(); + } + return Arrays.stream(ports.split(",")).map(Integer::valueOf).collect(Collectors.toSet()); } @Override diff --git a/airbyte-integrations/connectors/source-jira/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-jira/integration_tests/configured_catalog.json index 4ca9b4d98170..b109549379f8 100644 --- a/airbyte-integrations/connectors/source-jira/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-jira/integration_tests/configured_catalog.json @@ -7406,53 +7406,30 @@ "name": "labels", "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", - "type": [ - "object", - "null" - ], + "type": ["object", "null"], "properties": { "id": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "key": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "value": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "name": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "desc": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "type": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] } }, "additionalProperties": true }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", diff --git a/airbyte-integrations/connectors/source-jira/integration_tests/full_configured_catalog.json b/airbyte-integrations/connectors/source-jira/integration_tests/full_configured_catalog.json index b046cca6ad40..ba70e74a4d04 100644 --- a/airbyte-integrations/connectors/source-jira/integration_tests/full_configured_catalog.json +++ b/airbyte-integrations/connectors/source-jira/integration_tests/full_configured_catalog.json @@ -9593,53 +9593,30 @@ "name": "labels", "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", - "type": [ - "object", - "null" - ], + "type": ["object", "null"], "properties": { "id": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "key": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "value": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "name": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "desc": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "type": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] } }, "additionalProperties": true }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", diff --git a/airbyte-integrations/connectors/source-jira/integration_tests/labels_catalog.json b/airbyte-integrations/connectors/source-jira/integration_tests/labels_catalog.json index b19557de2b82..1dcd2e27d680 100644 --- a/airbyte-integrations/connectors/source-jira/integration_tests/labels_catalog.json +++ b/airbyte-integrations/connectors/source-jira/integration_tests/labels_catalog.json @@ -1,39 +1,38 @@ { - "streams": [ - { - "stream": { - "name": "labels", - "json_schema": { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": ["object", "null"], - "properties": { - "id": { - "type": ["string", "null"] - }, - "key": { - "type": ["string", "null"] - }, - "value": { - "type": ["string", "null"] - }, - "name": { - "type": ["string", "null"] - }, - "desc": { - "type": ["string", "null"] - }, - "type": { - "type": ["string", "null"] - } + "streams": [ + { + "stream": { + "name": "labels", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": ["object", "null"], + "properties": { + "id": { + "type": ["string", "null"] }, - "additionalProperties": true + "key": { + "type": ["string", "null"] + }, + "value": { + "type": ["string", "null"] + }, + "name": { + "type": ["string", "null"] + }, + "desc": { + "type": ["string", "null"] + }, + "type": { + "type": ["string", "null"] + } }, - "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false + "additionalProperties": true }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - } - ] - } - \ No newline at end of file + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-jira/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-jira/sample_files/configured_catalog.json index 456f49837ee6..c9d03e764db2 100644 --- a/airbyte-integrations/connectors/source-jira/sample_files/configured_catalog.json +++ b/airbyte-integrations/connectors/source-jira/sample_files/configured_catalog.json @@ -69,9 +69,7 @@ "additionalProperties": false, "description": "Details of an application role." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -137,9 +135,7 @@ "additionalProperties": false, "description": "List of system avatars." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -329,12 +325,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -1392,10 +1383,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -1510,10 +1498,7 @@ "type": "string", "description": "The default assignee when creating issues for this project.", "readOnly": true, - "enum": [ - "PROJECT_LEAD", - "UNASSIGNED" - ] + "enum": ["PROJECT_LEAD", "UNASSIGNED"] }, "versions": { "type": "array", @@ -1733,11 +1718,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -1748,10 +1729,7 @@ "type": "string", "description": "The type of the project.", "readOnly": true, - "enum": [ - "classic", - "next-gen" - ] + "enum": ["classic", "next-gen"] }, "favourite": { "type": "boolean", @@ -1808,11 +1786,7 @@ }, "globalHierarchyLevel": { "type": "string", - "enum": [ - "SUBTASK", - "BASE", - "EPIC" - ] + "enum": ["SUBTASK", "BASE", "EPIC"] } } } @@ -1905,12 +1879,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -2153,12 +2122,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -2469,10 +2433,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -2502,11 +2463,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -2624,9 +2581,7 @@ "additionalProperties": false, "description": "Details of a dashboard." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -2682,12 +2637,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -3004,12 +2954,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -4067,10 +4012,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -4185,10 +4127,7 @@ "type": "string", "description": "The default assignee when creating issues for this project.", "readOnly": true, - "enum": [ - "PROJECT_LEAD", - "UNASSIGNED" - ] + "enum": ["PROJECT_LEAD", "UNASSIGNED"] }, "versions": { "type": "array", @@ -4408,11 +4347,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -4423,10 +4358,7 @@ "type": "string", "description": "The type of the project.", "readOnly": true, - "enum": [ - "classic", - "next-gen" - ] + "enum": ["classic", "next-gen"] }, "favourite": { "type": "boolean", @@ -4483,11 +4415,7 @@ }, "globalHierarchyLevel": { "type": "string", - "enum": [ - "SUBTASK", - "BASE", - "EPIC" - ] + "enum": ["SUBTASK", "BASE", "EPIC"] } } } @@ -4580,12 +4508,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -4828,12 +4751,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -5144,10 +5062,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -5177,11 +5092,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -5327,12 +5238,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -5564,9 +5470,7 @@ "additionalProperties": false, "description": "Details of a filter." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -5654,12 +5558,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -5917,12 +5816,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -6174,12 +6068,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -6422,12 +6311,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -6717,10 +6601,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -6750,11 +6631,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -6835,10 +6712,7 @@ "type": "string", "description": "The default assignee when creating issues for this project.", "readOnly": true, - "enum": [ - "PROJECT_LEAD", - "UNASSIGNED" - ] + "enum": ["PROJECT_LEAD", "UNASSIGNED"] }, "versions": { "type": "array", @@ -7058,11 +6932,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -7073,10 +6943,7 @@ "type": "string", "description": "The type of the project.", "readOnly": true, - "enum": [ - "classic", - "next-gen" - ] + "enum": ["classic", "next-gen"] }, "favourite": { "type": "boolean", @@ -7133,11 +7000,7 @@ }, "globalHierarchyLevel": { "type": "string", - "enum": [ - "SUBTASK", - "BASE", - "EPIC" - ] + "enum": ["SUBTASK", "BASE", "EPIC"] } } } @@ -7230,12 +7093,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -7478,12 +7336,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -7794,10 +7647,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -7827,11 +7677,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -7941,9 +7787,7 @@ "additionalProperties": false, "description": "Details of a share permission for the filter." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -7994,11 +7838,7 @@ "type": { "type": "string", "description": "The type of the group label.", - "enum": [ - "ADMIN", - "SINGLE", - "MULTIPLE" - ] + "enum": ["ADMIN", "SINGLE", "MULTIPLE"] } } } @@ -8014,9 +7854,7 @@ "additionalProperties": false, "description": "The list of groups found in a search, including header text (Showing X of Y matching groups) and total of matched groups." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -8122,18 +7960,12 @@ }, "additionalProperties": false }, - "supported_sync_modes": [ - "incremental" - ], + "supported_sync_modes": ["incremental"], "source_defined_cursor": true, - "default_cursor_field": [ - "created" - ] + "default_cursor_field": ["created"] }, "sync_mode": "incremental", - "cursor_field": [ - "created" - ], + "cursor_field": ["created"], "destination_sync_mode": "append" }, { @@ -8197,9 +8029,7 @@ "additionalProperties": true, "description": "A comment." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -8256,10 +8086,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -8289,11 +8116,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -8402,9 +8225,7 @@ "additionalProperties": false, "description": "Details about a field." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -8438,9 +8259,7 @@ "additionalProperties": false, "description": "Details of a field configuration." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -8477,9 +8296,7 @@ "additionalProperties": false, "description": "The details of a custom field context." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -8531,9 +8348,7 @@ "additionalProperties": false, "description": "A list of issue link type beans." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -8558,9 +8373,7 @@ "additionalProperties": false, "description": "Details of an issue navigator column item." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -8717,10 +8530,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -8966,10 +8776,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -9185,10 +8992,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -9218,11 +9022,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -9291,9 +9091,7 @@ "additionalProperties": false, "description": "Details about a notification scheme." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -9334,9 +9132,7 @@ "additionalProperties": true, "description": "An issue priority." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -9360,9 +9156,7 @@ "additionalProperties": false, "description": "An entity property, for more information see [Entity properties](https://developer.atlassian.com/cloud/jira/platform/jira-entity-properties/)." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -9475,9 +9269,7 @@ "additionalProperties": false, "description": "Details of an issue remote link." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -9511,9 +9303,7 @@ "additionalProperties": false, "description": "Details of an issue resolution." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -9595,9 +9385,7 @@ "additionalProperties": false, "description": "List of security schemes." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -9634,9 +9422,7 @@ "additionalProperties": false, "description": "Details of an issue type scheme." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -9665,9 +9451,7 @@ "additionalProperties": false, "description": "Details of an issue type screen scheme." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -9723,12 +9507,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -9941,9 +9720,7 @@ "additionalProperties": false, "description": "The details of votes on an issue." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -10058,9 +9835,7 @@ "additionalProperties": false, "description": "The details of watchers on an issue." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -10259,10 +10034,7 @@ "type": { "type": "string", "description": "Whether visibility of this item is restricted to a group or role.", - "enum": [ - "group", - "role" - ] + "enum": ["group", "role"] }, "value": { "type": "string", @@ -10314,18 +10086,12 @@ "additionalProperties": true, "description": "Details of a worklog." }, - "supported_sync_modes": [ - "incremental" - ], + "supported_sync_modes": ["incremental"], "source_defined_cursor": true, - "default_cursor_field": [ - "startedAfter" - ] + "default_cursor_field": ["startedAfter"] }, "sync_mode": "incremental", - "cursor_field": [ - "startedAfter" - ], + "cursor_field": ["startedAfter"], "destination_sync_mode": "append" }, { @@ -10377,9 +10143,7 @@ "additionalProperties": false, "description": "Details of an application property." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -10390,53 +10154,30 @@ "name": "labels", "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", - "type": [ - "object", - "null" - ], + "type": ["object", "null"], "properties": { "id": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "key": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "value": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "name": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "desc": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "type": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] } }, "additionalProperties": true }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -10458,9 +10199,7 @@ "additionalProperties": false, "description": "Details about permissions." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -10513,10 +10252,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -10546,11 +10282,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -10666,9 +10398,7 @@ "additionalProperties": false, "description": "List of all permission schemes." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -10736,10 +10466,7 @@ "type": "string", "description": "The default assignee when creating issues for this project.", "readOnly": true, - "enum": [ - "PROJECT_LEAD", - "UNASSIGNED" - ] + "enum": ["PROJECT_LEAD", "UNASSIGNED"] }, "versions": { "type": "array", @@ -10773,11 +10500,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -10788,10 +10511,7 @@ "type": "string", "description": "The type of the project.", "readOnly": true, - "enum": [ - "classic", - "next-gen" - ] + "enum": ["classic", "next-gen"] }, "favourite": { "type": "boolean", @@ -10868,9 +10588,7 @@ "additionalProperties": false, "description": "Details about a project." }, - "supported_sync_modes": [ - "full_refresh" - ] + "supported_sync_modes": ["full_refresh"] }, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" @@ -10984,9 +10702,7 @@ "additionalProperties": false, "description": "List of project avatars." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -11022,9 +10738,7 @@ "additionalProperties": false, "description": "A project category." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -11096,12 +10810,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -11338,12 +11047,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -11579,12 +11283,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -11817,9 +11516,7 @@ "description": "Details about a component with a count of the issues it contains." } }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -11840,9 +11537,7 @@ "additionalProperties": false, "description": "A project's sender email address." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -11914,9 +11609,7 @@ "additionalProperties": false, "description": "Details about a security scheme." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -11958,9 +11651,7 @@ "additionalProperties": false, "description": "Details about a project type." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -12118,9 +11809,7 @@ } } }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -12160,10 +11849,7 @@ "type": "string", "description": "The type of scope.", "readOnly": true, - "enum": [ - "PROJECT", - "TEMPLATE" - ] + "enum": ["PROJECT", "TEMPLATE"] }, "project": { "description": "The project the item has scope in.", @@ -12193,11 +11879,7 @@ "type": "string", "description": "The [project type](https://confluence.atlassian.com/x/GwiiLQ#Jiraapplicationsoverview-Productfeaturesandprojecttypes) of the project.", "readOnly": true, - "enum": [ - "software", - "service_desk", - "business" - ] + "enum": ["software", "service_desk", "business"] }, "simplified": { "type": "boolean", @@ -12266,9 +11948,7 @@ "description": "A screen." } }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -12279,9 +11959,7 @@ "name": "screen_tabs", "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", - "required": [ - "name" - ], + "required": ["name"], "type": "object", "properties": { "id": { @@ -12298,9 +11976,7 @@ "additionalProperties": false, "description": "A screen tab." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -12326,9 +12002,7 @@ "additionalProperties": false, "description": "A screen tab field." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -12387,9 +12061,7 @@ "description": "A screen scheme." } }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -12400,9 +12072,7 @@ "name": "time_tracking", "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", - "required": [ - "key" - ], + "required": ["key"], "type": "object", "properties": { "key": { @@ -12422,9 +12092,7 @@ "additionalProperties": false, "description": "Details about the time tracking provider." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -12456,12 +12124,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -12516,9 +12179,7 @@ "additionalProperties": false, "description": "A user with details as permitted by the user's Atlassian Account privacy settings. However, be aware of these exceptions:\n\n * User record deleted from Atlassian: This occurs as the result of a right to be forgotten request. In this case, `displayName` provides an indication and other parameters have default values or are blank (for example, email is blank).\n * User record corrupted: This occurs as a results of events such as a server import and can only happen to deleted users. In this case, `accountId` returns *unknown* and all other parameters have fallback values.\n * User record unavailable: This usually occurs due to an internal service outage. In this case, all parameters have fallback values." }, - "supported_sync_modes": [ - "full_refresh" - ] + "supported_sync_modes": ["full_refresh"] }, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" @@ -12579,11 +12240,7 @@ "type": { "type": "string", "description": "The type of the transition.", - "enum": [ - "global", - "initial", - "directed" - ] + "enum": ["global", "initial", "directed"] }, "screen": { "type": "object", @@ -12680,9 +12337,7 @@ "description": "Details about a workflow." } }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -12766,12 +12421,7 @@ "type": "string", "description": "The user account type. Can take the following values:\n\n * `atlassian` regular Atlassian user account\n * `app` system account used for Connect applications and OAuth to represent external systems\n * `customer` Jira Service Desk account representing an external service desk", "readOnly": true, - "enum": [ - "atlassian", - "app", - "customer", - "unknown" - ] + "enum": ["atlassian", "app", "customer", "unknown"] }, "name": { "type": "string", @@ -13002,9 +12652,7 @@ "description": "Details about a workflow scheme." } }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -13079,9 +12727,7 @@ "additionalProperties": true, "description": "A status." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", @@ -13124,13 +12770,11 @@ "additionalProperties": true, "description": "A status category." }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" } ] -} \ No newline at end of file +} diff --git a/airbyte-integrations/connectors/source-jira/sample_files/full_configured_catalog.json b/airbyte-integrations/connectors/source-jira/sample_files/full_configured_catalog.json index b046cca6ad40..ba70e74a4d04 100644 --- a/airbyte-integrations/connectors/source-jira/sample_files/full_configured_catalog.json +++ b/airbyte-integrations/connectors/source-jira/sample_files/full_configured_catalog.json @@ -9593,53 +9593,30 @@ "name": "labels", "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", - "type": [ - "object", - "null" - ], + "type": ["object", "null"], "properties": { "id": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "key": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "value": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "name": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "desc": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "type": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] } }, "additionalProperties": true }, - "supported_sync_modes": [ - "full_refresh" - ], + "supported_sync_modes": ["full_refresh"], "source_defined_cursor": false }, "sync_mode": "full_refresh", diff --git a/airbyte-integrations/connectors/source-jira/source_jira/schemas/labels.json b/airbyte-integrations/connectors/source-jira/source_jira/schemas/labels.json index 309e12cba628..5430832a7379 100644 --- a/airbyte-integrations/connectors/source-jira/source_jira/schemas/labels.json +++ b/airbyte-integrations/connectors/source-jira/source_jira/schemas/labels.json @@ -1,44 +1,23 @@ { - "type": [ - "object", - "null" - ], + "type": ["object", "null"], "properties": { "id": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "key": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "value": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "name": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "desc": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] }, "type": { - "type": [ - "string", - "null" - ] + "type": ["string", "null"] } }, "additionalProperties": true diff --git a/airbyte-integrations/connectors/source-us-census/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-us-census/integration_tests/integration_test.py index d5cc95f945ff..3bfe91c63765 100644 --- a/airbyte-integrations/connectors/source-us-census/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-us-census/integration_tests/integration_test.py @@ -25,4 +25,3 @@ def test_hello_world(): assert True - diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 54664d034204..1523b293fd88 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -44,6 +44,7 @@ import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; import io.airbyte.workers.process.DockerProcessFactory; +import io.airbyte.workers.process.KubePortManagerSingleton; import io.airbyte.workers.process.KubeProcessFactory; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.process.WorkerHeartbeatServer; @@ -62,10 +63,8 @@ import java.time.Instant; import java.util.Map; import java.util.Optional; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -88,7 +87,7 @@ public class SchedulerApp { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class); private static final long GRACEFUL_SHUTDOWN_SECONDS = 30; - private static final int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads()); + private static int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads()); private static final Duration SCHEDULING_DELAY = Duration.ofSeconds(5); private static final Duration CLEANING_DELAY = Duration.ofHours(2); private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build(); @@ -177,11 +176,10 @@ private static ProcessFactory getProcessBuilderFactory(Configs configs) throws I if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { final ApiClient officialClient = Config.defaultClient(); final KubernetesClient fabricClient = new DefaultKubernetesClient(); - final BlockingQueue workerPorts = new LinkedBlockingDeque<>(configs.getTemporalWorkerPorts()); final String localIp = InetAddress.getLocalHost().getHostAddress(); final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT; LOGGER.info("Using Kubernetes namespace: {}", configs.getKubeNamespace()); - return new KubeProcessFactory(configs.getKubeNamespace(), officialClient, fabricClient, kubeHeartbeatUrl, workerPorts); + return new KubeProcessFactory(configs.getKubeNamespace(), officialClient, fabricClient, kubeHeartbeatUrl); } else { return new DockerProcessFactory( configs.getWorkspaceRoot(), @@ -222,6 +220,13 @@ public static void main(String[] args) throws IOException, InterruptedException final JobNotifier jobNotifier = new JobNotifier(configs.getWebappUrl(), configRepository); if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { + var supportedWorkers = KubePortManagerSingleton.getSupportedWorkers(); + if (supportedWorkers < SUBMITTER_NUM_THREADS) { + LOGGER.warn("{} workers configured with only {} ports available. Insufficient ports. Setting workers to {}.", SUBMITTER_NUM_THREADS, + KubePortManagerSingleton.getNumAvailablePorts(), supportedWorkers); + SUBMITTER_NUM_THREADS = supportedWorkers; + } + Map mdc = MDC.getCopyOfContextMap(); Executors.newSingleThreadExecutor().submit( () -> { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java index 7c4b97a0a78f..f8cc0c506bee 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -108,12 +108,12 @@ static void gentleCloseWithHeartbeat(final Process process, final Duration checkHeartbeatDuration, final Duration forcedShutdownDuration, final BiConsumer forceShutdown) { - while (process.isAlive() && heartbeatMonitor.isBeating()) { try { if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { LOGGER.debug("Gently closing process {} with heartbeat..", process.info().commandLine().get()); } + process.waitFor(checkHeartbeatDuration.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.error("Exception while waiting for process to finish", e); @@ -125,6 +125,7 @@ static void gentleCloseWithHeartbeat(final Process process, if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { LOGGER.debug("Gently closing process {} without heartbeat..", process.info().commandLine().get()); } + process.waitFor(gracefulShutdownDuration.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.error("Exception during grace period for process to finish. This can happen when cancelling jobs."); @@ -136,6 +137,7 @@ static void gentleCloseWithHeartbeat(final Process process, if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { LOGGER.debug("Force shutdown process {}..", process.info().commandLine().get()); } + forceShutdown.accept(process, forcedShutdownDuration); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index cd39f015d17f..91c574ce3cf1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -63,7 +63,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import org.apache.commons.io.output.NullOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,7 +135,6 @@ public class KubePodProcess extends Process { private Integer returnCode = null; private Long lastStatusCheck = null; - private final Consumer portReleaser; private final ServerSocket stdoutServerSocket; private final int stdoutLocalPort; private final ServerSocket stderrServerSocket; @@ -286,7 +284,6 @@ private static void waitForInitPodToRun(KubernetesClient client, Pod podDefiniti public KubePodProcess(ApiClient officialClient, KubernetesClient fabricClient, - Consumer portReleaser, String podName, String namespace, String image, @@ -300,7 +297,6 @@ public KubePodProcess(ApiClient officialClient, final String... args) throws IOException, InterruptedException { this.fabricClient = fabricClient; - this.portReleaser = portReleaser; this.stdoutLocalPort = stdoutLocalPort; this.stderrLocalPort = stderrLocalPort; @@ -528,20 +524,40 @@ public Info info() { /** * Close all open resource in the opposite order of resource creation. + * + * Null checks exist because certain local Kube clusters (e.g. Docker for Desktop) back this + * implementation with OS processes and resources, which are automatically reaped by the OS. */ private void close() { - Exceptions.swallow(this.stdin::close); - Exceptions.swallow(this.stdout::close); - Exceptions.swallow(this.stderr::close); + if (this.stdin != null) { + Exceptions.swallow(this.stdin::close); + } + if (this.stdout != null) { + Exceptions.swallow(this.stdout::close); + } + if (this.stderr != null) { + Exceptions.swallow(this.stderr::close); + } Exceptions.swallow(this.stdoutServerSocket::close); Exceptions.swallow(this.stderrServerSocket::close); Exceptions.swallow(this.executorService::shutdownNow); - try { - portReleaser.accept(stdoutLocalPort); - portReleaser.accept(stderrLocalPort); - } catch (Exception e) { - LOGGER.error("Error releasing ports ", e); + + var stdoutPortReleased = KubePortManagerSingleton.offer(stdoutLocalPort); + if (!stdoutPortReleased) { + LOGGER.warn( + "Error while releasing port {} from pod {}. This can cause the scheduler to run out of ports. Ignore this error if running Kubernetes on docker for desktop.", + stdoutLocalPort, + podDefinition.getMetadata().getName()); + } + + var stderrPortReleased = KubePortManagerSingleton.offer(stderrLocalPort); + if (!stderrPortReleased) { + LOGGER.warn( + "Error while releasing port {} from pod {}. This can cause the scheduler to run out of ports. Ignore this error if running Kubernetes on docker for desktop", + stderrLocalPort, + podDefinition.getMetadata().getName()); } + LOGGER.debug("Closed {}", podDefinition.getMetadata().getName()); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePortManagerSingleton.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePortManagerSingleton.java new file mode 100644 index 000000000000..42406e01c0a5 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePortManagerSingleton.java @@ -0,0 +1,77 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.workers.process; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.config.EnvConfigs; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Convenience wrapper around a thread-safe BlockingQueue. Keeps track of available ports for Kube + * Pod Processes. + * + * Although this data structure can do without the wrapper class, this class allows easier testing + * via the {@link #getNumAvailablePorts()} function. + * + * The singleton pattern clarifies that only one copy of this class is intended to exists per + * scheduler deployment. + */ +public class KubePortManagerSingleton { + + private static final Logger LOGGER = LoggerFactory.getLogger(KubePortManagerSingleton.class); + private static final int MAX_PORTS_PER_WORKER = 4; // A sync has two workers. Each worker requires 2 ports. + private static BlockingQueue workerPorts = new LinkedBlockingDeque<>(new EnvConfigs().getTemporalWorkerPorts()); + + public static Integer take() throws InterruptedException { + return workerPorts.poll(10, TimeUnit.MINUTES); + } + + public static boolean offer(Integer port) { + if (!workerPorts.contains(port)) { + workerPorts.add(port); + return true; + } + return false; + } + + public static int getNumAvailablePorts() { + return workerPorts.size(); + } + + public static int getSupportedWorkers() { + return workerPorts.size() / MAX_PORTS_PER_WORKER; + } + + @VisibleForTesting + protected static void setWorkerPorts(Set ports) { + workerPorts = new LinkedBlockingDeque<>(ports); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index c5f7a502957b..7c6b07321510 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -31,8 +31,6 @@ import io.kubernetes.client.openapi.ApiClient; import java.nio.file.Path; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.function.Consumer; import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +43,6 @@ public class KubeProcessFactory implements ProcessFactory { private final ApiClient officialClient; private final KubernetesClient fabricClient; private final String kubeHeartbeatUrl; - private final BlockingQueue workerPorts; /** * @param namespace kubernetes namespace where spawned pods will live @@ -58,13 +55,11 @@ public class KubeProcessFactory implements ProcessFactory { public KubeProcessFactory(String namespace, ApiClient officialClient, KubernetesClient fabricClient, - String kubeHeartbeatUrl, - BlockingQueue workerPorts) { + String kubeHeartbeatUrl) { this.namespace = namespace; this.officialClient = officialClient; this.fabricClient = fabricClient; this.kubeHeartbeatUrl = kubeHeartbeatUrl; - this.workerPorts = workerPorts; } @Override @@ -80,27 +75,18 @@ public Process create(String jobId, throws WorkerException { try { // used to differentiate source and destination processes with the same id and attempt + final String podName = createPodName(imageName, jobId, attempt); - final int stdoutLocalPort = workerPorts.take(); + final int stdoutLocalPort = KubePortManagerSingleton.take(); LOGGER.info("{} stdoutLocalPort = {}", podName, stdoutLocalPort); - final int stderrLocalPort = workerPorts.take(); + final int stderrLocalPort = KubePortManagerSingleton.take(); LOGGER.info("{} stderrLocalPort = {}", podName, stderrLocalPort); - final Consumer portReleaser = port -> { - if (!workerPorts.contains(port)) { - workerPorts.add(port); - LOGGER.info("{} releasing: {}", podName, port); - } else { - LOGGER.info("{} skipping releasing: {}", podName, port); - } - }; - return new KubePodProcess( officialClient, fabricClient, - portReleaser, podName, namespace, imageName, diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java index aa7858a3fedc..66bdf5a68b8d 100644 --- a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java @@ -46,8 +46,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; import org.apache.commons.lang.RandomStringUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -58,12 +56,10 @@ public class KubePodProcessIntegrationTest { private static final boolean IS_MINIKUBE = Boolean.parseBoolean(Optional.ofNullable(System.getenv("IS_MINIKUBE")).orElse("false")); private List openPorts; - private List openWorkerPorts; private int heartbeatPort; private String heartbeatUrl; private ApiClient officialClient; private KubernetesClient fabricClient; - private BlockingQueue workerPorts; private KubeProcessFactory processFactory; private static WorkerHeartbeatServer server; @@ -71,14 +67,14 @@ public class KubePodProcessIntegrationTest { @BeforeEach public void setup() throws Exception { openPorts = new ArrayList<>(getOpenPorts(5)); - openWorkerPorts = openPorts.subList(1, openPorts.size() - 1); + KubePortManagerSingleton.setWorkerPorts(new HashSet<>(openPorts.subList(1, openPorts.size() - 1))); + heartbeatPort = openPorts.get(0); heartbeatUrl = getHost() + ":" + heartbeatPort; officialClient = Config.defaultClient(); fabricClient = new DefaultKubernetesClient(); - workerPorts = new LinkedBlockingDeque<>(openWorkerPorts); - processFactory = new KubeProcessFactory("default", officialClient, fabricClient, heartbeatUrl, workerPorts); + processFactory = new KubeProcessFactory("default", officialClient, fabricClient, heartbeatUrl); server = new WorkerHeartbeatServer(heartbeatPort); server.startBackground(); @@ -92,50 +88,59 @@ public void teardown() throws Exception { @Test public void testSuccessfulSpawning() throws Exception { // start a finite process + var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); final Process process = getProcess("echo hi; sleep 1; echo hi2"); process.waitFor(); // the pod should be dead and in a good state assertFalse(process.isAlive()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); assertEquals(0, process.exitValue()); } @Test public void testPipeInEntrypoint() throws Exception { // start a process that has a pipe in the entrypoint + var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); final Process process = getProcess("echo hi | cat"); process.waitFor(); // the pod should be dead and in a good state assertFalse(process.isAlive()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); assertEquals(0, process.exitValue()); } @Test public void testExitCodeRetrieval() throws Exception { // start a process that requests + var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); final Process process = getProcess("exit 10"); process.waitFor(); // the pod should be dead with the correct error code assertFalse(process.isAlive()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); assertEquals(10, process.exitValue()); } @Test public void testMissingEntrypoint() throws WorkerException, InterruptedException { // start a process with an entrypoint that doesn't exist + var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); final Process process = getProcess("ksaiiiasdfjklaslkei"); process.waitFor(); // the pod should be dead and in an error state assertFalse(process.isAlive()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); assertEquals(127, process.exitValue()); } @Test public void testKillingWithoutHeartbeat() throws Exception { // start an infinite process + var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); final Process process = getProcess("while true; do echo hi; sleep 1; done"); // kill the heartbeat server @@ -146,6 +151,7 @@ public void testKillingWithoutHeartbeat() throws Exception { // the pod should be dead and in an error state assertFalse(process.isAlive()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); assertNotEquals(0, process.exitValue()); } diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index 0e78f4536f1d..ffaf8413e548 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -16,7 +16,7 @@ DB_DOCKER_MOUNT=airbyte_db # Temporal.io worker configuration TEMPORAL_HOST=airbyte-temporal-svc:7233 -TEMPORAL_WORKER_PORTS=9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029,9030 +TEMPORAL_WORKER_PORTS=9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029,9030,9031,9032,9033,9034,9035,9036,9037,9038,9039,9040 # Workspace storage for running jobs (logs, etc) WORKSPACE_ROOT=/workspace diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index e74606059d49..782bcdac8e17 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -16,7 +16,7 @@ DB_DOCKER_MOUNT=airbyte_db # Temporal.io worker configuration TEMPORAL_HOST=airbyte-temporal-svc:7233 -TEMPORAL_WORKER_PORTS=9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029,9030 +TEMPORAL_WORKER_PORTS=9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029,9030,9031,9032,9033,9034,9035,9036,9037,9038,9039,9040 # Workspace storage for running jobs (logs, etc) WORKSPACE_ROOT=/workspace diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index e74606059d49..782bcdac8e17 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -16,7 +16,7 @@ DB_DOCKER_MOUNT=airbyte_db # Temporal.io worker configuration TEMPORAL_HOST=airbyte-temporal-svc:7233 -TEMPORAL_WORKER_PORTS=9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029,9030 +TEMPORAL_WORKER_PORTS=9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029,9030,9031,9032,9033,9034,9035,9036,9037,9038,9039,9040 # Workspace storage for running jobs (logs, etc) WORKSPACE_ROOT=/workspace