Skip to content

Commit

Permalink
Merge 280f822 into d5dd2aa
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jun 26, 2024
2 parents d5dd2aa + 280f822 commit 524707a
Show file tree
Hide file tree
Showing 10 changed files with 1,364 additions and 1,293 deletions.
39 changes: 31 additions & 8 deletions ydb/tests/fq/s3/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
S3_PID_FILE = "s3.pid"


class TestCounter:
def __init__(self, tests_count_limit, error_string):
self.tests_count_limit = tests_count_limit
self.error_string = error_string
self.number_tests = 0

def on_test_start(self):
self.number_tests += 1
assert self.number_tests <= self.tests_count_limit, \
f"{self.error_string} exceeded limit {self.number_tests} vs {self.tests_count_limit}, " \
"this may lead timeouts on CI, please split this file"


@pytest.fixture(scope="module")
def mvp_external_ydb_endpoint(request) -> str:
return request.param["endpoint"] if request is not None and hasattr(request, 'param') else None
Expand Down Expand Up @@ -86,14 +99,21 @@ def get_kikimr_extensions(s3: S3, yq_version: str, kikimr_settings, mvp_external


@pytest.fixture(scope="module")
def kikimr_yqv1(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint):
def kikimr_starts_counter():
return TestCounter(10, "Number kikimr restarts in one module")


@pytest.fixture(scope="module")
def kikimr_yqv1(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint, kikimr_starts_counter):
kikimr_starts_counter.on_test_start()
kikimr_extensions = get_kikimr_extensions(s3, YQV1_VERSION_NAME, kikimr_settings, mvp_external_ydb_endpoint)
with start_kikimr(kikimr_params, kikimr_extensions) as kikimr:
yield kikimr


@pytest.fixture(scope="module")
def kikimr_yqv2(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint):
def kikimr_yqv2(kikimr_params: pytest.FixtureRequest, s3: S3, kikimr_settings, mvp_external_ydb_endpoint, kikimr_starts_counter):
kikimr_starts_counter.on_test_start()
kikimr_extensions = get_kikimr_extensions(s3, YQV2_VERSION_NAME, kikimr_settings, mvp_external_ydb_endpoint)
with start_kikimr(kikimr_params, kikimr_extensions) as kikimr:
yield kikimr
Expand All @@ -115,8 +135,14 @@ def kikimr(yq_version: str, kikimr_yqv1, kikimr_yqv2):
return kikimr


@pytest.fixture(scope="module")
def tests_counter():
return TestCounter(200, "Number tests in one module")


@pytest.fixture
def client(kikimr, request=None):
def client(kikimr, tests_counter, request=None):
tests_counter.on_test_start()
client = FederatedQueryClient(
request.param["folder_id"] if request is not None else "my_folder", streaming_over_kikimr=kikimr
)
Expand All @@ -128,8 +154,5 @@ def client(kikimr, request=None):

@pytest.fixture
def unique_prefix(request: pytest.FixtureRequest):
name_hash = hash(request.node.name)
if name_hash >= 0:
return f"p{name_hash}_"
else:
return f"n{-name_hash}_"
name_hash = abs(hash(request.node.name))
return f"h{name_hash}_{request.function.__name__}"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# -*- coding: utf-8 -*-

import boto3
import logging
import pytest

import ydb.public.api.protos.ydb_value_pb2 as ydb
Expand Down Expand Up @@ -346,270 +345,3 @@ def test_name_uniqueness_constraint(self, kikimr, client: FederatedQueryClient,
== "Connection with the same name already exists. Please choose another name"
)
assert modify_binding_result.issues[0].severity == 1

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("kikimr_settings", [{"bindings_mode": "BM_DROP_WITH_WARNING"}], indirect=True)
def test_s3_insert(self, kikimr, s3, client, yq_version, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("bindbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

kikimr.control_plane.wait_bootstrap(1)
connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id

fooType = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))
barType = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))
storage_binding_name = unique_prefix + "s3binding"
client.create_object_storage_binding(
name=storage_binding_name,
path="path1/",
format="csv_with_names",
connection_id=connection_id,
columns=[fooType, barType],
)

sql = fR'''
insert into bindings.`{storage_binding_name}`
select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
if yq_version == "v2":
issues = str(client.describe_query(query_id).result.query.issue)
assert (
"message: \"Please remove \\\'bindings.\\\' from your query, the support for this syntax will be dropped soon"
in issues
)
assert "severity: 2" in issues

sql = fR'''
select foo, bar from bindings.`{storage_binding_name}`;
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
if yq_version == "v2":
issues = str(client.describe_query(query_id).result.query.issue)
assert (
"message: \"Please remove \\\'bindings.\\\' from your query, the support for this syntax will be dropped soon"
in issues
)
assert "severity: 2" in issues

data = client.get_result_data(query_id)
result_set = data.result.result_set
assert len(result_set.columns) == 2
assert result_set.columns[0].name == "foo"
assert result_set.columns[0].type.type_id == ydb.Type.INT32
assert result_set.columns[1].name == "bar"
assert result_set.columns[1].type.type_id == ydb.Type.UTF8
assert len(result_set.rows) == 2
assert result_set.rows[0].items[0].int32_value == 123
assert result_set.rows[0].items[1].text_value == 'xxx'
assert result_set.rows[1].items[0].int32_value == 456
assert result_set.rows[1].items[1].text_value == 'yyy'

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_s3_format_mismatch(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("bindbucket")
bucket.create(ACL='public-read')

kikimr.control_plane.wait_bootstrap(1)
connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id

fooType = ydb.Column(name="foo", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.UTF8))
barType = ydb.Column(name="bar", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.INT32))
storage_binding_name = unique_prefix + "s3binding"
client.create_object_storage_binding(
name=storage_binding_name,
path="path2/",
format="csv_with_names",
connection_id=connection_id,
columns=[fooType, barType],
)

sql = fR'''
insert into bindings.`{storage_binding_name}`
select * from AS_TABLE([<|foo:123, bar:"xxx"u|>,<|foo:456, bar:"yyy"u|>]);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query(query_id, statuses=[fq.QueryMeta.FAILED])

describe_result = client.describe_query(query_id).result
describe_string = "{}".format(describe_result)
assert "Type mismatch between schema type" in describe_string, describe_string

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_pg_binding(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("fbucket")
bucket.create(ACL='public-read')

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

fruits = R'''Fruit,Price
Banana,3
Apple,2
Pear,15'''
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='a/fruits.csv', ContentType='text/plain')

kikimr.control_plane.wait_bootstrap(1)
connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket")

fruitType = ydb.Column(name="Fruit", type=ydb.Type(pg_type=ydb.PgType(oid=25)))
priceType = ydb.Column(name="Price", type=ydb.Type(pg_type=ydb.PgType(oid=23)))
storage_binding_name = unique_prefix + "my_binding"
client.create_object_storage_binding(
name=storage_binding_name,
path="a/",
format="csv_with_names",
connection_id=connection_response.result.connection_id,
columns=[fruitType, priceType],
format_setting={"file_pattern": "*.csv"},
)

sql = fR'''
SELECT *
FROM bindings.{storage_binding_name};
'''

query_id = client.create_query(
"simple", sql, type=fq.QueryContent.QueryType.ANALYTICS, pg_syntax=True
).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 2
assert result_set.columns[0].name == "Fruit"
assert result_set.columns[0].type.pg_type.oid == 25
assert result_set.columns[1].name == "Price"
assert result_set.columns[1].type.pg_type.oid == 23
assert len(result_set.rows) == 3
assert result_set.rows[0].items[0].text_value == "Banana"
assert result_set.rows[0].items[1].text_value == "3"
assert result_set.rows[1].items[0].text_value == "Apple"
assert result_set.rows[1].items[1].text_value == "2"
assert result_set.rows[2].items[0].text_value == "Pear"
assert result_set.rows[2].items[1].text_value == "15"

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("pg_syntax", [False, True], ids=["yql_syntax", "pg_syntax"])
def test_count_for_pg_binding(self, kikimr, s3, client, pg_syntax, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("count_for_pg_binding")
bucket.create(ACL='public-read')

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

row = R'''{"a": 42, "b": 3.14, "c": "text"}'''
s3_client.put_object(Body=row, Bucket='count_for_pg_binding', Key='abc.json', ContentType='text/json')

kikimr.control_plane.wait_bootstrap(1)
connection_response = client.create_storage_connection(unique_prefix + "abc", "count_for_pg_binding")

aType = ydb.Column(name="a", type=ydb.Type(pg_type=ydb.PgType(oid=23)))
bType = ydb.Column(name="b", type=ydb.Type(pg_type=ydb.PgType(oid=701)))
cType = ydb.Column(name="c", type=ydb.Type(pg_type=ydb.PgType(oid=25)))
storage_binding_name = unique_prefix + "binding_for_count"
client.create_object_storage_binding(
name=storage_binding_name,
path="abc.json",
format="json_each_row",
connection_id=connection_response.result.connection_id,
columns=[aType, bType, cType],
)

sql = fR'''
SELECT COUNT(*)
FROM bindings.{storage_binding_name};
'''

query_id = client.create_query(
"simple", sql, type=fq.QueryContent.QueryType.ANALYTICS, pg_syntax=pg_syntax
).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 1
assert len(result_set.rows) == 1
if pg_syntax:
assert result_set.columns[0].type.pg_type.oid == 20
assert result_set.rows[0].items[0].text_value == "1"
else:
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
assert result_set.rows[0].items[0].uint64_value == 1

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_ast_in_failed_query_compilation(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("bindbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

kikimr.control_plane.wait_bootstrap(1)
connection_id = client.create_storage_connection(unique_prefix + "bb", "bindbucket").result.connection_id

data_column = ydb.Column(name="data", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))
storage_binding_name = unique_prefix + "s3binding"
client.create_object_storage_binding(
name=storage_binding_name, path="/", format="raw", connection_id=connection_id, columns=[data_column]
)

sql = fR'''
SELECT some_unknown_column FROM bindings.`{storage_binding_name}`;
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)

ast = client.describe_query(query_id).result.query.ast.data
assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast"

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_raw_empty_schema_binding(self, kikimr, client, unique_prefix):
kikimr.control_plane.wait_bootstrap(1)
connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket")
binding_response = client.create_object_storage_binding(
name=unique_prefix + "my_binding",
path="fruits.csv",
format="raw",
connection_id=connection_response.result.connection_id,
columns=[],
check_issues=False,
)
assert "Only one column in schema supported in raw format" in str(binding_response.issues), str(
binding_response.issues
)
Loading

0 comments on commit 524707a

Please sign in to comment.