Skip to content

Commit

Permalink
Merge pull request #3384 from fishtown-analytics/feature/ls_in_RPC
Browse files Browse the repository at this point in the history
Add `ls` to RPC server
  • Loading branch information
iknox-fa committed May 27, 2021
2 parents c0d757a + e68fd6e commit a565026
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

### Under the hood
- Added logic for registry requests to raise a timeout error after a response hangs out for 30 seconds and 5 attempts have been made to reach the endpoint ([#3177](https://github.com/fishtown-analytics/dbt/issues/3177), [#3275](https://github.com/fishtown-analytics/dbt/pull/3275))
- Added support for invoking the `list` task via the RPC server ([#3311](https://github.com/fishtown-analytics/dbt/issues/3311), [#3384](https://github.com/fishtown-analytics/dbt/pull/3384))
- Added `unique_id` and `original_file_path` as keys to json responses from the `list` task ([#3356](https://github.com/fishtown-analytics/dbt/issues/3356), [#3384](https://github.com/fishtown-analytics/dbt/pull/3384))
- Use shutil.which so Windows can pick up git.bat as a git executable ([#3035](https://github.com/fishtown-analytics/dbt/issues/3035), [#3134](https://github.com/fishtown-analytics/dbt/issues/3134))
- Add `ssh-client` and update `git` version (using buster backports) in Docker image ([#3337](https://github.com/fishtown-analytics/dbt/issues/3337), [#3338](https://github.com/fishtown-analytics/dbt/pull/3338))

Expand Down
17 changes: 17 additions & 0 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ class RPCCompileParameters(RPCParameters):
state: Optional[str] = None


@dataclass
class RPCListParameters(RPCParameters):
resource_types: Optional[List[str]] = None
models: Union[None, str, List[str]] = None
exclude: Union[None, str, List[str]] = None
select: Union[None, str, List[str]] = None
selector: Optional[str] = None
output: Optional[str] = 'json'


@dataclass
class RPCRunParameters(RPCParameters):
threads: Optional[int] = None
Expand Down Expand Up @@ -190,6 +200,13 @@ class RemoteResult(VersionedSchema):
logs: List[LogMessage]


@dataclass
@schema_version('remote-list-results', 1)
class RemoteListResults(RemoteResult):
output: List[Any]
generated_at: datetime = field(default_factory=datetime.utcnow)


@dataclass
@schema_version('remote-deps-result', 1)
class RemoteDepsResult(RemoteResult):
Expand Down
7 changes: 5 additions & 2 deletions core/dbt/rpc/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
QueueTimeoutMessage,
)
from dbt.rpc.method import RemoteMethod

from dbt.task.rpc.project_commands import RemoteListTask

# we use this in typing only...
from queue import Queue # noqa
Expand Down Expand Up @@ -78,7 +78,10 @@ def _spawn_setup(self):

def task_exec(self) -> None:
"""task_exec runs first inside the child process"""
signal.signal(signal.SIGTERM, sigterm_handler)
if type(self.task) != RemoteListTask:
# TODO: find another solution for this.. in theory it stops us from
# being able to kill RemoteListTask processes
signal.signal(signal.SIGTERM, sigterm_handler)
# the first thing we do in a new process: push logging back over our
# queue
handler = QueueLogHandler(self.queue)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class BaseTask(metaclass=ABCMeta):

def __init__(self, args, config):
self.args = args
self.args.single_threaded = False
self.config = config

@classmethod
Expand Down
16 changes: 11 additions & 5 deletions core/dbt/task/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dbt.contracts.graph.parsed import (
ParsedExposure,
ParsedSourceDefinition,
ParsedSourceDefinition
)
from dbt.graph import (
parse_difference,
Expand Down Expand Up @@ -38,6 +38,8 @@ class ListTask(GraphRunnableTask):
'config',
'resource_type',
'source_name',
'original_file_path',
'unique_id'
))

def __init__(self, args, config):
Expand Down Expand Up @@ -120,7 +122,7 @@ def generate_paths(self):

def run(self):
ManifestTask._runtime_initialize(self)
output = self.config.args.output
output = self.args.output
if output == 'selector':
generator = self.generate_selectors
elif output == 'name':
Expand All @@ -133,7 +135,11 @@ def run(self):
raise InternalException(
'Invalid output {}'.format(output)
)
for result in generator():

return self.output_results(generator())

def output_results(self, results):
for result in results:
self.node_results.append(result)
print(result)
return self.node_results
Expand All @@ -143,10 +149,10 @@ def resource_types(self):
if self.args.models:
return [NodeType.Model]

values = set(self.config.args.resource_types)
if not values:
if not self.args.resource_types:
return list(self.DEFAULT_RESOURCE_VALUES)

values = set(self.args.resource_types)
if 'default' in values:
values.remove('default')
values.update(self.DEFAULT_RESOURCE_VALUES)
Expand Down
38 changes: 38 additions & 0 deletions core/dbt/task/rpc/project_commands.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Union
Expand All @@ -15,10 +16,13 @@
RPCTestParameters,
RemoteCatalogResults,
RemoteExecutionResult,
RemoteListResults,
RemoteRunOperationResult,
RPCSnapshotParameters,
RPCSourceFreshnessParameters,
RPCListParameters,
)
from dbt.exceptions import RuntimeException
from dbt.rpc.method import (
Parameters, RemoteManifestMethod
)
Expand All @@ -32,6 +36,7 @@
from dbt.task.seed import SeedTask
from dbt.task.snapshot import SnapshotTask
from dbt.task.test import TestTask
from dbt.task.list import ListTask

from .base import RPCTask
from .cli import HasCLI
Expand Down Expand Up @@ -258,3 +263,36 @@ def handle_request(self) -> GetManifestResult:

def interpret_results(self, results):
return results.manifest is not None


class RemoteListTask(
RPCCommandTask[RPCListParameters], ListTask
):
METHOD_NAME = 'list'

def set_args(self, params: RPCListParameters) -> None:

self.args.output = params.output
self.args.resource_types = self._listify(params.resource_types)
self.args.models = self._listify(params.models)
self.args.exclude = self._listify(params.exclude)
self.args.selector_name = params.selector
self.args.select = self._listify(params.select)

if self.args.models:
if self.args.select:
raise RuntimeException(
'"models" and "select" are mutually exclusive arguments'
)
if self.args.resource_types:
raise RuntimeException(
'"models" and "resource_type" are mutually exclusive '
'arguments'
)

@staticmethod
def output_results(results):
return RemoteListResults(
output=[json.loads(x) for x in results],
logs=None
)
25 changes: 25 additions & 0 deletions test/integration/047_dbt_ls_test/test_ls.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from test.integration.base import DBTIntegrationTest, use_profile
import os
from dbt.logger import log_manager
from test.integration.base import normalize

import json

Expand Down Expand Up @@ -94,6 +95,8 @@ def expect_snapshot_output(self):
'alias': None,
'check_cols': None,
},
'unique_id': 'snapshot.test.my_snapshot',
'original_file_path': normalize('snapshots/snapshot.sql'),
'alias': 'my_snapshot',
'resource_type': 'snapshot',
},
Expand Down Expand Up @@ -125,6 +128,8 @@ def expect_analyses_output(self):
'schema': None,
'alias': None,
},
'unique_id': 'analysis.test.a',
'original_file_path': normalize('analyses/a.sql'),
'alias': 'a',
'resource_type': 'analysis',
},
Expand Down Expand Up @@ -157,6 +162,8 @@ def expect_model_output(self):
'schema': None,
'alias': None,
},
'original_file_path': normalize('models/ephemeral.sql'),
'unique_id': 'model.test.ephemeral',
'alias': 'ephemeral',
'resource_type': 'model',
},
Expand All @@ -181,6 +188,8 @@ def expect_model_output(self):
'schema': None,
'alias': None,
},
'original_file_path': normalize('models/incremental.sql'),
'unique_id': 'model.test.incremental',
'alias': 'incremental',
'resource_type': 'model',
},
Expand All @@ -204,6 +213,8 @@ def expect_model_output(self):
'schema': None,
'alias': None,
},
'original_file_path': normalize('models/sub/inner.sql'),
'unique_id': 'model.test.inner',
'alias': 'inner',
'resource_type': 'model',
},
Expand All @@ -227,6 +238,8 @@ def expect_model_output(self):
'schema': None,
'alias': None,
},
'original_file_path': normalize('models/outer.sql'),
'unique_id': 'model.test.outer',
'alias': 'outer',
'resource_type': 'model',
},
Expand Down Expand Up @@ -261,6 +274,8 @@ def expect_model_ephemeral_output(self):
'schema': None,
'alias': None,
},
'unique_id': 'model.test.ephemeral',
'original_file_path': normalize('models/ephemeral.sql'),
'alias': 'outer',
'resource_type': 'model',
},
Expand All @@ -277,6 +292,8 @@ def expect_source_output(self):
'config': {
'enabled': True,
},
'unique_id': 'source.test.my_source.my_table',
'original_file_path': normalize('models/schema.yml'),
'package_name': 'test',
'name': 'my_table',
'source_name': 'my_source',
Expand Down Expand Up @@ -314,6 +331,8 @@ def expect_seed_output(self):
'schema': None,
'alias': None,
},
'unique_id': 'seed.test.seed',
'original_file_path': normalize('data/seed.csv'),
'alias': 'seed',
'resource_type': 'seed',
},
Expand Down Expand Up @@ -348,6 +367,8 @@ def expect_test_output(self):
'schema': 'dbt_test__audit',
'alias': None,
},
'unique_id': 'test.test.not_null_outer_id.8f1c176a93',
'original_file_path': normalize('models/schema.yml'),
'alias': 'not_null_outer_id',
'resource_type': 'test',
},
Expand All @@ -373,6 +394,8 @@ def expect_test_output(self):
'schema': 'dbt_test__audit',
'alias': None,
},
'unique_id': 'test.test.t',
'original_file_path': normalize('tests/t.sql'),
'alias': 't',
'resource_type': 'test',
},
Expand All @@ -398,6 +421,8 @@ def expect_test_output(self):
'schema': 'dbt_test__audit',
'alias': None,
},
'unique_id': 'test.test.unique_outer_id.a653b29b17',
'original_file_path': normalize('models/schema.yml'),
'alias': 'unique_outer_id',
'resource_type': 'test',
},
Expand Down
Loading

0 comments on commit a565026

Please sign in to comment.