Skip to content

Commit

Permalink
Make function not access outer-state (#265)
Browse files Browse the repository at this point in the history
Co-authored-by: Filipe Regadas <[email protected]>
  • Loading branch information
live-wire and regadas authored Feb 11, 2023
1 parent 5c171d5 commit 31b7063
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions pkg/flink/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,14 @@ func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.Task
return abortBehavior, nil
}

func FlinkClusterTaskLogs(ctx context.Context, flinkClusterName string, flinkClusterNamespace string) ([]*core.TaskLog, error) {
type FlinkTaskLogsInput struct {
ClusterName string
Namespace string
}

func FlinkClusterTaskLogs(ctx context.Context, config *Config, fi FlinkTaskLogsInput) ([]*core.TaskLog, error) {
var taskLogs []*core.TaskLog

config := GetFlinkConfig()
p, err := logs.InitializeLogPlugins(&config.LogConfig)
if err != nil {
return nil, err
Expand All @@ -172,8 +176,8 @@ func FlinkClusterTaskLogs(ctx context.Context, flinkClusterName string, flinkClu
}

jobLog, err := p.GetTaskLogs(tasklog.Input{
PodName: flinkClusterName,
Namespace: flinkClusterNamespace,
PodName: fi.ClusterName,
Namespace: fi.Namespace,
LogName: "(Job)",
})
if err != nil {
Expand All @@ -187,7 +191,10 @@ func FlinkClusterTaskLogs(ctx context.Context, flinkClusterName string, flinkClu
func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluster) (*pluginsCore.TaskInfo, error) {
var taskLogs []*core.TaskLog

tl, err := FlinkClusterTaskLogs(ctx, flinkCluster.Name, flinkCluster.Namespace)
tl, err := FlinkClusterTaskLogs(ctx, GetFlinkConfig(), FlinkTaskLogsInput{
ClusterName: flinkCluster.Name,
Namespace: flinkCluster.Namespace,
})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 31b7063

Please sign in to comment.