Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ls to RPC server #3384

Merged
merged 10 commits into from
May 27, 2021
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

### Under the hood
- Added logic for registry requests to raise a timeout error after a response hangs out for 30 seconds and 5 attempts have been made to reach the endpoint ([#3177](https://github.com/fishtown-analytics/dbt/issues/3177), [#3275](https://github.com/fishtown-analytics/dbt/pull/3275))
- Added support for invoking the `list` task via the RPC server ([#3311](https://github.com/fishtown-analytics/dbt/issues/3311), [#3384](https://github.com/fishtown-analytics/dbt/pull/3384))
- Added `unique_id` and `original_file_path` as keys to json responses from the `list` task ([#3356](https://github.com/fishtown-analytics/dbt/issues/3356), [#3384](https://github.com/fishtown-analytics/dbt/pull/3384))
- Use shutil.which so Windows can pick up git.bat as a git executable ([#3035](https://github.com/fishtown-analytics/dbt/issues/3035), [#3134](https://github.com/fishtown-analytics/dbt/issues/3134))
- Add `ssh-client` and update `git` version (using buster backports) in Docker image ([#3337](https://github.com/fishtown-analytics/dbt/issues/3337), [#3338](https://github.com/fishtown-analytics/dbt/pull/3338))

Expand Down
17 changes: 17 additions & 0 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ class RPCCompileParameters(RPCParameters):
state: Optional[str] = None


@dataclass
class RPCListParameters(RPCParameters):
resource_types: Optional[List[str]] = None
models: Union[None, str, List[str]] = None
exclude: Union[None, str, List[str]] = None
select: Union[None, str, List[str]] = None
selector: Optional[str] = None
output: Optional[str] = 'json'


@dataclass
class RPCRunParameters(RPCParameters):
threads: Optional[int] = None
Expand Down Expand Up @@ -190,6 +200,13 @@ class RemoteResult(VersionedSchema):
logs: List[LogMessage]


@dataclass
@schema_version('remote-list-results', 1)
class RemoteListResults(RemoteResult):
output: List[Any]
generated_at: datetime = field(default_factory=datetime.utcnow)


@dataclass
@schema_version('remote-deps-result', 1)
class RemoteDepsResult(RemoteResult):
Expand Down
7 changes: 5 additions & 2 deletions core/dbt/rpc/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
QueueTimeoutMessage,
)
from dbt.rpc.method import RemoteMethod

from dbt.task.rpc.project_commands import RemoteListTask

# we use this in typing only...
from queue import Queue # noqa
Expand Down Expand Up @@ -78,7 +78,10 @@ def _spawn_setup(self):

def task_exec(self) -> None:
"""task_exec runs first inside the child process"""
signal.signal(signal.SIGTERM, sigterm_handler)
if type(self.task) != RemoteListTask:
# TODO: find another solution for this.. in theory it stops us from
# being able to kill RemoteListTask processes
signal.signal(signal.SIGTERM, sigterm_handler)
# the first thing we do in a new process: push logging back over our
# queue
handler = QueueLogHandler(self.queue)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class BaseTask(metaclass=ABCMeta):

def __init__(self, args, config):
self.args = args
self.args.single_threaded = False
self.config = config

@classmethod
Expand Down
16 changes: 11 additions & 5 deletions core/dbt/task/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dbt.contracts.graph.parsed import (
ParsedExposure,
ParsedSourceDefinition,
ParsedSourceDefinition
)
from dbt.graph import (
parse_difference,
Expand Down Expand Up @@ -38,6 +38,8 @@ class ListTask(GraphRunnableTask):
'config',
'resource_type',
'source_name',
'original_file_path',
'unique_id'
))

def __init__(self, args, config):
Expand Down Expand Up @@ -120,7 +122,7 @@ def generate_paths(self):

def run(self):
ManifestTask._runtime_initialize(self)
output = self.config.args.output
output = self.args.output
if output == 'selector':
generator = self.generate_selectors
elif output == 'name':
Expand All @@ -133,7 +135,11 @@ def run(self):
raise InternalException(
'Invalid output {}'.format(output)
)
for result in generator():

return self.output_results(generator())

def output_results(self, results):
for result in results:
self.node_results.append(result)
print(result)
return self.node_results
Expand All @@ -143,10 +149,10 @@ def resource_types(self):
if self.args.models:
return [NodeType.Model]

values = set(self.config.args.resource_types)
if not values:
if not self.args.resource_types:
return list(self.DEFAULT_RESOURCE_VALUES)

values = set(self.args.resource_types)
if 'default' in values:
values.remove('default')
values.update(self.DEFAULT_RESOURCE_VALUES)
Expand Down
38 changes: 38 additions & 0 deletions core/dbt/task/rpc/project_commands.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Union
Expand All @@ -15,10 +16,13 @@
RPCTestParameters,
RemoteCatalogResults,
RemoteExecutionResult,
RemoteListResults,
RemoteRunOperationResult,
RPCSnapshotParameters,
RPCSourceFreshnessParameters,
RPCListParameters,
)
from dbt.exceptions import RuntimeException
from dbt.rpc.method import (
Parameters, RemoteManifestMethod
)
Expand All @@ -32,6 +36,7 @@
from dbt.task.seed import SeedTask
from dbt.task.snapshot import SnapshotTask
from dbt.task.test import TestTask
from dbt.task.list import ListTask

from .base import RPCTask
from .cli import HasCLI
Expand Down Expand Up @@ -258,3 +263,36 @@ def handle_request(self) -> GetManifestResult:

def interpret_results(self, results):
return results.manifest is not None


class RemoteListTask(
RPCCommandTask[RPCListParameters], ListTask
):
METHOD_NAME = 'list'

def set_args(self, params: RPCListParameters) -> None:

self.args.output = params.output
self.args.resource_types = self._listify(params.resource_types)
self.args.models = self._listify(params.models)
self.args.exclude = self._listify(params.exclude)
self.args.selector_name = params.selector
self.args.select = self._listify(params.select)

if self.args.models:
if self.args.select:
raise RuntimeException(
'"models" and "select" are mutually exclusive arguments'
)
if self.args.resource_types:
raise RuntimeException(
'"models" and "resource_type" are mutually exclusive '
'arguments'
)

@staticmethod
def output_results(results):
return RemoteListResults(
output=[json.loads(x) for x in results],
logs=None
)
25 changes: 25 additions & 0 deletions test/integration/047_dbt_ls_test/test_ls.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from test.integration.base import DBTIntegrationTest, use_profile
import os
from dbt.logger import log_manager
from test.integration.base import normalize

import json

Expand Down Expand Up @@ -94,6 +95,8 @@ def expect_snapshot_output(self):
'alias': None,
'check_cols': None,
},
'unique_id': 'snapshot.test.my_snapshot',
'original_file_path': normalize('snapshots/snapshot.sql'),
'alias': 'my_snapshot',
'resource_type': 'snapshot',
},
Expand Down Expand Up @@ -125,6 +128,8 @@ def expect_analyses_output(self):
'schema': None,
'alias': None,
},
'unique_id': 'analysis.test.a',
'original_file_path': normalize('analyses/a.sql'),
'alias': 'a',
'resource_type': 'analysis',
},
Expand Down Expand Up @@ -157,6 +162,8 @@ def expect_model_output(self):
'schema': None,
'alias': None,
},
'original_file_path': normalize('models/ephemeral.sql'),
'unique_id': 'model.test.ephemeral',
'alias': 'ephemeral',
'resource_type': 'model',
},
Expand All @@ -181,6 +188,8 @@ def expect_model_output(self):
'schema': None,
'alias': None,
},
'original_file_path': normalize('models/incremental.sql'),
'unique_id': 'model.test.incremental',
'alias': 'incremental',
'resource_type': 'model',
},
Expand All @@ -204,6 +213,8 @@ def expect_model_output(self):
'schema': None,
'alias': None,
},
'original_file_path': normalize('models/sub/inner.sql'),
'unique_id': 'model.test.inner',
'alias': 'inner',
'resource_type': 'model',
},
Expand All @@ -227,6 +238,8 @@ def expect_model_output(self):
'schema': None,
'alias': None,
},
'original_file_path': normalize('models/outer.sql'),
'unique_id': 'model.test.outer',
'alias': 'outer',
'resource_type': 'model',
},
Expand Down Expand Up @@ -261,6 +274,8 @@ def expect_model_ephemeral_output(self):
'schema': None,
'alias': None,
},
'unique_id': 'model.test.ephemeral',
'original_file_path': normalize('models/ephemeral.sql'),
'alias': 'outer',
'resource_type': 'model',
},
Expand All @@ -277,6 +292,8 @@ def expect_source_output(self):
'config': {
'enabled': True,
},
'unique_id': 'source.test.my_source.my_table',
'original_file_path': normalize('models/schema.yml'),
'package_name': 'test',
'name': 'my_table',
'source_name': 'my_source',
Expand Down Expand Up @@ -314,6 +331,8 @@ def expect_seed_output(self):
'schema': None,
'alias': None,
},
'unique_id': 'seed.test.seed',
'original_file_path': normalize('data/seed.csv'),
'alias': 'seed',
'resource_type': 'seed',
},
Expand Down Expand Up @@ -347,6 +366,8 @@ def expect_test_output(self):
'schema': None,
'alias': None,
},
'unique_id': 'test.test.not_null_outer_id.8f1c176a93',
'original_file_path': normalize('models/schema.yml'),
'alias': 'not_null_outer_id',
'resource_type': 'test',
},
Expand All @@ -371,6 +392,8 @@ def expect_test_output(self):
'schema': None,
'alias': None,
},
'unique_id': 'test.test.t',
'original_file_path': normalize('tests/t.sql'),
'alias': 't',
'resource_type': 'test',
},
Expand All @@ -395,6 +418,8 @@ def expect_test_output(self):
'schema': None,
'alias': None,
},
'unique_id': 'test.test.unique_outer_id.a653b29b17',
'original_file_path': normalize('models/schema.yml'),
'alias': 'unique_outer_id',
'resource_type': 'test',
},
Expand Down
Loading