util: simplify progress syncronization

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
pull/398/head
Tonis Tiigi 4 years ago
parent 290e25917c
commit 1496ac9b55

@ -173,7 +173,6 @@ func splitToDriverPairs(availablePlatforms map[string]int, opt map[string]Option
} }
func resolveDrivers(ctx context.Context, drivers []DriverInfo, auth Auth, opt map[string]Options, pw progress.Writer) (map[string][]driverPair, []*client.Client, error) { func resolveDrivers(ctx context.Context, drivers []DriverInfo, auth Auth, opt map[string]Options, pw progress.Writer) (map[string][]driverPair, []*client.Client, error) {
availablePlatforms := map[string]int{} availablePlatforms := map[string]int{}
for i, d := range drivers { for i, d := range drivers {
for _, p := range d.Platform { for _, p := range d.Platform {
@ -479,7 +478,7 @@ func toSolveOpt(ctx context.Context, d driver.Driver, multiDriver bool, opt Opti
return &so, releaseF, nil return &so, releaseF, nil
} }
func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, docker DockerAPI, auth Auth, pw progress.Writer) (resp map[string]*client.SolveResponse, err error) { func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, docker DockerAPI, auth Auth, w progress.Writer) (resp map[string]*client.SolveResponse, err error) {
if len(drivers) == 0 { if len(drivers) == 0 {
return nil, errors.Errorf("driver required for build") return nil, errors.Errorf("driver required for build")
} }
@ -506,10 +505,8 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do
} }
} }
m, clients, err := resolveDrivers(ctx, drivers, auth, opt, pw) m, clients, err := resolveDrivers(ctx, drivers, auth, opt, w)
if err != nil { if err != nil {
close(pw.Status())
<-pw.Done()
return nil, err return nil, err
} }
@ -522,7 +519,6 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do
} }
}() }()
mw := progress.NewMultiWriter(pw)
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
for k, opt := range opt { for k, opt := range opt {
@ -530,8 +526,8 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do
for i, dp := range m[k] { for i, dp := range m[k] {
d := drivers[dp.driverIndex].Driver d := drivers[dp.driverIndex].Driver
opt.Platforms = dp.platforms opt.Platforms = dp.platforms
so, release, err := toSolveOpt(ctx, d, multiDriver, opt, pw, func(name string) (io.WriteCloser, func(), error) { so, release, err := toSolveOpt(ctx, d, multiDriver, opt, w, func(name string) (io.WriteCloser, func(), error) {
return newDockerLoader(ctx, docker, name, mw) return newDockerLoader(ctx, docker, name, w)
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -559,8 +555,7 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do
var pushNames string var pushNames string
eg.Go(func() error { eg.Go(func() error {
pw := mw.WithPrefix("default", false) pw := progress.WithPrefix(w, "default", false)
defer close(pw.Status())
wg.Wait() wg.Wait()
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -663,23 +658,15 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do
} }
func(i int, dp driverPair, so client.SolveOpt) { func(i int, dp driverPair, so client.SolveOpt) {
pw := mw.WithPrefix(k, multiTarget) pw := progress.WithPrefix(w, k, multiTarget)
c := clients[dp.driverIndex] c := clients[dp.driverIndex]
var statusCh chan *client.SolveStatus pw = progress.ResetTime(pw)
if pw != nil {
pw = progress.ResetTime(pw)
statusCh = pw.Status()
eg.Go(func() error {
<-pw.Done()
return pw.Err()
})
}
eg.Go(func() error { eg.Go(func() error {
defer wg.Done() defer wg.Done()
rr, err := c.Solve(ctx, nil, so, statusCh) rr, err := c.Solve(ctx, nil, so, progress.NewChannel(pw))
if err != nil { if err != nil {
return err return err
} }
@ -829,7 +816,7 @@ func notSupported(d driver.Driver, f driver.Feature) error {
type dockerLoadCallback func(name string) (io.WriteCloser, func(), error) type dockerLoadCallback func(name string) (io.WriteCloser, func(), error)
func newDockerLoader(ctx context.Context, d DockerAPI, name string, mw *progress.MultiWriter) (io.WriteCloser, func(), error) { func newDockerLoader(ctx context.Context, d DockerAPI, name string, status progress.Writer) (io.WriteCloser, func(), error) {
c, err := d.DockerAPI(name) c, err := d.DockerAPI(name)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -852,7 +839,7 @@ func newDockerLoader(ctx context.Context, d DockerAPI, name string, mw *progress
w.mu.Unlock() w.mu.Unlock()
return return
} }
prog := mw.WithPrefix("", false) prog := progress.WithPrefix(status, "", false)
progress.FromReader(prog, "importing to docker", resp.Body) progress.FromReader(prog, "importing to docker", resp.Body)
}, },
done: done, done: done,

@ -60,7 +60,7 @@ func createTempDockerfileFromURL(ctx context.Context, d driver.Driver, url strin
} }
out = dir out = dir
return nil, nil return nil, nil
}, pw.Status()) }, progress.NewChannel(pw))
if err != nil { if err != nil {
return "", err return "", err

@ -203,9 +203,14 @@ func buildTargets(ctx context.Context, dockerCli command.Cli, opts map[string]bu
ctx2, cancel := context.WithCancel(context.TODO()) ctx2, cancel := context.WithCancel(context.TODO())
defer cancel() defer cancel()
pw := progress.NewPrinter(ctx2, os.Stderr, progressMode) printer := progress.NewPrinter(ctx2, os.Stderr, progressMode)
_, err = build.Build(ctx, dis, opts, dockerAPI(dockerCli), dockerCli.ConfigFile(), printer)
err1 := printer.Wait()
if err == nil {
err = err1
}
_, err = build.Build(ctx, dis, opts, dockerAPI(dockerCli), dockerCli.ConfigFile(), pw)
return err return err
} }

@ -171,25 +171,27 @@ func boot(ctx context.Context, ngi *nginfo, dockerCli command.Cli) (bool, error)
return false, nil return false, nil
} }
pw := progress.NewPrinter(context.TODO(), os.Stderr, "auto") printer := progress.NewPrinter(context.TODO(), os.Stderr, "auto")
mw := progress.NewMultiWriter(pw)
eg, _ := errgroup.WithContext(ctx) eg, _ := errgroup.WithContext(ctx)
for _, idx := range toBoot { for _, idx := range toBoot {
func(idx int) { func(idx int) {
eg.Go(func() error { eg.Go(func() error {
pw := mw.WithPrefix(ngi.ng.Nodes[idx].Name, len(toBoot) > 1) pw := progress.WithPrefix(printer, ngi.ng.Nodes[idx].Name, len(toBoot) > 1)
_, err := driver.Boot(ctx, ngi.drivers[idx].di.Driver, pw) _, err := driver.Boot(ctx, ngi.drivers[idx].di.Driver, pw)
if err != nil { if err != nil {
ngi.drivers[idx].err = err ngi.drivers[idx].err = err
} }
close(pw.Status())
<-pw.Done()
return nil return nil
}) })
}(idx) }(idx)
} }
return true, eg.Wait() err := eg.Wait()
err1 := printer.Wait()
if err == nil {
err = err1
}
return true, err
} }

@ -71,11 +71,7 @@ func Boot(ctx context.Context, d Driver, pw progress.Writer) (*client.Client, er
if try > 2 { if try > 2 {
return nil, errors.Errorf("failed to bootstrap %T driver in attempts", d) return nil, errors.Errorf("failed to bootstrap %T driver in attempts", d)
} }
if err := d.Bootstrap(ctx, func(s *client.SolveStatus) { if err := d.Bootstrap(ctx, pw.Write); err != nil {
if pw != nil {
pw.Status() <- s
}
}); err != nil {
return nil, err return nil, err
} }
} }

@ -11,7 +11,6 @@ import (
) )
func FromReader(w Writer, name string, rc io.ReadCloser) { func FromReader(w Writer, name string, rc io.ReadCloser) {
status := w.Status()
dgst := digest.FromBytes([]byte(identity.NewID())) dgst := digest.FromBytes([]byte(identity.NewID()))
tm := time.Now() tm := time.Now()
@ -21,9 +20,9 @@ func FromReader(w Writer, name string, rc io.ReadCloser) {
Started: &tm, Started: &tm,
} }
status <- &client.SolveStatus{ w.Write(&client.SolveStatus{
Vertexes: []*client.Vertex{&vtx}, Vertexes: []*client.Vertex{&vtx},
} })
_, err := io.Copy(ioutil.Discard, rc) _, err := io.Copy(ioutil.Discard, rc)
@ -33,8 +32,7 @@ func FromReader(w Writer, name string, rc io.ReadCloser) {
if err != nil { if err != nil {
vtx2.Error = err.Error() vtx2.Error = err.Error()
} }
status <- &client.SolveStatus{ w.Write(&client.SolveStatus{
Vertexes: []*client.Vertex{&vtx2}, Vertexes: []*client.Vertex{&vtx2},
} })
close(status)
} }

@ -1,101 +1,32 @@
package progress package progress
import ( import (
"context"
"strings" "strings"
"sync"
"github.com/moby/buildkit/client" "github.com/moby/buildkit/client"
"golang.org/x/sync/errgroup"
) )
type MultiWriter struct { func WithPrefix(w Writer, pfx string, force bool) Writer {
w Writer return &prefixed{
eg *errgroup.Group main: w,
once sync.Once pfx: pfx,
ready chan struct{} force: force,
}
func (mw *MultiWriter) WithPrefix(pfx string, force bool) Writer {
in := make(chan *client.SolveStatus)
out := mw.w.Status()
p := &prefixed{
main: mw.w,
in: in,
} }
mw.eg.Go(func() error {
mw.once.Do(func() {
close(mw.ready)
})
for {
select {
case v, ok := <-in:
if ok {
if force {
for _, v := range v.Vertexes {
v.Name = addPrefix(pfx, v.Name)
}
}
out <- v
} else {
return nil
}
case <-mw.Done():
return mw.Err()
}
}
})
return p
}
func (mw *MultiWriter) Done() <-chan struct{} {
return mw.w.Done()
}
func (mw *MultiWriter) Err() error {
return mw.w.Err()
}
func (mw *MultiWriter) Status() chan *client.SolveStatus {
return nil
} }
type prefixed struct { type prefixed struct {
main Writer main Writer
in chan *client.SolveStatus pfx string
} force bool
func (p *prefixed) Done() <-chan struct{} {
return p.main.Done()
}
func (p *prefixed) Err() error {
return p.main.Err()
}
func (p *prefixed) Status() chan *client.SolveStatus {
return p.in
} }
func NewMultiWriter(pw Writer) *MultiWriter { func (p *prefixed) Write(v *client.SolveStatus) {
if pw == nil { if p.force {
return nil for _, v := range v.Vertexes {
} v.Name = addPrefix(p.pfx, v.Name)
eg, _ := errgroup.WithContext(context.TODO()) }
ready := make(chan struct{})
go func() {
<-ready
eg.Wait()
close(pw.Status())
}()
return &MultiWriter{
w: pw,
eg: eg,
ready: ready,
} }
p.main.Write(v)
} }
func addPrefix(pfx, name string) string { func addPrefix(pfx, name string) string {

@ -9,32 +9,27 @@ import (
"github.com/moby/buildkit/util/progress/progressui" "github.com/moby/buildkit/util/progress/progressui"
) )
type printer struct { type Printer struct {
status chan *client.SolveStatus status chan *client.SolveStatus
done <-chan struct{} done <-chan struct{}
err error err error
} }
func (p *printer) Done() <-chan struct{} { func (p *Printer) Wait() error {
return p.done close(p.status)
} <-p.done
func (p *printer) Err() error {
return p.err return p.err
} }
func (p *printer) Status() chan *client.SolveStatus { func (p *Printer) Write(s *client.SolveStatus) {
if p == nil { p.status <- s
return nil
}
return p.status
} }
func NewPrinter(ctx context.Context, out console.File, mode string) Writer { func NewPrinter(ctx context.Context, out console.File, mode string) *Printer {
statusCh := make(chan *client.SolveStatus) statusCh := make(chan *client.SolveStatus)
doneCh := make(chan struct{}) doneCh := make(chan struct{})
pw := &printer{ pw := &Printer{
status: statusCh, status: statusCh,
done: doneCh, done: doneCh,
} }

@ -7,56 +7,45 @@ import (
) )
func ResetTime(in Writer) Writer { func ResetTime(in Writer) Writer {
w := &pw{Writer: in, status: make(chan *client.SolveStatus), tm: time.Now()} return &pw{Writer: in, status: make(chan *client.SolveStatus), tm: time.Now()}
go func() { }
for {
select { func (w *pw) Write(st *client.SolveStatus) {
case <-in.Done(): if w.diff == nil {
return for _, v := range st.Vertexes {
case st, ok := <-w.status: if v.Started != nil {
if !ok { d := v.Started.Sub(w.tm)
close(in.Status()) w.diff = &d
return }
} }
if w.diff == nil { }
for _, v := range st.Vertexes { if w.diff != nil {
if v.Started != nil { for _, v := range st.Vertexes {
d := v.Started.Sub(w.tm) if v.Started != nil {
w.diff = &d d := v.Started.Add(-*w.diff)
} v.Started = &d
} }
} if v.Completed != nil {
if w.diff != nil { d := v.Completed.Add(-*w.diff)
for _, v := range st.Vertexes { v.Completed = &d
if v.Started != nil {
d := v.Started.Add(-*w.diff)
v.Started = &d
}
if v.Completed != nil {
d := v.Completed.Add(-*w.diff)
v.Completed = &d
}
}
for _, v := range st.Statuses {
if v.Started != nil {
d := v.Started.Add(-*w.diff)
v.Started = &d
}
if v.Completed != nil {
d := v.Completed.Add(-*w.diff)
v.Completed = &d
}
v.Timestamp = v.Timestamp.Add(-*w.diff)
}
for _, v := range st.Logs {
v.Timestamp = v.Timestamp.Add(-*w.diff)
}
}
in.Status() <- st
} }
} }
}() for _, v := range st.Statuses {
return w if v.Started != nil {
d := v.Started.Add(-*w.diff)
v.Started = &d
}
if v.Completed != nil {
d := v.Completed.Add(-*w.diff)
v.Completed = &d
}
v.Timestamp = v.Timestamp.Add(-*w.diff)
}
for _, v := range st.Logs {
v.Timestamp = v.Timestamp.Add(-*w.diff)
}
}
w.Writer.Write(st)
} }
type pw struct { type pw struct {

@ -9,13 +9,10 @@ import (
) )
type Writer interface { type Writer interface {
Done() <-chan struct{} Write(*client.SolveStatus)
Err() error
Status() chan *client.SolveStatus
} }
func Write(w Writer, name string, f func() error) { func Write(w Writer, name string, f func() error) {
status := w.Status()
dgst := digest.FromBytes([]byte(identity.NewID())) dgst := digest.FromBytes([]byte(identity.NewID()))
tm := time.Now() tm := time.Now()
@ -25,9 +22,9 @@ func Write(w Writer, name string, f func() error) {
Started: &tm, Started: &tm,
} }
status <- &client.SolveStatus{ w.Write(&client.SolveStatus{
Vertexes: []*client.Vertex{&vtx}, Vertexes: []*client.Vertex{&vtx},
} })
err := f() err := f()
@ -37,7 +34,21 @@ func Write(w Writer, name string, f func() error) {
if err != nil { if err != nil {
vtx2.Error = err.Error() vtx2.Error = err.Error()
} }
status <- &client.SolveStatus{ w.Write(&client.SolveStatus{
Vertexes: []*client.Vertex{&vtx2}, Vertexes: []*client.Vertex{&vtx2},
} })
}
func NewChannel(w Writer) chan *client.SolveStatus {
ch := make(chan *client.SolveStatus)
go func() {
for {
v, ok := <-ch
if !ok {
return
}
w.Write(v)
}
}()
return ch
} }

Loading…
Cancel
Save