diff --git a/build/build.go b/build/build.go index 86095371..f479aa65 100644 --- a/build/build.go +++ b/build/build.go @@ -920,7 +920,12 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s } results.Set(resultKey(dp.driverIndex, k), res) if resultHandleFunc != nil { - resultHandleFunc(dp.driverIndex, &ResultContext{cc, res}) + resultCtx, err := NewResultContext(cc, so, res) + if err == nil { + resultHandleFunc(dp.driverIndex, resultCtx) + } else { + logrus.Warnf("failed to record result: %s", err) + } } return res, nil } diff --git a/build/invoke.go b/build/invoke.go index 427b3e1c..38425ec3 100644 --- a/build/invoke.go +++ b/build/invoke.go @@ -3,110 +3,68 @@ package build import ( "context" _ "crypto/sha256" // ensure digests can be computed - "encoding/json" - "fmt" "io" "sync" "sync/atomic" "syscall" controllerapi "github.com/docker/buildx/controller/pb" - "github.com/moby/buildkit/client" - "github.com/moby/buildkit/exporter/containerimage/exptypes" gateway "github.com/moby/buildkit/frontend/gateway/client" - "github.com/moby/buildkit/solver/pb" - specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) -// ResultContext is a build result with the client that built it. -type ResultContext struct { - Client *client.Client - Res *gateway.Result -} - type Container struct { cancelOnce sync.Once containerCancel func() - - isUnavailable atomic.Bool - - initStarted atomic.Bool - - container gateway.Container - image *specs.Image - - releaseCh chan struct{} + isUnavailable atomic.Bool + initStarted atomic.Bool + container gateway.Container + releaseCh chan struct{} + resultCtx *ResultContext } -func NewContainer(ctx context.Context, resultCtx *ResultContext) (*Container, error) { - c, res := resultCtx.Client, resultCtx.Res - +func NewContainer(ctx context.Context, resultCtx *ResultContext, cfg *controllerapi.InvokeConfig) (*Container, error) { mainCtx := ctx ctrCh := make(chan *Container) errCh := make(chan error) go func() { - _, err := c.Build(context.TODO(), client.SolveOpt{}, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { + err := resultCtx.build(func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { ctx, cancel := context.WithCancel(ctx) go func() { <-mainCtx.Done() cancel() }() - if res.Ref == nil { - return nil, errors.Errorf("no reference is registered") - } - st, err := res.Ref.ToState() - if err != nil { - return nil, err - } - def, err := st.Marshal(ctx) - if err != nil { - return nil, err - } - imgRef, err := c.Solve(ctx, gateway.SolveRequest{ - Definition: def.ToPB(), - }) + containerCfg, err := resultCtx.getContainerConfig(ctx, c, cfg) if err != nil { return nil, err } containerCtx, containerCancel := context.WithCancel(ctx) defer containerCancel() - bkContainer, err := c.NewContainer(containerCtx, gateway.NewContainerRequest{ - Mounts: []gateway.Mount{ - { - Dest: "/", - MountType: pb.MountType_BIND, - Ref: imgRef.Ref, - }, - }, - }) + bkContainer, err := c.NewContainer(containerCtx, containerCfg) if err != nil { return nil, err } - imgData := res.Metadata[exptypes.ExporterImageConfigKey] - var img *specs.Image - if len(imgData) > 0 { - img = &specs.Image{} - if err := json.Unmarshal(imgData, img); err != nil { - fmt.Println(err) - return nil, err - } - } releaseCh := make(chan struct{}) container := &Container{ containerCancel: containerCancel, container: bkContainer, - image: img, releaseCh: releaseCh, + resultCtx: resultCtx, } + doneCh := make(chan struct{}) + defer close(doneCh) + resultCtx.registerCleanup(func() { + container.Cancel() + <-doneCh + }) ctrCh <- container <-container.releaseCh return nil, bkContainer.Release(ctx) - }, nil) + }) if err != nil { errCh <- err } @@ -146,7 +104,7 @@ func (c *Container) Exec(ctx context.Context, cfg *controllerapi.InvokeConfig, s c.markUnavailable() }() } - err := exec(ctx, cfg, c.container, c.image, stdin, stdout, stderr) + err := exec(ctx, c.resultCtx, cfg, c.container, stdin, stdout, stderr) if err != nil { // Container becomes unavailable if one of the processes fails in it. c.markUnavailable() @@ -154,48 +112,12 @@ func (c *Container) Exec(ctx context.Context, cfg *controllerapi.InvokeConfig, s return err } -func exec(ctx context.Context, cfg *controllerapi.InvokeConfig, ctr gateway.Container, img *specs.Image, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error { - user := "" - if !cfg.NoUser { - user = cfg.User - } else if img != nil { - user = img.Config.User - } - - cwd := "" - if !cfg.NoCwd { - cwd = cfg.Cwd - } else if img != nil { - cwd = img.Config.WorkingDir - } - - env := []string{} - if img != nil { - env = append(env, img.Config.Env...) - } - env = append(env, cfg.Env...) - - args := []string{} - if cfg.Entrypoint != nil { - args = append(args, cfg.Entrypoint...) - } - if cfg.Cmd != nil { - args = append(args, cfg.Cmd...) - } - // caller should always set args - if len(args) == 0 { - return errors.Errorf("specify args to execute") +func exec(ctx context.Context, resultCtx *ResultContext, cfg *controllerapi.InvokeConfig, ctr gateway.Container, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error { + processCfg, err := resultCtx.getProcessConfig(cfg, stdin, stdout, stderr) + if err != nil { + return err } - proc, err := ctr.Start(ctx, gateway.StartRequest{ - Args: args, - Env: env, - User: user, - Cwd: cwd, - Tty: cfg.Tty, - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - }) + proc, err := ctr.Start(ctx, processCfg) if err != nil { return errors.Errorf("failed to start container: %v", err) } diff --git a/build/result.go b/build/result.go new file mode 100644 index 00000000..a999da33 --- /dev/null +++ b/build/result.go @@ -0,0 +1,365 @@ +package build + +import ( + "context" + _ "crypto/sha256" // ensure digests can be computed + "encoding/json" + "io" + "sync" + "sync/atomic" + + controllerapi "github.com/docker/buildx/controller/pb" + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/exporter/containerimage/exptypes" + gateway "github.com/moby/buildkit/frontend/gateway/client" + "github.com/moby/buildkit/solver/errdefs" + "github.com/moby/buildkit/solver/pb" + specs "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +func NewResultContext(c *client.Client, solveOpt client.SolveOpt, res *gateway.Result) (*ResultContext, error) { + ctx := context.Background() + def, err := getDefinition(ctx, res) + if err != nil { + return nil, err + } + return getResultAt(ctx, c, solveOpt, def, nil) +} + +func getDefinition(ctx context.Context, res *gateway.Result) (*pb.Definition, error) { + ref, err := res.SingleRef() + if err != nil { + return nil, err + } + st, err := ref.ToState() + if err != nil { + return nil, err + } + def, err := st.Marshal(ctx) + if err != nil { + return nil, err + } + return def.ToPB(), nil +} + +func getResultAt(ctx context.Context, c *client.Client, solveOpt client.SolveOpt, target *pb.Definition, statusChan chan *client.SolveStatus) (*ResultContext, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // forward SolveStatus + done := new(atomic.Bool) + defer done.Store(true) + ch := make(chan *client.SolveStatus) + go func() { + for { + s := <-ch + if s == nil { + return + } + if done.Load() { + // Do not forward if the function returned because statusChan is possibly closed + continue + } + select { + case statusChan <- s: + case <-ctx.Done(): + } + } + }() + + // get result + resultCtxCh := make(chan *ResultContext) + errCh := make(chan error) + go func() { + resultCtx := ResultContext{ + client: c, + solveOpt: solveOpt, + } + _, err := c.Build(context.Background(), solveOpt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + res2, err := c.Solve(ctx, gateway.SolveRequest{ + Evaluate: true, + Definition: target, + }) + if err != nil { + var se *errdefs.SolveError + if errors.As(err, &se) { + resultCtx.solveErr = se + } else { + return nil, err + } + } + // Record the client and ctx as well so that containers can be created from the SolveError. + resultCtx.res = res2 + resultCtx.gwClient = c + resultCtx.gwCtx = ctx + resultCtx.gwDone = cancel + select { + case resultCtxCh <- &resultCtx: + case <-ctx.Done(): + return nil, ctx.Err() + } + <-ctx.Done() + return nil, nil + }, ch) + if err != nil { + errCh <- err + } + }() + + select { + case resultCtx := <-resultCtxCh: + return resultCtx, nil + case err := <-errCh: + return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// ResultContext is a build result with the client that built it. +type ResultContext struct { + client *client.Client + res *gateway.Result + solveOpt client.SolveOpt + + solveErr *errdefs.SolveError + + gwClient gateway.Client + gwCtx context.Context + gwDone func() + gwDoneOnce sync.Once + + cleanups []func() + cleanupsMu sync.Mutex +} + +func (r *ResultContext) Done() { + r.gwDoneOnce.Do(func() { + r.cleanupsMu.Lock() + cleanups := r.cleanups + r.cleanups = nil + r.cleanupsMu.Unlock() + for _, f := range cleanups { + f() + } + r.gwDone() + }) +} + +func (r *ResultContext) registerCleanup(f func()) { + r.cleanupsMu.Lock() + r.cleanups = append(r.cleanups, f) + r.cleanupsMu.Unlock() +} + +func (r *ResultContext) build(buildFunc gateway.BuildFunc) (err error) { + _, err = buildFunc(r.gwCtx, r.gwClient) + return err +} + +func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client, cfg *controllerapi.InvokeConfig) (containerCfg gateway.NewContainerRequest, _ error) { + if r.res != nil && r.solveErr == nil { + logrus.Debugf("creating container from successful build") + ccfg, err := containerConfigFromResult(ctx, r.res, c, *cfg) + if err != nil { + return containerCfg, err + } + containerCfg = *ccfg + } else { + logrus.Debugf("creating container from failed build %+v", cfg) + ccfg, err := containerConfigFromError(r.solveErr, *cfg) + if err != nil { + return containerCfg, errors.Wrapf(err, "no result nor error is available") + } + containerCfg = *ccfg + } + return containerCfg, nil +} + +func (r *ResultContext) getProcessConfig(cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) (_ gateway.StartRequest, err error) { + processCfg := newStartRequest(stdin, stdout, stderr) + if r.res != nil && r.solveErr == nil { + logrus.Debugf("creating container from successful build") + if err := populateProcessConfigFromResult(&processCfg, r.res, *cfg); err != nil { + return processCfg, err + } + } else { + logrus.Debugf("creating container from failed build %+v", cfg) + if err := populateProcessConfigFromError(&processCfg, r.solveErr, *cfg); err != nil { + return processCfg, err + } + } + return processCfg, nil +} + +func containerConfigFromResult(ctx context.Context, res *gateway.Result, c gateway.Client, cfg controllerapi.InvokeConfig) (*gateway.NewContainerRequest, error) { + if res.Ref == nil { + return nil, errors.Errorf("no reference is registered") + } + st, err := res.Ref.ToState() + if err != nil { + return nil, err + } + def, err := st.Marshal(ctx) + if err != nil { + return nil, err + } + imgRef, err := c.Solve(ctx, gateway.SolveRequest{ + Definition: def.ToPB(), + }) + if err != nil { + return nil, err + } + return &gateway.NewContainerRequest{ + Mounts: []gateway.Mount{ + { + Dest: "/", + MountType: pb.MountType_BIND, + Ref: imgRef.Ref, + }, + }, + }, nil +} + +func populateProcessConfigFromResult(req *gateway.StartRequest, res *gateway.Result, cfg controllerapi.InvokeConfig) error { + imgData := res.Metadata[exptypes.ExporterImageConfigKey] + var img *specs.Image + if len(imgData) > 0 { + img = &specs.Image{} + if err := json.Unmarshal(imgData, img); err != nil { + return err + } + } + + user := "" + if !cfg.NoUser { + user = cfg.User + } else if img != nil { + user = img.Config.User + } + + cwd := "" + if !cfg.NoCwd { + cwd = cfg.Cwd + } else if img != nil { + cwd = img.Config.WorkingDir + } + + env := []string{} + if img != nil { + env = append(env, img.Config.Env...) + } + env = append(env, cfg.Env...) + + args := []string{} + if cfg.Entrypoint != nil { + args = append(args, cfg.Entrypoint...) + } else if img != nil { + args = append(args, img.Config.Entrypoint...) + } + if cfg.Cmd != nil { + args = append(args, cfg.Cmd...) + } else if img != nil { + args = append(args, img.Config.Cmd...) + } + + req.Args = args + req.Env = env + req.User = user + req.Cwd = cwd + req.Tty = cfg.Tty + + return nil +} + +func containerConfigFromError(solveErr *errdefs.SolveError, cfg controllerapi.InvokeConfig) (*gateway.NewContainerRequest, error) { + exec, err := execOpFromError(solveErr) + if err != nil { + return nil, err + } + var mounts []gateway.Mount + for i, mnt := range exec.Mounts { + rid := solveErr.Solve.MountIDs[i] + mounts = append(mounts, gateway.Mount{ + Selector: mnt.Selector, + Dest: mnt.Dest, + ResultID: rid, + Readonly: mnt.Readonly, + MountType: mnt.MountType, + CacheOpt: mnt.CacheOpt, + SecretOpt: mnt.SecretOpt, + SSHOpt: mnt.SSHOpt, + }) + } + return &gateway.NewContainerRequest{ + Mounts: mounts, + NetMode: exec.Network, + }, nil +} + +func populateProcessConfigFromError(req *gateway.StartRequest, solveErr *errdefs.SolveError, cfg controllerapi.InvokeConfig) error { + exec, err := execOpFromError(solveErr) + if err != nil { + return err + } + meta := exec.Meta + user := "" + if !cfg.NoUser { + user = cfg.User + } else { + user = meta.User + } + + cwd := "" + if !cfg.NoCwd { + cwd = cfg.Cwd + } else { + cwd = meta.Cwd + } + + env := append(meta.Env, cfg.Env...) + + args := []string{} + if cfg.Entrypoint != nil { + args = append(args, cfg.Entrypoint...) + } + if cfg.Cmd != nil { + args = append(args, cfg.Cmd...) + } + if len(args) == 0 { + args = meta.Args + } + + req.Args = args + req.Env = env + req.User = user + req.Cwd = cwd + req.Tty = cfg.Tty + + return nil +} + +func execOpFromError(solveErr *errdefs.SolveError) (*pb.ExecOp, error) { + if solveErr == nil { + return nil, errors.Errorf("no error is available") + } + switch op := solveErr.Solve.Op.GetOp().(type) { + case *pb.Op_Exec: + return op.Exec, nil + default: + return nil, errors.Errorf("invoke: unsupported error type") + } + // TODO: support other ops +} + +func newStartRequest(stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) gateway.StartRequest { + return gateway.StartRequest{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + } +} diff --git a/controller/local/controller.go b/controller/local/controller.go index b7eae6ad..3bda61fb 100644 --- a/controller/local/controller.go +++ b/controller/local/controller.go @@ -93,7 +93,7 @@ func (b *localController) Invoke(ctx context.Context, ref string, pid string, cf return errors.New("no build result is registered") } var err error - proc, err = b.processes.StartProcess(pid, b.resultCtx, &cfg) + proc, err = b.processes.StartProcess(pid, b.buildConfig.resultCtx, &cfg) if err != nil { return err } diff --git a/controller/processes/processes.go b/controller/processes/processes.go index 51f164fe..d0c0df42 100644 --- a/controller/processes/processes.go +++ b/controller/processes/processes.go @@ -111,7 +111,7 @@ func (m *Manager) StartProcess(pid string, resultCtx *build.ResultContext, cfg * go ctr.Cancel() // Finish the existing container } var err error - ctr, err = build.NewContainer(context.TODO(), resultCtx) + ctr, err = build.NewContainer(context.TODO(), resultCtx, cfg) if err != nil { return nil, errors.Errorf("failed to create container %v", err) }