Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Allow using pod start time in kubeflow plugin log links (#362)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Graetz <[email protected]>
  • Loading branch information
fg91 authored Jun 15, 2023
1 parent 318fa6b commit dfdf6f9
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 16 deletions.
31 changes: 25 additions & 6 deletions go/tasks/plugins/k8s/kfoperators/common/common_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
commonOp "github.com/kubeflow/common/pkg/apis/common/v1"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
Expand Down Expand Up @@ -104,8 +105,11 @@ func GetMPIPhaseInfo(currentCondition commonOp.JobCondition, occurredAt time.Tim
}

// GetLogs will return the logs for kubeflow job
func GetLogs(taskType string, name string, namespace string, hasMaster bool,
func GetLogs(taskType string, objectMeta meta_v1.ObjectMeta, hasMaster bool,
workersCount int32, psReplicasCount int32, chiefReplicasCount int32) ([]*core.TaskLog, error) {
name := objectMeta.Name
namespace := objectMeta.Namespace

taskLogs := make([]*core.TaskLog, 0, 10)

logPlugin, err := logs.InitializeLogPlugins(logs.GetLogConfig())
Expand All @@ -118,12 +122,23 @@ func GetLogs(taskType string, name string, namespace string, hasMaster bool,
return nil, nil
}

// We use the creation timestamp of the Kubeflow Job as a proxy for the start time of the pods
startTime := objectMeta.CreationTimestamp.Time.Unix()
// Don't have a good mechanism for this yet, but approximating with time.Now for now
finishTime := time.Now().Unix()
RFC3999StartTime := time.Unix(startTime, 0).Format(time.RFC3339)
RFC3999FinishTime := time.Unix(finishTime, 0).Format(time.RFC3339)

if taskType == PytorchTaskType && hasMaster {
masterTaskLog, masterErr := logPlugin.GetTaskLogs(
tasklog.Input{
PodName: name + "-master-0",
Namespace: namespace,
LogName: "master",
PodName: name + "-master-0",
Namespace: namespace,
LogName: "master",
PodRFC3339StartTime: RFC3999StartTime,
PodRFC3339FinishTime: RFC3999FinishTime,
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
},
)
if masterErr != nil {
Expand All @@ -135,8 +150,12 @@ func GetLogs(taskType string, name string, namespace string, hasMaster bool,
// get all workers log
for workerIndex := int32(0); workerIndex < workersCount; workerIndex++ {
workerLog, err := logPlugin.GetTaskLogs(tasklog.Input{
PodName: name + fmt.Sprintf("-worker-%d", workerIndex),
Namespace: namespace,
PodName: name + fmt.Sprintf("-worker-%d", workerIndex),
Namespace: namespace,
PodRFC3339StartTime: RFC3999StartTime,
PodRFC3339FinishTime: RFC3999FinishTime,
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
})
if err != nil {
return nil, err
Expand Down
40 changes: 37 additions & 3 deletions go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestExtractMPICurrentCondition(t *testing.T) {
Expand Down Expand Up @@ -167,18 +168,31 @@ func TestGetLogs(t *testing.T) {
workers := int32(1)
launcher := int32(1)

jobLogs, err := GetLogs(MPITaskType, "test", "mpi-namespace", false, workers, launcher, 0)
mpiJobObjectMeta := meta_v1.ObjectMeta{
Name: "test",
Namespace: "mpi-namespace",
}
jobLogs, err := GetLogs(MPITaskType, mpiJobObjectMeta, false, workers, launcher, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(jobLogs))
assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-worker-0/pod?namespace=mpi-namespace", "mpi-namespace", "test"), jobLogs[0].Uri)

jobLogs, err = GetLogs(PytorchTaskType, "test", "pytorch-namespace", true, workers, launcher, 0)
pytorchJobObjectMeta := meta_v1.ObjectMeta{
Name: "test",
Namespace: "pytorch-namespace",
}
jobLogs, err = GetLogs(PytorchTaskType, pytorchJobObjectMeta, true, workers, launcher, 0)
assert.NoError(t, err)
assert.Equal(t, 2, len(jobLogs))
assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-master-0/pod?namespace=pytorch-namespace", "pytorch-namespace", "test"), jobLogs[0].Uri)
assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-worker-0/pod?namespace=pytorch-namespace", "pytorch-namespace", "test"), jobLogs[1].Uri)

jobLogs, err = GetLogs(TensorflowTaskType, "test", "tensorflow-namespace", false, workers, launcher, 1)
tensorflowJobObjectMeta := meta_v1.ObjectMeta{
Name: "test",
Namespace: "tensorflow-namespace",
}

jobLogs, err = GetLogs(TensorflowTaskType, tensorflowJobObjectMeta, false, workers, launcher, 1)
assert.NoError(t, err)
assert.Equal(t, 3, len(jobLogs))
assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-worker-0/pod?namespace=tensorflow-namespace", "tensorflow-namespace", "test"), jobLogs[0].Uri)
Expand All @@ -187,6 +201,26 @@ func TestGetLogs(t *testing.T) {

}

func TestGetLogsTemplateUri(t *testing.T) {
assert.NoError(t, logs.SetLogConfig(&logs.LogConfig{
IsStackDriverEnabled: true,
StackDriverTemplateURI: "https://console.cloud.google.com/logs/query;query=resource.labels.pod_name={{.podName}}&timestamp>{{.podRFC3339StartTime}}",
}))

pytorchJobObjectMeta := meta_v1.ObjectMeta{
Name: "test",
Namespace: "pytorch-namespace",
CreationTimestamp: meta_v1.Time{
Time: time.Date(2022, time.January, 1, 12, 0, 0, 0, time.UTC),
},
}
jobLogs, err := GetLogs(PytorchTaskType, pytorchJobObjectMeta, true, 1, 0, 0)
assert.NoError(t, err)
assert.Equal(t, 2, len(jobLogs))
assert.Equal(t, fmt.Sprintf("https://console.cloud.google.com/logs/query;query=resource.labels.pod_name=%s-master-0&timestamp>%s", "test", "2022-01-01T12:00:00Z"), jobLogs[0].Uri)
assert.Equal(t, fmt.Sprintf("https://console.cloud.google.com/logs/query;query=resource.labels.pod_name=%s-worker-0&timestamp>%s", "test", "2022-01-01T12:00:00Z"), jobLogs[1].Uri)
}

func dummyPodSpec() v1.PodSpec {
return v1.PodSpec{
Containers: []v1.Container{
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/kfoperators/mpi/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext
numWorkers = app.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Replicas
numLauncherReplicas = app.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Replicas

taskLogs, err := common.GetLogs(common.MPITaskType, app.Name, app.Namespace, false,
taskLogs, err := common.GetLogs(common.MPITaskType, app.ObjectMeta, false,
*numWorkers, *numLauncherReplicas, 0)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func TestGetLogs(t *testing.T) {

mpiResourceHandler := mpiOperatorResourceHandler{}
mpiJob := dummyMPIJobResource(mpiResourceHandler, workers, launcher, slots, mpiOp.JobRunning)
jobLogs, err := common.GetLogs(common.MPITaskType, mpiJob.Name, mpiJob.Namespace, false, workers, launcher, 0)
jobLogs, err := common.GetLogs(common.MPITaskType, mpiJob.ObjectMeta, false, workers, launcher, 0)
assert.NoError(t, err)
assert.Equal(t, 2, len(jobLogs))
assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-worker-0/pod?namespace=mpi-namespace", jobNamespace, jobName), jobLogs[0].Uri)
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (pytorchOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginCont

workersCount := app.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas

taskLogs, err := common.GetLogs(common.PytorchTaskType, app.Name, app.Namespace, hasMaster, *workersCount, 0, 0)
taskLogs, err := common.GetLogs(common.PytorchTaskType, app.ObjectMeta, hasMaster, *workersCount, 0, 0)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestGetLogs(t *testing.T) {

pytorchResourceHandler := pytorchOperatorResourceHandler{}
pytorchJob := dummyPytorchJobResource(pytorchResourceHandler, workers, commonOp.JobRunning)
jobLogs, err := common.GetLogs(common.PytorchTaskType, pytorchJob.Name, pytorchJob.Namespace, hasMaster, workers, 0, 0)
jobLogs, err := common.GetLogs(common.PytorchTaskType, pytorchJob.ObjectMeta, hasMaster, workers, 0, 0)
assert.NoError(t, err)
assert.Equal(t, 3, len(jobLogs))
assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-master-0/pod?namespace=pytorch-namespace", jobNamespace, jobName), jobLogs[0].Uri)
Expand All @@ -440,7 +440,7 @@ func TestGetLogsElastic(t *testing.T) {

pytorchResourceHandler := pytorchOperatorResourceHandler{}
pytorchJob := dummyPytorchJobResource(pytorchResourceHandler, workers, commonOp.JobRunning)
jobLogs, err := common.GetLogs(common.PytorchTaskType, pytorchJob.Name, pytorchJob.Namespace, hasMaster, workers, 0, 0)
jobLogs, err := common.GetLogs(common.PytorchTaskType, pytorchJob.ObjectMeta, hasMaster, workers, 0, 0)
assert.NoError(t, err)
assert.Equal(t, 2, len(jobLogs))
assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-worker-0/pod?namespace=pytorch-namespace", jobNamespace, jobName), jobLogs[0].Uri)
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (tensorflowOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginC
psReplicasCount := app.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypePS].Replicas
chiefCount := app.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeChief].Replicas

taskLogs, err := common.GetLogs(common.TensorflowTaskType, app.Name, app.Namespace, false,
taskLogs, err := common.GetLogs(common.TensorflowTaskType, app.ObjectMeta, false,
*workersCount, *psReplicasCount, *chiefCount)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func TestGetLogs(t *testing.T) {

tensorflowResourceHandler := tensorflowOperatorResourceHandler{}
tensorFlowJob := dummyTensorFlowJobResource(tensorflowResourceHandler, workers, psReplicas, chiefReplicas, commonOp.JobRunning)
jobLogs, err := common.GetLogs(common.TensorflowTaskType, tensorFlowJob.Name, tensorFlowJob.Namespace, false,
jobLogs, err := common.GetLogs(common.TensorflowTaskType, tensorFlowJob.ObjectMeta, false,
workers, psReplicas, chiefReplicas)
assert.NoError(t, err)
assert.Equal(t, 4, len(jobLogs))
Expand Down

0 comments on commit dfdf6f9

Please sign in to comment.