refactor: move some docker logic to storeutil

Signed-off-by: CrazyMax <crazy-max@users.noreply.github.com>
pull/865/head
CrazyMax 4 years ago
parent ab73275f58
commit a3a4e48f25
No known key found for this signature in database
GPG Key ID: 3248E46B6BB8C7F7

@ -19,13 +19,13 @@ import (
"github.com/containerd/containerd/images" "github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms" "github.com/containerd/containerd/platforms"
"github.com/docker/buildx/driver" "github.com/docker/buildx/driver"
"github.com/docker/buildx/store/storeutil"
"github.com/docker/buildx/util/imagetools" "github.com/docker/buildx/util/imagetools"
"github.com/docker/buildx/util/progress" "github.com/docker/buildx/util/progress"
"github.com/docker/buildx/util/resolver" "github.com/docker/buildx/util/resolver"
"github.com/docker/cli/opts" "github.com/docker/cli/opts"
"github.com/docker/distribution/reference" "github.com/docker/distribution/reference"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
dockerclient "github.com/docker/docker/client"
"github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/urlutil" "github.com/docker/docker/pkg/urlutil"
"github.com/moby/buildkit/client" "github.com/moby/buildkit/client"
@ -89,10 +89,6 @@ type DriverInfo struct {
ImageOpt imagetools.Opt ImageOpt imagetools.Opt
} }
type DockerAPI interface {
DockerAPI(name string) (dockerclient.APIClient, error)
}
func filterAvailableDrivers(drivers []DriverInfo) ([]DriverInfo, error) { func filterAvailableDrivers(drivers []DriverInfo) ([]DriverInfo, error) {
out := make([]DriverInfo, 0, len(drivers)) out := make([]DriverInfo, 0, len(drivers))
err := errors.Errorf("no drivers found") err := errors.Errorf("no drivers found")
@ -580,7 +576,7 @@ func toSolveOpt(ctx context.Context, d driver.Driver, multiDriver bool, opt Opti
return &so, releaseF, nil return &so, releaseF, nil
} }
func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, docker DockerAPI, configDir string, w progress.Writer) (resp map[string]*client.SolveResponse, err error) { func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, docker *storeutil.DockerClient, configDir string, w progress.Writer) (resp map[string]*client.SolveResponse, err error) {
if len(drivers) == 0 { if len(drivers) == 0 {
return nil, errors.Errorf("driver required for build") return nil, errors.Errorf("driver required for build")
} }
@ -633,7 +629,7 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do
} }
opt.Platforms = dp.platforms opt.Platforms = dp.platforms
so, release, err := toSolveOpt(ctx, d, multiDriver, opt, dp.bopts, configDir, w, func(name string) (io.WriteCloser, func(), error) { so, release, err := toSolveOpt(ctx, d, multiDriver, opt, dp.bopts, configDir, w, func(name string) (io.WriteCloser, func(), error) {
return newDockerLoader(ctx, docker, name, w) return docker.LoadImage(ctx, name, w)
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -1094,40 +1090,6 @@ func notSupported(d driver.Driver, f driver.Feature) error {
type dockerLoadCallback func(name string) (io.WriteCloser, func(), error) type dockerLoadCallback func(name string) (io.WriteCloser, func(), error)
func newDockerLoader(ctx context.Context, d DockerAPI, name string, status progress.Writer) (io.WriteCloser, func(), error) {
c, err := d.DockerAPI(name)
if err != nil {
return nil, nil, err
}
pr, pw := io.Pipe()
done := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
var w *waitingWriter
w = &waitingWriter{
PipeWriter: pw,
f: func() {
resp, err := c.ImageLoad(ctx, pr, false)
defer close(done)
if err != nil {
pr.CloseWithError(err)
w.mu.Lock()
w.err = err
w.mu.Unlock()
return
}
prog := progress.WithPrefix(status, "", false)
progress.FromReader(prog, "importing to docker", resp.Body)
},
done: done,
cancel: cancel,
}
return w, func() {
pr.Close()
}, nil
}
func noDefaultLoad() bool { func noDefaultLoad() bool {
v, ok := os.LookupEnv("BUILDX_NO_DEFAULT_LOAD") v, ok := os.LookupEnv("BUILDX_NO_DEFAULT_LOAD")
if !ok { if !ok {
@ -1140,34 +1102,6 @@ func noDefaultLoad() bool {
return b return b
} }
type waitingWriter struct {
*io.PipeWriter
f func()
once sync.Once
mu sync.Mutex
err error
done chan struct{}
cancel func()
}
func (w *waitingWriter) Write(dt []byte) (int, error) {
w.once.Do(func() {
go w.f()
})
return w.PipeWriter.Write(dt)
}
func (w *waitingWriter) Close() error {
err := w.PipeWriter.Close()
<-w.done
if err == nil {
w.mu.Lock()
defer w.mu.Unlock()
return w.err
}
return err
}
// handle https://github.com/moby/moby/pull/10858 // handle https://github.com/moby/moby/pull/10858
func handleLowercaseDockerfile(dir, p string) string { func handleLowercaseDockerfile(dir, p string) string {
if filepath.Base(p) != "Dockerfile" { if filepath.Base(p) != "Dockerfile" {

@ -9,6 +9,7 @@ import (
"github.com/containerd/containerd/platforms" "github.com/containerd/containerd/platforms"
"github.com/docker/buildx/bake" "github.com/docker/buildx/bake"
"github.com/docker/buildx/build" "github.com/docker/buildx/build"
"github.com/docker/buildx/store/storeutil"
"github.com/docker/buildx/util/confutil" "github.com/docker/buildx/util/confutil"
"github.com/docker/buildx/util/progress" "github.com/docker/buildx/util/progress"
"github.com/docker/buildx/util/tracing" "github.com/docker/buildx/util/tracing"
@ -146,7 +147,7 @@ func runBake(dockerCli command.Cli, targets []string, in bakeOptions) (err error
return nil return nil
} }
resp, err := build.Build(ctx, dis, bo, dockerAPI(dockerCli), confutil.ConfigDir(dockerCli), printer) resp, err := build.Build(ctx, dis, bo, storeutil.NewDockerClient(dockerCli), confutil.ConfigDir(dockerCli), printer)
if err != nil { if err != nil {
return err return err
} }

@ -9,6 +9,7 @@ import (
"strings" "strings"
"github.com/docker/buildx/build" "github.com/docker/buildx/build"
"github.com/docker/buildx/store/storeutil"
"github.com/docker/buildx/util/buildflags" "github.com/docker/buildx/util/buildflags"
"github.com/docker/buildx/util/confutil" "github.com/docker/buildx/util/confutil"
"github.com/docker/buildx/util/platformutil" "github.com/docker/buildx/util/platformutil"
@ -226,7 +227,7 @@ func buildTargets(ctx context.Context, dockerCli command.Cli, opts map[string]bu
printer := progress.NewPrinter(ctx2, os.Stderr, progressMode) printer := progress.NewPrinter(ctx2, os.Stderr, progressMode)
resp, err := build.Build(ctx, dis, opts, dockerAPI(dockerCli), confutil.ConfigDir(dockerCli), printer) resp, err := build.Build(ctx, dis, opts, storeutil.NewDockerClient(dockerCli), confutil.ConfigDir(dockerCli), printer)
err1 := printer.Wait() err1 := printer.Wait()
if err == nil { if err == nil {
err = err1 err = err1

@ -16,6 +16,7 @@ import (
"github.com/docker/buildx/util/cobrautil" "github.com/docker/buildx/util/cobrautil"
"github.com/docker/cli/cli" "github.com/docker/cli/cli"
"github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command"
"github.com/docker/cli/opts"
"github.com/google/shlex" "github.com/google/shlex"
"github.com/moby/buildkit/util/appcontext" "github.com/moby/buildkit/util/appcontext"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -143,7 +144,6 @@ func runCreate(dockerCli command.Cli, in createOptions, args []string) error {
if dockerCli.CurrentContext() == "default" && dockerCli.DockerEndpoint().TLSData != nil { if dockerCli.CurrentContext() == "default" && dockerCli.DockerEndpoint().TLSData != nil {
return errors.Errorf("could not create a builder instance with TLS data loaded from environment. Please use `docker context create <context-name>` to create a context for current environment and then create a builder instance with `docker buildx create <context-name>`") return errors.Errorf("could not create a builder instance with TLS data loaded from environment. Please use `docker context create <context-name>` to create a context for current environment and then create a builder instance with `docker buildx create <context-name>`")
} }
ep, err = storeutil.GetCurrentEndpoint(dockerCli) ep, err = storeutil.GetCurrentEndpoint(dockerCli)
if err != nil { if err != nil {
return err return err
@ -263,3 +263,18 @@ func csvToMap(in []string) (map[string]string, error) {
} }
return m, nil return m, nil
} }
func validateEndpoint(dockerCli command.Cli, ep string) (string, error) {
dem, err := storeutil.GetDockerEndpoint(dockerCli, ep)
if err == nil && dem != nil {
if ep == "default" {
return dem.Host, nil
}
return ep, nil
}
h, err := opts.ParseHost(true, ep)
if err != nil {
return "", errors.Wrapf(err, "failed to parse endpoint %s", ep)
}
return h, nil
}

@ -13,11 +13,8 @@ import (
"github.com/docker/buildx/util/platformutil" "github.com/docker/buildx/util/platformutil"
"github.com/docker/buildx/util/progress" "github.com/docker/buildx/util/progress"
"github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/context/docker"
"github.com/docker/cli/cli/context/kubernetes" "github.com/docker/cli/cli/context/kubernetes"
ctxstore "github.com/docker/cli/cli/context/store" ctxstore "github.com/docker/cli/cli/context/store"
dopts "github.com/docker/cli/opts"
dockerclient "github.com/docker/docker/client"
specs "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -25,22 +22,6 @@ import (
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
) )
// validateEndpoint validates that endpoint is either a context or a docker host
func validateEndpoint(dockerCli command.Cli, ep string) (string, error) {
de, err := storeutil.GetDockerEndpoint(dockerCli, ep)
if err == nil && de != "" {
if ep == "default" {
return de, nil
}
return ep, nil
}
h, err := dopts.ParseHost(true, ep)
if err != nil {
return "", errors.Wrapf(err, "failed to parse endpoint %s", ep)
}
return h, nil
}
// driversForNodeGroup returns drivers for a nodegroup instance // driversForNodeGroup returns drivers for a nodegroup instance
func driversForNodeGroup(ctx context.Context, dockerCli command.Cli, ng *store.NodeGroup, contextPathHash string) ([]build.DriverInfo, error) { func driversForNodeGroup(ctx context.Context, dockerCli command.Cli, ng *store.NodeGroup, contextPathHash string) ([]build.DriverInfo, error) {
eg, _ := errgroup.WithContext(ctx) eg, _ := errgroup.WithContext(ctx)
@ -54,7 +35,7 @@ func driversForNodeGroup(ctx context.Context, dockerCli command.Cli, ng *store.N
return nil, errors.Errorf("failed to find driver %q", f) return nil, errors.Errorf("failed to find driver %q", f)
} }
} else { } else {
dockerapi, err := clientForEndpoint(dockerCli, ng.Nodes[0].Endpoint) dockerapi, err := storeutil.ClientForEndpoint(dockerCli, ng.Nodes[0].Endpoint)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -79,13 +60,12 @@ func driversForNodeGroup(ctx context.Context, dockerCli command.Cli, ng *store.N
defer func() { defer func() {
dis[i] = di dis[i] = di
}() }()
dockerapi, err := clientForEndpoint(dockerCli, n.Endpoint)
dockerapi, err := storeutil.ClientForEndpoint(dockerCli, n.Endpoint)
if err != nil { if err != nil {
di.Err = err di.Err = err
return nil return nil
} }
// TODO: replace the following line with dockerclient.WithAPIVersionNegotiation option in clientForEndpoint
dockerapi.NegotiateAPIVersion(ctx)
contextStore := dockerCli.ContextStore() contextStore := dockerCli.ContextStore()
@ -152,48 +132,6 @@ func configFromContext(endpointName string, s ctxstore.Reader) (clientcmd.Client
return kubernetes.ConfigFromContext(endpointName, s) return kubernetes.ConfigFromContext(endpointName, s)
} }
// clientForEndpoint returns a docker client for an endpoint
func clientForEndpoint(dockerCli command.Cli, name string) (dockerclient.APIClient, error) {
list, err := dockerCli.ContextStore().List()
if err != nil {
return nil, err
}
for _, l := range list {
if l.Name == name {
dep, ok := l.Endpoints["docker"]
if !ok {
return nil, errors.Errorf("context %q does not have a Docker endpoint", name)
}
epm, ok := dep.(docker.EndpointMeta)
if !ok {
return nil, errors.Errorf("endpoint %q is not of type EndpointMeta, %T", dep, dep)
}
ep, err := docker.WithTLSData(dockerCli.ContextStore(), name, epm)
if err != nil {
return nil, err
}
clientOpts, err := ep.ClientOpts()
if err != nil {
return nil, err
}
return dockerclient.NewClientWithOpts(clientOpts...)
}
}
ep := docker.Endpoint{
EndpointMeta: docker.EndpointMeta{
Host: name,
},
}
clientOpts, err := ep.ClientOpts()
if err != nil {
return nil, err
}
return dockerclient.NewClientWithOpts(clientOpts...)
}
func getInstanceOrDefault(ctx context.Context, dockerCli command.Cli, instance, contextPathHash string) ([]build.DriverInfo, error) { func getInstanceOrDefault(ctx context.Context, dockerCli command.Cli, instance, contextPathHash string) ([]build.DriverInfo, error) {
var defaultOnly bool var defaultOnly bool
@ -361,21 +299,6 @@ func loadNodeGroupData(ctx context.Context, dockerCli command.Cli, ngi *nginfo)
return nil return nil
} }
func dockerAPI(dockerCli command.Cli) *api {
return &api{dockerCli: dockerCli}
}
type api struct {
dockerCli command.Cli
}
func (a *api) DockerAPI(name string) (dockerclient.APIClient, error) {
if name == "" {
name = a.dockerCli.CurrentContext()
}
return clientForEndpoint(a.dockerCli, name)
}
type dinfo struct { type dinfo struct {
di *build.DriverInfo di *build.DriverInfo
info *driver.Info info *driver.Info

@ -0,0 +1,119 @@
package storeutil
import (
"context"
"io"
"sync"
"github.com/docker/buildx/util/progress"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/context/docker"
"github.com/docker/docker/client"
)
// DockerClient represents an active docker object.
type DockerClient struct {
cli command.Cli
}
// NewDockerClient initializes a new docker client.
func NewDockerClient(cli command.Cli) *DockerClient {
return &DockerClient{cli: cli}
}
// API returns a new docker API client.
func (c *DockerClient) API(name string) (client.APIClient, error) {
if name == "" {
name = c.cli.CurrentContext()
}
return ClientForEndpoint(c.cli, name)
}
// LoadImage imports an image to docker.
func (c *DockerClient) LoadImage(ctx context.Context, name string, status progress.Writer) (io.WriteCloser, func(), error) {
dapi, err := c.API(name)
if err != nil {
return nil, nil, err
}
pr, pw := io.Pipe()
done := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
var w *waitingWriter
w = &waitingWriter{
PipeWriter: pw,
f: func() {
resp, err := dapi.ImageLoad(ctx, pr, false)
defer close(done)
if err != nil {
pr.CloseWithError(err)
w.mu.Lock()
w.err = err
w.mu.Unlock()
return
}
prog := progress.WithPrefix(status, "", false)
progress.FromReader(prog, "importing to docker", resp.Body)
},
done: done,
cancel: cancel,
}
return w, func() {
pr.Close()
}, nil
}
type waitingWriter struct {
*io.PipeWriter
f func()
once sync.Once
mu sync.Mutex
err error
done chan struct{}
cancel func()
}
func (w *waitingWriter) Write(dt []byte) (int, error) {
w.once.Do(func() {
go w.f()
})
return w.PipeWriter.Write(dt)
}
func (w *waitingWriter) Close() error {
err := w.PipeWriter.Close()
<-w.done
if err == nil {
w.mu.Lock()
defer w.mu.Unlock()
return w.err
}
return err
}
// ClientForEndpoint returns a docker client for an endpoint
func ClientForEndpoint(dockerCli command.Cli, name string) (client.APIClient, error) {
dem, err := GetDockerEndpoint(dockerCli, name)
if err == nil && dem != nil {
ep, err := docker.WithTLSData(dockerCli.ContextStore(), name, *dem)
if err != nil {
return nil, err
}
clientOpts, err := ep.ClientOpts()
if err != nil {
return nil, err
}
return client.NewClientWithOpts(append(clientOpts, client.WithAPIVersionNegotiation())...)
}
ep := docker.Endpoint{
EndpointMeta: docker.EndpointMeta{
Host: name,
},
}
clientOpts, err := ep.ClientOpts()
if err != nil {
return nil, err
}
return client.NewClientWithOpts(append(clientOpts, client.WithAPIVersionNegotiation())...)
}

@ -30,33 +30,35 @@ func GetCurrentEndpoint(dockerCli command.Cli) (string, error) {
if name != "default" { if name != "default" {
return name, nil return name, nil
} }
de, err := GetDockerEndpoint(dockerCli, name) dem, err := GetDockerEndpoint(dockerCli, name)
if err != nil { if err != nil {
return "", errors.Errorf("docker endpoint for %q not found", name) return "", errors.Errorf("docker endpoint for %q not found", name)
} else if dem != nil {
return dem.Host, nil
} }
return de, nil return "", nil
} }
// GetDockerEndpoint returns docker endpoint string for given context // GetDockerEndpoint returns docker endpoint meta for given context
func GetDockerEndpoint(dockerCli command.Cli, name string) (string, error) { func GetDockerEndpoint(dockerCli command.Cli, name string) (*docker.EndpointMeta, error) {
list, err := dockerCli.ContextStore().List() list, err := dockerCli.ContextStore().List()
if err != nil { if err != nil {
return "", err return nil, err
} }
for _, l := range list { for _, l := range list {
if l.Name == name { if l.Name == name {
ep, ok := l.Endpoints["docker"] ep, ok := l.Endpoints["docker"]
if !ok { if !ok {
return "", errors.Errorf("context %q does not have a Docker endpoint", name) return nil, errors.Errorf("context %q does not have a Docker endpoint", name)
} }
typed, ok := ep.(docker.EndpointMeta) typed, ok := ep.(docker.EndpointMeta)
if !ok { if !ok {
return "", errors.Errorf("endpoint %q is not of type EndpointMeta, %T", ep, ep) return nil, errors.Errorf("endpoint %q is not of type EndpointMeta, %T", ep, ep)
} }
return typed.Host, nil return &typed, nil
} }
} }
return "", nil return nil, nil
} }
// GetCurrentInstance finds the current builder instance // GetCurrentInstance finds the current builder instance

Loading…
Cancel
Save