You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			417 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
			
		
		
	
	
			417 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
package monitor
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"sort"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"text/tabwriter"
 | 
						|
 | 
						|
	"github.com/containerd/console"
 | 
						|
	"github.com/docker/buildx/controller/control"
 | 
						|
	controllererrors "github.com/docker/buildx/controller/errdefs"
 | 
						|
	controllerapi "github.com/docker/buildx/controller/pb"
 | 
						|
	"github.com/docker/buildx/util/ioset"
 | 
						|
	"github.com/docker/buildx/util/progress"
 | 
						|
	"github.com/moby/buildkit/identity"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
	"github.com/sirupsen/logrus"
 | 
						|
	"golang.org/x/term"
 | 
						|
)
 | 
						|
 | 
						|
const helpMessage = `
 | 
						|
Available commands are:
 | 
						|
  reload     reloads the context and build it.
 | 
						|
  rollback   re-runs the interactive container with initial rootfs contents.
 | 
						|
  list       list buildx sessions.
 | 
						|
  attach     attach to a buildx server or a process in the container.
 | 
						|
  exec       execute a process in the interactive container.
 | 
						|
  ps         list processes invoked by "exec". Use "attach" to attach IO to that process.
 | 
						|
  disconnect disconnect a client from a buildx server. Specific session ID can be specified an arg.
 | 
						|
  kill       kill buildx server.
 | 
						|
  exit       exits monitor.
 | 
						|
  help       shows this message.
 | 
						|
`
 | 
						|
 | 
						|
// RunMonitor provides an interactive session for running and managing containers via specified IO.
 | 
						|
func RunMonitor(ctx context.Context, curRef string, options *controllerapi.BuildOptions, invokeConfig controllerapi.InvokeConfig, c control.BuildxController, stdin io.ReadCloser, stdout io.WriteCloser, stderr console.File, progress *progress.Printer) error {
 | 
						|
	defer func() {
 | 
						|
		if err := c.Disconnect(ctx, curRef); err != nil {
 | 
						|
			logrus.Warnf("disconnect error: %v", err)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	if err := progress.Pause(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer progress.Unpause()
 | 
						|
 | 
						|
	monitorIn, monitorOut := ioset.Pipe()
 | 
						|
	defer func() {
 | 
						|
		monitorIn.Close()
 | 
						|
	}()
 | 
						|
	monitorEnableCh := make(chan struct{})
 | 
						|
	monitorDisableCh := make(chan struct{})
 | 
						|
	monitorOutCtx := ioset.MuxOut{
 | 
						|
		Out:         monitorOut,
 | 
						|
		EnableHook:  func() { monitorEnableCh <- struct{}{} },
 | 
						|
		DisableHook: func() { monitorDisableCh <- struct{}{} },
 | 
						|
	}
 | 
						|
 | 
						|
	containerIn, containerOut := ioset.Pipe()
 | 
						|
	defer func() {
 | 
						|
		containerIn.Close()
 | 
						|
	}()
 | 
						|
	containerOutCtx := ioset.MuxOut{
 | 
						|
		Out: containerOut,
 | 
						|
		// send newline to hopefully get the prompt; TODO: better UI (e.g. reprinting the last line)
 | 
						|
		EnableHook:  func() { containerOut.Stdin.Write([]byte("\n")) },
 | 
						|
		DisableHook: func() {},
 | 
						|
	}
 | 
						|
 | 
						|
	invokeForwarder := ioset.NewForwarder()
 | 
						|
	invokeForwarder.SetIn(&containerIn)
 | 
						|
	m := &monitor{
 | 
						|
		invokeIO: invokeForwarder,
 | 
						|
		muxIO: ioset.NewMuxIO(ioset.In{
 | 
						|
			Stdin:  io.NopCloser(stdin),
 | 
						|
			Stdout: nopCloser{stdout},
 | 
						|
			Stderr: nopCloser{stderr},
 | 
						|
		}, []ioset.MuxOut{monitorOutCtx, containerOutCtx}, 1, func(prev int, res int) string {
 | 
						|
			if prev == 0 && res == 0 {
 | 
						|
				// No toggle happened because container I/O isn't enabled.
 | 
						|
				return "Process isn't attached (previous \"exec\" exited?). Use \"attach\" for attaching or \"rollback\" or \"exec\" for running new one.\n"
 | 
						|
			}
 | 
						|
			return "Switched IO\n"
 | 
						|
		}),
 | 
						|
		invokeFunc: c.Invoke,
 | 
						|
	}
 | 
						|
 | 
						|
	// Start container automatically
 | 
						|
	fmt.Fprintf(stdout, "Launching interactive container. Press Ctrl-a-c to switch to monitor console\n")
 | 
						|
	invokeConfig.Rollback = false
 | 
						|
	invokeConfig.Initial = false
 | 
						|
	id := m.rollback(ctx, curRef, invokeConfig)
 | 
						|
	fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
 | 
						|
 | 
						|
	// Serve monitor commands
 | 
						|
	monitorForwarder := ioset.NewForwarder()
 | 
						|
	monitorForwarder.SetIn(&monitorIn)
 | 
						|
	for {
 | 
						|
		<-monitorEnableCh
 | 
						|
		in, out := ioset.Pipe()
 | 
						|
		monitorForwarder.SetOut(&out)
 | 
						|
		doneCh, errCh := make(chan struct{}), make(chan error)
 | 
						|
		go func() {
 | 
						|
			defer close(doneCh)
 | 
						|
			defer in.Close()
 | 
						|
			go func() {
 | 
						|
				<-ctx.Done()
 | 
						|
				in.Close()
 | 
						|
			}()
 | 
						|
			t := term.NewTerminal(readWriter{in.Stdin, in.Stdout}, "(buildx) ")
 | 
						|
			for {
 | 
						|
				l, err := t.ReadLine()
 | 
						|
				if err != nil {
 | 
						|
					if err != io.EOF {
 | 
						|
						errCh <- err
 | 
						|
						return
 | 
						|
					}
 | 
						|
					return
 | 
						|
				}
 | 
						|
				args := strings.Fields(l) // TODO: use shlex
 | 
						|
				if len(args) == 0 {
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				switch args[0] {
 | 
						|
				case "":
 | 
						|
					// nop
 | 
						|
				case "reload":
 | 
						|
					var bo *controllerapi.BuildOptions
 | 
						|
					if curRef != "" {
 | 
						|
						// Rebuilding an existing session; Restore the build option used for building this session.
 | 
						|
						res, err := c.Inspect(ctx, curRef)
 | 
						|
						if err != nil {
 | 
						|
							fmt.Printf("failed to inspect the current build session: %v\n", err)
 | 
						|
						} else {
 | 
						|
							bo = res.Options
 | 
						|
						}
 | 
						|
					} else {
 | 
						|
						bo = options
 | 
						|
					}
 | 
						|
					if bo == nil {
 | 
						|
						fmt.Println("reload: no build option is provided")
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					if curRef != "" {
 | 
						|
						if err := c.Disconnect(ctx, curRef); err != nil {
 | 
						|
							fmt.Println("disconnect error", err)
 | 
						|
						}
 | 
						|
					}
 | 
						|
					var resultUpdated bool
 | 
						|
					progress.Unpause()
 | 
						|
					ref, _, err := c.Build(ctx, *bo, nil, progress) // TODO: support stdin, hold build ref
 | 
						|
					progress.Pause()
 | 
						|
					if err != nil {
 | 
						|
						var be *controllererrors.BuildError
 | 
						|
						if errors.As(err, &be) {
 | 
						|
							curRef = be.Ref
 | 
						|
							resultUpdated = true
 | 
						|
						} else {
 | 
						|
							fmt.Printf("failed to reload: %v\n", err)
 | 
						|
						}
 | 
						|
					} else {
 | 
						|
						curRef = ref
 | 
						|
						resultUpdated = true
 | 
						|
					}
 | 
						|
					if resultUpdated {
 | 
						|
						// rollback the running container with the new result
 | 
						|
						id := m.rollback(ctx, curRef, invokeConfig)
 | 
						|
						fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
 | 
						|
					}
 | 
						|
				case "rollback":
 | 
						|
					cfg := invokeConfig
 | 
						|
					if len(args) >= 2 {
 | 
						|
						cmds := args[1:]
 | 
						|
						if cmds[0] == "--init" {
 | 
						|
							cfg.Initial = true
 | 
						|
							cmds = cmds[1:]
 | 
						|
						}
 | 
						|
						if len(cmds) > 0 {
 | 
						|
							cfg.Entrypoint = []string{cmds[0]}
 | 
						|
							cfg.Cmd = cmds[1:]
 | 
						|
						}
 | 
						|
					}
 | 
						|
					id := m.rollback(ctx, curRef, cfg)
 | 
						|
					fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
 | 
						|
				case "list":
 | 
						|
					refs, err := c.List(ctx)
 | 
						|
					if err != nil {
 | 
						|
						fmt.Printf("failed to list: %v\n", err)
 | 
						|
					}
 | 
						|
					sort.Strings(refs)
 | 
						|
					tw := tabwriter.NewWriter(stdout, 1, 8, 1, '\t', 0)
 | 
						|
					fmt.Fprintln(tw, "ID\tCURRENT_SESSION")
 | 
						|
					for _, k := range refs {
 | 
						|
						fmt.Fprintf(tw, "%-20s\t%v\n", k, k == curRef)
 | 
						|
					}
 | 
						|
					tw.Flush()
 | 
						|
				case "disconnect":
 | 
						|
					target := curRef
 | 
						|
					if len(args) >= 2 {
 | 
						|
						target = args[1]
 | 
						|
					}
 | 
						|
					isProcess, err := isProcessID(ctx, c, curRef, target)
 | 
						|
					if err == nil && isProcess {
 | 
						|
						if err := c.DisconnectProcess(ctx, curRef, target); err != nil {
 | 
						|
							fmt.Printf("disconnect process failed %v\n", target)
 | 
						|
							continue
 | 
						|
						}
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					if err := c.Disconnect(ctx, target); err != nil {
 | 
						|
						fmt.Println("disconnect error", err)
 | 
						|
					}
 | 
						|
				case "kill":
 | 
						|
					if err := c.Kill(ctx); err != nil {
 | 
						|
						fmt.Printf("failed to kill: %v\n", err)
 | 
						|
					}
 | 
						|
				case "attach":
 | 
						|
					if len(args) < 2 {
 | 
						|
						fmt.Println("attach: server name must be passed")
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					ref := args[1]
 | 
						|
					var id string
 | 
						|
 | 
						|
					isProcess, err := isProcessID(ctx, c, curRef, ref)
 | 
						|
					if err == nil && isProcess {
 | 
						|
						m.attach(ctx, curRef, ref)
 | 
						|
						id = ref
 | 
						|
					}
 | 
						|
					if id == "" {
 | 
						|
						refs, err := c.List(ctx)
 | 
						|
						if err != nil {
 | 
						|
							fmt.Printf("failed to get the list of sessions: %v\n", err)
 | 
						|
							continue
 | 
						|
						}
 | 
						|
						found := false
 | 
						|
						for _, s := range refs {
 | 
						|
							if s == ref {
 | 
						|
								found = true
 | 
						|
								break
 | 
						|
							}
 | 
						|
						}
 | 
						|
						if !found {
 | 
						|
							fmt.Printf("unknown ID: %q\n", ref)
 | 
						|
							continue
 | 
						|
						}
 | 
						|
						if m.invokeCancel != nil {
 | 
						|
							m.invokeCancel() // Finish existing attach
 | 
						|
						}
 | 
						|
						curRef = ref
 | 
						|
					}
 | 
						|
					fmt.Fprintf(stdout, "Attached to process %q. Press Ctrl-a-c to switch to the new container\n", id)
 | 
						|
				case "exec":
 | 
						|
					if len(args) < 2 {
 | 
						|
						fmt.Println("attach: command must be passed")
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					if curRef == "" {
 | 
						|
						fmt.Println("attach to a session first")
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					cfg := controllerapi.InvokeConfig{
 | 
						|
						Entrypoint: []string{args[1]},
 | 
						|
						Cmd:        args[2:],
 | 
						|
						// TODO: support other options as well via flags
 | 
						|
						Env:  invokeConfig.Env,
 | 
						|
						User: invokeConfig.User,
 | 
						|
						Cwd:  invokeConfig.Cwd,
 | 
						|
						Tty:  true,
 | 
						|
					}
 | 
						|
					pid := m.exec(ctx, curRef, cfg)
 | 
						|
					fmt.Fprintf(stdout, "Process %q started. Press Ctrl-a-c to switch to that process.\n", pid)
 | 
						|
				case "ps":
 | 
						|
					plist, err := c.ListProcesses(ctx, curRef)
 | 
						|
					if err != nil {
 | 
						|
						fmt.Println("cannot list process:", err)
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					tw := tabwriter.NewWriter(stdout, 1, 8, 1, '\t', 0)
 | 
						|
					fmt.Fprintln(tw, "PID\tCURRENT_SESSION\tCOMMAND")
 | 
						|
					for _, p := range plist {
 | 
						|
						fmt.Fprintf(tw, "%-20s\t%v\t%v\n", p.ProcessID, p.ProcessID == m.attachedPid.Load(), append(p.InvokeConfig.Entrypoint, p.InvokeConfig.Cmd...))
 | 
						|
					}
 | 
						|
					tw.Flush()
 | 
						|
				case "exit":
 | 
						|
					return
 | 
						|
				case "help":
 | 
						|
					fmt.Fprint(stdout, helpMessage)
 | 
						|
				default:
 | 
						|
					fmt.Printf("unknown command: %q\n", l)
 | 
						|
					fmt.Fprint(stdout, helpMessage)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		select {
 | 
						|
		case <-doneCh:
 | 
						|
			m.close()
 | 
						|
			return nil
 | 
						|
		case err := <-errCh:
 | 
						|
			m.close()
 | 
						|
			return err
 | 
						|
		case <-monitorDisableCh:
 | 
						|
		}
 | 
						|
		monitorForwarder.SetOut(nil)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type readWriter struct {
 | 
						|
	io.Reader
 | 
						|
	io.Writer
 | 
						|
}
 | 
						|
 | 
						|
type monitor struct {
 | 
						|
	muxIO        *ioset.MuxIO
 | 
						|
	invokeIO     *ioset.Forwarder
 | 
						|
	invokeFunc   func(ctx context.Context, ref, pid string, cfg controllerapi.InvokeConfig, in io.ReadCloser, out io.WriteCloser, err io.WriteCloser) error
 | 
						|
	invokeCancel func()
 | 
						|
	attachedPid  atomic.Value
 | 
						|
}
 | 
						|
 | 
						|
func (m *monitor) rollback(ctx context.Context, ref string, cfg controllerapi.InvokeConfig) string {
 | 
						|
	pid := identity.NewID()
 | 
						|
	cfg1 := cfg
 | 
						|
	cfg1.Rollback = true
 | 
						|
	return m.startInvoke(ctx, ref, pid, cfg1)
 | 
						|
}
 | 
						|
 | 
						|
func (m *monitor) exec(ctx context.Context, ref string, cfg controllerapi.InvokeConfig) string {
 | 
						|
	return m.startInvoke(ctx, ref, identity.NewID(), cfg)
 | 
						|
}
 | 
						|
 | 
						|
func (m *monitor) attach(ctx context.Context, ref, pid string) {
 | 
						|
	m.startInvoke(ctx, ref, pid, controllerapi.InvokeConfig{})
 | 
						|
}
 | 
						|
 | 
						|
func (m *monitor) close() {
 | 
						|
	if m.invokeCancel != nil {
 | 
						|
		m.invokeCancel()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (m *monitor) startInvoke(ctx context.Context, ref, pid string, cfg controllerapi.InvokeConfig) string {
 | 
						|
	if m.invokeCancel != nil {
 | 
						|
		m.invokeCancel() // Finish existing attach
 | 
						|
	}
 | 
						|
	if len(cfg.Entrypoint) == 0 && len(cfg.Cmd) == 0 {
 | 
						|
		cfg.Entrypoint = []string{"sh"} // launch shell by default
 | 
						|
	}
 | 
						|
	go func() {
 | 
						|
		// Start a new invoke
 | 
						|
		if err := m.invoke(ctx, ref, pid, cfg); err != nil {
 | 
						|
			logrus.Debugf("invoke error: %v", err)
 | 
						|
		}
 | 
						|
		if pid == m.attachedPid.Load() {
 | 
						|
			m.attachedPid.Store("")
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	m.attachedPid.Store(pid)
 | 
						|
	return pid
 | 
						|
}
 | 
						|
 | 
						|
func (m *monitor) invoke(ctx context.Context, ref, pid string, cfg controllerapi.InvokeConfig) error {
 | 
						|
	m.muxIO.Enable(1)
 | 
						|
	defer m.muxIO.Disable(1)
 | 
						|
	if err := m.muxIO.SwitchTo(1); err != nil {
 | 
						|
		return errors.Errorf("failed to switch to process IO: %v", err)
 | 
						|
	}
 | 
						|
	if ref == "" {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	invokeCtx, invokeCancel := context.WithCancel(ctx)
 | 
						|
 | 
						|
	containerIn, containerOut := ioset.Pipe()
 | 
						|
	m.invokeIO.SetOut(&containerOut)
 | 
						|
	waitInvokeDoneCh := make(chan struct{})
 | 
						|
	var cancelOnce sync.Once
 | 
						|
	invokeCancelAndDetachFn := func() {
 | 
						|
		cancelOnce.Do(func() {
 | 
						|
			containerIn.Close()
 | 
						|
			m.invokeIO.SetOut(nil)
 | 
						|
			invokeCancel()
 | 
						|
		})
 | 
						|
		<-waitInvokeDoneCh
 | 
						|
	}
 | 
						|
	defer invokeCancelAndDetachFn()
 | 
						|
	m.invokeCancel = invokeCancelAndDetachFn
 | 
						|
 | 
						|
	err := m.invokeFunc(invokeCtx, ref, pid, cfg, containerIn.Stdin, containerIn.Stdout, containerIn.Stderr)
 | 
						|
	close(waitInvokeDoneCh)
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
type nopCloser struct {
 | 
						|
	io.Writer
 | 
						|
}
 | 
						|
 | 
						|
func (c nopCloser) Close() error { return nil }
 | 
						|
 | 
						|
func isProcessID(ctx context.Context, c control.BuildxController, curRef, ref string) (bool, error) {
 | 
						|
	infos, err := c.ListProcesses(ctx, curRef)
 | 
						|
	if err != nil {
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
	for _, p := range infos {
 | 
						|
		if p.ProcessID == ref {
 | 
						|
			return true, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return false, nil
 | 
						|
}
 |