package monitor

import (

	controllerapi ""

const helpMessage = `
Available commands are:
  reload     reloads the context and build it.
  rollback   re-runs the interactive container with initial rootfs contents.
  list       list buildx sessions.
  attach     attach to a buildx server or a process in the container.
  exec       execute a process in the interactive container.
  ps         list processes invoked by "exec". Use "attach" to attach IO to that process.
  disconnect disconnect a client from a buildx server. Specific session ID can be specified an arg.
  kill       kill buildx server.
  exit       exits monitor.
  help       shows this message.

// RunMonitor provides an interactive session for running and managing containers via specified IO.
func RunMonitor(ctx context.Context, curRef string, options controllerapi.BuildOptions, invokeConfig controllerapi.InvokeConfig, c control.BuildxController, progressMode string, stdin io.ReadCloser, stdout io.WriteCloser, stderr console.File) error {
	defer func() {
		if err := c.Disconnect(ctx, curRef); err != nil {
			logrus.Warnf("disconnect error: %v", err)
	monitorIn, monitorOut := ioset.Pipe()
	defer func() {
	monitorEnableCh := make(chan struct{})
	monitorDisableCh := make(chan struct{})
	monitorOutCtx := ioset.MuxOut{
		Out:         monitorOut,
		EnableHook:  func() { monitorEnableCh <- struct{}{} },
		DisableHook: func() { monitorDisableCh <- struct{}{} },

	containerIn, containerOut := ioset.Pipe()
	defer func() {
	containerOutCtx := ioset.MuxOut{
		Out: containerOut,
		// send newline to hopefully get the prompt; TODO: better UI (e.g. reprinting the last line)
		EnableHook:  func() { containerOut.Stdin.Write([]byte("\n")) },
		DisableHook: func() {},

	invokeForwarder := ioset.NewForwarder()
	m := &monitor{
		invokeIO: invokeForwarder,
		muxIO: ioset.NewMuxIO(ioset.In{
			Stdin:  io.NopCloser(stdin),
			Stdout: nopCloser{stdout},
			Stderr: nopCloser{stderr},
		}, []ioset.MuxOut{monitorOutCtx, containerOutCtx}, 1, func(prev int, res int) string {
			if prev == 0 && res == 0 {
				// No toggle happened because container I/O isn't enabled.
				return "Process isn't attached (previous \"exec\" exited?). Use \"attach\" for attaching or \"rollback\" or \"exec\" for running new one.\n"
			return "Switched IO\n"
		invokeFunc: c.Invoke,

	// Start container automatically
	fmt.Fprintf(stdout, "Launching interactive container. Press Ctrl-a-c to switch to monitor console\n")
	id := m.rollback(ctx, curRef, invokeConfig)
	fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)

	// Serve monitor commands
	monitorForwarder := ioset.NewForwarder()
	for {
		in, out := ioset.Pipe()
		doneCh, errCh := make(chan struct{}), make(chan error)
		go func() {
			defer close(doneCh)
			defer in.Close()
			go func() {
			t := term.NewTerminal(readWriter{in.Stdin, in.Stdout}, "(buildx) ")
			for {
				l, err := t.ReadLine()
				if err != nil {
					if err != io.EOF {
						errCh <- err
				args := strings.Fields(l) // TODO: use shlex
				if len(args) == 0 {
				switch args[0] {
				case "":
					// nop
				case "reload":
					if curRef != "" {
						if err := c.Disconnect(ctx, curRef); err != nil {
							fmt.Println("disconnect error", err)
					ref, _, err := c.Build(ctx, options, nil, stdout, stderr, progressMode) // TODO: support stdin, hold build ref
					if err != nil {
						fmt.Printf("failed to reload: %v\n", err)
					} else {
						curRef = ref
						// rollback the running container with the new result
						id := m.rollback(ctx, curRef, invokeConfig)
						fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
				case "rollback":
					cfg := invokeConfig
					if len(args) >= 2 {
						cfg.Entrypoint = []string{args[1]}
						cfg.Cmd = args[2:]
					id := m.rollback(ctx, curRef, cfg)
					fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
				case "list":
					refs, err := c.List(ctx)
					if err != nil {
						fmt.Printf("failed to list: %v\n", err)
					tw := tabwriter.NewWriter(stdout, 1, 8, 1, '\t', 0)
					fmt.Fprintln(tw, "ID\tCURRENT_SESSION")
					for _, k := range refs {
						fmt.Fprintf(tw, "%-20s\t%v\n", k, k == curRef)
				case "disconnect":
					target := curRef
					if len(args) >= 2 {
						target = args[1]
					isProcess, err := isProcessID(ctx, c, curRef, target)
					if err == nil && isProcess {
						if err := c.DisconnectProcess(ctx, curRef, target); err != nil {
							fmt.Printf("disconnect process failed %v\n", target)
					if err := c.Disconnect(ctx, target); err != nil {
						fmt.Println("disconnect error", err)
				case "kill":
					if err := c.Kill(ctx); err != nil {
						fmt.Printf("failed to kill: %v\n", err)
				case "attach":
					if len(args) < 2 {
						fmt.Println("attach: server name must be passed")
					ref := args[1]
					var id string

					isProcess, err := isProcessID(ctx, c, curRef, ref)
					if err == nil && isProcess {
						m.attach(ctx, curRef, ref)
						id = ref
					if id == "" {
						refs, err := c.List(ctx)
						if err != nil {
							fmt.Printf("failed to get the list of sessions: %v\n", err)
						found := false
						for _, s := range refs {
							if s == ref {
								found = true
						if !found {
							fmt.Printf("unknown ID: %q\n", ref)
						if m.invokeCancel != nil {
							m.invokeCancel() // Finish existing attach
						curRef = ref
					fmt.Fprintf(stdout, "Attached to process %q. Press Ctrl-a-c to switch to the new container\n", id)
				case "exec":
					if len(args) < 2 {
						fmt.Println("exec: server name must be passed")
					if curRef == "" {
						fmt.Println("attach to a session first")
					cfg := controllerapi.InvokeConfig{
						Entrypoint: []string{args[1]},
						Cmd:        args[2:],
						// TODO: support other options as well via flags
						Env:  invokeConfig.Env,
						User: invokeConfig.User,
						Cwd:  invokeConfig.Cwd,
						Tty:  true,
					pid := m.exec(ctx, curRef, cfg)
					fmt.Fprintf(stdout, "Process %q started. Press Ctrl-a-c to switch to that process.\n", pid)
				case "ps":
					plist, err := c.ListProcesses(ctx, curRef)
					if err != nil {
						fmt.Println("cannot list process:", err)
					tw := tabwriter.NewWriter(stdout, 1, 8, 1, '\t', 0)
					fmt.Fprintln(tw, "PID\tCURRENT_SESSION\tCOMMAND")
					for _, p := range plist {
						fmt.Fprintf(tw, "%-20s\t%v\t%v\n", p.ProcessID, p.ProcessID == m.attachedPid.Load(), append(p.InvokeConfig.Entrypoint, p.InvokeConfig.Cmd...))
				case "exit":
				case "help":
					fmt.Fprint(stdout, helpMessage)
					fmt.Printf("unknown command: %q\n", l)
					fmt.Fprint(stdout, helpMessage)
		select {
		case <-doneCh:
			return nil
		case err := <-errCh:
			return err
		case <-monitorDisableCh:

type readWriter struct {

type monitor struct {
	muxIO        *ioset.MuxIO
	invokeIO     *ioset.Forwarder
	invokeFunc   func(ctx context.Context, ref, pid string, cfg controllerapi.InvokeConfig, in io.ReadCloser, out io.WriteCloser, err io.WriteCloser) error
	invokeCancel func()
	attachedPid  atomic.Value

func (m *monitor) rollback(ctx context.Context, ref string, cfg controllerapi.InvokeConfig) string {
	pid := identity.NewID()
	cfg1 := cfg
	cfg1.Rollback = true
	return m.startInvoke(ctx, ref, pid, cfg1)

func (m *monitor) exec(ctx context.Context, ref string, cfg controllerapi.InvokeConfig) string {
	return m.startInvoke(ctx, ref, identity.NewID(), cfg)

func (m *monitor) attach(ctx context.Context, ref, pid string) {
	m.startInvoke(ctx, ref, pid, controllerapi.InvokeConfig{})

func (m *monitor) startInvoke(ctx context.Context, ref, pid string, cfg controllerapi.InvokeConfig) string {
	if m.invokeCancel != nil {
		m.invokeCancel() // Finish existing attach
	go func() {
		// Start a new invoke
		if err := m.invoke(ctx, ref, pid, cfg); err != nil {
			logrus.Debugf("invoke error: %v", err)
		if pid == m.attachedPid.Load() {
	return pid

func (m *monitor) invoke(ctx context.Context, ref, pid string, cfg controllerapi.InvokeConfig) error {
	defer m.muxIO.Disable(1)
	if err := m.muxIO.SwitchTo(1); err != nil {
		return errors.Errorf("failed to switch to process IO: %v", err)
	invokeCtx, invokeCancel := context.WithCancel(ctx)

	containerIn, containerOut := ioset.Pipe()
	waitInvokeDoneCh := make(chan struct{})
	var cancelOnce sync.Once
	invokeCancelAndDetachFn := func() {
		cancelOnce.Do(func() {
	defer invokeCancelAndDetachFn()
	m.invokeCancel = invokeCancelAndDetachFn

	err := m.invokeFunc(invokeCtx, ref, pid, cfg, containerIn.Stdin, containerIn.Stdout, containerIn.Stderr)

	return err

type nopCloser struct {

func (c nopCloser) Close() error { return nil }

func isProcessID(ctx context.Context, c control.BuildxController, curRef, ref string) (bool, error) {
	infos, err := c.ListProcesses(ctx, curRef)
	if err != nil {
		return false, err
	for _, p := range infos {
		if p.ProcessID == ref {
			return true, nil
	return false, nil