Merge pull request #1760 from ktock/monitor-commands

monitor: Move commands to a separated package
pull/1772/head
Justin Chadwell 2 years ago committed by GitHub
commit c7c37c3591
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,71 @@
package commands
import (
"context"
"fmt"
"io"
"github.com/docker/buildx/monitor/types"
"github.com/pkg/errors"
)
type AttachCmd struct {
m types.Monitor
stdout io.WriteCloser
}
func NewAttachCmd(m types.Monitor, stdout io.WriteCloser) types.Command {
return &AttachCmd{m, stdout}
}
func (cm *AttachCmd) Info() types.CommandInfo {
return types.CommandInfo{HelpMessage: "attach to a buildx server or a process in the container"}
}
func (cm *AttachCmd) Exec(ctx context.Context, args []string) error {
if len(args) < 2 {
return errors.Errorf("server name must be passed")
}
ref := args[1]
var id string
isProcess, err := isProcessID(ctx, cm.m, ref)
if err == nil && isProcess {
cm.m.Attach(ctx, ref)
id = ref
}
if id == "" {
refs, err := cm.m.List(ctx)
if err != nil {
return errors.Errorf("failed to get the list of sessions: %v", err)
}
found := false
for _, s := range refs {
if s == ref {
found = true
break
}
}
if !found {
return errors.Errorf("unknown ID: %q", ref)
}
cm.m.Detach() // Finish existing attach
cm.m.AttachSession(ref)
}
fmt.Fprintf(cm.stdout, "Attached to process %q. Press Ctrl-a-c to switch to the new container\n", id)
return nil
}
func isProcessID(ctx context.Context, c types.Monitor, ref string) (bool, error) {
infos, err := c.ListProcesses(ctx, c.AttachedSessionID())
if err != nil {
return false, err
}
for _, p := range infos {
if p.ProcessID == ref {
return true, nil
}
}
return false, nil
}

@ -0,0 +1,38 @@
package commands
import (
"context"
"github.com/docker/buildx/monitor/types"
"github.com/pkg/errors"
)
type DisconnectCmd struct {
m types.Monitor
}
func NewDisconnectCmd(m types.Monitor) types.Command {
return &DisconnectCmd{m}
}
func (cm *DisconnectCmd) Info() types.CommandInfo {
return types.CommandInfo{HelpMessage: "disconnect a client from a buildx server. Specific session ID can be specified an arg"}
}
func (cm *DisconnectCmd) Exec(ctx context.Context, args []string) error {
target := cm.m.AttachedSessionID()
if len(args) >= 2 {
target = args[1]
}
isProcess, err := isProcessID(ctx, cm.m, target)
if err == nil && isProcess {
if err := cm.m.DisconnectProcess(ctx, cm.m.AttachedSessionID(), target); err != nil {
return errors.Errorf("disconnecting from process failed %v", target)
}
return nil
}
if err := cm.m.DisconnectSession(ctx, target); err != nil {
return errors.Errorf("disconnecting from session failed: %v", err)
}
return nil
}

@ -0,0 +1,44 @@
package commands
import (
"context"
"fmt"
"io"
controllerapi "github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/monitor/types"
"github.com/pkg/errors"
)
type ExecCmd struct {
m types.Monitor
invokeConfig controllerapi.InvokeConfig
stdout io.WriteCloser
}
func NewExecCmd(m types.Monitor, invokeConfig controllerapi.InvokeConfig, stdout io.WriteCloser) types.Command {
return &ExecCmd{m, invokeConfig, stdout}
}
func (cm *ExecCmd) Info() types.CommandInfo {
return types.CommandInfo{HelpMessage: "execute a process in the interactive container"}
}
func (cm *ExecCmd) Exec(ctx context.Context, args []string) error {
if len(args) < 2 {
return errors.Errorf("command must be passed")
}
cfg := controllerapi.InvokeConfig{
Entrypoint: []string{args[1]},
Cmd: args[2:],
// TODO: support other options as well via flags
Env: cm.invokeConfig.Env,
User: cm.invokeConfig.User,
Cwd: cm.invokeConfig.Cwd,
Tty: true,
}
pid := cm.m.Exec(ctx, cfg)
fmt.Fprintf(cm.stdout, "Process %q started. Press Ctrl-a-c to switch to that process.\n", pid)
return nil
}

@ -0,0 +1,27 @@
package commands
import (
"context"
"github.com/docker/buildx/monitor/types"
"github.com/pkg/errors"
)
type KillCmd struct {
m types.Monitor
}
func NewKillCmd(m types.Monitor) types.Command {
return &KillCmd{m}
}
func (cm *KillCmd) Info() types.CommandInfo {
return types.CommandInfo{HelpMessage: "kill buildx server"}
}
func (cm *KillCmd) Exec(ctx context.Context, args []string) error {
if err := cm.m.Kill(ctx); err != nil {
return errors.Errorf("failed to kill: %v", err)
}
return nil
}

@ -0,0 +1,40 @@
package commands
import (
"context"
"fmt"
"io"
"sort"
"text/tabwriter"
"github.com/docker/buildx/monitor/types"
)
type ListCmd struct {
m types.Monitor
stdout io.WriteCloser
}
func NewListCmd(m types.Monitor, stdout io.WriteCloser) types.Command {
return &ListCmd{m, stdout}
}
func (cm *ListCmd) Info() types.CommandInfo {
return types.CommandInfo{HelpMessage: "list buildx sessions"}
}
func (cm *ListCmd) Exec(ctx context.Context, args []string) error {
refs, err := cm.m.List(ctx)
if err != nil {
return err
}
sort.Strings(refs)
tw := tabwriter.NewWriter(cm.stdout, 1, 8, 1, '\t', 0)
fmt.Fprintln(tw, "ID\tCURRENT_SESSION")
for _, k := range refs {
fmt.Fprintf(tw, "%-20s\t%v\n", k, k == cm.m.AttachedSessionID())
}
tw.Flush()
return nil
}

@ -0,0 +1,38 @@
package commands
import (
"context"
"fmt"
"io"
"text/tabwriter"
"github.com/docker/buildx/monitor/types"
)
type PsCmd struct {
m types.Monitor
stdout io.WriteCloser
}
func NewPsCmd(m types.Monitor, stdout io.WriteCloser) types.Command {
return &PsCmd{m, stdout}
}
func (cm *PsCmd) Info() types.CommandInfo {
return types.CommandInfo{HelpMessage: `list processes invoked by "exec". Use "attach" to attach IO to that process`}
}
func (cm *PsCmd) Exec(ctx context.Context, args []string) error {
ref := cm.m.AttachedSessionID()
plist, err := cm.m.ListProcesses(ctx, ref)
if err != nil {
return err
}
tw := tabwriter.NewWriter(cm.stdout, 1, 8, 1, '\t', 0)
fmt.Fprintln(tw, "PID\tCURRENT_SESSION\tCOMMAND")
for _, p := range plist {
fmt.Fprintf(tw, "%-20s\t%v\t%v\n", p.ProcessID, p.ProcessID == cm.m.AttachedPID(), append(p.InvokeConfig.Entrypoint, p.InvokeConfig.Cmd...))
}
tw.Flush()
return nil
}

@ -0,0 +1,76 @@
package commands
import (
"context"
"fmt"
"io"
controllererrors "github.com/docker/buildx/controller/errdefs"
controllerapi "github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/monitor/types"
"github.com/docker/buildx/util/progress"
"github.com/pkg/errors"
)
type ReloadCmd struct {
m types.Monitor
stdout io.WriteCloser
progress *progress.Printer
options *controllerapi.BuildOptions
invokeConfig controllerapi.InvokeConfig
}
func NewReloadCmd(m types.Monitor, stdout io.WriteCloser, progress *progress.Printer, options *controllerapi.BuildOptions, invokeConfig controllerapi.InvokeConfig) types.Command {
return &ReloadCmd{m, stdout, progress, options, invokeConfig}
}
func (cm *ReloadCmd) Info() types.CommandInfo {
return types.CommandInfo{HelpMessage: "reloads the context and build it"}
}
func (cm *ReloadCmd) Exec(ctx context.Context, args []string) error {
var bo *controllerapi.BuildOptions
if ref := cm.m.AttachedSessionID(); ref != "" {
// Rebuilding an existing session; Restore the build option used for building this session.
res, err := cm.m.Inspect(ctx, ref)
if err != nil {
fmt.Printf("failed to inspect the current build session: %v\n", err)
} else {
bo = res.Options
}
} else {
bo = cm.options
}
if bo == nil {
return errors.Errorf("no build option is provided")
}
if ref := cm.m.AttachedSessionID(); ref != "" {
if err := cm.m.Disconnect(ctx, ref); err != nil {
fmt.Println("disconnect error", err)
}
}
var resultUpdated bool
cm.progress.Unpause()
ref, _, err := cm.m.Build(ctx, *bo, nil, cm.progress) // TODO: support stdin, hold build ref
cm.progress.Pause()
if err != nil {
var be *controllererrors.BuildError
if errors.As(err, &be) {
ref = be.Ref
resultUpdated = true
} else {
fmt.Printf("failed to reload: %v\n", err)
}
} else {
resultUpdated = true
}
cm.m.AttachSession(ref)
if resultUpdated {
// rollback the running container with the new result
id := cm.m.Rollback(ctx, cm.invokeConfig)
fmt.Fprintf(cm.stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
}
return nil
}

@ -0,0 +1,43 @@
package commands
import (
"context"
"fmt"
"io"
controllerapi "github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/monitor/types"
)
type RollbackCmd struct {
m types.Monitor
invokeConfig controllerapi.InvokeConfig
stdout io.WriteCloser
}
func NewRollbackCmd(m types.Monitor, invokeConfig controllerapi.InvokeConfig, stdout io.WriteCloser) types.Command {
return &RollbackCmd{m, invokeConfig, stdout}
}
func (cm *RollbackCmd) Info() types.CommandInfo {
return types.CommandInfo{HelpMessage: "re-runs the interactive container with initial rootfs contents"}
}
func (cm *RollbackCmd) Exec(ctx context.Context, args []string) error {
cfg := cm.invokeConfig
if len(args) >= 2 {
cmds := args[1:]
if cmds[0] == "--init" {
cfg.Initial = true
cmds = cmds[1:]
}
if len(cmds) > 0 {
cfg.Entrypoint = []string{cmds[0]}
cfg.Cmd = cmds[1:]
}
}
id := cm.m.Rollback(ctx, cfg)
fmt.Fprintf(cm.stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
return nil
}

@ -5,37 +5,24 @@ import (
"fmt" "fmt"
"io" "io"
"sort" "sort"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"text/tabwriter" "text/tabwriter"
"github.com/containerd/console" "github.com/containerd/console"
"github.com/docker/buildx/controller/control" "github.com/docker/buildx/controller/control"
controllererrors "github.com/docker/buildx/controller/errdefs"
controllerapi "github.com/docker/buildx/controller/pb" controllerapi "github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/monitor/commands"
"github.com/docker/buildx/monitor/types"
"github.com/docker/buildx/util/ioset" "github.com/docker/buildx/util/ioset"
"github.com/docker/buildx/util/progress" "github.com/docker/buildx/util/progress"
"github.com/google/shlex"
"github.com/moby/buildkit/identity" "github.com/moby/buildkit/identity"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/term" "golang.org/x/term"
) )
const helpMessage = `
Available commands are:
reload reloads the context and build it.
rollback re-runs the interactive container with initial rootfs contents.
list list buildx sessions.
attach attach to a buildx server or a process in the container.
exec execute a process in the interactive container.
ps list processes invoked by "exec". Use "attach" to attach IO to that process.
disconnect disconnect a client from a buildx server. Specific session ID can be specified an arg.
kill kill buildx server.
exit exits monitor.
help shows this message.
`
// RunMonitor provides an interactive session for running and managing containers via specified IO. // RunMonitor provides an interactive session for running and managing containers via specified IO.
func RunMonitor(ctx context.Context, curRef string, options *controllerapi.BuildOptions, invokeConfig controllerapi.InvokeConfig, c control.BuildxController, stdin io.ReadCloser, stdout io.WriteCloser, stderr console.File, progress *progress.Printer) error { func RunMonitor(ctx context.Context, curRef string, options *controllerapi.BuildOptions, invokeConfig controllerapi.InvokeConfig, c control.BuildxController, stdin io.ReadCloser, stdout io.WriteCloser, stderr console.File, progress *progress.Printer) error {
defer func() { defer func() {
@ -75,7 +62,8 @@ func RunMonitor(ctx context.Context, curRef string, options *controllerapi.Build
invokeForwarder := ioset.NewForwarder() invokeForwarder := ioset.NewForwarder()
invokeForwarder.SetIn(&containerIn) invokeForwarder.SetIn(&containerIn)
m := &monitor{ m := &monitor{
invokeIO: invokeForwarder, BuildxController: c,
invokeIO: invokeForwarder,
muxIO: ioset.NewMuxIO(ioset.In{ muxIO: ioset.NewMuxIO(ioset.In{
Stdin: io.NopCloser(stdin), Stdin: io.NopCloser(stdin),
Stdout: nopCloser{stdout}, Stdout: nopCloser{stdout},
@ -87,16 +75,31 @@ func RunMonitor(ctx context.Context, curRef string, options *controllerapi.Build
} }
return "Switched IO\n" return "Switched IO\n"
}), }),
invokeFunc: c.Invoke,
} }
m.ref.Store(curRef)
// Start container automatically // Start container automatically
fmt.Fprintf(stdout, "Launching interactive container. Press Ctrl-a-c to switch to monitor console\n") fmt.Fprintf(stdout, "Launching interactive container. Press Ctrl-a-c to switch to monitor console\n")
invokeConfig.Rollback = false invokeConfig.Rollback = false
invokeConfig.Initial = false invokeConfig.Initial = false
id := m.rollback(ctx, curRef, invokeConfig) id := m.Rollback(ctx, invokeConfig)
fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id) fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
registeredCommands := map[string]types.Command{
"reload": commands.NewReloadCmd(m, stdout, progress, options, invokeConfig),
"rollback": commands.NewRollbackCmd(m, invokeConfig, stdout),
"list": commands.NewListCmd(m, stdout),
"disconnect": commands.NewDisconnectCmd(m),
"kill": commands.NewKillCmd(m),
"attach": commands.NewAttachCmd(m, stdout),
"exec": commands.NewExecCmd(m, invokeConfig, stdout),
"ps": commands.NewPsCmd(m, stdout),
}
additionalHelpMessages := map[string]string{
"help": "shows this message",
"exit": "exits monitor",
}
// Serve monitor commands // Serve monitor commands
monitorForwarder := ioset.NewForwarder() monitorForwarder := ioset.NewForwarder()
monitorForwarder.SetIn(&monitorIn) monitorForwarder.SetIn(&monitorIn)
@ -122,178 +125,36 @@ func RunMonitor(ctx context.Context, curRef string, options *controllerapi.Build
} }
return return
} }
args := strings.Fields(l) // TODO: use shlex args, err := shlex.Split(l)
if len(args) == 0 { if err != nil {
fmt.Fprintf(stdout, "monitor: failed to parse command: %v", err)
continue
} else if len(args) == 0 {
continue continue
} }
// Builtin commands
switch args[0] { switch args[0] {
case "": case "":
// nop // nop
case "reload": continue
var bo *controllerapi.BuildOptions
if curRef != "" {
// Rebuilding an existing session; Restore the build option used for building this session.
res, err := c.Inspect(ctx, curRef)
if err != nil {
fmt.Printf("failed to inspect the current build session: %v\n", err)
} else {
bo = res.Options
}
} else {
bo = options
}
if bo == nil {
fmt.Println("reload: no build option is provided")
continue
}
if curRef != "" {
if err := c.Disconnect(ctx, curRef); err != nil {
fmt.Println("disconnect error", err)
}
}
var resultUpdated bool
progress.Unpause()
ref, _, err := c.Build(ctx, *bo, nil, progress) // TODO: support stdin, hold build ref
progress.Pause()
if err != nil {
var be *controllererrors.BuildError
if errors.As(err, &be) {
curRef = be.Ref
resultUpdated = true
} else {
fmt.Printf("failed to reload: %v\n", err)
}
} else {
curRef = ref
resultUpdated = true
}
if resultUpdated {
// rollback the running container with the new result
id := m.rollback(ctx, curRef, invokeConfig)
fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
}
case "rollback":
cfg := invokeConfig
if len(args) >= 2 {
cmds := args[1:]
if cmds[0] == "--init" {
cfg.Initial = true
cmds = cmds[1:]
}
if len(cmds) > 0 {
cfg.Entrypoint = []string{cmds[0]}
cfg.Cmd = cmds[1:]
}
}
id := m.rollback(ctx, curRef, cfg)
fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
case "list":
refs, err := c.List(ctx)
if err != nil {
fmt.Printf("failed to list: %v\n", err)
}
sort.Strings(refs)
tw := tabwriter.NewWriter(stdout, 1, 8, 1, '\t', 0)
fmt.Fprintln(tw, "ID\tCURRENT_SESSION")
for _, k := range refs {
fmt.Fprintf(tw, "%-20s\t%v\n", k, k == curRef)
}
tw.Flush()
case "disconnect":
target := curRef
if len(args) >= 2 {
target = args[1]
}
isProcess, err := isProcessID(ctx, c, curRef, target)
if err == nil && isProcess {
if err := c.DisconnectProcess(ctx, curRef, target); err != nil {
fmt.Printf("disconnect process failed %v\n", target)
continue
}
continue
}
if err := c.Disconnect(ctx, target); err != nil {
fmt.Println("disconnect error", err)
}
case "kill":
if err := c.Kill(ctx); err != nil {
fmt.Printf("failed to kill: %v\n", err)
}
case "attach":
if len(args) < 2 {
fmt.Println("attach: server name must be passed")
continue
}
ref := args[1]
var id string
isProcess, err := isProcessID(ctx, c, curRef, ref)
if err == nil && isProcess {
m.attach(ctx, curRef, ref)
id = ref
}
if id == "" {
refs, err := c.List(ctx)
if err != nil {
fmt.Printf("failed to get the list of sessions: %v\n", err)
continue
}
found := false
for _, s := range refs {
if s == ref {
found = true
break
}
}
if !found {
fmt.Printf("unknown ID: %q\n", ref)
continue
}
if m.invokeCancel != nil {
m.invokeCancel() // Finish existing attach
}
curRef = ref
}
fmt.Fprintf(stdout, "Attached to process %q. Press Ctrl-a-c to switch to the new container\n", id)
case "exec":
if len(args) < 2 {
fmt.Println("attach: command must be passed")
continue
}
if curRef == "" {
fmt.Println("attach to a session first")
continue
}
cfg := controllerapi.InvokeConfig{
Entrypoint: []string{args[1]},
Cmd: args[2:],
// TODO: support other options as well via flags
Env: invokeConfig.Env,
User: invokeConfig.User,
Cwd: invokeConfig.Cwd,
Tty: true,
}
pid := m.exec(ctx, curRef, cfg)
fmt.Fprintf(stdout, "Process %q started. Press Ctrl-a-c to switch to that process.\n", pid)
case "ps":
plist, err := c.ListProcesses(ctx, curRef)
if err != nil {
fmt.Println("cannot list process:", err)
continue
}
tw := tabwriter.NewWriter(stdout, 1, 8, 1, '\t', 0)
fmt.Fprintln(tw, "PID\tCURRENT_SESSION\tCOMMAND")
for _, p := range plist {
fmt.Fprintf(tw, "%-20s\t%v\t%v\n", p.ProcessID, p.ProcessID == m.attachedPid.Load(), append(p.InvokeConfig.Entrypoint, p.InvokeConfig.Cmd...))
}
tw.Flush()
case "exit": case "exit":
return return
case "help": case "help":
fmt.Fprint(stdout, helpMessage) printHelpMessage(stdout, registeredCommands, additionalHelpMessages)
continue
default: default:
fmt.Printf("unknown command: %q\n", l) }
fmt.Fprint(stdout, helpMessage)
// Registered commands
cmdname := args[0]
if cm, ok := registeredCommands[cmdname]; ok {
if err := cm.Exec(ctx, args); err != nil {
fmt.Fprintf(stdout, "%s: %v\n", cmdname, err)
}
} else {
fmt.Fprintf(stdout, "monitor: unknown command: %q\n", l)
printHelpMessage(stdout, registeredCommands, additionalHelpMessages)
} }
} }
}() }()
@ -310,41 +171,89 @@ func RunMonitor(ctx context.Context, curRef string, options *controllerapi.Build
} }
} }
func printHelpMessage(out io.Writer, registeredCommands map[string]types.Command, additional map[string]string) {
var names []string
for name := range registeredCommands {
names = append(names, name)
}
for name := range additional {
names = append(names, name)
}
sort.Strings(names)
fmt.Fprint(out, "Available commands are:\n")
w := new(tabwriter.Writer)
w.Init(out, 0, 8, 0, '\t', 0)
for _, name := range names {
var mes string
if c, ok := registeredCommands[name]; ok {
mes = c.Info().HelpMessage
} else if m, ok := additional[name]; ok {
mes = m
} else {
continue
}
fmt.Fprintln(w, " "+name+"\t"+mes)
}
w.Flush()
}
type readWriter struct { type readWriter struct {
io.Reader io.Reader
io.Writer io.Writer
} }
type monitor struct { type monitor struct {
control.BuildxController
ref atomic.Value
muxIO *ioset.MuxIO muxIO *ioset.MuxIO
invokeIO *ioset.Forwarder invokeIO *ioset.Forwarder
invokeFunc func(ctx context.Context, ref, pid string, cfg controllerapi.InvokeConfig, in io.ReadCloser, out io.WriteCloser, err io.WriteCloser) error
invokeCancel func() invokeCancel func()
attachedPid atomic.Value attachedPid atomic.Value
} }
func (m *monitor) rollback(ctx context.Context, ref string, cfg controllerapi.InvokeConfig) string { func (m *monitor) DisconnectSession(ctx context.Context, targetID string) error {
return m.Disconnect(ctx, targetID)
}
func (m *monitor) AttachSession(ref string) {
m.ref.Store(ref)
}
func (m *monitor) AttachedSessionID() string {
return m.ref.Load().(string)
}
func (m *monitor) Rollback(ctx context.Context, cfg controllerapi.InvokeConfig) string {
pid := identity.NewID() pid := identity.NewID()
cfg1 := cfg cfg1 := cfg
cfg1.Rollback = true cfg1.Rollback = true
return m.startInvoke(ctx, ref, pid, cfg1) return m.startInvoke(ctx, pid, cfg1)
} }
func (m *monitor) exec(ctx context.Context, ref string, cfg controllerapi.InvokeConfig) string { func (m *monitor) Exec(ctx context.Context, cfg controllerapi.InvokeConfig) string {
return m.startInvoke(ctx, ref, identity.NewID(), cfg) return m.startInvoke(ctx, identity.NewID(), cfg)
} }
func (m *monitor) attach(ctx context.Context, ref, pid string) { func (m *monitor) Attach(ctx context.Context, pid string) {
m.startInvoke(ctx, ref, pid, controllerapi.InvokeConfig{}) m.startInvoke(ctx, pid, controllerapi.InvokeConfig{})
} }
func (m *monitor) close() { func (m *monitor) Detach() {
if m.invokeCancel != nil { if m.invokeCancel != nil {
m.invokeCancel() m.invokeCancel() // Finish existing attach
} }
} }
func (m *monitor) startInvoke(ctx context.Context, ref, pid string, cfg controllerapi.InvokeConfig) string { func (m *monitor) AttachedPID() string {
return m.attachedPid.Load().(string)
}
func (m *monitor) close() {
m.Detach()
}
func (m *monitor) startInvoke(ctx context.Context, pid string, cfg controllerapi.InvokeConfig) string {
if m.invokeCancel != nil { if m.invokeCancel != nil {
m.invokeCancel() // Finish existing attach m.invokeCancel() // Finish existing attach
} }
@ -353,7 +262,7 @@ func (m *monitor) startInvoke(ctx context.Context, ref, pid string, cfg controll
} }
go func() { go func() {
// Start a new invoke // Start a new invoke
if err := m.invoke(ctx, ref, pid, cfg); err != nil { if err := m.invoke(ctx, pid, cfg); err != nil {
logrus.Debugf("invoke error: %v", err) logrus.Debugf("invoke error: %v", err)
} }
if pid == m.attachedPid.Load() { if pid == m.attachedPid.Load() {
@ -364,13 +273,13 @@ func (m *monitor) startInvoke(ctx context.Context, ref, pid string, cfg controll
return pid return pid
} }
func (m *monitor) invoke(ctx context.Context, ref, pid string, cfg controllerapi.InvokeConfig) error { func (m *monitor) invoke(ctx context.Context, pid string, cfg controllerapi.InvokeConfig) error {
m.muxIO.Enable(1) m.muxIO.Enable(1)
defer m.muxIO.Disable(1) defer m.muxIO.Disable(1)
if err := m.muxIO.SwitchTo(1); err != nil { if err := m.muxIO.SwitchTo(1); err != nil {
return errors.Errorf("failed to switch to process IO: %v", err) return errors.Errorf("failed to switch to process IO: %v", err)
} }
if ref == "" { if m.AttachedSessionID() == "" {
return nil return nil
} }
invokeCtx, invokeCancel := context.WithCancel(ctx) invokeCtx, invokeCancel := context.WithCancel(ctx)
@ -390,7 +299,7 @@ func (m *monitor) invoke(ctx context.Context, ref, pid string, cfg controllerapi
defer invokeCancelAndDetachFn() defer invokeCancelAndDetachFn()
m.invokeCancel = invokeCancelAndDetachFn m.invokeCancel = invokeCancelAndDetachFn
err := m.invokeFunc(invokeCtx, ref, pid, cfg, containerIn.Stdin, containerIn.Stdout, containerIn.Stderr) err := m.Invoke(invokeCtx, m.AttachedSessionID(), pid, cfg, containerIn.Stdin, containerIn.Stdout, containerIn.Stderr)
close(waitInvokeDoneCh) close(waitInvokeDoneCh)
return err return err
@ -401,16 +310,3 @@ type nopCloser struct {
} }
func (c nopCloser) Close() error { return nil } func (c nopCloser) Close() error { return nil }
func isProcessID(ctx context.Context, c control.BuildxController, curRef, ref string) (bool, error) {
infos, err := c.ListProcesses(ctx, curRef)
if err != nil {
return false, err
}
for _, p := range infos {
if p.ProcessID == ref {
return true, nil
}
}
return false, nil
}

@ -0,0 +1,54 @@
package types
import (
"context"
"github.com/docker/buildx/controller/control"
controllerapi "github.com/docker/buildx/controller/pb"
)
// Monitor provides APIs for attaching and controlling the buildx server.
type Monitor interface {
control.BuildxController
// Rollback re-runs the interactive container with initial rootfs contents.
Rollback(ctx context.Context, cfg controllerapi.InvokeConfig) string
// Rollback executes a process in the interactive container.
Exec(ctx context.Context, cfg controllerapi.InvokeConfig) string
// Attach attaches IO to a process in the container.
Attach(ctx context.Context, pid string)
// AttachedPID returns ID of the attached process.
AttachedPID() string
// Detach detaches IO from the container.
Detach()
// DisconnectSession finishes the specified session.
DisconnectSession(ctx context.Context, targetID string) error
// AttachSession attaches the monitor to the specified session.
AttachSession(ref string)
// AttachedSessionID returns the ID of the attached session.
AttachedSessionID() string
}
// CommandInfo is information about a command.
type CommandInfo struct {
// HelpMessage is the message printed to the console when "help" command is invoked.
HelpMessage string
}
// Command represents a command for debugging.
type Command interface {
// Exec executes the command.
Exec(ctx context.Context, args []string) error
// Info returns information of the command.
Info() CommandInfo
}
Loading…
Cancel
Save