Skip to content

Commit

Permalink
Feat: enhance saectl exec cmd
Browse files Browse the repository at this point in the history
Signed-off-by: songyang.song <[email protected]>
  • Loading branch information
songyang.song committed Feb 8, 2023
1 parent 063ac43 commit 27806d0
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 27 deletions.
20 changes: 14 additions & 6 deletions saectl/internal/cmd/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Options struct {
podName string
namespace string
podClient coreclient.PodsGetter

cmd []string
}

func (o *Options) Complete(f util.AliCloudFactory, argsIn []string, ioStreams genericclioptions.IOStreams) error {
Expand Down Expand Up @@ -122,12 +124,19 @@ func (o *Options) Run() error {
return err
}
fn := func() error {
return o.Stream(tokenId)
e, err := o.NewExecutor(tokenId)
if err != nil {
return err
}
//_, _ = e.Exec("curl 127.0.0.1:5555")
_, _ = e.Exec("c")
return nil
//return e.Stream()
}
return o.tty.Safe(fn)
}

func (o *Options) Stream(tokenId string) error {
func (o *Options) NewExecutor(tokenId string) (*stream.Executor, error) {
wsUrl := &url.URL{
Scheme: "wss",
Host: "sae-webshell.console.aliyun.com",
Expand All @@ -139,15 +148,14 @@ func (o *Options) Stream(tokenId string) error {
}
c, err := websocket.Dial(wsUrl.String(), "", "https://sae.console.aliyun.com")
if err != nil {
return err
return nil, err
}
e := stream.NewExecutor(c, stream.Option{
return stream.NewExecutor(c, stream.Option{
Stdin: o.In,
Stdout: o.Out,
StdErr: nil,
TTY: o.tty.Raw,
})
return e.Stream()
}), nil
}

func (o *Options) GetWebShellToken(appId string, podName string) (string, error) {
Expand Down
80 changes: 60 additions & 20 deletions saectl/internal/cmd/exec/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stream

import (
"context"
"fmt"
"io"
"strings"
"sync"
Expand Down Expand Up @@ -31,12 +32,36 @@ func NewExecutor(conn *websocket.Conn, op Option) *Executor {
}
}

func (e *Executor) Exec(cmd string) (string, error) {
stop := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())

go func(ctx context.Context) {
websocket.Message.Send(e.Conn, cmd+"\n")
for i := 1; i <= 2; i++ {
websocket.Message.Send(e.Conn, "exit\n")
}
}(ctx)

stdOutBuffer := NewStdOutBuffer()
e.copyStdout(stop, stdOutBuffer)

select {
case <-stop:
cancel()
}

r := stdOutBuffer.String()
fmt.Println(r)
return r, nil
}

func (e *Executor) Stream() error {
stop := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())

e.copyStdin(ctx)
e.copyStdout(stop)
e.copyStdin(ctx, e.Stdin)
e.copyStdout(stop, e.Stdout)

select {
case <-stop:
Expand All @@ -45,28 +70,22 @@ func (e *Executor) Stream() error {
}
}

func (e *Executor) copyStdin(ctx context.Context) {
if e.Stdin != nil {
var once sync.Once
// copy from client's stdin to container's stdin
go func() {
defer runtime.HandleCrash()
defer once.Do(func() { e.Conn.Close() })
if _, err := io.Copy(e.Conn, NewGuardStdIn(ctx, e.Stdin)); err != nil {
runtime.HandleError(err)
}
}()
}
func (e *Executor) copyStdin(ctx context.Context, r io.Reader) {
var once sync.Once
go func() {
defer runtime.HandleCrash()
defer once.Do(func() { e.Conn.Close() })
if _, err := io.Copy(e.Conn, NewGuardStdIn(ctx, r)); err != nil {
runtime.HandleError(err)
}
}()
}

func (e *Executor) copyStdout(stop chan struct{}) {
if e.Stdout == nil {
return
}
func (e *Executor) copyStdout(stop chan struct{}, w io.Writer) {
go func() {
defer runtime.HandleCrash()
defer io.Copy(io.Discard, e.Conn)
if _, err := io.Copy(e.Stdout, NewGuardStdOut(e.Conn, stop)); err != nil {
if _, err := io.Copy(w, NewGuardStdOut(e.Conn, stop)); err != nil {
runtime.HandleError(err)
}
}()
Expand All @@ -86,7 +105,8 @@ func NewGuardStdOut(r io.Reader, stop chan struct{}) io.Reader {

func (g *guardStdOut) Read(p []byte) (n int, err error) {
n, err = g.Reader.Read(p)
if strings.Contains(string(p), "{\"metadata\":{},\"status\":\"Success\"}") {
if strings.Contains(string(p), "{\"metadata\":{},\"status\":\"Failure\"") ||
strings.Contains(string(p), "{\"metadata\":{},\"status\":\"Success\"") {
g.stop <- struct{}{}
return 0, io.EOF
}
Expand All @@ -113,3 +133,23 @@ func (g *guardStdIn) Read(p []byte) (n int, err error) {
return g.Reader.Read(p)
}
}

type StdOutBuffer struct {
builder strings.Builder
}

func NewStdOutBuffer() *StdOutBuffer {
return &StdOutBuffer{
builder: strings.Builder{},
}
}

func (s *StdOutBuffer) Write(p []byte) (n int, err error) {
s.builder.WriteString(string(p))
//fmt.Printf("\n<--%s-->\n", s.builder.String())
return len(p), nil
}

func (s *StdOutBuffer) String() string {
return s.builder.String()
}
2 changes: 1 addition & 1 deletion saectl/internal/cmd/scale/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewCmdScale(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobr
cmd.MarkFlagRequired("replicas")
cmd.Flags().DurationVar(&o.Timeout, "timeout", 0, "The length of time to wait before giving up on a scale operation, zero means don't wait. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h).")
cmdutil.AddFilenameOptionFlags(cmd, &o.FilenameOptions, "identifying the resource to set a new size")
//cmdutil.AddDryRunFlag(cmd)
cmdutil.AddDryRunFlag(cmd)
cmdutil.AddLabelSelectorFlagVar(cmd, &o.Selector)
return cmd
}
Expand Down

0 comments on commit 27806d0

Please sign in to comment.