-
Notifications
You must be signed in to change notification settings - Fork 421
/
tasks_endpoint.py
97 lines (81 loc) · 3.96 KB
/
tasks_endpoint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import logging
from typing import List, Optional, Tuple, TYPE_CHECKING
from tableauserverclient.server.endpoint.endpoint import Endpoint, api
from tableauserverclient.server.endpoint.exceptions import MissingRequiredFieldError
from tableauserverclient.models import TaskItem, PaginationItem
from tableauserverclient.server import RequestFactory
from tableauserverclient.helpers.logging import logger
if TYPE_CHECKING:
from tableauserverclient.server.request_options import RequestOptions
class Tasks(Endpoint):
@property
def baseurl(self) -> str:
return "{0}/sites/{1}/tasks".format(self.parent_srv.baseurl, self.parent_srv.site_id)
def __normalize_task_type(self, task_type: str) -> str:
"""
The word for extract refresh used in API URL is "extractRefreshes".
It is different than the tag "extractRefresh" used in the request body.
"""
if task_type == TaskItem.Type.ExtractRefresh:
return "{}es".format(task_type)
else:
return task_type
@api(version="2.6")
def get(
self, req_options: Optional["RequestOptions"] = None, task_type: str = TaskItem.Type.ExtractRefresh
) -> Tuple[List[TaskItem], PaginationItem]:
if task_type == TaskItem.Type.DataAcceleration:
self.parent_srv.assert_at_least_version("3.8", "Data Acceleration Tasks")
logger.info("Querying all %s tasks for the site", task_type)
url = "{0}/{1}".format(self.baseurl, self.__normalize_task_type(task_type))
server_response = self.get_request(url, req_options)
pagination_item = PaginationItem.from_response(server_response.content, self.parent_srv.namespace)
all_tasks = TaskItem.from_response(server_response.content, self.parent_srv.namespace, task_type)
return all_tasks, pagination_item
@api(version="2.6")
def get_by_id(self, task_id: str) -> TaskItem:
if not task_id:
error = "No Task ID provided"
raise ValueError(error)
logger.info("Querying a single task by id %s", task_id)
url = "{}/{}/{}".format(
self.baseurl,
self.__normalize_task_type(TaskItem.Type.ExtractRefresh),
task_id,
)
server_response = self.get_request(url)
return TaskItem.from_response(server_response.content, self.parent_srv.namespace)[0]
@api(version="3.19")
def create(self, extract_item: TaskItem) -> TaskItem:
if not extract_item:
error = "No extract refresh provided"
raise ValueError(error)
logger.info("Creating an extract refresh %s", extract_item)
url = "{0}/{1}".format(self.baseurl, self.__normalize_task_type(TaskItem.Type.ExtractRefresh))
create_req = RequestFactory.Task.create_extract_req(extract_item)
server_response = self.post_request(url, create_req)
return server_response.content
@api(version="2.6")
def run(self, task_item: TaskItem) -> bytes:
if not task_item.id:
error = "Task item missing ID."
raise MissingRequiredFieldError(error)
url = "{0}/{1}/{2}/runNow".format(
self.baseurl,
self.__normalize_task_type(TaskItem.Type.ExtractRefresh),
task_item.id,
)
run_req = RequestFactory.Task.run_req(task_item)
server_response = self.post_request(url, run_req)
return server_response.content # Todo add typing
# Delete 1 task by id
@api(version="3.6")
def delete(self, task_id: str, task_type: str = TaskItem.Type.ExtractRefresh) -> None:
if task_type == TaskItem.Type.DataAcceleration:
self.parent_srv.assert_at_least_version("3.8", "Data Acceleration Tasks")
if not task_id:
error = "No Task ID provided"
raise ValueError(error)
url = "{0}/{1}/{2}".format(self.baseurl, self.__normalize_task_type(task_type), task_id)
self.delete_request(url)
logger.info("Deleted single task (ID: %s)", task_id)