From 27806d018fec56c2fa69efece6d4774d6f1ef24c Mon Sep 17 00:00:00 2001 From: "songyang.song" Date: Wed, 8 Feb 2023 09:41:02 +0800 Subject: [PATCH] Feat: enhance saectl exec cmd Signed-off-by: songyang.song --- saectl/internal/cmd/exec/exec.go | 20 ++++-- saectl/internal/cmd/exec/stream/stream.go | 80 +++++++++++++++++------ saectl/internal/cmd/scale/scale.go | 2 +- 3 files changed, 75 insertions(+), 27 deletions(-) diff --git a/saectl/internal/cmd/exec/exec.go b/saectl/internal/cmd/exec/exec.go index 0311fb7..d8cdbb8 100644 --- a/saectl/internal/cmd/exec/exec.go +++ b/saectl/internal/cmd/exec/exec.go @@ -65,6 +65,8 @@ type Options struct { podName string namespace string podClient coreclient.PodsGetter + + cmd []string } func (o *Options) Complete(f util.AliCloudFactory, argsIn []string, ioStreams genericclioptions.IOStreams) error { @@ -122,12 +124,19 @@ func (o *Options) Run() error { return err } fn := func() error { - return o.Stream(tokenId) + e, err := o.NewExecutor(tokenId) + if err != nil { + return err + } + //_, _ = e.Exec("curl 127.0.0.1:5555") + _, _ = e.Exec("c") + return nil + //return e.Stream() } return o.tty.Safe(fn) } -func (o *Options) Stream(tokenId string) error { +func (o *Options) NewExecutor(tokenId string) (*stream.Executor, error) { wsUrl := &url.URL{ Scheme: "wss", Host: "sae-webshell.console.aliyun.com", @@ -139,15 +148,14 @@ func (o *Options) Stream(tokenId string) error { } c, err := websocket.Dial(wsUrl.String(), "", "https://sae.console.aliyun.com") if err != nil { - return err + return nil, err } - e := stream.NewExecutor(c, stream.Option{ + return stream.NewExecutor(c, stream.Option{ Stdin: o.In, Stdout: o.Out, StdErr: nil, TTY: o.tty.Raw, - }) - return e.Stream() + }), nil } func (o *Options) GetWebShellToken(appId string, podName string) (string, error) { diff --git a/saectl/internal/cmd/exec/stream/stream.go b/saectl/internal/cmd/exec/stream/stream.go index 084067f..e0c51ee 100644 --- a/saectl/internal/cmd/exec/stream/stream.go +++ b/saectl/internal/cmd/exec/stream/stream.go @@ -2,6 +2,7 @@ package stream import ( "context" + "fmt" "io" "strings" "sync" @@ -31,12 +32,36 @@ func NewExecutor(conn *websocket.Conn, op Option) *Executor { } } +func (e *Executor) Exec(cmd string) (string, error) { + stop := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + + go func(ctx context.Context) { + websocket.Message.Send(e.Conn, cmd+"\n") + for i := 1; i <= 2; i++ { + websocket.Message.Send(e.Conn, "exit\n") + } + }(ctx) + + stdOutBuffer := NewStdOutBuffer() + e.copyStdout(stop, stdOutBuffer) + + select { + case <-stop: + cancel() + } + + r := stdOutBuffer.String() + fmt.Println(r) + return r, nil +} + func (e *Executor) Stream() error { stop := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) - e.copyStdin(ctx) - e.copyStdout(stop) + e.copyStdin(ctx, e.Stdin) + e.copyStdout(stop, e.Stdout) select { case <-stop: @@ -45,28 +70,22 @@ func (e *Executor) Stream() error { } } -func (e *Executor) copyStdin(ctx context.Context) { - if e.Stdin != nil { - var once sync.Once - // copy from client's stdin to container's stdin - go func() { - defer runtime.HandleCrash() - defer once.Do(func() { e.Conn.Close() }) - if _, err := io.Copy(e.Conn, NewGuardStdIn(ctx, e.Stdin)); err != nil { - runtime.HandleError(err) - } - }() - } +func (e *Executor) copyStdin(ctx context.Context, r io.Reader) { + var once sync.Once + go func() { + defer runtime.HandleCrash() + defer once.Do(func() { e.Conn.Close() }) + if _, err := io.Copy(e.Conn, NewGuardStdIn(ctx, r)); err != nil { + runtime.HandleError(err) + } + }() } -func (e *Executor) copyStdout(stop chan struct{}) { - if e.Stdout == nil { - return - } +func (e *Executor) copyStdout(stop chan struct{}, w io.Writer) { go func() { defer runtime.HandleCrash() defer io.Copy(io.Discard, e.Conn) - if _, err := io.Copy(e.Stdout, NewGuardStdOut(e.Conn, stop)); err != nil { + if _, err := io.Copy(w, NewGuardStdOut(e.Conn, stop)); err != nil { runtime.HandleError(err) } }() @@ -86,7 +105,8 @@ func NewGuardStdOut(r io.Reader, stop chan struct{}) io.Reader { func (g *guardStdOut) Read(p []byte) (n int, err error) { n, err = g.Reader.Read(p) - if strings.Contains(string(p), "{\"metadata\":{},\"status\":\"Success\"}") { + if strings.Contains(string(p), "{\"metadata\":{},\"status\":\"Failure\"") || + strings.Contains(string(p), "{\"metadata\":{},\"status\":\"Success\"") { g.stop <- struct{}{} return 0, io.EOF } @@ -113,3 +133,23 @@ func (g *guardStdIn) Read(p []byte) (n int, err error) { return g.Reader.Read(p) } } + +type StdOutBuffer struct { + builder strings.Builder +} + +func NewStdOutBuffer() *StdOutBuffer { + return &StdOutBuffer{ + builder: strings.Builder{}, + } +} + +func (s *StdOutBuffer) Write(p []byte) (n int, err error) { + s.builder.WriteString(string(p)) + //fmt.Printf("\n<--%s-->\n", s.builder.String()) + return len(p), nil +} + +func (s *StdOutBuffer) String() string { + return s.builder.String() +} diff --git a/saectl/internal/cmd/scale/scale.go b/saectl/internal/cmd/scale/scale.go index ed168c3..b97f942 100644 --- a/saectl/internal/cmd/scale/scale.go +++ b/saectl/internal/cmd/scale/scale.go @@ -113,7 +113,7 @@ func NewCmdScale(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobr cmd.MarkFlagRequired("replicas") cmd.Flags().DurationVar(&o.Timeout, "timeout", 0, "The length of time to wait before giving up on a scale operation, zero means don't wait. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h).") cmdutil.AddFilenameOptionFlags(cmd, &o.FilenameOptions, "identifying the resource to set a new size") - //cmdutil.AddDryRunFlag(cmd) + cmdutil.AddDryRunFlag(cmd) cmdutil.AddLabelSelectorFlagVar(cmd, &o.Selector) return cmd }