controller: followup refactoring on Build API
Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
This commit is contained in:
@@ -6,11 +6,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/console"
|
||||
"github.com/containerd/containerd/defaults"
|
||||
"github.com/containerd/containerd/pkg/dialer"
|
||||
"github.com/docker/buildx/controller/pb"
|
||||
"github.com/docker/buildx/util/progress"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/moby/buildkit/identity"
|
||||
"github.com/pkg/errors"
|
||||
@@ -104,38 +102,13 @@ func (c *Client) Invoke(ctx context.Context, ref string, pid string, invokeConfi
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) Build(ctx context.Context, options pb.BuildOptions, in io.ReadCloser, w io.Writer, out console.File, progressMode string) (string, *client.SolveResponse, error) {
|
||||
func (c *Client) Build(ctx context.Context, options pb.BuildOptions, in io.ReadCloser, statusChan chan *pb.StatusResponse) (string, *client.SolveResponse, error) {
|
||||
ref := identity.NewID()
|
||||
pw, err := progress.NewPrinter(context.TODO(), w, out, progressMode)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
statusChan := make(chan *client.SolveStatus)
|
||||
statusDone := make(chan struct{})
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
var resp *client.SolveResponse
|
||||
eg.Go(func() error {
|
||||
defer close(statusChan)
|
||||
var err error
|
||||
resp, err = c.build(egCtx, ref, options, in, statusChan)
|
||||
return err
|
||||
})
|
||||
eg.Go(func() error {
|
||||
defer close(statusDone)
|
||||
for s := range statusChan {
|
||||
st := s
|
||||
pw.Write(st)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
eg.Go(func() error {
|
||||
<-statusDone
|
||||
return pw.Wait()
|
||||
})
|
||||
return ref, resp, eg.Wait()
|
||||
resp, err := c.build(ctx, ref, options, in, statusChan)
|
||||
return ref, resp, err
|
||||
}
|
||||
|
||||
func (c *Client) build(ctx context.Context, ref string, options pb.BuildOptions, in io.ReadCloser, statusChan chan *client.SolveStatus) (*client.SolveResponse, error) {
|
||||
func (c *Client) build(ctx context.Context, ref string, options pb.BuildOptions, in io.ReadCloser, statusChan chan *pb.StatusResponse) (*client.SolveResponse, error) {
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
done := make(chan struct{})
|
||||
|
||||
@@ -170,51 +143,7 @@ func (c *Client) build(ctx context.Context, ref string, options pb.BuildOptions,
|
||||
}
|
||||
return errors.Wrap(err, "failed to receive status")
|
||||
}
|
||||
s := client.SolveStatus{}
|
||||
for _, v := range resp.Vertexes {
|
||||
s.Vertexes = append(s.Vertexes, &client.Vertex{
|
||||
Digest: v.Digest,
|
||||
Inputs: v.Inputs,
|
||||
Name: v.Name,
|
||||
Started: v.Started,
|
||||
Completed: v.Completed,
|
||||
Error: v.Error,
|
||||
Cached: v.Cached,
|
||||
ProgressGroup: v.ProgressGroup,
|
||||
})
|
||||
}
|
||||
for _, v := range resp.Statuses {
|
||||
s.Statuses = append(s.Statuses, &client.VertexStatus{
|
||||
ID: v.ID,
|
||||
Vertex: v.Vertex,
|
||||
Name: v.Name,
|
||||
Total: v.Total,
|
||||
Current: v.Current,
|
||||
Timestamp: v.Timestamp,
|
||||
Started: v.Started,
|
||||
Completed: v.Completed,
|
||||
})
|
||||
}
|
||||
for _, v := range resp.Logs {
|
||||
s.Logs = append(s.Logs, &client.VertexLog{
|
||||
Vertex: v.Vertex,
|
||||
Stream: int(v.Stream),
|
||||
Data: v.Msg,
|
||||
Timestamp: v.Timestamp,
|
||||
})
|
||||
}
|
||||
for _, v := range resp.Warnings {
|
||||
s.Warnings = append(s.Warnings, &client.VertexWarning{
|
||||
Vertex: v.Vertex,
|
||||
Level: int(v.Level),
|
||||
Short: v.Short,
|
||||
Detail: v.Detail,
|
||||
URL: v.Url,
|
||||
SourceInfo: v.Info,
|
||||
Range: v.Ranges,
|
||||
})
|
||||
}
|
||||
statusChan <- &s
|
||||
statusChan <- resp
|
||||
}
|
||||
})
|
||||
if in != nil {
|
||||
|
||||
@@ -141,8 +141,8 @@ func serveCmd(dockerCli command.Cli) *cobra.Command {
|
||||
}()
|
||||
|
||||
// prepare server
|
||||
b := NewServer(func(ctx context.Context, options *controllerapi.BuildOptions, stdin io.Reader, statusChan chan *client.SolveStatus) (*client.SolveResponse, *build.ResultContext, error) {
|
||||
return cbuild.RunBuild(ctx, dockerCli, *options, stdin, "quiet", statusChan)
|
||||
b := NewServer(func(ctx context.Context, options *controllerapi.BuildOptions, stdin io.Reader, statusChan chan *controllerapi.StatusResponse) (*client.SolveResponse, *build.ResultContext, error) {
|
||||
return cbuild.RunBuild(ctx, dockerCli, *options, stdin, control.ForwardProgress(statusChan))
|
||||
})
|
||||
defer b.Close()
|
||||
|
||||
|
||||
@@ -12,13 +12,12 @@ import (
|
||||
"github.com/docker/buildx/controller/processes"
|
||||
"github.com/docker/buildx/util/ioset"
|
||||
"github.com/docker/buildx/version"
|
||||
controlapi "github.com/moby/buildkit/api/services/control"
|
||||
"github.com/moby/buildkit/client"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type BuildFunc func(ctx context.Context, options *pb.BuildOptions, stdin io.Reader, statusChan chan *client.SolveStatus) (resp *client.SolveResponse, res *build.ResultContext, err error)
|
||||
type BuildFunc func(ctx context.Context, options *pb.BuildOptions, stdin io.Reader, statusChan chan *pb.StatusResponse) (resp *client.SolveResponse, res *build.ResultContext, err error)
|
||||
|
||||
func NewServer(buildFunc BuildFunc) *Server {
|
||||
return &Server{
|
||||
@@ -34,7 +33,7 @@ type Server struct {
|
||||
|
||||
type session struct {
|
||||
buildOnGoing atomic.Bool
|
||||
statusChan chan *client.SolveStatus
|
||||
statusChan chan *pb.StatusResponse
|
||||
cancelBuild func()
|
||||
inputPipe *io.PipeWriter
|
||||
|
||||
@@ -156,7 +155,7 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
|
||||
s.buildOnGoing.Store(true)
|
||||
}
|
||||
s.processes = processes.NewManager()
|
||||
statusChan := make(chan *client.SolveStatus)
|
||||
statusChan := make(chan *pb.StatusResponse)
|
||||
s.statusChan = statusChan
|
||||
inR, inW := io.Pipe()
|
||||
defer inR.Close()
|
||||
@@ -204,7 +203,7 @@ func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer
|
||||
}
|
||||
|
||||
// Wait and get status channel prepared by Build()
|
||||
var statusChan <-chan *client.SolveStatus
|
||||
var statusChan <-chan *pb.StatusResponse
|
||||
for {
|
||||
// TODO: timeout?
|
||||
m.sessionMu.Lock()
|
||||
@@ -223,7 +222,7 @@ func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer
|
||||
if ss == nil {
|
||||
break
|
||||
}
|
||||
cs := toControlStatus(ss)
|
||||
cs := ss
|
||||
if err := stream.Send(cs); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -404,51 +403,3 @@ func (m *Server) Invoke(srv pb.Controller_InvokeServer) error {
|
||||
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func toControlStatus(s *client.SolveStatus) *pb.StatusResponse {
|
||||
resp := pb.StatusResponse{}
|
||||
for _, v := range s.Vertexes {
|
||||
resp.Vertexes = append(resp.Vertexes, &controlapi.Vertex{
|
||||
Digest: v.Digest,
|
||||
Inputs: v.Inputs,
|
||||
Name: v.Name,
|
||||
Started: v.Started,
|
||||
Completed: v.Completed,
|
||||
Error: v.Error,
|
||||
Cached: v.Cached,
|
||||
ProgressGroup: v.ProgressGroup,
|
||||
})
|
||||
}
|
||||
for _, v := range s.Statuses {
|
||||
resp.Statuses = append(resp.Statuses, &controlapi.VertexStatus{
|
||||
ID: v.ID,
|
||||
Vertex: v.Vertex,
|
||||
Name: v.Name,
|
||||
Total: v.Total,
|
||||
Current: v.Current,
|
||||
Timestamp: v.Timestamp,
|
||||
Started: v.Started,
|
||||
Completed: v.Completed,
|
||||
})
|
||||
}
|
||||
for _, v := range s.Logs {
|
||||
resp.Logs = append(resp.Logs, &controlapi.VertexLog{
|
||||
Vertex: v.Vertex,
|
||||
Stream: int64(v.Stream),
|
||||
Msg: v.Data,
|
||||
Timestamp: v.Timestamp,
|
||||
})
|
||||
}
|
||||
for _, v := range s.Warnings {
|
||||
resp.Warnings = append(resp.Warnings, &controlapi.VertexWarning{
|
||||
Vertex: v.Vertex,
|
||||
Level: int64(v.Level),
|
||||
Short: v.Short,
|
||||
Detail: v.Detail,
|
||||
Url: v.URL,
|
||||
Info: v.SourceInfo,
|
||||
Ranges: v.Range,
|
||||
})
|
||||
}
|
||||
return &resp
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user