Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ splitted fq/s3/test_s3.py tests #5798

Merged
merged 7 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions ydb/tests/fq/s3/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
S3_PID_FILE = "s3.pid"


class TestCounter:
def __init__(self, tests_count_limit, error_string):
self.tests_count_limit = tests_count_limit
self.error_string = error_string
self.number_tests = 0

def on_test_start(self):
self.number_tests += 1
assert self.number_tests <= self.tests_count_limit, \
f"{self.error_string} exceeded limit {self.number_tests} vs {self.tests_count_limit}, " \
"this may lead timeouts on CI, please split this file"


@pytest.fixture(scope="module")
def mvp_external_ydb_endpoint(request) -> str:
return request.param["endpoint"] if request is not None and hasattr(request, 'param') else None
Expand Down Expand Up @@ -86,14 +99,21 @@ def get_kikimr_extensions(s3: S3, yq_version: str, kikimr_settings, mvp_external


@pytest.fixture(scope="module")
def kikimr_yqv1(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint):
def kikimr_starts_counter():
return TestCounter(10, "Number kikimr restarts in one module")


@pytest.fixture(scope="module")
def kikimr_yqv1(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint, kikimr_starts_counter):
kikimr_starts_counter.on_test_start()
kikimr_extensions = get_kikimr_extensions(s3, YQV1_VERSION_NAME, kikimr_settings, mvp_external_ydb_endpoint)
with start_kikimr(kikimr_params, kikimr_extensions) as kikimr:
yield kikimr


@pytest.fixture(scope="module")
def kikimr_yqv2(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint):
def kikimr_yqv2(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint, kikimr_starts_counter):
kikimr_starts_counter.on_test_start()
kikimr_extensions = get_kikimr_extensions(s3, YQV2_VERSION_NAME, kikimr_settings, mvp_external_ydb_endpoint)
with start_kikimr(kikimr_params, kikimr_extensions) as kikimr:
yield kikimr
Expand All @@ -115,8 +135,14 @@ def kikimr(yq_version: str, kikimr_yqv1, kikimr_yqv2):
return kikimr


@pytest.fixture(scope="module")
def tests_counter():
return TestCounter(200, "Number tests in one module")


@pytest.fixture
def client(kikimr, request=None):
def client(kikimr, tests_counter, request=None):
tests_counter.on_test_start()
client = FederatedQueryClient(
request.param["folder_id"] if request is not None else "my_folder", streaming_over_kikimr=kikimr
)
Expand All @@ -128,8 +154,5 @@ def client(kikimr, request=None):

@pytest.fixture
def unique_prefix(request: pytest.FixtureRequest):
name_hash = hash(request.node.name)
if name_hash >= 0:
return f"p{name_hash}_"
else:
return f"n{-name_hash}_"
name_hash = abs(hash(request.node.name))
return f"h{name_hash}_{request.function.__name__}"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# -*- coding: utf-8 -*-

import boto3
import logging
import pytest

import ydb.public.api.protos.ydb_value_pb2 as ydb
Expand Down Expand Up @@ -346,270 +345,3 @@ def test_name_uniqueness_constraint(self, kikimr, client: FederatedQueryClient,
== "Connection with the same name already exists. Please choose another name"
)
assert modify_binding_result.issues[0].severity == 1

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("kikimr_settings", [{"bindings_mode": "BM_DROP_WITH_WARNING"}], indirect=True)
def test_s3_insert(self, kikimr, s3, client, yq_version, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("bindbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

kikimr.control_plane.wait_bootstrap(1)
connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id

fooType = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))
barType = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))
storage_binding_name = unique_prefix + "s3binding"
client.create_object_storage_binding(
name=storage_binding_name,
path="path1/",
format="csv_with_names",
connection_id=connection_id,
columns=[fooType, barType],
)

sql = fR'''
insert into bindings.`{storage_binding_name}`
select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
if yq_version == "v2":
issues = str(client.describe_query(query_id).result.query.issue)
assert (
"message: \"Please remove \\\'bindings.\\\' from your query, the support for this syntax will be dropped soon"
in issues
)
assert "severity: 2" in issues

sql = fR'''
select foo, bar from bindings.`{storage_binding_name}`;
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
if yq_version == "v2":
issues = str(client.describe_query(query_id).result.query.issue)
assert (
"message: \"Please remove \\\'bindings.\\\' from your query, the support for this syntax will be dropped soon"
in issues
)
assert "severity: 2" in issues

data = client.get_result_data(query_id)
result_set = data.result.result_set
assert len(result_set.columns) == 2
assert result_set.columns[0].name == "foo"
assert result_set.columns[0].type.type_id == ydb.Type.INT32
assert result_set.columns[1].name == "bar"
assert result_set.columns[1].type.type_id == ydb.Type.UTF8
assert len(result_set.rows) == 2
assert result_set.rows[0].items[0].int32_value == 123
assert result_set.rows[0].items[1].text_value == 'xxx'
assert result_set.rows[1].items[0].int32_value == 456
assert result_set.rows[1].items[1].text_value == 'yyy'

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_s3_format_mismatch(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("bindbucket")
bucket.create(ACL='public-read')

kikimr.control_plane.wait_bootstrap(1)
connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id

fooType = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))
barType = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))
storage_binding_name = unique_prefix + "s3binding"
client.create_object_storage_binding(
name=storage_binding_name,
path="path2/",
format="csv_with_names",
connection_id=connection_id,
columns=[fooType, barType],
)

sql = fR'''
insert into bindings.`{storage_binding_name}`
select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED])

describe_result = client.describe_query(query_id).result
describe_string = "{}".format(describe_result)
assert "Type mismatch between schema type" in describe_string, describe_string

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_pg_binding(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("fbucket")
bucket.create(ACL='public-read')

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

fruits = R'''Fruit,Price
Banana,3
Apple,2
Pear,15'''
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='a/fruits.csv', ContentType='text/plain')

kikimr.control_plane.wait_bootstrap(1)
connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket")

fruitType = ydb.Column(name="Fruit", type=ydb.Type(pg_type=ydb.PgType(oid=25)))
priceType = ydb.Column(name="Price", type=ydb.Type(pg_type=ydb.PgType(oid=23)))
storage_binding_name = unique_prefix + "my_binding"
client.create_object_storage_binding(
name=storage_binding_name,
path="a/",
format="csv_with_names",
connection_id=connection_response.result.connection_id,
columns=[fruitType, priceType],
format_setting={"file_pattern": "*.csv"},
)

sql = fR'''
SELECT *
FROM bindings.{storage_binding_name};
'''

query_id = client.create_query(
"simple", sql, type=fq.QueryContent.QueryType.ANALYTICS, pg_syntax=True
).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 2
assert result_set.columns[0].name == "Fruit"
assert result_set.columns[0].type.pg_type.oid == 25
assert result_set.columns[1].name == "Price"
assert result_set.columns[1].type.pg_type.oid == 23
assert len(result_set.rows) == 3
assert result_set.rows[0].items[0].text_value == "Banana"
assert result_set.rows[0].items[1].text_value == "3"
assert result_set.rows[1].items[0].text_value == "Apple"
assert result_set.rows[1].items[1].text_value == "2"
assert result_set.rows[2].items[0].text_value == "Pear"
assert result_set.rows[2].items[1].text_value == "15"

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("pg_syntax", [False, True], ids=["yql_syntax", "pg_syntax"])
def test_count_for_pg_binding(self, kikimr, s3, client, pg_syntax, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("count_for_pg_binding")
bucket.create(ACL='public-read')

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

row = R'''{"a": 42, "b": 3.14, "c": "text"}'''
s3_client.put_object(Body=row, Bucket='count_for_pg_binding', Key='abc.json', ContentType='text/json')

kikimr.control_plane.wait_bootstrap(1)
connection_response = client.create_storage_connection(unique_prefix + "abc", "count_for_pg_binding")

aType = ydb.Column(name="a", type=ydb.Type(pg_type=ydb.PgType(oid=23)))
bType = ydb.Column(name="b", type=ydb.Type(pg_type=ydb.PgType(oid=701)))
cType = ydb.Column(name="c", type=ydb.Type(pg_type=ydb.PgType(oid=25)))
storage_binding_name = unique_prefix + "binding_for_count"
client.create_object_storage_binding(
name=storage_binding_name,
path="abc.json",
format="json_each_row",
connection_id=connection_response.result.connection_id,
columns=[aType, bType, cType],
)

sql = fR'''
SELECT COUNT(*)
FROM bindings.{storage_binding_name};
'''

query_id = client.create_query(
"simple", sql, type=fq.QueryContent.QueryType.ANALYTICS, pg_syntax=pg_syntax
).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 1
assert len(result_set.rows) == 1
if pg_syntax:
assert result_set.columns[0].type.pg_type.oid == 20
assert result_set.rows[0].items[0].text_value == "1"
else:
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
assert result_set.rows[0].items[0].uint64_value == 1

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_ast_in_failed_query_compilation(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("bindbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

kikimr.control_plane.wait_bootstrap(1)
connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id

data_column = ydb.Column(name="data", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))
storage_binding_name = unique_prefix + "s3binding"
client.create_object_storage_binding(
name=storage_binding_name, path="/", format="raw", connection_id=connection_id, columns=[data_column]
)

sql = fR'''
SELECT some_unknown_column FROM bindings.`{storage_binding_name}`;
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)

ast = client.describe_query(query_id).result.query.ast.data
assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast"

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_raw_empty_schema_binding(self, kikimr, client, unique_prefix):
kikimr.control_plane.wait_bootstrap(1)
connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket")
binding_response = client.create_object_storage_binding(
name=unique_prefix + "my_binding",
path="fruits.csv",
format="raw",
connection_id=connection_response.result.connection_id,
columns=[],
check_issues=False,
)
assert "Only one column in schema supported in raw format" in str(binding_response.issues), str(
binding_response.issues
)
Loading
Loading