diff --git a/granulate_utils/config_feeder/client/client.py b/granulate_utils/config_feeder/client/client.py index f2b5bdb6..4eeefcaa 100644 --- a/granulate_utils/config_feeder/client/client.py +++ b/granulate_utils/config_feeder/client/client.py @@ -22,6 +22,7 @@ CreateClusterRequest, CreateClusterResponse, ) +from granulate_utils.config_feeder.core.models.collection import CollectorType from granulate_utils.config_feeder.core.models.node import CreateNodeRequest, CreateNodeResponse, NodeCreate, NodeInfo from granulate_utils.config_feeder.core.models.yarn import NodeYarnConfigCreate @@ -38,6 +39,7 @@ def __init__( logger: Union[logging.Logger, logging.LoggerAdapter] = None, server_address: Optional[str] = None, yarn: bool = True, + collector=CollectorType.SAGENT, ) -> None: if not token or not service: raise ClientError("Token and service must be provided") @@ -45,6 +47,7 @@ def __init__( self._token = token self._service = service self._cluster_id: Optional[str] = None + self._collector = collector self._server_address: str = server_address.rstrip("/") if server_address else DEFAULT_API_SERVER_ADDRESS self._is_yarn_enabled = yarn self._yarn_collector = YarnConfigCollector() @@ -106,6 +109,7 @@ def _register_node( self.logger.debug(f"registering node {node.external_id}") request = CreateNodeRequest( node=NodeCreate( + collector=self._collector, external_id=node.external_id, is_master=node.is_master, ), @@ -118,6 +122,7 @@ def _register_cluster(self, provider: CloudProvider, external_id: str) -> None: self.logger.debug(f"registering cluster {external_id}") request = CreateClusterRequest( cluster=ClusterCreate( + collector=self._collector, service=self._service, provider=provider, external_id=external_id, @@ -143,7 +148,7 @@ def _get_yarn_config_if_changed(self, configs: CollectionResult) -> Optional[Nod if self._last_hash[ConfigType.YARN].get(configs.node.external_id) == configs.yarn_config_hash: self.logger.debug("YARN config is up to date") return None - return NodeYarnConfigCreate(config_json=json.dumps(configs.yarn_config.config)) + return NodeYarnConfigCreate(collector=self._collector, config_json=json.dumps(configs.yarn_config.config)) def _api_request( self, diff --git a/granulate_utils/config_feeder/core/models/cluster.py b/granulate_utils/config_feeder/core/models/cluster.py index e0eb0ed5..b3e1660e 100644 --- a/granulate_utils/config_feeder/core/models/cluster.py +++ b/granulate_utils/config_feeder/core/models/cluster.py @@ -4,6 +4,8 @@ from pydantic import BaseModel +from granulate_utils.config_feeder.core.models.collection import CollectorType + class BigDataPlatform(str, Enum): UNKOWN = "unknown" @@ -19,6 +21,7 @@ class CloudProvider(str, Enum): class ClusterBase(BaseModel): + collector: CollectorType provider: CloudProvider external_id: str diff --git a/granulate_utils/config_feeder/core/models/node.py b/granulate_utils/config_feeder/core/models/node.py index be16a345..f17098ce 100644 --- a/granulate_utils/config_feeder/core/models/node.py +++ b/granulate_utils/config_feeder/core/models/node.py @@ -4,6 +4,7 @@ from pydantic import BaseModel from granulate_utils.config_feeder.core.models.cluster import BigDataPlatform, CloudProvider +from granulate_utils.config_feeder.core.models.collection import CollectorType class NodeInfo(BaseModel): @@ -16,6 +17,7 @@ class NodeInfo(BaseModel): class NodeBase(BaseModel): + collector: CollectorType external_id: str is_master: bool = False diff --git a/granulate_utils/config_feeder/core/models/yarn.py b/granulate_utils/config_feeder/core/models/yarn.py index 5ec492da..f2524d39 100644 --- a/granulate_utils/config_feeder/core/models/yarn.py +++ b/granulate_utils/config_feeder/core/models/yarn.py @@ -3,8 +3,11 @@ from pydantic import BaseModel +from granulate_utils.config_feeder.core.models.collection import CollectorType + class NodeYarnConfigCreate(BaseModel): + collector: CollectorType config_json: str diff --git a/tests/granulate_utils/config_feeder/fixtures/api.py b/tests/granulate_utils/config_feeder/fixtures/api.py index eae2244c..8f24305b 100644 --- a/tests/granulate_utils/config_feeder/fixtures/api.py +++ b/tests/granulate_utils/config_feeder/fixtures/api.py @@ -48,6 +48,7 @@ def _configure_api_mock(self, mock: Any) -> None: "json": { "cluster": { "id": "cluster-1", + "collector": "sagent", "provider": "aws", "external_id": "j-1234567890", "ts": "2021-10-01T00:00:00Z", @@ -63,6 +64,7 @@ def _configure_api_mock(self, mock: Any) -> None: "json": { "node": { "id": "node-1", + "collector": "sagent", "external_id": "i-1234567890", "ts": "2021-10-01T00:00:00Z", } diff --git a/tests/granulate_utils/config_feeder/test_client.py b/tests/granulate_utils/config_feeder/test_client.py index 1c8ce904..adf42cff 100644 --- a/tests/granulate_utils/config_feeder/test_client.py +++ b/tests/granulate_utils/config_feeder/test_client.py @@ -28,6 +28,7 @@ def test_should_send_config_only_once_when_not_changed() -> None: assert len(requests[f"{API_URL}/clusters"]) == 1 assert requests[f"{API_URL}/clusters"][0].json() == { "cluster": { + "collector": "sagent", "service": "service1", "provider": "aws", "external_id": "j-1234567890", @@ -38,6 +39,7 @@ def test_should_send_config_only_once_when_not_changed() -> None: assert len(requests[f"{API_URL}/clusters/cluster-1/nodes"]) == 1 assert requests[f"{API_URL}/clusters/cluster-1/nodes"][0].json() == { "node": { + "collector": "sagent", "external_id": "i-1234567890", "is_master": True, }, @@ -46,7 +48,7 @@ def test_should_send_config_only_once_when_not_changed() -> None: assert len(requests[f"{API_URL}/nodes/node-1/configs"]) == 1 assert requests[f"{API_URL}/nodes/node-1/configs"][0].json() == { - "yarn_config": {"config_json": json.dumps(mock_yarn_config().config)}, + "yarn_config": {"collector": "sagent", "config_json": json.dumps(mock_yarn_config().config)}, }