package monitor import ( "context" "fmt" "io" "sort" "sync" "sync/atomic" "text/tabwriter" "github.com/containerd/console" "github.com/docker/buildx/controller/control" controllerapi "github.com/docker/buildx/controller/pb" "github.com/docker/buildx/monitor/commands" "github.com/docker/buildx/monitor/types" "github.com/docker/buildx/util/ioset" "github.com/docker/buildx/util/progress" "github.com/google/shlex" "github.com/moby/buildkit/identity" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/term" ) // RunMonitor provides an interactive session for running and managing containers via specified IO. func RunMonitor(ctx context.Context, curRef string, options *controllerapi.BuildOptions, invokeConfig controllerapi.InvokeConfig, c control.BuildxController, stdin io.ReadCloser, stdout io.WriteCloser, stderr console.File, progress *progress.Printer) error { defer func() { if err := c.Disconnect(ctx, curRef); err != nil { logrus.Warnf("disconnect error: %v", err) } }() if err := progress.Pause(); err != nil { return err } defer progress.Unpause() monitorIn, monitorOut := ioset.Pipe() defer func() { monitorIn.Close() }() monitorEnableCh := make(chan struct{}) monitorDisableCh := make(chan struct{}) monitorOutCtx := ioset.MuxOut{ Out: monitorOut, EnableHook: func() { monitorEnableCh <- struct{}{} }, DisableHook: func() { monitorDisableCh <- struct{}{} }, } containerIn, containerOut := ioset.Pipe() defer func() { containerIn.Close() }() containerOutCtx := ioset.MuxOut{ Out: containerOut, // send newline to hopefully get the prompt; TODO: better UI (e.g. reprinting the last line) EnableHook: func() { containerOut.Stdin.Write([]byte("\n")) }, DisableHook: func() {}, } invokeForwarder := ioset.NewForwarder() invokeForwarder.SetIn(&containerIn) m := &monitor{ BuildxController: c, invokeIO: invokeForwarder, muxIO: ioset.NewMuxIO(ioset.In{ Stdin: io.NopCloser(stdin), Stdout: nopCloser{stdout}, Stderr: nopCloser{stderr}, }, []ioset.MuxOut{monitorOutCtx, containerOutCtx}, 1, func(prev int, res int) string { if prev == 0 && res == 0 { // No toggle happened because container I/O isn't enabled. return "Process isn't attached (previous \"exec\" exited?). Use \"attach\" for attaching or \"rollback\" or \"exec\" for running new one.\n" } return "Switched IO\n" }), } m.ref.Store(curRef) // Start container automatically fmt.Fprintf(stdout, "Launching interactive container. Press Ctrl-a-c to switch to monitor console\n") invokeConfig.Rollback = false invokeConfig.Initial = false id := m.Rollback(ctx, invokeConfig) fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id) availableCommands := []types.Command{ commands.NewReloadCmd(m, stdout, progress, options, invokeConfig), commands.NewRollbackCmd(m, invokeConfig, stdout), commands.NewListCmd(m, stdout), commands.NewDisconnectCmd(m), commands.NewKillCmd(m), commands.NewAttachCmd(m, stdout), commands.NewExecCmd(m, invokeConfig, stdout), commands.NewPsCmd(m, stdout), } registeredCommands := make(map[string]types.Command) for _, c := range availableCommands { registeredCommands[c.Info().Name] = c } additionalHelpMessages := map[string]string{ "help": "shows this message. Optionally pass a command name as an argument to print the detailed usage.", "exit": "exits monitor", } // Serve monitor commands monitorForwarder := ioset.NewForwarder() monitorForwarder.SetIn(&monitorIn) for { <-monitorEnableCh in, out := ioset.Pipe() monitorForwarder.SetOut(&out) doneCh, errCh := make(chan struct{}), make(chan error) go func() { defer close(doneCh) defer in.Close() go func() { <-ctx.Done() in.Close() }() t := term.NewTerminal(readWriter{in.Stdin, in.Stdout}, "(buildx) ") for { l, err := t.ReadLine() if err != nil { if err != io.EOF { errCh <- err return } return } args, err := shlex.Split(l) if err != nil { fmt.Fprintf(stdout, "monitor: failed to parse command: %v", err) continue } else if len(args) == 0 { continue } // Builtin commands switch args[0] { case "": // nop continue case "exit": return case "help": if len(args) >= 2 { printHelpMessageOfCommand(stdout, args[1], registeredCommands, additionalHelpMessages) continue } printHelpMessage(stdout, registeredCommands, additionalHelpMessages) continue default: } // Registered commands cmdname := args[0] if cm, ok := registeredCommands[cmdname]; ok { if err := cm.Exec(ctx, args); err != nil { fmt.Fprintf(stdout, "%s: %v\n", cmdname, err) } } else { fmt.Fprintf(stdout, "monitor: unknown command: %q\n", l) printHelpMessage(stdout, registeredCommands, additionalHelpMessages) } } }() select { case <-doneCh: m.close() return nil case err := <-errCh: m.close() return err case <-monitorDisableCh: } monitorForwarder.SetOut(nil) } } func printHelpMessageOfCommand(out io.Writer, name string, registeredCommands map[string]types.Command, additional map[string]string) { var target types.Command if c, ok := registeredCommands[name]; ok { target = c } else { fmt.Fprintf(out, "monitor: no help message for %q\n", name) printHelpMessage(out, registeredCommands, additional) return } fmt.Fprintln(out, target.Info().HelpMessage) if h := target.Info().HelpMessageLong; h != "" { fmt.Fprintln(out, h) } } func printHelpMessage(out io.Writer, registeredCommands map[string]types.Command, additional map[string]string) { var names []string for name := range registeredCommands { names = append(names, name) } for name := range additional { names = append(names, name) } sort.Strings(names) fmt.Fprint(out, "Available commands are:\n") w := new(tabwriter.Writer) w.Init(out, 0, 8, 0, '\t', 0) for _, name := range names { var mes string if c, ok := registeredCommands[name]; ok { mes = c.Info().HelpMessage } else if m, ok := additional[name]; ok { mes = m } else { continue } fmt.Fprintln(w, " "+name+"\t"+mes) } w.Flush() } type readWriter struct { io.Reader io.Writer } type monitor struct { control.BuildxController ref atomic.Value muxIO *ioset.MuxIO invokeIO *ioset.Forwarder invokeCancel func() attachedPid atomic.Value } func (m *monitor) DisconnectSession(ctx context.Context, targetID string) error { return m.Disconnect(ctx, targetID) } func (m *monitor) AttachSession(ref string) { m.ref.Store(ref) } func (m *monitor) AttachedSessionID() string { return m.ref.Load().(string) } func (m *monitor) Rollback(ctx context.Context, cfg controllerapi.InvokeConfig) string { pid := identity.NewID() cfg1 := cfg cfg1.Rollback = true return m.startInvoke(ctx, pid, cfg1) } func (m *monitor) Exec(ctx context.Context, cfg controllerapi.InvokeConfig) string { return m.startInvoke(ctx, identity.NewID(), cfg) } func (m *monitor) Attach(ctx context.Context, pid string) { m.startInvoke(ctx, pid, controllerapi.InvokeConfig{}) } func (m *monitor) Detach() { if m.invokeCancel != nil { m.invokeCancel() // Finish existing attach } } func (m *monitor) AttachedPID() string { return m.attachedPid.Load().(string) } func (m *monitor) close() { m.Detach() } func (m *monitor) startInvoke(ctx context.Context, pid string, cfg controllerapi.InvokeConfig) string { if m.invokeCancel != nil { m.invokeCancel() // Finish existing attach } if len(cfg.Entrypoint) == 0 && len(cfg.Cmd) == 0 { cfg.Entrypoint = []string{"sh"} // launch shell by default cfg.Cmd = []string{} } go func() { // Start a new invoke if err := m.invoke(ctx, pid, cfg); err != nil { logrus.Debugf("invoke error: %v", err) } if pid == m.attachedPid.Load() { m.attachedPid.Store("") } }() m.attachedPid.Store(pid) return pid } func (m *monitor) invoke(ctx context.Context, pid string, cfg controllerapi.InvokeConfig) error { m.muxIO.Enable(1) defer m.muxIO.Disable(1) if err := m.muxIO.SwitchTo(1); err != nil { return errors.Errorf("failed to switch to process IO: %v", err) } if m.AttachedSessionID() == "" { return nil } invokeCtx, invokeCancel := context.WithCancel(ctx) containerIn, containerOut := ioset.Pipe() m.invokeIO.SetOut(&containerOut) waitInvokeDoneCh := make(chan struct{}) var cancelOnce sync.Once invokeCancelAndDetachFn := func() { cancelOnce.Do(func() { containerIn.Close() m.invokeIO.SetOut(nil) invokeCancel() }) <-waitInvokeDoneCh } defer invokeCancelAndDetachFn() m.invokeCancel = invokeCancelAndDetachFn err := m.Invoke(invokeCtx, m.AttachedSessionID(), pid, cfg, containerIn.Stdin, containerIn.Stdout, containerIn.Stderr) close(waitInvokeDoneCh) return err } type nopCloser struct { io.Writer } func (c nopCloser) Close() error { return nil }