-
Notifications
You must be signed in to change notification settings - Fork 4.1k
/
KubeProcessFactory.java
164 lines (145 loc) · 6.26 KB
/
KubeProcessFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.workers.process;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.config.AllowedHosts;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.exception.WorkerException;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.net.InetAddress;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KubeProcessFactory implements ProcessFactory {
@VisibleForTesting
public static final int KUBE_NAME_LEN_LIMIT = 63;
private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class);
private final WorkerConfigs workerConfigs;
private final String namespace;
private final KubernetesClient fabricClient;
private final String kubeHeartbeatUrl;
private final String processRunnerHost;
private final boolean isOrchestrator;
/**
* Sets up a process factory with the default processRunnerHost.
*/
public KubeProcessFactory(final WorkerConfigs workerConfigs,
final String namespace,
final KubernetesClient fabricClient,
final String kubeHeartbeatUrl,
final boolean isOrchestrator) {
this(
workerConfigs,
namespace,
fabricClient,
kubeHeartbeatUrl,
Exceptions.toRuntime(() -> InetAddress.getLocalHost().getHostAddress()),
isOrchestrator);
}
/**
* @param namespace kubernetes namespace where spawned pods will live
* @param fabricClient fabric8 kubernetes client
* @param kubeHeartbeatUrl a url where if the response is not 200 the spawned process will fail
* itself
* @param processRunnerHost is the local host or ip of the machine running the process factory.
* injectable for testing.
* @param isOrchestrator determines if this should run as airbyte-admin
*/
@VisibleForTesting
public KubeProcessFactory(final WorkerConfigs workerConfigs,
final String namespace,
final KubernetesClient fabricClient,
final String kubeHeartbeatUrl,
final String processRunnerHost,
final boolean isOrchestrator) {
this.workerConfigs = workerConfigs;
this.namespace = namespace;
this.fabricClient = fabricClient;
this.kubeHeartbeatUrl = kubeHeartbeatUrl;
this.processRunnerHost = processRunnerHost;
this.isOrchestrator = isOrchestrator;
}
@Override
public Process create(
final String jobType,
final String jobId,
final int attempt,
final Path jobRoot,
final String imageName,
final boolean isCustomConnector,
final boolean usesStdin,
final Map<String, String> files,
final String entrypoint,
final ResourceRequirements resourceRequirements,
final AllowedHosts allowedHosts,
final Map<String, String> customLabels,
final Map<String, String> jobMetadata,
final Map<Integer, Integer> internalToExternalPorts,
final String... args)
throws WorkerException {
try {
// used to differentiate source and destination processes with the same id and attempt
final String podName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, KUBE_NAME_LEN_LIMIT);
LOGGER.info("Attempting to start pod = {} for {} with resources {} and allowed hosts {}", podName, imageName, resourceRequirements,
allowedHosts);
final int stdoutLocalPort = KubePortManagerSingleton.getInstance().take();
LOGGER.info("{} stdoutLocalPort = {}", podName, stdoutLocalPort);
final int stderrLocalPort = KubePortManagerSingleton.getInstance().take();
LOGGER.info("{} stderrLocalPort = {}", podName, stderrLocalPort);
final var allLabels = getLabels(jobId, attempt, customLabels);
// If using isolated pool, check workerConfigs has isolated pool set. If not set, fall back to use
// regular node pool.
final var nodeSelectors =
isCustomConnector ? workerConfigs.getWorkerIsolatedKubeNodeSelectors().orElse(workerConfigs.getworkerKubeNodeSelectors())
: workerConfigs.getworkerKubeNodeSelectors();
return new KubePodProcess(
isOrchestrator,
processRunnerHost,
fabricClient,
podName,
namespace,
imageName,
workerConfigs.getJobImagePullPolicy(),
workerConfigs.getSidecarImagePullPolicy(),
stdoutLocalPort,
stderrLocalPort,
kubeHeartbeatUrl,
usesStdin,
files,
entrypoint,
resourceRequirements,
workerConfigs.getJobImagePullSecrets(),
workerConfigs.getWorkerKubeTolerations(),
nodeSelectors,
allLabels,
workerConfigs.getWorkerKubeAnnotations(),
workerConfigs.getJobSocatImage(),
workerConfigs.getJobBusyboxImage(),
workerConfigs.getJobCurlImage(),
MoreMaps.merge(jobMetadata, workerConfigs.getEnvMap()),
internalToExternalPorts,
args).toProcess();
} catch (final Exception e) {
throw new WorkerException(e.getMessage(), e);
}
}
/**
* Returns general labels to be applied to all Kubernetes pods. All general labels should be added
* here.
*/
public static Map<String, String> getLabels(final String jobId, final int attemptId, final Map<String, String> customLabels) {
final var allLabels = new HashMap<>(customLabels);
final var generalKubeLabels = Map.of(
Metadata.JOB_LABEL_KEY, jobId,
Metadata.ATTEMPT_LABEL_KEY, String.valueOf(attemptId),
Metadata.WORKER_POD_LABEL_KEY, Metadata.WORKER_POD_LABEL_VALUE);
allLabels.putAll(generalKubeLabels);
return allLabels;
}
}