From 0398fa337b5ce8c895f990a95817774a0611ab54 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Fri, 19 Apr 2019 10:00:35 -0700 Subject: [PATCH] build: multi-node build support Signed-off-by: Tonis Tiigi --- build/build.go | 703 +++++++++++++++++++++++++++-------- commands/build.go | 2 +- util/imagetools/create.go | 10 +- util/progress/multiwriter.go | 18 +- util/progress/writer.go | 32 ++ vendor/modules.txt | 6 +- 6 files changed, 600 insertions(+), 171 deletions(-) diff --git a/build/build.go b/build/build.go index cdee48ec..3e4f79cb 100644 --- a/build/build.go +++ b/build/build.go @@ -11,17 +11,21 @@ import ( "strings" "sync" + "github.com/containerd/containerd/images" "github.com/containerd/containerd/platforms" + clitypes "github.com/docker/cli/cli/config/types" "github.com/docker/distribution/reference" dockerclient "github.com/docker/docker/client" "github.com/docker/docker/pkg/urlutil" "github.com/moby/buildkit/client" "github.com/moby/buildkit/session" "github.com/moby/buildkit/session/upload/uploadprovider" + "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/tonistiigi/buildx/driver" + "github.com/tonistiigi/buildx/util/imagetools" "github.com/tonistiigi/buildx/util/progress" "golang.org/x/sync/errgroup" ) @@ -66,245 +70,618 @@ type DriverInfo struct { Err error } +type Auth interface { + GetAuthConfig(registryHostname string) (clitypes.AuthConfig, error) +} + type DockerAPI interface { DockerAPI(name string) (dockerclient.APIClient, error) } -func getFirstDriver(drivers []DriverInfo) (driver.Driver, error) { +func filterAvailableDrivers(drivers []DriverInfo) ([]DriverInfo, error) { + out := make([]DriverInfo, 0, len(drivers)) err := errors.Errorf("no drivers found") for _, di := range drivers { - if di.Driver != nil { - return di.Driver, nil + if di.Err == nil && di.Driver != nil { + out = append(out, di) } if di.Err != nil { err = di.Err } } + if len(out) > 0 { + return out, nil + } return nil, err } -func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, docker DockerAPI, pw progress.Writer) (map[string]*client.SolveResponse, error) { - if len(drivers) == 0 { - return nil, errors.Errorf("driver required for build") +type driverPair struct { + driverIndex int + platforms []specs.Platform + so *client.SolveOpt +} + +func driverIndexes(m map[string][]driverPair) []int { + out := make([]int, 0, len(m)) + visited := map[int]struct{}{} + for _, dp := range m { + for _, d := range dp { + if _, ok := visited[d.driverIndex]; ok { + continue + } + visited[d.driverIndex] = struct{}{} + out = append(out, d.driverIndex) + } } + return out +} - if len(drivers) > 1 { - return nil, errors.Errorf("multiple drivers currently not supported") +func allIndexes(l int) []int { + out := make([]int, 0, l) + for i := 0; i < l; i++ { + out = append(out, i) } + return out +} - d, err := getFirstDriver(drivers) - if err != nil { +func ensureBooted(ctx context.Context, drivers []DriverInfo, idxs []int, pw progress.Writer) ([]*client.Client, error) { + clients := make([]*client.Client, len(drivers)) + + eg, ctx := errgroup.WithContext(ctx) + + for _, i := range idxs { + func(i int) { + eg.Go(func() error { + c, err := driver.Boot(ctx, drivers[i].Driver, pw) + if err != nil { + return err + } + clients[i] = c + return nil + }) + }(i) + } + + if err := eg.Wait(); err != nil { return nil, err } - _, isDefaultMobyDriver := d.(interface { - IsDefaultMobyDriver() - }) + return clients, nil +} + +func splitToDriverPairs(availablePlatforms map[string]int, opt map[string]Options) map[string][]driverPair { + m := map[string][]driverPair{} + for k, opt := range opt { + mm := map[int][]specs.Platform{} + for _, p := range opt.Platforms { + k := platforms.Format(p) + idx := availablePlatforms[k] // default 0 + pp := mm[idx] + pp = append(pp, p) + mm[idx] = pp + } + dps := make([]driverPair, 0, 2) + for idx, pp := range mm { + dps = append(dps, driverPair{driverIndex: idx, platforms: pp}) + } + m[k] = dps + } + return m +} + +func resolveDrivers(ctx context.Context, drivers []DriverInfo, opt map[string]Options, pw progress.Writer) (map[string][]driverPair, []*client.Client, error) { + + availablePlatforms := map[string]int{} + for i, d := range drivers { + for _, p := range d.Platform { + availablePlatforms[platforms.Format(p)] = i + } + } + + undetectedPlatform := false + allPlatforms := map[string]int{} for _, opt := range opt { - if !isDefaultMobyDriver && len(opt.Exports) == 0 { - logrus.Warnf("No output specified for %s driver. Build result will only remain in the build cache. To push result image into registry use --push or to load image into docker use --load", d.Factory().Name()) - break + for _, p := range opt.Platforms { + k := platforms.Format(p) + allPlatforms[k] = -1 + if _, ok := availablePlatforms[k]; !ok { + undetectedPlatform = true + } } } - c, err := driver.Boot(ctx, d, pw) - if err != nil { - close(pw.Status()) - <-pw.Done() - return nil, err + // fast path + if len(drivers) == 1 || len(allPlatforms) == 0 { + m := map[string][]driverPair{} + for k, opt := range opt { + m[k] = []driverPair{{driverIndex: 0, platforms: opt.Platforms}} + } + clients, err := ensureBooted(ctx, drivers, driverIndexes(m), pw) + if err != nil { + return nil, nil, err + } + return m, clients, nil } - withPrefix := len(opt) > 1 + // map based on existing platforms + if !undetectedPlatform { + m := splitToDriverPairs(availablePlatforms, opt) + clients, err := ensureBooted(ctx, drivers, driverIndexes(m), pw) + if err != nil { + return nil, nil, err + } + return m, clients, nil + } - mw := progress.NewMultiWriter(pw) + // boot all drivers in k + clients, err := ensureBooted(ctx, drivers, allIndexes(len(drivers)), pw) + if err != nil { + return nil, nil, err + } eg, ctx := errgroup.WithContext(ctx) + workers := make([][]*client.WorkerInfo, len(clients)) - resp := map[string]*client.SolveResponse{} - var mu sync.Mutex + for i, c := range clients { + if c == nil { + continue + } + func(i int) { + eg.Go(func() error { + ww, err := clients[i].ListWorkers(ctx) + if err != nil { + return errors.Wrap(err, "listing workers") + } + workers[i] = ww + return nil + }) + }(i) + } - for k, opt := range opt { - pw := mw.WithPrefix(k, withPrefix) + if err := eg.Wait(); err != nil { + return nil, nil, err + } - if opt.ImageIDFile != "" { - if len(opt.Platforms) != 0 { - return nil, errors.Errorf("image ID file cannot be specified when building for multiple platforms") - } - // Avoid leaving a stale file if we eventually fail - if err := os.Remove(opt.ImageIDFile); err != nil && !os.IsNotExist(err) { - return nil, errors.Wrap(err, "removing image ID file") + for i, ww := range workers { + for _, w := range ww { + for _, p := range w.Platforms { + p = platforms.Normalize(p) + ps := platforms.Format(p) + + if _, ok := availablePlatforms[ps]; !ok { + availablePlatforms[ps] = i + } } } + } - if v, ok := opt.BuildArgs["BUILDKIT_INLINE_CACHE"]; ok { - if v, _ := strconv.ParseBool(v); v { - opt.CacheTo = append(opt.CacheTo, client.CacheOptionsEntry{ - Type: "inline", - Attrs: map[string]string{}, - }) - } + return splitToDriverPairs(availablePlatforms, opt), clients, nil +} + +func toRepoOnly(in string) (string, error) { + m := map[string]struct{}{} + p := strings.Split(in, ",") + for _, pp := range p { + n, err := reference.ParseNormalizedNamed(pp) + if err != nil { + return "", err } + m[n.Name()] = struct{}{} + } + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) + } + return strings.Join(out, ","), nil +} - for _, e := range opt.CacheTo { - if e.Type != "inline" && !d.Features()[driver.CacheExport] { - return nil, notSupported(d, driver.CacheExport) - } +func isDefaultMobyDriver(d driver.Driver) bool { + _, ok := d.(interface { + IsDefaultMobyDriver() + }) + return ok +} + +func toSolveOpt(d driver.Driver, multiDriver bool, opt Options, dl dockerLoadCallback) (solveOpt *client.SolveOpt, release func(), err error) { + defers := make([]func(), 0, 2) + release = func() { + for _, f := range defers { + f() } + } - so := client.SolveOpt{ - Frontend: "dockerfile.v0", - FrontendAttrs: map[string]string{}, - LocalDirs: map[string]string{}, - CacheExports: opt.CacheTo, - CacheImports: opt.CacheFrom, + defer func() { + if err != nil { + release() } + }() - switch len(opt.Exports) { - case 1: - // valid - case 0: - if isDefaultMobyDriver { - // backwards compat for docker driver only: - // this ensures the build results in a docker image. - opt.Exports = []client.ExportEntry{{Type: "image", Attrs: map[string]string{}}} - } - default: - return nil, errors.Errorf("multiple outputs currently unsupported") + if opt.ImageIDFile != "" { + if multiDriver || len(opt.Platforms) != 0 { + return nil, nil, errors.Errorf("image ID file cannot be specified when building for multiple platforms") + } + // Avoid leaving a stale file if we eventually fail + if err := os.Remove(opt.ImageIDFile); err != nil && !os.IsNotExist(err) { + return nil, nil, errors.Wrap(err, "removing image ID file") } + } - if len(opt.Tags) > 0 { - tags := make([]string, len(opt.Tags)) - for i, tag := range opt.Tags { - ref, err := reference.Parse(tag) - if err != nil { - return nil, errors.Wrapf(err, "invalid tag %q", tag) - } - tags[i] = ref.String() + // inline cache from build arg + if v, ok := opt.BuildArgs["BUILDKIT_INLINE_CACHE"]; ok { + if v, _ := strconv.ParseBool(v); v { + opt.CacheTo = append(opt.CacheTo, client.CacheOptionsEntry{ + Type: "inline", + Attrs: map[string]string{}, + }) + } + } + + for _, e := range opt.CacheTo { + if e.Type != "inline" && !d.Features()[driver.CacheExport] { + return nil, nil, notSupported(d, driver.CacheExport) + } + } + + so := client.SolveOpt{ + Frontend: "dockerfile.v0", + FrontendAttrs: map[string]string{}, + LocalDirs: map[string]string{}, + CacheExports: opt.CacheTo, + CacheImports: opt.CacheFrom, + } + + if multiDriver { + // force creation of manifest list + so.FrontendAttrs["multi-platform"] = "true" + } + + _, isDefaultMobyDriver := d.(interface { + IsDefaultMobyDriver() + }) + + switch len(opt.Exports) { + case 1: + // valid + case 0: + if isDefaultMobyDriver { + // backwards compat for docker driver only: + // this ensures the build results in a docker image. + opt.Exports = []client.ExportEntry{{Type: "image", Attrs: map[string]string{}}} + } + default: + return nil, nil, errors.Errorf("multiple outputs currently unsupported") + } + + // fill in image exporter names from tags + if len(opt.Tags) > 0 { + tags := make([]string, len(opt.Tags)) + for i, tag := range opt.Tags { + ref, err := reference.Parse(tag) + if err != nil { + return nil, nil, errors.Wrapf(err, "invalid tag %q", tag) } - for i, e := range opt.Exports { - switch e.Type { - case "image", "oci", "docker": - opt.Exports[i].Attrs["name"] = strings.Join(tags, ",") - } + tags[i] = ref.String() + } + for i, e := range opt.Exports { + switch e.Type { + case "image", "oci", "docker": + opt.Exports[i].Attrs["name"] = strings.Join(tags, ",") } - } else { - for _, e := range opt.Exports { - if e.Type == "image" && e.Attrs["name"] == "" && e.Attrs["push"] != "" { - if ok, _ := strconv.ParseBool(e.Attrs["push"]); ok { - return nil, errors.Errorf("tag is needed when pushing to registry") - } + } + } else { + for _, e := range opt.Exports { + if e.Type == "image" && e.Attrs["name"] == "" && e.Attrs["push"] != "" { + if ok, _ := strconv.ParseBool(e.Attrs["push"]); ok { + return nil, nil, errors.Errorf("tag is needed when pushing to registry") } } } + } - for i, e := range opt.Exports { - if (e.Type == "local" || e.Type == "tar") && opt.ImageIDFile != "" { - return nil, errors.Errorf("local and tar exporters are incompatible with image ID file") - } - if e.Type == "oci" && !d.Features()[driver.OCIExporter] { - return nil, notSupported(d, driver.OCIExporter) - } - if e.Type == "docker" { - if e.Output == nil { - if isDefaultMobyDriver { - e.Type = "image" - } else { - w, cancel, err := newDockerLoader(ctx, docker, e.Attrs["context"], mw) - if err != nil { - return nil, err - } - defer cancel() - opt.Exports[i].Output = w + // set up exporters + for i, e := range opt.Exports { + if (e.Type == "local" || e.Type == "tar") && opt.ImageIDFile != "" { + return nil, nil, errors.Errorf("local and tar exporters are incompatible with image ID file") + } + if e.Type == "oci" && !d.Features()[driver.OCIExporter] { + return nil, nil, notSupported(d, driver.OCIExporter) + } + if e.Type == "docker" { + if e.Output == nil { + if isDefaultMobyDriver { + e.Type = "image" + } else { + w, cancel, err := dl(e.Attrs["context"]) + if err != nil { + return nil, nil, err } - } else if !d.Features()[driver.DockerExporter] { - return nil, notSupported(d, driver.DockerExporter) + defers = append(defers, cancel) + opt.Exports[i].Output = w } + } else if !d.Features()[driver.DockerExporter] { + return nil, nil, notSupported(d, driver.DockerExporter) } - if e.Type == "image" && isDefaultMobyDriver { - opt.Exports[i].Type = "moby" - if e.Attrs["push"] != "" { - if ok, _ := strconv.ParseBool(e.Attrs["push"]); ok { - return nil, errors.Errorf("auto-push is currently not implemented for docker driver") - } + } + if e.Type == "image" && isDefaultMobyDriver { + opt.Exports[i].Type = "moby" + if e.Attrs["push"] != "" { + if ok, _ := strconv.ParseBool(e.Attrs["push"]); ok { + return nil, nil, errors.Errorf("auto-push is currently not implemented for docker driver") } } } + } - // TODO: handle loading to docker daemon + so.Exports = opt.Exports + so.Session = opt.Session - so.Exports = opt.Exports - so.Session = opt.Session + releaseLoad, err := LoadInputs(opt.Inputs, &so) + if err != nil { + return nil, nil, err + } + defers = append(defers, releaseLoad) - release, err := LoadInputs(opt.Inputs, &so) - if err != nil { - return nil, err - } - defer release() + if opt.Pull { + so.FrontendAttrs["image-resolve-mode"] = "pull" + } + if opt.Target != "" { + so.FrontendAttrs["target"] = opt.Target + } + if opt.NoCache { + so.FrontendAttrs["no-cache"] = "" + } + for k, v := range opt.BuildArgs { + so.FrontendAttrs["build-arg:"+k] = v + } + for k, v := range opt.Labels { + so.FrontendAttrs["label:"+k] = v + } - if opt.Pull { - so.FrontendAttrs["image-resolve-mode"] = "pull" - } - if opt.Target != "" { - so.FrontendAttrs["target"] = opt.Target - } - if opt.NoCache { - so.FrontendAttrs["no-cache"] = "" + // set platforms + if len(opt.Platforms) != 0 { + pp := make([]string, len(opt.Platforms)) + for i, p := range opt.Platforms { + pp[i] = platforms.Format(p) } - for k, v := range opt.BuildArgs { - so.FrontendAttrs["build-arg:"+k] = v + if len(pp) > 1 && !d.Features()[driver.MultiPlatform] { + return nil, nil, notSupported(d, driver.MultiPlatform) } - for k, v := range opt.Labels { - so.FrontendAttrs["label:"+k] = v + so.FrontendAttrs["platform"] = strings.Join(pp, ",") + } + + // setup networkmode + switch opt.NetworkMode { + case "host", "none": + so.FrontendAttrs["force-network-mode"] = opt.NetworkMode + case "", "default": + default: + return nil, nil, errors.Errorf("network mode %q not supported by buildkit", opt.NetworkMode) + } + + // setup extrahosts + extraHosts, err := toBuildkitExtraHosts(opt.ExtraHosts) + if err != nil { + return nil, nil, err + } + so.FrontendAttrs["add-hosts"] = extraHosts + + return &so, release, nil +} + +func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, docker DockerAPI, auth Auth, pw progress.Writer) (resp map[string]*client.SolveResponse, err error) { + if len(drivers) == 0 { + return nil, errors.Errorf("driver required for build") + } + + drivers, err = filterAvailableDrivers(drivers) + if err != nil { + return nil, errors.Wrapf(err, "no valid drivers found") + } + + var noMobyDriver driver.Driver + for _, d := range drivers { + if !isDefaultMobyDriver(d.Driver) { + noMobyDriver = d.Driver + break } + } - if len(opt.Platforms) != 0 { - pp := make([]string, len(opt.Platforms)) - for i, p := range opt.Platforms { - pp[i] = platforms.Format(p) - } - if len(pp) > 1 && !d.Features()[driver.MultiPlatform] { - return nil, notSupported(d, driver.MultiPlatform) + if noMobyDriver != nil { + for _, opt := range opt { + if len(opt.Exports) == 0 { + logrus.Warnf("No output specified for %s driver. Build result will only remain in the build cache. To push result image into registry use --push or to load image into docker use --load", noMobyDriver.Factory().Name()) + break } - so.FrontendAttrs["platform"] = strings.Join(pp, ",") } + } - switch opt.NetworkMode { - case "host", "none": - so.FrontendAttrs["force-network-mode"] = opt.NetworkMode - case "", "default": - default: - return nil, errors.Errorf("network mode %q not supported by buildkit", opt.NetworkMode) - } + m, clients, err := resolveDrivers(ctx, drivers, opt, pw) + if err != nil { + close(pw.Status()) + <-pw.Done() + return nil, err + } - extraHosts, err := toBuildkitExtraHosts(opt.ExtraHosts) + defers := make([]func(), 0, 2) + defer func() { if err != nil { - return nil, err + for _, f := range defers { + f() + } } - so.FrontendAttrs["add-hosts"] = extraHosts + }() - var statusCh chan *client.SolveStatus - if pw != nil { - pw = progress.ResetTime(pw) - statusCh = pw.Status() - eg.Go(func() error { - <-pw.Done() - return pw.Err() + mw := progress.NewMultiWriter(pw) + eg, ctx := errgroup.WithContext(ctx) + + for k, opt := range opt { + multiDriver := len(m[k]) > 1 + for i, dp := range m[k] { + d := drivers[dp.driverIndex].Driver + opt.Platforms = dp.platforms + so, release, err := toSolveOpt(d, multiDriver, opt, func(name string) (io.WriteCloser, func(), error) { + return newDockerLoader(ctx, docker, name, mw) }) + if err != nil { + return nil, err + } + defers = append(defers, release) + m[k][i].so = so } + } - eg.Go(func() error { - rr, err := c.Solve(ctx, nil, so, statusCh) - if err != nil { - return err + resp = map[string]*client.SolveResponse{} + var respMu sync.Mutex + + multiTarget := len(opt) > 1 + + for k, opt := range opt { + err := func() error { + opt := opt + dps := m[k] + multiDriver := len(m[k]) > 1 + + res := make([]*client.SolveResponse, len(dps)) + wg := &sync.WaitGroup{} + wg.Add(len(dps)) + + var pushNames string + + if multiDriver { + for i, e := range opt.Exports { + switch e.Type { + case "oci", "tar": + return errors.Errorf("%s for multi-node builds currently not supported", e.Type) + case "image": + if e.Attrs["push"] != "" { + if ok, _ := strconv.ParseBool(e.Attrs["push"]); ok { + pushNames = e.Attrs["name"] + if pushNames == "" { + return errors.Errorf("tag is needed when pushing to registry") + } + names, err := toRepoOnly(e.Attrs["name"]) + if err != nil { + return err + } + e.Attrs["name"] = names + e.Attrs["push-by-digest"] = "true" + opt.Exports[i].Attrs = e.Attrs + } + } + } + } } - mu.Lock() - resp[k] = rr - mu.Unlock() - if opt.ImageIDFile != "" { - return ioutil.WriteFile(opt.ImageIDFile, []byte(rr.ExporterResponse["containerimage.digest"]), 0644) + + eg.Go(func() error { + pw := mw.WithPrefix("default", false) + defer close(pw.Status()) + wg.Wait() + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + respMu.Lock() + resp[k] = res[0] + respMu.Unlock() + if len(res) == 1 { + if opt.ImageIDFile != "" { + return ioutil.WriteFile(opt.ImageIDFile, []byte(res[0].ExporterResponse["containerimage.digest"]), 0644) + } + return nil + } + + if pushNames != "" { + progress.Write(pw, "merging manifest list", func() error { + descs := make([]specs.Descriptor, 0, len(res)) + + for _, r := range res { + s, ok := r.ExporterResponse["containerimage.digest"] + if ok { + descs = append(descs, specs.Descriptor{ + Digest: digest.Digest(s), + MediaType: images.MediaTypeDockerSchema2ManifestList, + Size: -1, + }) + } + } + if len(descs) > 0 { + itpull := imagetools.New(imagetools.Opt{ + Auth: auth, + }) + + names := strings.Split(pushNames, ",") + dt, desc, err := itpull.Combine(ctx, names[0], descs) + if err != nil { + return err + } + if opt.ImageIDFile != "" { + return ioutil.WriteFile(opt.ImageIDFile, []byte(desc.Digest), 0644) + } + + itpush := imagetools.New(imagetools.Opt{ + Auth: auth, + }) + + for _, n := range names { + nn, err := reference.ParseNormalizedNamed(n) + if err != nil { + return err + } + if err := itpush.Push(ctx, nn, desc, dt); err != nil { + return err + } + } + + respMu.Lock() + resp[k] = &client.SolveResponse{ + ExporterResponse: map[string]string{ + "containerimage.digest": desc.Digest.String(), + }, + } + respMu.Unlock() + } + return nil + }) + } + return nil + }) + + for i, dp := range dps { + func(i int, dp driverPair) { + pw := mw.WithPrefix(k, multiTarget) + + c := clients[dp.driverIndex] + + var statusCh chan *client.SolveStatus + if pw != nil { + pw = progress.ResetTime(pw) + statusCh = pw.Status() + eg.Go(func() error { + <-pw.Done() + return pw.Err() + }) + } + + eg.Go(func() error { + defer wg.Done() + rr, err := c.Solve(ctx, nil, *dp.so, statusCh) + if err != nil { + return err + } + res[i] = rr + return nil + }) + + }(i, dp) } + return nil - }) + }() + if err != nil { + return nil, err + } } if err := eg.Wait(); err != nil { @@ -423,6 +800,8 @@ func notSupported(d driver.Driver, f driver.Feature) error { return errors.Errorf("%s feature is currently not supported for %s driver. Please switch to a different driver (eg. \"docker buildx create --use\")", f, d.Factory().Name()) } +type dockerLoadCallback func(name string) (io.WriteCloser, func(), error) + func newDockerLoader(ctx context.Context, d DockerAPI, name string, mw *progress.MultiWriter) (io.WriteCloser, func(), error) { c, err := d.DockerAPI(name) if err != nil { diff --git a/commands/build.go b/commands/build.go index a242a9a8..bf82a705 100644 --- a/commands/build.go +++ b/commands/build.go @@ -180,7 +180,7 @@ func buildTargets(ctx context.Context, dockerCli command.Cli, opts map[string]bu defer cancel() pw := progress.NewPrinter(ctx2, os.Stderr, progressMode) - _, err = build.Build(ctx, dis, opts, dockerAPI(dockerCli), pw) + _, err = build.Build(ctx, dis, opts, dockerAPI(dockerCli), dockerCli.ConfigFile(), pw) return err } diff --git a/util/imagetools/create.go b/util/imagetools/create.go index 8ecf7d1b..9dc511b2 100644 --- a/util/imagetools/create.go +++ b/util/imagetools/create.go @@ -6,6 +6,7 @@ import ( "encoding/json" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/docker/distribution/reference" "github.com/opencontainers/go-digest" @@ -152,10 +153,17 @@ func (r *Resolver) Push(ctx context.Context, ref reference.Named, desc ocispec.D } cw, err := p.Push(ctx, desc) if err != nil { + if errdefs.IsAlreadyExists(err) { + return nil + } return err } - return content.Copy(ctx, cw, bytes.NewReader(dt), desc.Size, desc.Digest) + err = content.Copy(ctx, cw, bytes.NewReader(dt), desc.Size, desc.Digest) + if errdefs.IsAlreadyExists(err) { + return nil + } + return err } func (r *Resolver) loadConfig(ctx context.Context, in string, dt []byte) (*ocispec.Image, error) { diff --git a/util/progress/multiwriter.go b/util/progress/multiwriter.go index 75fc6ab9..51b2c877 100644 --- a/util/progress/multiwriter.go +++ b/util/progress/multiwriter.go @@ -3,14 +3,17 @@ package progress import ( "context" "strings" + "sync" "github.com/moby/buildkit/client" "golang.org/x/sync/errgroup" ) type MultiWriter struct { - w Writer - eg *errgroup.Group + w Writer + eg *errgroup.Group + once sync.Once + ready chan struct{} } func (mw *MultiWriter) WithPrefix(pfx string, force bool) Writer { @@ -21,6 +24,9 @@ func (mw *MultiWriter) WithPrefix(pfx string, force bool) Writer { in: in, } mw.eg.Go(func() error { + mw.once.Do(func() { + close(mw.ready) + }) for { select { case v, ok := <-in: @@ -77,14 +83,18 @@ func NewMultiWriter(pw Writer) *MultiWriter { } eg, _ := errgroup.WithContext(context.TODO()) + ready := make(chan struct{}) + go func() { + <-ready eg.Wait() close(pw.Status()) }() return &MultiWriter{ - w: pw, - eg: eg, + w: pw, + eg: eg, + ready: ready, } } diff --git a/util/progress/writer.go b/util/progress/writer.go index 12907669..7e7ab265 100644 --- a/util/progress/writer.go +++ b/util/progress/writer.go @@ -1,7 +1,11 @@ package progress import ( + "time" + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/identity" + "github.com/opencontainers/go-digest" ) type Writer interface { @@ -9,3 +13,31 @@ type Writer interface { Err() error Status() chan *client.SolveStatus } + +func Write(w Writer, name string, f func() error) { + status := w.Status() + dgst := digest.FromBytes([]byte(identity.NewID())) + tm := time.Now() + + vtx := client.Vertex{ + Digest: dgst, + Name: name, + Started: &tm, + } + + status <- &client.SolveStatus{ + Vertexes: []*client.Vertex{&vtx}, + } + + err := f() + + tm2 := time.Now() + vtx2 := vtx + vtx2.Completed = &tm2 + if err != nil { + vtx2.Error = err.Error() + } + status <- &client.SolveStatus{ + Vertexes: []*client.Vertex{&vtx2}, + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index da13f0a5..19a174e9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -27,12 +27,12 @@ github.com/beorn7/perks/quantile # github.com/containerd/console v0.0.0-20181022165439-0650fd9eeb50 github.com/containerd/console # github.com/containerd/containerd v1.3.0-0.20190321141026-ceba56893a76 -github.com/containerd/containerd/platforms github.com/containerd/containerd/images +github.com/containerd/containerd/platforms github.com/containerd/containerd/content +github.com/containerd/containerd/errdefs github.com/containerd/containerd/remotes github.com/containerd/containerd/remotes/docker -github.com/containerd/containerd/errdefs github.com/containerd/containerd/log github.com/containerd/containerd/content/local github.com/containerd/containerd/labels @@ -98,6 +98,7 @@ github.com/davecgh/go-spew/spew # github.com/docker/cli v0.0.0-20190321234815-f40f9c240ab0 => github.com/tiborvass/cli v0.0.0-20190419012645-1ed02c40fe68 github.com/docker/cli/cli/compose/loader github.com/docker/cli/cli/compose/types +github.com/docker/cli/cli/config/types github.com/docker/cli/cli-plugins/manager github.com/docker/cli/cli-plugins/plugin github.com/docker/cli/cli/command @@ -106,7 +107,6 @@ github.com/docker/cli/cli github.com/docker/cli/cli/config github.com/docker/cli/cli/context/docker github.com/docker/cli/opts -github.com/docker/cli/cli/config/types github.com/docker/cli/cli/compose/interpolation github.com/docker/cli/cli/compose/schema github.com/docker/cli/cli/compose/template