controller: refactor progress api

Refactor the progress printer creation to the caller-side of the
controller api. Then, instead of passing around status channels, we can
simply pass around the higher level interface progress.Writer, which
allows us to correctly pull warnings out of the remote controller.

Signed-off-by: Justin Chadwell <me@jedevc.com>
This commit is contained in:
Justin Chadwell
2023-04-03 13:04:28 +01:00
parent 3d49bbd23a
commit 1fd23933c2
10 changed files with 251 additions and 199 deletions

View File

@@ -6,7 +6,6 @@ import (
"sync"
"time"
"github.com/containerd/console"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/pkg/dialer"
"github.com/docker/buildx/controller/pb"
@@ -114,14 +113,9 @@ func (c *Client) Inspect(ctx context.Context, ref string) (*pb.InspectResponse,
return c.client().Inspect(ctx, &pb.InspectRequest{Ref: ref})
}
func (c *Client) Build(ctx context.Context, options pb.BuildOptions, in io.ReadCloser, w io.Writer, out console.File, progressMode string) (string, *client.SolveResponse, error) {
func (c *Client) Build(ctx context.Context, options pb.BuildOptions, in io.ReadCloser, progress progress.Writer) (string, *client.SolveResponse, error) {
ref := identity.NewID()
pw, err := progress.NewPrinter(context.TODO(), w, out, progressMode)
if err != nil {
return "", nil, err
}
statusChan := make(chan *client.SolveStatus)
statusDone := make(chan struct{})
eg, egCtx := errgroup.WithContext(ctx)
var resp *client.SolveResponse
eg.Go(func() error {
@@ -131,17 +125,12 @@ func (c *Client) Build(ctx context.Context, options pb.BuildOptions, in io.ReadC
return err
})
eg.Go(func() error {
defer close(statusDone)
for s := range statusChan {
st := s
pw.Write(st)
progress.Write(st)
}
return nil
})
eg.Go(func() error {
<-statusDone
return pw.Wait()
})
return ref, resp, eg.Wait()
}
@@ -180,51 +169,7 @@ func (c *Client) build(ctx context.Context, ref string, options pb.BuildOptions,
}
return errors.Wrap(err, "failed to receive status")
}
s := client.SolveStatus{}
for _, v := range resp.Vertexes {
s.Vertexes = append(s.Vertexes, &client.Vertex{
Digest: v.Digest,
Inputs: v.Inputs,
Name: v.Name,
Started: v.Started,
Completed: v.Completed,
Error: v.Error,
Cached: v.Cached,
ProgressGroup: v.ProgressGroup,
})
}
for _, v := range resp.Statuses {
s.Statuses = append(s.Statuses, &client.VertexStatus{
ID: v.ID,
Vertex: v.Vertex,
Name: v.Name,
Total: v.Total,
Current: v.Current,
Timestamp: v.Timestamp,
Started: v.Started,
Completed: v.Completed,
})
}
for _, v := range resp.Logs {
s.Logs = append(s.Logs, &client.VertexLog{
Vertex: v.Vertex,
Stream: int(v.Stream),
Data: v.Msg,
Timestamp: v.Timestamp,
})
}
for _, v := range resp.Warnings {
s.Warnings = append(s.Warnings, &client.VertexWarning{
Vertex: v.Vertex,
Level: int(v.Level),
Short: v.Short,
Detail: v.Detail,
URL: v.Url,
SourceInfo: v.Info,
Range: v.Ranges,
})
}
statusChan <- &s
statusChan <- pb.FromControlStatus(resp)
}
})
if in != nil {

View File

@@ -21,6 +21,7 @@ import (
"github.com/docker/buildx/controller/control"
controllerapi "github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/util/confutil"
"github.com/docker/buildx/util/progress"
"github.com/docker/buildx/version"
"github.com/docker/cli/cli/command"
"github.com/moby/buildkit/client"
@@ -142,8 +143,8 @@ func serveCmd(dockerCli command.Cli) *cobra.Command {
}()
// prepare server
b := NewServer(func(ctx context.Context, options *controllerapi.BuildOptions, stdin io.Reader, statusChan chan *client.SolveStatus) (*client.SolveResponse, *build.ResultContext, error) {
return cbuild.RunBuild(ctx, dockerCli, *options, stdin, "quiet", statusChan, true)
b := NewServer(func(ctx context.Context, options *controllerapi.BuildOptions, stdin io.Reader, progress progress.Writer) (*client.SolveResponse, *build.ResultContext, error) {
return cbuild.RunBuild(ctx, dockerCli, *options, stdin, progress, true)
})
defer b.Close()

View File

@@ -12,14 +12,14 @@ import (
"github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/controller/processes"
"github.com/docker/buildx/util/ioset"
"github.com/docker/buildx/util/progress"
"github.com/docker/buildx/version"
controlapi "github.com/moby/buildkit/api/services/control"
"github.com/moby/buildkit/client"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
type BuildFunc func(ctx context.Context, options *pb.BuildOptions, stdin io.Reader, statusChan chan *client.SolveStatus) (resp *client.SolveResponse, res *build.ResultContext, err error)
type BuildFunc func(ctx context.Context, options *pb.BuildOptions, stdin io.Reader, progress progress.Writer) (resp *client.SolveResponse, res *build.ResultContext, err error)
func NewServer(buildFunc BuildFunc) *Server {
return &Server{
@@ -35,7 +35,7 @@ type Server struct {
type session struct {
buildOnGoing atomic.Bool
statusChan chan *client.SolveStatus
statusChan chan *pb.StatusResponse
cancelBuild func()
buildOptions *pb.BuildOptions
inputPipe *io.PipeWriter
@@ -177,8 +177,9 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
s = &session{}
s.buildOnGoing.Store(true)
}
s.processes = processes.NewManager()
statusChan := make(chan *client.SolveStatus)
statusChan := make(chan *pb.StatusResponse)
s.statusChan = statusChan
inR, inW := io.Pipe()
defer inR.Close()
@@ -196,10 +197,12 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
m.sessionMu.Unlock()
}()
pw := pb.NewProgressWriter(statusChan)
// Build the specified request
ctx, cancel := context.WithCancel(ctx)
defer cancel()
resp, res, buildErr := m.buildFunc(ctx, req.Options, inR, statusChan)
resp, res, buildErr := m.buildFunc(ctx, req.Options, inR, pw)
m.sessionMu.Lock()
if s, ok := m.session[ref]; ok {
// NOTE: buildFunc can return *build.ResultContext even on error (e.g. when it's implemented using (github.com/docker/buildx/controller/build).RunBuild).
@@ -237,7 +240,7 @@ func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer
}
// Wait and get status channel prepared by Build()
var statusChan <-chan *client.SolveStatus
var statusChan <-chan *pb.StatusResponse
for {
// TODO: timeout?
m.sessionMu.Lock()
@@ -256,8 +259,7 @@ func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer
if ss == nil {
break
}
cs := toControlStatus(ss)
if err := stream.Send(cs); err != nil {
if err := stream.Send(ss); err != nil {
return err
}
}
@@ -437,51 +439,3 @@ func (m *Server) Invoke(srv pb.Controller_InvokeServer) error {
return eg.Wait()
}
func toControlStatus(s *client.SolveStatus) *pb.StatusResponse {
resp := pb.StatusResponse{}
for _, v := range s.Vertexes {
resp.Vertexes = append(resp.Vertexes, &controlapi.Vertex{
Digest: v.Digest,
Inputs: v.Inputs,
Name: v.Name,
Started: v.Started,
Completed: v.Completed,
Error: v.Error,
Cached: v.Cached,
ProgressGroup: v.ProgressGroup,
})
}
for _, v := range s.Statuses {
resp.Statuses = append(resp.Statuses, &controlapi.VertexStatus{
ID: v.ID,
Vertex: v.Vertex,
Name: v.Name,
Total: v.Total,
Current: v.Current,
Timestamp: v.Timestamp,
Started: v.Started,
Completed: v.Completed,
})
}
for _, v := range s.Logs {
resp.Logs = append(resp.Logs, &controlapi.VertexLog{
Vertex: v.Vertex,
Stream: int64(v.Stream),
Msg: v.Data,
Timestamp: v.Timestamp,
})
}
for _, v := range s.Warnings {
resp.Warnings = append(resp.Warnings, &controlapi.VertexWarning{
Vertex: v.Vertex,
Level: int64(v.Level),
Short: v.Short,
Detail: v.Detail,
Url: v.URL,
Info: v.SourceInfo,
Ranges: v.Range,
})
}
return &resp
}