build: rework build package to use only a single Build call

Signed-off-by: Justin Chadwell <me@jedevc.com>
pull/1750/head
Justin Chadwell 2 years ago
parent c6a78d216c
commit 1571c137bc

@ -653,11 +653,166 @@ func toSolveOpt(ctx context.Context, node builder.Node, multiDriver bool, opt Op
return &so, releaseF, nil
}
func build(ctx context.Context, c *client.Client, opt Options, so client.SolveOpt, pw progress.Writer) (*ResultContext, error) {
frontendInputs := make(map[string]*pb.Definition)
for key, st := range so.FrontendInputs {
def, err := st.Marshal(ctx)
if err != nil {
return nil, err
}
frontendInputs[key] = def.ToPB()
}
req := gateway.SolveRequest{
Frontend: so.Frontend,
FrontendInputs: frontendInputs,
FrontendOpt: make(map[string]string),
}
for k, v := range so.FrontendAttrs {
req.FrontendOpt[k] = v
}
so.Frontend = ""
so.FrontendInputs = nil
ch, chdone := progress.NewChannel(pw)
rcs := make(chan *ResultContext)
suspend := make(chan struct{})
suspendDone := make(chan struct{})
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var rc *ResultContext
var printRes map[string][]byte
rr, err := c.Build(ctx, so, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
var isFallback bool
var origErr error
for {
if opt.PrintFunc != nil {
if _, ok := req.FrontendOpt["frontend.caps"]; !ok {
req.FrontendOpt["frontend.caps"] = "moby.buildkit.frontend.subrequests+forward"
} else {
req.FrontendOpt["frontend.caps"] += ",moby.buildkit.frontend.subrequests+forward"
}
req.FrontendOpt["requestid"] = "frontend." + opt.PrintFunc.Name
if isFallback {
req.FrontendOpt["build-arg:BUILDKIT_SYNTAX"] = printFallbackImage
}
}
res, err := c.Solve(ctx, req)
if err != nil {
if origErr != nil {
return nil, err
}
var reqErr *errdefs.UnsupportedSubrequestError
if !isFallback {
if errors.As(err, &reqErr) {
switch reqErr.Name {
case "frontend.outline", "frontend.targets":
isFallback = true
origErr = err
continue
}
return nil, err
}
// buildkit v0.8 vendored in Docker 20.10 does not support typed errors
if strings.Contains(err.Error(), "unsupported request frontend.outline") || strings.Contains(err.Error(), "unsupported request frontend.targets") {
isFallback = true
origErr = err
continue
}
}
}
// TODO: would we want to compute this on demand instead of blocking?
eg, ctx2 := errgroup.WithContext(ctx)
res.EachRef(func(r gateway.Reference) error {
eg.Go(func() error {
return r.Evaluate(ctx2)
})
return nil
})
err = eg.Wait()
if opt.PrintFunc != nil {
printRes = res.Metadata
}
rc = &ResultContext{
gwClient: c,
gwCtx: ctx,
gwDone: cancel,
gwRef: res,
suspend: suspend,
suspendDone: suspendDone,
}
var se *errdefs.SolveError
if errors.As(err, &se) {
rc.gwErr = se
} else if err != nil {
return nil, err
}
rcs <- rc
<-suspend
return res, err
}
}, ch)
<-chdone
if rr != nil {
if rr.ExporterResponse == nil {
rr.ExporterResponse = map[string]string{}
}
for k, v := range printRes {
rr.ExporterResponse[k] = string(v)
}
}
rc.resp = rr
rc.err = err
close(suspendDone)
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case rc := <-rcs:
return rc, nil
}
}
func Build(ctx context.Context, nodes []builder.Node, opt map[string]Options, docker *dockerutil.Client, configDir string, w progress.Writer) (resp map[string]*client.SolveResponse, err error) {
return BuildWithResultHandler(ctx, nodes, opt, docker, configDir, w, nil)
resp = map[string]*client.SolveResponse{}
rcs, err := BuildResults(ctx, nodes, opt, docker, configDir, w)
if err != nil {
return nil, err
}
eg, ctx := errgroup.WithContext(ctx)
for k, v := range rcs {
k, v := k, v
eg.Go(func() error {
v2, err := v.Wait(ctx)
if err != nil {
return err
}
resp[k] = v2
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
return resp, nil
}
func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[string]Options, docker *dockerutil.Client, configDir string, w progress.Writer, resultHandleFunc func(driverIndex int, rCtx *ResultContext)) (resp map[string]*client.SolveResponse, err error) {
func BuildResults(ctx context.Context, nodes []builder.Node, opt map[string]Options, docker *dockerutil.Client, configDir string, w progress.Writer) (resp map[string]*ResultContext, err error) {
if len(nodes) == 0 {
return nil, errors.Errorf("driver required for build")
}
@ -708,8 +863,6 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
}
}()
eg, ctx := errgroup.WithContext(ctx)
for k, opt := range opt {
multiDriver := len(m[k]) > 1
hasMobyDriver := false
@ -785,8 +938,9 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
}
}
resp = map[string]*client.SolveResponse{}
eg, ctx := errgroup.WithContext(ctx)
var respMu sync.Mutex
resp = map[string]*ResultContext{}
results := waitmap.New()
multiTarget := len(opt) > 1
@ -801,15 +955,17 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
if multiTarget {
span, ctx = tracing.StartSpan(ctx, k)
}
baseCtx := ctx
res := make([]*client.SolveResponse, len(dps))
eg2, ctx := errgroup.WithContext(ctx)
res := make([]*ResultContext, len(dps))
var pushNames string
var insecurePush bool
wg := sync.WaitGroup{}
for i, dp := range dps {
wg.Add(1)
i, dp, so := i, dp, *dp.so
if multiDriver {
for i, e := range so.Exports {
@ -842,263 +998,188 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
pw := progress.WithPrefix(w, k, multiTarget)
c := clients[dp.driverIndex]
eg2.Go(func() error {
eg.Go(func() error {
pw = progress.ResetTime(pw)
if err := waitContextDeps(ctx, dp.driverIndex, results, &so); err != nil {
return err
}
frontendInputs := make(map[string]*pb.Definition)
for key, st := range so.FrontendInputs {
def, err := st.Marshal(ctx)
if err != nil {
return err
}
frontendInputs[key] = def.ToPB()
}
req := gateway.SolveRequest{
Frontend: so.Frontend,
FrontendInputs: frontendInputs,
FrontendOpt: make(map[string]string),
}
for k, v := range so.FrontendAttrs {
req.FrontendOpt[k] = v
}
so.Frontend = ""
so.FrontendInputs = nil
ch, done := progress.NewChannel(pw)
defer func() { <-done }()
cc := c
var printRes map[string][]byte
rr, err := c.Build(ctx, so, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
var isFallback bool
var origErr error
for {
if opt.PrintFunc != nil {
if _, ok := req.FrontendOpt["frontend.caps"]; !ok {
req.FrontendOpt["frontend.caps"] = "moby.buildkit.frontend.subrequests+forward"
} else {
req.FrontendOpt["frontend.caps"] += ",moby.buildkit.frontend.subrequests+forward"
}
req.FrontendOpt["requestid"] = "frontend." + opt.PrintFunc.Name
if isFallback {
req.FrontendOpt["build-arg:BUILDKIT_SYNTAX"] = printFallbackImage
}
}
res, err := c.Solve(ctx, req)
if err != nil {
if origErr != nil {
return nil, err
}
var reqErr *errdefs.UnsupportedSubrequestError
if !isFallback {
if errors.As(err, &reqErr) {
switch reqErr.Name {
case "frontend.outline", "frontend.targets":
isFallback = true
origErr = err
continue
}
return nil, err
}
// buildkit v0.8 vendored in Docker 20.10 does not support typed errors
if strings.Contains(err.Error(), "unsupported request frontend.outline") || strings.Contains(err.Error(), "unsupported request frontend.targets") {
isFallback = true
origErr = err
continue
}
}
return nil, err
}
if opt.PrintFunc != nil {
printRes = res.Metadata
}
results.Set(resultKey(dp.driverIndex, k), res)
if resultHandleFunc != nil {
resultCtx, err := NewResultContext(cc, so, res)
if err == nil {
resultHandleFunc(dp.driverIndex, resultCtx)
} else {
logrus.Warnf("failed to record result: %s", err)
}
}
return res, nil
}
}, ch)
result, err := build(ctx, c, opt, so, pw)
if err != nil {
return err
}
res[i] = rr
if rr.ExporterResponse == nil {
rr.ExporterResponse = map[string]string{}
}
for k, v := range printRes {
rr.ExporterResponse[k] = string(v)
}
node := nodes[dp.driverIndex].Driver
if node.IsMobyDriver() {
for _, e := range so.Exports {
if e.Type == "moby" && e.Attrs["push"] != "" {
if ok, _ := strconv.ParseBool(e.Attrs["push"]); ok {
pushNames = e.Attrs["name"]
if pushNames == "" {
return errors.Errorf("tag is needed when pushing to registry")
}
pw := progress.ResetTime(pw)
pushList := strings.Split(pushNames, ",")
for _, name := range pushList {
if err := progress.Wrap(fmt.Sprintf("pushing %s with docker", name), pw.Write, func(l progress.SubLogger) error {
return pushWithMoby(ctx, node, name, l)
}); err != nil {
return err
results.Set(resultKey(dp.driverIndex, k), result.gwRef)
res[i] = result
resp[k] = result
result.hook(func(ctx context.Context) error {
defer wg.Done()
node := nodes[dp.driverIndex].Driver
if node.IsMobyDriver() {
for _, e := range so.Exports {
if e.Type == "moby" && e.Attrs["push"] != "" {
if ok, _ := strconv.ParseBool(e.Attrs["push"]); ok {
pushNames = e.Attrs["name"]
if pushNames == "" {
return errors.Errorf("tag is needed when pushing to registry")
}
}
remoteDigest, err := remoteDigestWithMoby(ctx, node, pushList[0])
if err == nil && remoteDigest != "" {
// old daemons might not have containerimage.config.digest set
// in response so use containerimage.digest value for it if available
if _, ok := rr.ExporterResponse[exptypes.ExporterImageConfigDigestKey]; !ok {
if v, ok := rr.ExporterResponse[exptypes.ExporterImageDigestKey]; ok {
rr.ExporterResponse[exptypes.ExporterImageConfigDigestKey] = v
pw := progress.ResetTime(pw)
pushList := strings.Split(pushNames, ",")
for _, name := range pushList {
if err := progress.Wrap(fmt.Sprintf("pushing %s with docker", name), pw.Write, func(l progress.SubLogger) error {
return pushWithMoby(ctx, node, name, l)
}); err != nil {
return err
}
}
rr.ExporterResponse[exptypes.ExporterImageDigestKey] = remoteDigest
} else if err != nil {
return err
remoteDigest, err := remoteDigestWithMoby(ctx, node, pushList[0])
if err == nil && remoteDigest != "" {
// old daemons might not have containerimage.config.digest set
// in response so use containerimage.digest value for it if available
if _, ok := result.resp.ExporterResponse[exptypes.ExporterImageConfigDigestKey]; !ok {
if v, ok := result.resp.ExporterResponse[exptypes.ExporterImageDigestKey]; ok {
result.resp.ExporterResponse[exptypes.ExporterImageConfigDigestKey] = v
}
}
result.resp.ExporterResponse[exptypes.ExporterImageDigestKey] = remoteDigest
} else if err != nil {
return err
}
}
}
}
}
}
return nil
})
}
eg.Go(func() (err error) {
ctx := baseCtx
defer func() {
if span != nil {
tracing.FinishWithError(span, err)
}
}()
pw := progress.WithPrefix(w, "default", false)
if err := eg2.Wait(); err != nil {
return err
}
respMu.Lock()
resp[k] = res[0]
respMu.Unlock()
if len(res) == 1 {
return nil
}
return nil
})
if pushNames != "" {
progress.Write(pw, fmt.Sprintf("merging manifest list %s", pushNames), func() error {
descs := make([]specs.Descriptor, 0, len(res))
if i == 0 {
result.hook(func(ctx context.Context) error {
wg.Wait()
for _, r := range res {
s, ok := r.ExporterResponse[exptypes.ExporterImageDescriptorKey]
if ok {
dt, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return err
}
var desc specs.Descriptor
if err := json.Unmarshal(dt, &desc); err != nil {
return errors.Wrapf(err, "failed to unmarshal descriptor %s", s)
defer func() {
if span != nil {
tracing.FinishWithError(span, err)
}
descs = append(descs, desc)
continue
}
// This is fallback for some very old buildkit versions.
// Note that the mediatype isn't really correct as most of the time it is image manifest and
// not manifest list but actually both are handled because for Docker mediatypes the
// mediatype value in the Accpet header does not seem to matter.
s, ok = r.ExporterResponse[exptypes.ExporterImageDigestKey]
if ok {
descs = append(descs, specs.Descriptor{
Digest: digest.Digest(s),
MediaType: images.MediaTypeDockerSchema2ManifestList,
Size: -1,
})
}()
pw := progress.WithPrefix(w, "default", false)
respMu.Lock()
resp[k] = res[0]
respMu.Unlock()
if len(res) == 1 {
return nil
}
}
if len(descs) > 0 {
var imageopt imagetools.Opt
for _, dp := range dps {
imageopt = nodes[dp.driverIndex].ImageOpt
break
if pushNames == "" {
return nil
}
names := strings.Split(pushNames, ",")
if insecurePush {
insecureTrue := true
httpTrue := true
nn, err := reference.ParseNormalizedNamed(names[0])
if err != nil {
return err
}
imageopt.RegistryConfig = map[string]resolver.RegistryConfig{
reference.Domain(nn): {
Insecure: &insecureTrue,
PlainHTTP: &httpTrue,
},
progress.Write(pw, fmt.Sprintf("merging manifest list %s", pushNames), func() error {
descs := make([]specs.Descriptor, 0, len(res))
for _, r := range res {
s, ok := r.resp.ExporterResponse[exptypes.ExporterImageDescriptorKey]
if ok {
dt, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return err
}
var desc specs.Descriptor
if err := json.Unmarshal(dt, &desc); err != nil {
return errors.Wrapf(err, "failed to unmarshal descriptor %s", s)
}
descs = append(descs, desc)
continue
}
// This is fallback for some very old buildkit versions.
// Note that the mediatype isn't really correct as most of the time it is image manifest and
// not manifest list but actually both are handled because for Docker mediatypes the
// mediatype value in the Accpet header does not seem to matter.
s, ok = r.resp.ExporterResponse[exptypes.ExporterImageDigestKey]
if ok {
descs = append(descs, specs.Descriptor{
Digest: digest.Digest(s),
MediaType: images.MediaTypeDockerSchema2ManifestList,
Size: -1,
})
}
}
}
if len(descs) > 0 {
var imageopt imagetools.Opt
for _, dp := range dps {
imageopt = nodes[dp.driverIndex].ImageOpt
break
}
names := strings.Split(pushNames, ",")
itpull := imagetools.New(imageopt)
if insecurePush {
insecureTrue := true
httpTrue := true
nn, err := reference.ParseNormalizedNamed(names[0])
if err != nil {
return err
}
imageopt.RegistryConfig = map[string]resolver.RegistryConfig{
reference.Domain(nn): {
Insecure: &insecureTrue,
PlainHTTP: &httpTrue,
},
}
}
ref, err := reference.ParseNormalizedNamed(names[0])
if err != nil {
return err
}
ref = reference.TagNameOnly(ref)
itpull := imagetools.New(imageopt)
srcs := make([]*imagetools.Source, len(descs))
for i, desc := range descs {
srcs[i] = &imagetools.Source{
Desc: desc,
Ref: ref,
}
}
ref, err := reference.ParseNormalizedNamed(names[0])
if err != nil {
return err
}
ref = reference.TagNameOnly(ref)
dt, desc, err := itpull.Combine(ctx, srcs)
if err != nil {
return err
}
srcs := make([]*imagetools.Source, len(descs))
for i, desc := range descs {
srcs[i] = &imagetools.Source{
Desc: desc,
Ref: ref,
}
}
itpush := imagetools.New(imageopt)
dt, desc, err := itpull.Combine(ctx, srcs)
if err != nil {
return err
}
for _, n := range names {
nn, err := reference.ParseNormalizedNamed(n)
if err != nil {
return err
}
if err := itpush.Push(ctx, nn, desc, dt); err != nil {
return err
}
}
itpush := imagetools.New(imageopt)
respMu.Lock()
resp[k] = &client.SolveResponse{
ExporterResponse: map[string]string{
exptypes.ExporterImageDigestKey: desc.Digest.String(),
},
}
respMu.Unlock()
}
return nil
})
}
return nil
})
for _, n := range names {
nn, err := reference.ParseNormalizedNamed(n)
if err != nil {
return err
}
if err := itpush.Push(ctx, nn, desc, dt); err != nil {
return err
}
}
respMu.Lock()
resp[k].resp = &client.SolveResponse{
ExporterResponse: map[string]string{
exptypes.ExporterImageDigestKey: desc.Digest.String(),
},
}
respMu.Unlock()
}
return nil
})
return nil
})
}
return nil
})
}
}
if err := eg.Wait(); err != nil {

@ -6,7 +6,6 @@ import (
"encoding/json"
"io"
"sync"
"sync/atomic"
controllerapi "github.com/docker/buildx/controller/pb"
"github.com/moby/buildkit/client"
@ -17,110 +16,20 @@ import (
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
func NewResultContext(c *client.Client, solveOpt client.SolveOpt, res *gateway.Result) (*ResultContext, error) {
ctx := context.Background()
def, err := getDefinition(ctx, res)
if err != nil {
return nil, err
}
return getResultAt(ctx, c, solveOpt, def, nil)
}
func getDefinition(ctx context.Context, res *gateway.Result) (*pb.Definition, error) {
ref, err := res.SingleRef()
if err != nil {
return nil, err
}
st, err := ref.ToState()
if err != nil {
return nil, err
}
def, err := st.Marshal(ctx)
if err != nil {
return nil, err
}
return def.ToPB(), nil
}
func getResultAt(ctx context.Context, c *client.Client, solveOpt client.SolveOpt, target *pb.Definition, statusChan chan *client.SolveStatus) (*ResultContext, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// forward SolveStatus
done := new(atomic.Bool)
defer done.Store(true)
ch := make(chan *client.SolveStatus)
go func() {
for {
s := <-ch
if s == nil {
return
}
if done.Load() {
// Do not forward if the function returned because statusChan is possibly closed
continue
}
select {
case statusChan <- s:
case <-ctx.Done():
}
}
}()
// get result
resultCtxCh := make(chan *ResultContext)
errCh := make(chan error)
go func() {
_, err := c.Build(context.Background(), solveOpt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
resultCtx := ResultContext{}
res2, err := c.Solve(ctx, gateway.SolveRequest{
Evaluate: true,
Definition: target,
})
if err != nil {
var se *errdefs.SolveError
if errors.As(err, &se) {
resultCtx.solveErr = se
} else {
return nil, err
}
}
// Record the client and ctx as well so that containers can be created from the SolveError.
resultCtx.res = res2
resultCtx.gwClient = c
resultCtx.gwCtx = ctx
resultCtx.gwDone = cancel
select {
case resultCtxCh <- &resultCtx:
case <-ctx.Done():
return nil, ctx.Err()
}
<-ctx.Done()
return nil, nil
}, ch)
if err != nil {
errCh <- err
}
}()
select {
case resultCtx := <-resultCtxCh:
return resultCtx, nil
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}
// ResultContext is a build result with the client that built it.
type ResultContext struct {
res *gateway.Result
solveErr *errdefs.SolveError
resp *client.SolveResponse
err error
suspend chan<- struct{}
suspendDone <-chan struct{}
hooks []func(ctx context.Context) error
gwRef *gateway.Result
gwErr *errdefs.SolveError
gwClient gateway.Client
gwCtx context.Context
@ -131,7 +40,39 @@ type ResultContext struct {
cleanupsMu sync.Mutex
}
func (r *ResultContext) Done() {
func (r *ResultContext) hook(hook func(ctx context.Context) error) {
r.hooks = append(r.hooks, hook)
}
func (r *ResultContext) Result(ctx context.Context) (*gateway.Result, *errdefs.SolveError) {
return r.gwRef, r.gwErr
}
func (r *ResultContext) Wait(ctx context.Context) (*client.SolveResponse, error) {
defer r.Close()
close(r.suspend)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-r.suspendDone:
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
for _, f := range r.hooks {
if err := f(ctx); err != nil {
return err
}
}
return nil
})
if err := eg.Wait(); err != nil {
return nil, err
}
return r.resp, r.err
}
}
func (r *ResultContext) Close() {
r.gwDoneOnce.Do(func() {
r.cleanupsMu.Lock()
cleanups := r.cleanups
@ -156,16 +97,16 @@ func (r *ResultContext) build(buildFunc gateway.BuildFunc) (err error) {
}
func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client, cfg *controllerapi.InvokeConfig) (containerCfg gateway.NewContainerRequest, _ error) {
if r.res != nil && r.solveErr == nil {
if r.gwRef != nil && r.gwErr == nil {
logrus.Debugf("creating container from successful build")
ccfg, err := containerConfigFromResult(ctx, r.res, c, *cfg)
ccfg, err := containerConfigFromResult(ctx, r.gwRef, c, *cfg)
if err != nil {
return containerCfg, err
}
containerCfg = *ccfg
} else {
logrus.Debugf("creating container from failed build %+v", cfg)
ccfg, err := containerConfigFromError(r.solveErr, *cfg)
ccfg, err := containerConfigFromError(r.gwErr, *cfg)
if err != nil {
return containerCfg, errors.Wrapf(err, "no result nor error is available")
}
@ -176,14 +117,14 @@ func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client
func (r *ResultContext) getProcessConfig(cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) (_ gateway.StartRequest, err error) {
processCfg := newStartRequest(stdin, stdout, stderr)
if r.res != nil && r.solveErr == nil {
if r.gwRef != nil && r.gwErr == nil {
logrus.Debugf("creating container from successful build")
if err := populateProcessConfigFromResult(&processCfg, r.res, *cfg); err != nil {
if err := populateProcessConfigFromResult(&processCfg, r.gwRef, *cfg); err != nil {
return processCfg, err
}
} else {
logrus.Debugf("creating container from failed build %+v", cfg)
if err := populateProcessConfigFromError(&processCfg, r.solveErr, *cfg); err != nil {
if err := populateProcessConfigFromError(&processCfg, r.gwErr, *cfg); err != nil {
return processCfg, err
}
}

@ -8,11 +8,13 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"github.com/containerd/console"
"github.com/docker/buildx/build"
@ -33,10 +35,14 @@ import (
"github.com/docker/cli/cli"
"github.com/docker/cli/cli/command"
dockeropts "github.com/docker/cli/opts"
"github.com/docker/docker/api/types/versions"
"github.com/docker/docker/builder/remotecontext/urlutil"
"github.com/docker/docker/pkg/ioutils"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/exporter/containerimage/exptypes"
"github.com/moby/buildkit/frontend/subrequests"
"github.com/moby/buildkit/frontend/subrequests/outline"
"github.com/moby/buildkit/frontend/subrequests/targets"
"github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/util/appcontext"
"github.com/moby/buildkit/util/grpcerrors"
@ -106,7 +112,6 @@ func (o *buildOptions) toControllerOptions() (controllerapi.BuildOptions, error)
NetworkMode: o.networkMode,
NoCacheFilter: o.noCacheFilter,
Platforms: o.platforms,
PrintFunc: o.printFunc,
ShmSize: int64(o.shmSize),
Tags: o.tags,
Target: o.target,
@ -153,6 +158,11 @@ func (o *buildOptions) toControllerOptions() (controllerapi.BuildOptions, error)
}
}
opts.PrintFunc, err = parsePrintFunc(o.printFunc)
if err != nil {
return controllerapi.BuildOptions{}, err
}
opts.CacheFrom, err = buildflags.ParseCacheEntry(o.cacheFrom)
if err != nil {
return controllerapi.BuildOptions{}, err
@ -231,9 +241,13 @@ func runBuild(dockerCli command.Cli, in buildOptions) error {
return errors.Wrap(err, "removing image ID file")
}
}
resp, _, err := cbuild.RunBuild(ctx, dockerCli, opts, os.Stdin, printer, false)
if err1 := printer.Wait(); err == nil {
err = err1
res, err := cbuild.RunBuild(ctx, dockerCli, opts, os.Stdin, printer)
resp, err2 := res.Wait(ctx)
if err == nil {
err = err2
}
if err2 := printer.Wait(); err == nil {
err = err2
}
if err != nil {
return err
@ -249,6 +263,16 @@ func runBuild(dockerCli command.Cli, in buildOptions) error {
}
return os.WriteFile(in.imageIDFile, []byte(dgst), 0644)
}
if in.metadataFile != "" {
if err := writeMetadataFile(in.metadataFile, decodeExporterResponse(resp.ExporterResponse)); err != nil {
return err
}
}
if opts.PrintFunc != nil {
if err := printResult(opts.PrintFunc, resp.ExporterResponse); err != nil {
return err
}
}
return nil
}
@ -518,7 +542,7 @@ func updateLastActivity(dockerCli command.Cli, ng *store.NodeGroup) error {
return txn.UpdateLastActivity(ng)
}
func launchControllerAndRunBuild(dockerCli command.Cli, options buildOptions) error {
func launchControllerAndRunBuild(dockerCli command.Cli, options buildOptions) (err error) {
ctx := context.TODO()
if options.invoke != nil && (options.dockerfileName == "-" || options.contextPath == "-") {
@ -560,6 +584,14 @@ func launchControllerAndRunBuild(dockerCli command.Cli, options buildOptions) er
if err != nil {
return err
}
printerCloseOnce := sync.Once{}
defer func() {
printerCloseOnce.Do(func() {
if err1 := printer.Wait(); err == nil {
err = err1
}
})
}()
// NOTE: buildx server has the current working directory different from the client
// so we need to resolve paths to abosolute ones in the client.
@ -588,11 +620,7 @@ func launchControllerAndRunBuild(dockerCli command.Cli, options buildOptions) er
}
}
var resp *client.SolveResponse
ref, resp, err = c.Build(ctx, opts, pr, printer)
if err1 := printer.Wait(); err == nil {
err = err1
}
ref, err = c.Build(ctx, opts, pr, printer)
if err != nil {
var be *controllererrors.BuildError
if errors.As(err, &be) {
@ -610,22 +638,13 @@ func launchControllerAndRunBuild(dockerCli command.Cli, options buildOptions) er
if err := pr.Close(); err != nil {
logrus.Debug("failed to close stdin pipe reader")
}
if options.quiet {
fmt.Println(resp.ExporterResponse[exptypes.ExporterImageDigestKey])
}
if options.imageIDFile != "" {
dgst := resp.ExporterResponse[exptypes.ExporterImageDigestKey]
if v, ok := resp.ExporterResponse[exptypes.ExporterImageConfigDigestKey]; ok {
dgst = v
}
return os.WriteFile(options.imageIDFile, []byte(dgst), 0644)
}
}
// post-build operations
if options.invoke != nil && options.invoke.needsMonitor(retErr) {
// HACK: pause the printer to prevent interference with monitor
printer.Pause(true)
pr2, pw2 := io.Pipe()
f.SetWriter(pw2, func() io.WriteCloser {
pw2.Close() // propagate EOF
@ -646,11 +665,40 @@ func launchControllerAndRunBuild(dockerCli command.Cli, options buildOptions) er
if err != nil {
logrus.Warnf("failed to run monitor: %v", err)
}
} else {
if err := c.Disconnect(ctx, ref); err != nil {
logrus.Warnf("disconnect error: %v", err)
printer.Pause(false)
}
resp, err := c.Finalize(ctx, ref)
printerCloseOnce.Do(func() {
if err1 := printer.Wait(); err == nil {
err = err1
}
})
if err != nil {
return err
}
if options.quiet {
fmt.Println(resp.ExporterResponse[exptypes.ExporterImageDigestKey])
}
if options.imageIDFile != "" {
dgst := resp.ExporterResponse[exptypes.ExporterImageDigestKey]
if v, ok := resp.ExporterResponse[exptypes.ExporterImageConfigDigestKey]; ok {
dgst = v
}
return os.WriteFile(options.imageIDFile, []byte(dgst), 0644)
}
if options.metadataFile != "" {
if err := writeMetadataFile(options.metadataFile, decodeExporterResponse(resp.ExporterResponse)); err != nil {
return err
}
}
if opts.PrintFunc != nil {
if err := printResult(opts.PrintFunc, resp.ExporterResponse); err != nil {
return err
}
}
return nil
}
@ -964,3 +1012,65 @@ func printWarnings(w io.Writer, warnings []client.VertexWarning, mode string) {
}
}
func printResult(f *controllerapi.PrintFunc, res map[string]string) error {
switch f.Name {
case "outline":
return printValue(outline.PrintOutline, outline.SubrequestsOutlineDefinition.Version, f.Format, res)
case "targets":
return printValue(targets.PrintTargets, targets.SubrequestsTargetsDefinition.Version, f.Format, res)
case "subrequests.describe":
return printValue(subrequests.PrintDescribe, subrequests.SubrequestsDescribeDefinition.Version, f.Format, res)
default:
if dt, ok := res["result.txt"]; ok {
fmt.Print(dt)
} else {
log.Printf("%s %+v", f, res)
}
}
return nil
}
type printFunc func([]byte, io.Writer) error
func printValue(printer printFunc, version string, format string, res map[string]string) error {
if format == "json" {
fmt.Fprintln(os.Stdout, res["result.json"])
return nil
}
if res["version"] != "" && versions.LessThan(version, res["version"]) && res["result.txt"] != "" {
// structure is too new and we don't know how to print it
fmt.Fprint(os.Stdout, res["result.txt"])
return nil
}
return printer([]byte(res["result.json"]), os.Stdout)
}
func parsePrintFunc(str string) (*controllerapi.PrintFunc, error) {
if str == "" {
return nil, nil
}
csvReader := csv.NewReader(strings.NewReader(str))
fields, err := csvReader.Read()
if err != nil {
return nil, err
}
f := &controllerapi.PrintFunc{}
for _, field := range fields {
parts := strings.SplitN(field, "=", 2)
if len(parts) == 2 {
if parts[0] == "format" {
f.Format = parts[1]
} else {
return nil, errors.Errorf("invalid print field: %s", field)
}
} else {
if f.Name != "" {
return nil, errors.Errorf("invalid print value: %s", str)
}
f.Name = field
}
}
return f, nil
}

@ -2,14 +2,10 @@ package build
import (
"context"
"encoding/base64"
"encoding/csv"
"encoding/json"
"io"
"os"
"path/filepath"
"strings"
"sync"
"github.com/docker/buildx/build"
"github.com/docker/buildx/builder"
@ -24,7 +20,6 @@ import (
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/config"
dockeropts "github.com/docker/cli/opts"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/go-units"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/session/auth/authprovider"
@ -36,13 +31,9 @@ import (
const defaultTargetName = "default"
// RunBuild runs the specified build and returns the result.
//
// NOTE: When an error happens during the build and this function acquires the debuggable *build.ResultContext,
// this function returns it in addition to the error (i.e. it does "return nil, res, err"). The caller can
// inspect the result and debug the cause of that error.
func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.BuildOptions, inStream io.Reader, progress progress.Writer, generateResult bool) (*client.SolveResponse, *build.ResultContext, error) {
func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.BuildOptions, inStream io.Reader, progress progress.Writer) (*build.ResultContext, error) {
if in.NoCache && len(in.NoCacheFilter) > 0 {
return nil, nil, errors.Errorf("--no-cache and --no-cache-filter cannot currently be used together")
return nil, errors.Errorf("--no-cache and --no-cache-filter cannot currently be used together")
}
contexts := map[string]build.NamedContext{}
@ -50,11 +41,6 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
contexts[name] = build.NamedContext{Path: path}
}
printFunc, err := parsePrintFunc(in.PrintFunc)
if err != nil {
return nil, nil, err
}
opts := build.Options{
Inputs: build.Inputs{
ContextPath: in.ContextPath,
@ -73,12 +59,11 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
Tags: in.Tags,
Target: in.Target,
Ulimits: controllerUlimitOpt2DockerUlimit(in.Ulimits),
PrintFunc: printFunc,
}
platforms, err := platformutil.Parse(in.Platforms)
if err != nil {
return nil, nil, err
return nil, err
}
opts.Platforms = platforms
@ -87,7 +72,7 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
secrets, err := controllerapi.CreateSecrets(in.Secrets)
if err != nil {
return nil, nil, err
return nil, err
}
opts.Session = append(opts.Session, secrets)
@ -97,17 +82,17 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
}
ssh, err := controllerapi.CreateSSH(sshSpecs)
if err != nil {
return nil, nil, err
return nil, err
}
opts.Session = append(opts.Session, ssh)
outputs, err := controllerapi.CreateExports(in.Exports)
if err != nil {
return nil, nil, err
return nil, err
}
if in.ExportPush {
if in.ExportLoad {
return nil, nil, errors.Errorf("push and load may not be set together at the moment")
return nil, errors.Errorf("push and load may not be set together at the moment")
}
if len(outputs) == 0 {
outputs = []client.ExportEntry{{
@ -121,7 +106,7 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
case "image":
outputs[0].Attrs["push"] = "true"
default:
return nil, nil, errors.Errorf("push and %q output can't be used together", outputs[0].Type)
return nil, errors.Errorf("push and %q output can't be used together", outputs[0].Type)
}
}
}
@ -135,12 +120,19 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
switch outputs[0].Type {
case "docker":
default:
return nil, nil, errors.Errorf("load and %q output can't be used together", outputs[0].Type)
return nil, errors.Errorf("load and %q output can't be used together", outputs[0].Type)
}
}
}
opts.Exports = outputs
if in.PrintFunc != nil {
opts.PrintFunc = &build.PrintFunc{
Name: in.PrintFunc.Name,
Format: in.PrintFunc.Format,
}
}
opts.CacheFrom = controllerapi.CreateCaches(in.CacheFrom)
opts.CacheTo = controllerapi.CreateCaches(in.CacheTo)
@ -148,7 +140,7 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
allow, err := buildflags.ParseEntitlements(in.Allow)
if err != nil {
return nil, nil, err
return nil, err
}
opts.Allow = allow
@ -164,120 +156,32 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
builder.WithContextPathHash(contextPathHash),
)
if err != nil {
return nil, nil, err
return nil, err
}
if err = updateLastActivity(dockerCli, b.NodeGroup); err != nil {
return nil, nil, errors.Wrapf(err, "failed to update builder last activity time")
return nil, errors.Wrapf(err, "failed to update builder last activity time")
}
nodes, err := b.LoadNodes(ctx, false)
if err != nil {
return nil, nil, err
return nil, err
}
resp, res, err := buildTargets(ctx, dockerCli, b.NodeGroup, nodes, map[string]build.Options{defaultTargetName: opts}, progress, in.MetadataFile, generateResult)
res, err := buildTargets(ctx, dockerCli, b.NodeGroup, nodes, map[string]build.Options{defaultTargetName: opts}, progress, in.MetadataFile)
err = wrapBuildError(err, false)
if err != nil {
// NOTE: buildTargets can return *build.ResultContext even on error.
return nil, res, err
return nil, err
}
return resp, res, nil
return res[defaultTargetName], nil
}
// buildTargets runs the specified build and returns the result.
//
// NOTE: When an error happens during the build and this function acquires the debuggable *build.ResultContext,
// this function returns it in addition to the error (i.e. it does "return nil, res, err"). The caller can
// inspect the result and debug the cause of that error.
func buildTargets(ctx context.Context, dockerCli command.Cli, ng *store.NodeGroup, nodes []builder.Node, opts map[string]build.Options, progress progress.Writer, metadataFile string, generateResult bool) (*client.SolveResponse, *build.ResultContext, error) {
var res *build.ResultContext
var resp map[string]*client.SolveResponse
var err error
if generateResult {
var mu sync.Mutex
var idx int
resp, err = build.BuildWithResultHandler(ctx, nodes, opts, dockerutil.NewClient(dockerCli), confutil.ConfigDir(dockerCli), progress, func(driverIndex int, gotRes *build.ResultContext) {
mu.Lock()
defer mu.Unlock()
if res == nil || driverIndex < idx {
idx, res = driverIndex, gotRes
}
})
} else {
resp, err = build.Build(ctx, nodes, opts, dockerutil.NewClient(dockerCli), confutil.ConfigDir(dockerCli), progress)
}
if err != nil {
return nil, res, err
}
if len(metadataFile) > 0 && resp != nil {
if err := writeMetadataFile(metadataFile, decodeExporterResponse(resp[defaultTargetName].ExporterResponse)); err != nil {
return nil, nil, err
}
}
for k := range resp {
if opts[k].PrintFunc != nil {
if err := printResult(opts[k].PrintFunc, resp[k].ExporterResponse); err != nil {
return nil, nil, err
}
}
}
return resp[defaultTargetName], res, err
}
func parsePrintFunc(str string) (*build.PrintFunc, error) {
if str == "" {
return nil, nil
}
csvReader := csv.NewReader(strings.NewReader(str))
fields, err := csvReader.Read()
func buildTargets(ctx context.Context, dockerCli command.Cli, ng *store.NodeGroup, nodes []builder.Node, opts map[string]build.Options, progress progress.Writer, metadataFile string) (map[string]*build.ResultContext, error) {
res, err := build.BuildResults(ctx, nodes, opts, dockerutil.NewClient(dockerCli), confutil.ConfigDir(dockerCli), progress)
if err != nil {
return nil, err
}
f := &build.PrintFunc{}
for _, field := range fields {
parts := strings.SplitN(field, "=", 2)
if len(parts) == 2 {
if parts[0] == "format" {
f.Format = parts[1]
} else {
return nil, errors.Errorf("invalid print field: %s", field)
}
} else {
if f.Name != "" {
return nil, errors.Errorf("invalid print value: %s", str)
}
f.Name = field
}
}
return f, nil
}
func writeMetadataFile(filename string, dt interface{}) error {
b, err := json.MarshalIndent(dt, "", " ")
if err != nil {
return err
}
return ioutils.AtomicWriteFile(filename, b, 0644)
}
func decodeExporterResponse(exporterResponse map[string]string) map[string]interface{} {
out := make(map[string]interface{})
for k, v := range exporterResponse {
dt, err := base64.StdEncoding.DecodeString(v)
if err != nil {
out[k] = v
continue
}
var raw map[string]interface{}
if err = json.Unmarshal(dt, &raw); err != nil || len(raw) == 0 {
out[k] = v
continue
}
out[k] = json.RawMessage(dt)
}
return out
return res, err
}
func wrapBuildError(err error, bake bool) error {

@ -1,48 +0,0 @@
package build
import (
"fmt"
"io"
"log"
"os"
"github.com/docker/buildx/build"
"github.com/docker/docker/api/types/versions"
"github.com/moby/buildkit/frontend/subrequests"
"github.com/moby/buildkit/frontend/subrequests/outline"
"github.com/moby/buildkit/frontend/subrequests/targets"
)
func printResult(f *build.PrintFunc, res map[string]string) error {
switch f.Name {
case "outline":
return printValue(outline.PrintOutline, outline.SubrequestsOutlineDefinition.Version, f.Format, res)
case "targets":
return printValue(targets.PrintTargets, targets.SubrequestsTargetsDefinition.Version, f.Format, res)
case "subrequests.describe":
return printValue(subrequests.PrintDescribe, subrequests.SubrequestsDescribeDefinition.Version, f.Format, res)
default:
if dt, ok := res["result.txt"]; ok {
fmt.Print(dt)
} else {
log.Printf("%s %+v", f, res)
}
}
return nil
}
type printFunc func([]byte, io.Writer) error
func printValue(printer printFunc, version string, format string, res map[string]string) error {
if format == "json" {
fmt.Fprintln(os.Stdout, res["result.json"])
return nil
}
if res["version"] != "" && versions.LessThan(version, res["version"]) && res["result.txt"] != "" {
// structure is too new and we don't know how to print it
fmt.Fprint(os.Stdout, res["result.txt"])
return nil
}
return printer([]byte(res["result.json"]), os.Stdout)
}

@ -10,19 +10,23 @@ import (
)
type BuildxController interface {
Build(ctx context.Context, options controllerapi.BuildOptions, in io.ReadCloser, progress progress.Writer) (ref string, resp *client.SolveResponse, err error)
Build(ctx context.Context, options controllerapi.BuildOptions, in io.ReadCloser, progress progress.Writer) (string, error)
// Invoke starts an IO session into the specified process.
// If pid doesn't matche to any running processes, it starts a new process with the specified config.
// If there is no container running or InvokeConfig.Rollback is speicfied, the process will start in a newly created container.
// NOTE: If needed, in the future, we can split this API into three APIs (NewContainer, NewProcess and Attach).
Invoke(ctx context.Context, ref, pid string, options controllerapi.InvokeConfig, ioIn io.ReadCloser, ioOut io.WriteCloser, ioErr io.WriteCloser) error
Kill(ctx context.Context) error
Close() error
List(ctx context.Context) (refs []string, _ error)
Finalize(ctx context.Context, ref string) (*client.SolveResponse, error)
Disconnect(ctx context.Context, ref string) error
ListProcesses(ctx context.Context, ref string) (infos []*controllerapi.ProcessInfo, retErr error)
DisconnectProcess(ctx context.Context, ref, pid string) error
Inspect(ctx context.Context, ref string) (*controllerapi.InspectResponse, error)
List(ctx context.Context) ([]string, error)
ListProcesses(ctx context.Context, ref string) ([]*controllerapi.ProcessInfo, error)
DisconnectProcess(ctx context.Context, ref, pid string) error
Kill(ctx context.Context) error
Close() error
}
type ControlOptions struct {

@ -42,27 +42,27 @@ type localController struct {
buildOnGoing atomic.Bool
}
func (b *localController) Build(ctx context.Context, options controllerapi.BuildOptions, in io.ReadCloser, progress progress.Writer) (string, *client.SolveResponse, error) {
func (b *localController) Build(ctx context.Context, options controllerapi.BuildOptions, in io.ReadCloser, progress progress.Writer) (string, error) {
if !b.buildOnGoing.CompareAndSwap(false, true) {
return "", nil, errors.New("build ongoing")
return "", errors.New("build ongoing")
}
defer b.buildOnGoing.Store(false)
resp, res, buildErr := cbuild.RunBuild(ctx, b.dockerCli, options, in, progress, true)
// NOTE: RunBuild can return *build.ResultContext even on error.
res, err := cbuild.RunBuild(ctx, b.dockerCli, options, in, progress)
if err != nil {
return "", err
}
if res != nil {
b.buildConfig = buildConfig{
resultCtx: res,
buildOptions: &options,
}
_, buildErr := b.buildConfig.resultCtx.Result(ctx)
if buildErr != nil {
buildErr = controllererrors.WrapBuild(buildErr, b.ref)
return "", controllererrors.WrapBuild(buildErr, b.ref)
}
}
if buildErr != nil {
return "", nil, buildErr
}
return b.ref, resp, nil
return b.ref, nil
}
func (b *localController) ListProcesses(ctx context.Context, ref string) (infos []*controllerapi.ProcessInfo, retErr error) {
@ -123,7 +123,7 @@ func (b *localController) Kill(context.Context) error {
func (b *localController) Close() error {
b.cancelRunningProcesses()
if b.buildConfig.resultCtx != nil {
b.buildConfig.resultCtx.Done()
b.buildConfig.resultCtx.Close()
}
// TODO: cancel ongoing builds?
return nil
@ -133,9 +133,17 @@ func (b *localController) List(ctx context.Context) (res []string, _ error) {
return []string{b.ref}, nil
}
func (b *localController) Finalize(ctx context.Context, key string) (*client.SolveResponse, error) {
b.cancelRunningProcesses()
if b.buildConfig.resultCtx != nil {
return b.buildConfig.resultCtx.Wait(ctx)
}
return nil, nil
}
func (b *localController) Disconnect(ctx context.Context, key string) error {
b.Close()
return nil
return b.Close()
}
func (b *localController) Inspect(ctx context.Context, ref string) (*controllerapi.InspectResponse, error) {

File diff suppressed because it is too large Load Diff

@ -7,14 +7,19 @@ import "github.com/moby/buildkit/api/services/control/control.proto";
option go_package = "pb";
service Controller {
rpc Info(InfoRequest) returns (InfoResponse);
rpc Build(BuildRequest) returns (BuildResponse);
rpc Inspect(InspectRequest) returns (InspectResponse);
rpc Status(StatusRequest) returns (stream StatusResponse);
rpc Input(stream InputMessage) returns (InputResponse);
rpc Finalize(FinalizeRequest) returns (FinalizeResponse);
rpc Invoke(stream Message) returns (stream Message);
rpc List(ListRequest) returns (ListResponse);
rpc Disconnect(DisconnectRequest) returns (DisconnectResponse);
rpc Info(InfoRequest) returns (InfoResponse);
rpc Status(StatusRequest) returns (stream StatusResponse);
rpc Inspect(InspectRequest) returns (InspectResponse);
rpc List(ListRequest) returns (ListResponse);
rpc Input(stream InputMessage) returns (InputResponse);
rpc ListProcesses(ListProcessesRequest) returns (ListProcessesResponse);
rpc DisconnectProcess(DisconnectProcessRequest) returns (DisconnectProcessResponse);
}
@ -48,7 +53,7 @@ message BuildRequest {
message BuildOptions {
string ContextPath = 1;
string DockerfileName = 2;
string PrintFunc = 3;
PrintFunc PrintFunc = 3;
map<string, string> NamedContexts = 4;
repeated string Allow = 5;
@ -78,6 +83,19 @@ message BuildOptions {
bool ExportLoad = 28;
}
message FinalizeRequest {
string Ref = 1;
}
message FinalizeResponse {
map<string, string> ExporterResponse = 1;
}
message PrintFunc {
string Name = 1;
string Format = 2;
}
message ExportEntry {
string Type = 1;
map<string, string> Attrs = 2;
@ -125,7 +143,6 @@ message Ulimit {
}
message BuildResponse {
map<string, string> ExporterResponse = 1;
}
message DisconnectRequest {

@ -113,49 +113,30 @@ func (c *Client) Inspect(ctx context.Context, ref string) (*pb.InspectResponse,
return c.client().Inspect(ctx, &pb.InspectRequest{Ref: ref})
}
func (c *Client) Build(ctx context.Context, options pb.BuildOptions, in io.ReadCloser, progress progress.Writer) (string, *client.SolveResponse, error) {
func (c *Client) Build(ctx context.Context, options pb.BuildOptions, in io.ReadCloser, progress progress.Writer) (string, error) {
ref := identity.NewID()
statusChan := make(chan *client.SolveStatus)
eg, egCtx := errgroup.WithContext(ctx)
var resp *client.SolveResponse
eg.Go(func() error {
defer close(statusChan)
var err error
resp, err = c.build(egCtx, ref, options, in, statusChan)
return err
})
eg.Go(func() error {
for s := range statusChan {
st := s
progress.Write(st)
}
return nil
})
return ref, resp, eg.Wait()
if err := c.build(ctx, ref, options, in, progress); err != nil {
return "", err
}
return ref, nil
}
func (c *Client) build(ctx context.Context, ref string, options pb.BuildOptions, in io.ReadCloser, statusChan chan *client.SolveStatus) (*client.SolveResponse, error) {
func (c *Client) Finalize(ctx context.Context, ref string) (*client.SolveResponse, error) {
resp, err := c.client().Finalize(ctx, &pb.FinalizeRequest{Ref: ref})
if err != nil {
return nil, err
}
return &client.SolveResponse{
ExporterResponse: resp.ExporterResponse,
}, nil
}
func (c *Client) build(ctx context.Context, ref string, options pb.BuildOptions, in io.ReadCloser, writer progress.Writer) error {
eg, egCtx := errgroup.WithContext(ctx)
done := make(chan struct{})
var resp *client.SolveResponse
eg.Go(func() error {
defer close(done)
pbResp, err := c.client().Build(egCtx, &pb.BuildRequest{
Ref: ref,
Options: &options,
})
if err != nil {
return err
}
resp = &client.SolveResponse{
ExporterResponse: pbResp.ExporterResponse,
}
return nil
})
eg.Go(func() error {
stream, err := c.client().Status(egCtx, &pb.StatusRequest{
go func() error {
stream, err := c.client().Status(ctx, &pb.StatusRequest{
Ref: ref,
})
if err != nil {
@ -169,8 +150,20 @@ func (c *Client) build(ctx context.Context, ref string, options pb.BuildOptions,
}
return errors.Wrap(err, "failed to receive status")
}
statusChan <- pb.FromControlStatus(resp)
writer.Write(pb.FromControlStatus(resp))
}
}()
eg.Go(func() error {
defer close(done)
_, err := c.client().Build(egCtx, &pb.BuildRequest{
Ref: ref,
Options: &options,
})
if err != nil {
return err
}
return nil
})
if in != nil {
eg.Go(func() error {
@ -232,7 +225,7 @@ func (c *Client) build(ctx context.Context, ref string, options pb.BuildOptions,
return eg2.Wait()
})
}
return resp, eg.Wait()
return eg.Wait()
}
func (c *Client) client() pb.ControllerClient {

@ -24,7 +24,6 @@ import (
"github.com/docker/buildx/util/progress"
"github.com/docker/buildx/version"
"github.com/docker/cli/cli/command"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/pelletier/go-toml"
"github.com/pkg/errors"
@ -143,8 +142,8 @@ func serveCmd(dockerCli command.Cli) *cobra.Command {
}()
// prepare server
b := NewServer(func(ctx context.Context, options *controllerapi.BuildOptions, stdin io.Reader, progress progress.Writer) (*client.SolveResponse, *build.ResultContext, error) {
return cbuild.RunBuild(ctx, dockerCli, *options, stdin, progress, true)
b := NewServer(func(ctx context.Context, options *controllerapi.BuildOptions, stdin io.Reader, progress progress.Writer) (*build.ResultContext, error) {
return cbuild.RunBuild(ctx, dockerCli, *options, stdin, progress)
})
defer b.Close()

@ -4,7 +4,6 @@ import (
"context"
"io"
"sync"
"sync/atomic"
"time"
"github.com/docker/buildx/build"
@ -14,12 +13,11 @@ import (
"github.com/docker/buildx/util/ioset"
"github.com/docker/buildx/util/progress"
"github.com/docker/buildx/version"
"github.com/moby/buildkit/client"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
type BuildFunc func(ctx context.Context, options *pb.BuildOptions, stdin io.Reader, progress progress.Writer) (resp *client.SolveResponse, res *build.ResultContext, err error)
type BuildFunc func(ctx context.Context, options *pb.BuildOptions, stdin io.Reader, progress progress.Writer) (res *build.ResultContext, err error)
func NewServer(buildFunc BuildFunc) *Server {
return &Server{
@ -34,7 +32,6 @@ type Server struct {
}
type session struct {
buildOnGoing atomic.Bool
statusChan chan *pb.StatusResponse
cancelBuild func()
buildOptions *pb.BuildOptions
@ -101,28 +98,6 @@ func (m *Server) List(ctx context.Context, req *pb.ListRequest) (res *pb.ListRes
}, nil
}
func (m *Server) Disconnect(ctx context.Context, req *pb.DisconnectRequest) (res *pb.DisconnectResponse, err error) {
key := req.Ref
if key == "" {
return nil, errors.New("disconnect: empty key")
}
m.sessionMu.Lock()
if s, ok := m.session[key]; ok {
if s.cancelBuild != nil {
s.cancelBuild()
}
s.cancelRunningProcesses()
if s.result != nil {
s.result.Done()
}
}
delete(m.session, key)
m.sessionMu.Unlock()
return &pb.DisconnectResponse{}, nil
}
func (m *Server) Close() error {
m.sessionMu.Lock()
for k := range m.session {
@ -167,15 +142,10 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
}
s, ok := m.session[ref]
if ok {
if !s.buildOnGoing.CompareAndSwap(false, true) {
m.sessionMu.Unlock()
return &pb.BuildResponse{}, errors.New("build ongoing")
}
s.cancelRunningProcesses()
s.result = nil
} else {
s = &session{}
s.buildOnGoing.Store(true)
}
s.processes = processes.NewManager()
@ -186,33 +156,24 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
s.inputPipe = inW
m.session[ref] = s
m.sessionMu.Unlock()
defer func() {
close(statusChan)
m.sessionMu.Lock()
s, ok := m.session[ref]
if ok {
s.statusChan = nil
s.buildOnGoing.Store(false)
}
m.sessionMu.Unlock()
}()
pw := pb.NewProgressWriter(statusChan)
// Build the specified request
ctx, cancel := context.WithCancel(ctx)
defer cancel()
resp, res, buildErr := m.buildFunc(ctx, req.Options, inR, pw)
res, err := m.buildFunc(ctx, req.Options, inR, pw)
m.sessionMu.Lock()
if s, ok := m.session[ref]; ok {
// NOTE: buildFunc can return *build.ResultContext even on error (e.g. when it's implemented using (github.com/docker/buildx/controller/build).RunBuild).
if res != nil {
s.result = res
s.cancelBuild = cancel
s.buildOptions = req.Options
m.session[ref] = s
_, buildErr := s.result.Result(ctx)
if buildErr != nil {
buildErr = controllererrors.WrapBuild(buildErr, ref)
err = controllererrors.WrapBuild(buildErr, ref)
}
}
} else {
@ -221,18 +182,69 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
}
m.sessionMu.Unlock()
if buildErr != nil {
return nil, buildErr
if err != nil {
return nil, err
}
return &pb.BuildResponse{}, nil
}
func (m *Server) Finalize(ctx context.Context, req *pb.FinalizeRequest) (*pb.FinalizeResponse, error) {
key := req.Ref
if key == "" {
return nil, errors.New("finalize: empty key")
}
if resp == nil {
resp = &client.SolveResponse{}
m.sessionMu.Lock()
defer m.sessionMu.Unlock()
s, ok := m.session[key]
if !ok {
return nil, errors.Errorf("unknown ref %q", key)
}
if s.result == nil {
return nil, errors.Errorf("no build ongoing for %q", key)
}
resp, err := s.result.Wait(ctx)
if err != nil {
return nil, err
}
if s.cancelBuild != nil {
s.cancelBuild()
}
return &pb.BuildResponse{
s.cancelRunningProcesses()
close(s.statusChan)
delete(m.session, key)
return &pb.FinalizeResponse{
ExporterResponse: resp.ExporterResponse,
}, nil
}
func (m *Server) Disconnect(ctx context.Context, req *pb.DisconnectRequest) (res *pb.DisconnectResponse, err error) {
key := req.Ref
if key == "" {
return nil, errors.New("disconnect: empty key")
}
m.sessionMu.Lock()
defer m.sessionMu.Unlock()
if s, ok := m.session[key]; ok {
if s.cancelBuild != nil {
s.cancelBuild()
}
close(s.statusChan)
s.cancelRunningProcesses()
if s.result != nil {
s.result.Close()
}
}
delete(m.session, key)
return &pb.DisconnectResponse{}, nil
}
func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer) error {
ref := req.Ref
if ref == "" {

@ -38,11 +38,6 @@ Available commands are:
// RunMonitor provides an interactive session for running and managing containers via specified IO.
func RunMonitor(ctx context.Context, curRef string, options *controllerapi.BuildOptions, invokeConfig controllerapi.InvokeConfig, c control.BuildxController, stdin io.ReadCloser, stdout io.WriteCloser, stderr console.File, progress progress.Writer) error {
defer func() {
if err := c.Disconnect(ctx, curRef); err != nil {
logrus.Warnf("disconnect error: %v", err)
}
}()
monitorIn, monitorOut := ioset.Pipe()
defer func() {
monitorIn.Close()
@ -146,7 +141,7 @@ func RunMonitor(ctx context.Context, curRef string, options *controllerapi.Build
}
}
var resultUpdated bool
ref, _, err := c.Build(ctx, *bo, nil, progress) // TODO: support stdin, hold build ref
ref, err := c.Build(ctx, *bo, nil, progress) // TODO: support stdin, hold build ref
if err != nil {
var be *controllererrors.BuildError
if errors.As(err, &be) {

@ -29,6 +29,7 @@ type Printer struct {
warnings []client.VertexWarning
logMu sync.Mutex
logSourceMap map[digest.Digest]interface{}
paused *bool
}
func (p *Printer) Wait() error {
@ -45,6 +46,10 @@ func (p *Printer) Warnings() []client.VertexWarning {
return p.warnings
}
func (p *Printer) Pause(v bool) {
*p.paused = v
}
func (p *Printer) ValidateLogSource(dgst digest.Digest, v interface{}) bool {
p.logMu.Lock()
defer p.logMu.Unlock()
@ -74,10 +79,12 @@ func NewPrinter(ctx context.Context, w io.Writer, out console.File, mode string,
statusCh := make(chan *client.SolveStatus)
doneCh := make(chan struct{})
paused := false
pw := &Printer{
status: statusCh,
done: doneCh,
logSourceMap: map[digest.Digest]interface{}{},
paused: &paused,
}
if v := os.Getenv("BUILDKIT_PROGRESS"); v != "" && mode == PrinterModeAuto {
@ -101,7 +108,7 @@ func NewPrinter(ctx context.Context, w io.Writer, out console.File, mode string,
go func() {
resumeLogs := logutil.Pause(logrus.StandardLogger())
// not using shared context to not disrupt display but let is finish reporting errors
pw.warnings, pw.err = progressui.DisplaySolveStatus(ctx, c, w, statusCh, solveStatusOpt...)
pw.warnings, pw.err = progressui.DisplaySolveStatus(ctx, c, w, statusCh, pw.paused, solveStatusOpt...)
resumeLogs()
close(doneCh)
}()

@ -42,7 +42,7 @@ func WithDesc(text string, console string) DisplaySolveStatusOpt {
}
}
func DisplaySolveStatus(ctx context.Context, c console.Console, w io.Writer, ch chan *client.SolveStatus, opts ...DisplaySolveStatusOpt) ([]client.VertexWarning, error) {
func DisplaySolveStatus(ctx context.Context, c console.Console, w io.Writer, ch chan *client.SolveStatus, paused *bool, opts ...DisplaySolveStatusOpt) ([]client.VertexWarning, error) {
modeConsole := c != nil
dsso := &displaySolveStatusOpts{}
@ -78,6 +78,11 @@ func DisplaySolveStatus(ctx context.Context, c console.Console, w io.Writer, ch
displayLimiter := rate.NewLimiter(rate.Every(displayTimeout), 1)
ispaused := false
if paused != nil {
ispaused = *paused
}
var height int
width, _ := disp.getSize()
for {
@ -93,6 +98,10 @@ func DisplaySolveStatus(ctx context.Context, c console.Console, w io.Writer, ch
}
}
if paused != nil && *paused && ispaused {
continue
}
if modeConsole {
width, height = disp.getSize()
if done {
@ -115,6 +124,10 @@ func DisplaySolveStatus(ctx context.Context, c console.Console, w io.Writer, ch
ticker = time.NewTicker(tickerTimeout)
}
}
if paused != nil {
ispaused = *paused
}
}
}

@ -87,7 +87,7 @@ func NewPrinter(ctx context.Context, out console.File, mode string) (Writer, err
go func() {
// not using shared context to not disrupt display but let is finish reporting errors
_, pw.err = progressui.DisplaySolveStatus(ctx, c, out, statusCh)
_, pw.err = progressui.DisplaySolveStatus(ctx, c, out, statusCh, nil)
close(doneCh)
}()
return pw, nil

Loading…
Cancel
Save