Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Improve filesystems tests coverage #1497

Open
wants to merge 12 commits into
base: branch-21.06
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ echo -e "${GREEN}Install RAPIDS dependencies${ENDCOLOR}"
conda install --yes -c rapidsai$CHANNEL -c nvidia -c conda-forge -c defaults dask-cuda=$RAPIDS_VERSION dask-cudf=$RAPIDS_VERSION cudf=$RAPIDS_VERSION ucx-py=$RAPIDS_VERSION ucx-proc=*=gpu cudatoolkit=$CUDA_VERSION

echo-e "${GREEN}Install E2E test dependencies${ENDCOLOR}"
pip install openpyxl pymysql gitpython pynvml gspread oauth2client
pip install openpyxl pymysql gitpython pynvml gspread oauth2client pydrill docker

if [ $? -eq 0 ]; then
echo -e "${GREEN}Installation complete${ENDCOLOR}"
Expand Down
2 changes: 2 additions & 0 deletions tests/BlazingSQLTest/Configuration/Settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def create_json():
"BLAZINGSQL_E2E_COMPARE_BY_PERCENTAJE", "false")
acceptableDifference = os.getenv(
"BLAZINGSQL_E2E_ACCEPTABLE_DIFERENCE", 0.01)
hadoopDirectory = os.getenv("HADOOP_HOME")

data["TestSettings"] = {
"dataDirectory": dataDirectory,
Expand All @@ -106,6 +107,7 @@ def create_json():
"googleStorageProjectId": googleStorageProjectId,
"googleStorageBucketName": googleStorageBucketName,
"googleStorageAdcJsonFile": googleStorageAdcJsonFile,
"hadoopDirectory": hadoopDirectory
}

data["RunSettings"] = {
Expand Down
16 changes: 7 additions & 9 deletions tests/BlazingSQLTest/EndToEndTests/fileSystemHdfsTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,27 @@ def executionTest():
krbticket = os.path.abspath(ktoken)
hdfs_host = "172.22.0.3"
hdfs_port = 9000
hdfs_driver = "libhdfs"
#hdfs_driver = "libhdfs"
print("Using krb ticket: " + krbticket)
result, error_msg, fs = bc.hdfs(
result = bc.hdfs(
authority,
host=hdfs_host,
port=hdfs_port,
user="jhs",
driver=hdfs_driver,
kerb_ticket=krbticket,
#driver=hdfs_driver,
kerb_ticket=krbticket
)

if result is False:
msg = (
"""WARNING: Could not connect to HDFS instance %s:%d using
driver %s, error was: %s"""
% (hdfs_host, hdfs_port, hdfs_driver, error_msg)
"""WARNING: Could not connect to HDFS instance %s:%d """
% (hdfs_host, hdfs_port)
)
print(msg)
print("WARNING: Will ignore " + queryType)
return

print("Success connection to HDFS:")
print(fs)
print("Success connection to HDFS")

hdfs_dir_data_lc = "hdfs://" + authority + dir_data_lc
print("TPCH files at: " + hdfs_dir_data_lc)
Expand Down
38 changes: 33 additions & 5 deletions tests/BlazingSQLTest/EndToEndTests/fileSystemS3Test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os

from blazingsql import DataType, S3EncryptionType
from Configuration import ExecutionMode
from Configuration import Settings as Settings
from DataBase import createSchema as cs
from pynvml import nvmlInit
from Runner import runTest
from Utils import Execution, gpuMemory, init_context, skip_test
from Utils import Execution, gpuMemory, init_context, skip_test, storageBackends


def main(dask_client, drill, dir_data_lc, bc, nRals):
Expand All @@ -13,25 +15,48 @@ def main(dask_client, drill, dir_data_lc, bc, nRals):

queryType = "File System S3"

def start_s3mock(access_key_id, secret_access_key):
print("Starting s3 mock server")
mount_path = Settings.data["TestSettings"]["dataDirectory"].replace('data/','')

container = storageBackends.start_backend(backendType="S3",
image='minio/minio',
environment=[f"MINIO_ACCESS_KEY={access_key_id}",
f"MINIO_SECRET_KEY={secret_access_key}"],
ports={ '9000/tcp': 9000 },
volumes={f'{mount_path}': {'bind': '/data', 'mode': 'rw'}},
command='server /data'
)

return container

def executionTest(queryType):
# Read Data TPCH------------------------------------------------------
authority = "tpch_s3"
authority = "tpch"

awsS3BucketName = Settings.data["TestSettings"]["awsS3BucketName"]
awsS3AccessKeyId = Settings.data["TestSettings"]["awsS3AccessKeyId"]
awsS3SecretKey = Settings.data["TestSettings"]["awsS3SecretKey"]
awsS3OverrideEndpoint = None

if not awsS3BucketName:
awsS3BucketName = "data"
awsS3OverrideEndpoint = "http://127.0.0.1:9000"

mock_server = start_s3mock(awsS3AccessKeyId, awsS3SecretKey)

bc.s3(
authority,
bucket_name=awsS3BucketName,
encryption_type=S3EncryptionType.NONE,
access_key_id=awsS3AccessKeyId,
secret_key=awsS3SecretKey,
endpoint_override=awsS3OverrideEndpoint
)

# dir_df = dir_data_lc[dir_data_lc.find("DataSet"):len(dir_data_lc)]

dir_data_lc = "s3://" + authority + "/" + "DataSet100Mb2part/"
dir_data_lc = "s3://" + authority + "/"

tables = ["nation", "region", "supplier", "customer",
"lineitem", "orders"]
Expand All @@ -56,7 +81,7 @@ def executionTest(queryType):
queryId = "TEST_01"
query = """select count(c_custkey) as c1, count(c_acctbal) as c2
from customer"""
query(
runTest.run_query(
bc,
drill,
query,
Expand Down Expand Up @@ -382,7 +407,7 @@ def executionTest(queryType):

queryId = "TEST_19"
query = """select sum(o_orderkey)/count(o_orderkey)
rom orders group by o_orderstatus"""
from orders group by o_orderstatus"""
runTest.run_query(
bc,
drill,
Expand Down Expand Up @@ -442,6 +467,9 @@ def executionTest(queryType):
print("==============================")
break

if mock_server:
storageBackends.stop_backend(mock_server)

executionTest(queryType)

end_mem = gpuMemory.capture_gpu_memory_usage()
Expand Down
59 changes: 54 additions & 5 deletions tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from collections import OrderedDict

from blazingsql import DataType
from DataBase import createSchema
from Configuration import ExecutionMode
from Configuration import Settings as Settings
from Runner import runTest
from Utils import gpuMemory, skip_test
from Utils import gpuMemory, skip_test, storageBackends
from EndToEndTests import tpchQueries


Expand Down Expand Up @@ -90,6 +89,47 @@ def define_samples(sample_list: [Sample], table_mapper = None):
"partsupp": 40000,
}

tpch_queries = [
"TEST_13",
"TEST_07",
"TEST_12",
"TEST_04",
"TEST_01",
]

# Parameter to indicate if its necessary to order
# the resulsets before compare them
worder = 1
use_percentage = False
acceptable_difference = 0.01

# example: {csv: {tb1: tb1_csv, ...}, parquet: {tb1: tb1_parquet, ...}}
datasource_tables = dict((ds, dict((t, t+"_"+str(ds).split(".")[1]) for t in tables)) for ds in data_types)

def start_database(databaseType, user, password, database, port):
if databaseType=="mysql":
image='mysql:8.0.24'
env = [f"MYSQL_USER={user}",
f"MYSQL_ROOT_PASSWORD={password}",
f"MYSQL_PASSWORD={password}",
f"MYSQL_DATABASE={database}"]
ports = { '3306/tcp': port }
command = '--local-infile=1'
if databaseType=="postgres":
image = 'postgres:9.6.21'
env = [f"POSTGRES_USER={user}",
f"POSTGRES_PASSWORD={password}",
f"POSTGRES_DB={database}"]
ports = { '5432/tcp': port }
command = None
container = storageBackends.start_backend(backendType=databaseType,
image=image,
environment=env,
ports=ports,
command=command
)

return container

def datasources(dask_client, nRals):
for fileSchemaType in data_types:
Expand Down Expand Up @@ -186,7 +226,6 @@ def run_queries(bc, dask_client, nRals, drill, spark, dir_data_lc, tables, **kwa
)
currrentFileSchemaType = fileSchemaType


def setup_test(data_type: DataType) -> createSchema.sql_connection:
sql = createSchema.get_sql_connection(data_type)
if not sql:
Expand All @@ -195,14 +234,20 @@ def setup_test(data_type: DataType) -> createSchema.sql_connection:

if data_type is DataType.MYSQL:
from DataBase import mysqlSchema
mysql_container = start_database(data_type.name, sql.username, sql.password, sql.database, sql.port)
mysqlSchema.create_and_load_tpch_schema(sql)
return sql
return sql, mysql_container

if data_type is DataType.SQLITE:
from DataBase import sqliteSchema
sqliteSchema.create_and_load_tpch_schema(sql)
return sql

if data_type is DataType.POSTGRESQL:
from DataBase import postgreSQLSchema
postgres_container = start_database(data_type.name, sql.username, sql.password, sql.database, sql.port)
postgreSQLSchema.create_and_load_tpch_schema(sql)
return sql, postgres_container

def executionTest(dask_client, drill, spark, dir_data_lc, bc, nRals, sql):
extra_args = {
Expand All @@ -225,9 +270,13 @@ def main(dask_client, drill, spark, dir_data_lc, bc, nRals):
if data_type not in [DataType.MYSQL, DataType.POSTGRESQL, DataType.SQLITE]:
is_file_ds = True
else:
sql = setup_test(data_type)
sql, container = setup_test(data_type)

if sql or is_file_ds:
start_mem = gpuMemory.capture_gpu_memory_usage()
executionTest(dask_client, drill, spark, dir_data_lc, bc, nRals, sql)
end_mem = gpuMemory.capture_gpu_memory_usage()
gpuMemory.log_memory_usage(queryType, start_mem, end_mem)

if container:
storageBackends.stop_backend(container)
3 changes: 2 additions & 1 deletion tests/BlazingSQLTest/Utils/startHadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def start_hdfs():


def stop_hdfs():
env = dict()
os.environ["PATH"] = os.getenv("PATH")
env = dict(os.environ)
command = ["docker-compose", "-f", COMPOSE_FILE, "down"]
print("Shutting down docker-compose ...")
proc = subprocess.Popen(" ".join(command), env=env, shell=True)
Expand Down
23 changes: 23 additions & 0 deletions tests/BlazingSQLTest/Utils/storageBackends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import docker
from time import sleep

def start_backend(backendType: str, **args):
try:
print(f"Starting backend for {backendType}")
client = docker.from_env()
container = client.containers.run(**args, detach=True, auto_remove=True)
sleep(20)
print(f"{backendType} Backend status:" + str(container.status))
return container
except docker.errors.DockerException as e:
raise Exception(e)
return None

def stop_backend(container):
try:
if container:
print("Stopping backend...")
container.kill()
except docker.errors.DockerException as e:
raise Exception(e)
return None
1 change: 0 additions & 1 deletion tests/BlazingSQLTest/manualTesting.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def run_query(bc, engine, query, queryId, queryType, worder, orderBy, acceptabl
stringResult = print_query_results(pdf, pdf2, acceptable_difference, use_percentage, engine)

return stringResult


def print_query_results(pdf1, pdf2, acceptable_difference, use_percentage, engine):

Expand Down