From 1496ac9b5505e050f79fe655dec95a1c532cd2f5 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Sun, 20 Sep 2020 19:46:39 -0700 Subject: [PATCH] util: simplify progress syncronization Signed-off-by: Tonis Tiigi --- build/build.go | 33 ++++-------- build/url.go | 2 +- commands/build.go | 9 +++- commands/inspect.go | 16 +++--- driver/driver.go | 6 +-- util/progress/fromreader.go | 10 ++-- util/progress/multiwriter.go | 97 ++++++------------------------------ util/progress/printer.go | 21 +++----- util/progress/reset.go | 85 ++++++++++++++----------------- util/progress/writer.go | 27 +++++++--- 10 files changed, 110 insertions(+), 196 deletions(-) diff --git a/build/build.go b/build/build.go index e468eb61..1b6e09cf 100644 --- a/build/build.go +++ b/build/build.go @@ -173,7 +173,6 @@ func splitToDriverPairs(availablePlatforms map[string]int, opt map[string]Option } func resolveDrivers(ctx context.Context, drivers []DriverInfo, auth Auth, opt map[string]Options, pw progress.Writer) (map[string][]driverPair, []*client.Client, error) { - availablePlatforms := map[string]int{} for i, d := range drivers { for _, p := range d.Platform { @@ -479,7 +478,7 @@ func toSolveOpt(ctx context.Context, d driver.Driver, multiDriver bool, opt Opti return &so, releaseF, nil } -func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, docker DockerAPI, auth Auth, pw progress.Writer) (resp map[string]*client.SolveResponse, err error) { +func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, docker DockerAPI, auth Auth, w progress.Writer) (resp map[string]*client.SolveResponse, err error) { if len(drivers) == 0 { return nil, errors.Errorf("driver required for build") } @@ -506,10 +505,8 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do } } - m, clients, err := resolveDrivers(ctx, drivers, auth, opt, pw) + m, clients, err := resolveDrivers(ctx, drivers, auth, opt, w) if err != nil { - close(pw.Status()) - <-pw.Done() return nil, err } @@ -522,7 +519,6 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do } }() - mw := progress.NewMultiWriter(pw) eg, ctx := errgroup.WithContext(ctx) for k, opt := range opt { @@ -530,8 +526,8 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do for i, dp := range m[k] { d := drivers[dp.driverIndex].Driver opt.Platforms = dp.platforms - so, release, err := toSolveOpt(ctx, d, multiDriver, opt, pw, func(name string) (io.WriteCloser, func(), error) { - return newDockerLoader(ctx, docker, name, mw) + so, release, err := toSolveOpt(ctx, d, multiDriver, opt, w, func(name string) (io.WriteCloser, func(), error) { + return newDockerLoader(ctx, docker, name, w) }) if err != nil { return nil, err @@ -559,8 +555,7 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do var pushNames string eg.Go(func() error { - pw := mw.WithPrefix("default", false) - defer close(pw.Status()) + pw := progress.WithPrefix(w, "default", false) wg.Wait() select { case <-ctx.Done(): @@ -663,23 +658,15 @@ func Build(ctx context.Context, drivers []DriverInfo, opt map[string]Options, do } func(i int, dp driverPair, so client.SolveOpt) { - pw := mw.WithPrefix(k, multiTarget) + pw := progress.WithPrefix(w, k, multiTarget) c := clients[dp.driverIndex] - var statusCh chan *client.SolveStatus - if pw != nil { - pw = progress.ResetTime(pw) - statusCh = pw.Status() - eg.Go(func() error { - <-pw.Done() - return pw.Err() - }) - } + pw = progress.ResetTime(pw) eg.Go(func() error { defer wg.Done() - rr, err := c.Solve(ctx, nil, so, statusCh) + rr, err := c.Solve(ctx, nil, so, progress.NewChannel(pw)) if err != nil { return err } @@ -829,7 +816,7 @@ func notSupported(d driver.Driver, f driver.Feature) error { type dockerLoadCallback func(name string) (io.WriteCloser, func(), error) -func newDockerLoader(ctx context.Context, d DockerAPI, name string, mw *progress.MultiWriter) (io.WriteCloser, func(), error) { +func newDockerLoader(ctx context.Context, d DockerAPI, name string, status progress.Writer) (io.WriteCloser, func(), error) { c, err := d.DockerAPI(name) if err != nil { return nil, nil, err @@ -852,7 +839,7 @@ func newDockerLoader(ctx context.Context, d DockerAPI, name string, mw *progress w.mu.Unlock() return } - prog := mw.WithPrefix("", false) + prog := progress.WithPrefix(status, "", false) progress.FromReader(prog, "importing to docker", resp.Body) }, done: done, diff --git a/build/url.go b/build/url.go index e369e9ba..451b0146 100644 --- a/build/url.go +++ b/build/url.go @@ -60,7 +60,7 @@ func createTempDockerfileFromURL(ctx context.Context, d driver.Driver, url strin } out = dir return nil, nil - }, pw.Status()) + }, progress.NewChannel(pw)) if err != nil { return "", err diff --git a/commands/build.go b/commands/build.go index fe1c9803..262d57b4 100644 --- a/commands/build.go +++ b/commands/build.go @@ -203,9 +203,14 @@ func buildTargets(ctx context.Context, dockerCli command.Cli, opts map[string]bu ctx2, cancel := context.WithCancel(context.TODO()) defer cancel() - pw := progress.NewPrinter(ctx2, os.Stderr, progressMode) + printer := progress.NewPrinter(ctx2, os.Stderr, progressMode) + + _, err = build.Build(ctx, dis, opts, dockerAPI(dockerCli), dockerCli.ConfigFile(), printer) + err1 := printer.Wait() + if err == nil { + err = err1 + } - _, err = build.Build(ctx, dis, opts, dockerAPI(dockerCli), dockerCli.ConfigFile(), pw) return err } diff --git a/commands/inspect.go b/commands/inspect.go index 6f0db761..fb9d9535 100644 --- a/commands/inspect.go +++ b/commands/inspect.go @@ -171,25 +171,27 @@ func boot(ctx context.Context, ngi *nginfo, dockerCli command.Cli) (bool, error) return false, nil } - pw := progress.NewPrinter(context.TODO(), os.Stderr, "auto") - - mw := progress.NewMultiWriter(pw) + printer := progress.NewPrinter(context.TODO(), os.Stderr, "auto") eg, _ := errgroup.WithContext(ctx) for _, idx := range toBoot { func(idx int) { eg.Go(func() error { - pw := mw.WithPrefix(ngi.ng.Nodes[idx].Name, len(toBoot) > 1) + pw := progress.WithPrefix(printer, ngi.ng.Nodes[idx].Name, len(toBoot) > 1) _, err := driver.Boot(ctx, ngi.drivers[idx].di.Driver, pw) if err != nil { ngi.drivers[idx].err = err } - close(pw.Status()) - <-pw.Done() return nil }) }(idx) } - return true, eg.Wait() + err := eg.Wait() + err1 := printer.Wait() + if err == nil { + err = err1 + } + + return true, err } diff --git a/driver/driver.go b/driver/driver.go index 12dca3d2..688ec835 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -71,11 +71,7 @@ func Boot(ctx context.Context, d Driver, pw progress.Writer) (*client.Client, er if try > 2 { return nil, errors.Errorf("failed to bootstrap %T driver in attempts", d) } - if err := d.Bootstrap(ctx, func(s *client.SolveStatus) { - if pw != nil { - pw.Status() <- s - } - }); err != nil { + if err := d.Bootstrap(ctx, pw.Write); err != nil { return nil, err } } diff --git a/util/progress/fromreader.go b/util/progress/fromreader.go index 18c8c2da..06f57b83 100644 --- a/util/progress/fromreader.go +++ b/util/progress/fromreader.go @@ -11,7 +11,6 @@ import ( ) func FromReader(w Writer, name string, rc io.ReadCloser) { - status := w.Status() dgst := digest.FromBytes([]byte(identity.NewID())) tm := time.Now() @@ -21,9 +20,9 @@ func FromReader(w Writer, name string, rc io.ReadCloser) { Started: &tm, } - status <- &client.SolveStatus{ + w.Write(&client.SolveStatus{ Vertexes: []*client.Vertex{&vtx}, - } + }) _, err := io.Copy(ioutil.Discard, rc) @@ -33,8 +32,7 @@ func FromReader(w Writer, name string, rc io.ReadCloser) { if err != nil { vtx2.Error = err.Error() } - status <- &client.SolveStatus{ + w.Write(&client.SolveStatus{ Vertexes: []*client.Vertex{&vtx2}, - } - close(status) + }) } diff --git a/util/progress/multiwriter.go b/util/progress/multiwriter.go index 51b2c877..340e5694 100644 --- a/util/progress/multiwriter.go +++ b/util/progress/multiwriter.go @@ -1,101 +1,32 @@ package progress import ( - "context" "strings" - "sync" "github.com/moby/buildkit/client" - "golang.org/x/sync/errgroup" ) -type MultiWriter struct { - w Writer - eg *errgroup.Group - once sync.Once - ready chan struct{} -} - -func (mw *MultiWriter) WithPrefix(pfx string, force bool) Writer { - in := make(chan *client.SolveStatus) - out := mw.w.Status() - p := &prefixed{ - main: mw.w, - in: in, +func WithPrefix(w Writer, pfx string, force bool) Writer { + return &prefixed{ + main: w, + pfx: pfx, + force: force, } - mw.eg.Go(func() error { - mw.once.Do(func() { - close(mw.ready) - }) - for { - select { - case v, ok := <-in: - if ok { - if force { - for _, v := range v.Vertexes { - v.Name = addPrefix(pfx, v.Name) - } - } - out <- v - } else { - return nil - } - case <-mw.Done(): - return mw.Err() - } - } - }) - return p -} - -func (mw *MultiWriter) Done() <-chan struct{} { - return mw.w.Done() -} - -func (mw *MultiWriter) Err() error { - return mw.w.Err() -} - -func (mw *MultiWriter) Status() chan *client.SolveStatus { - return nil } type prefixed struct { - main Writer - in chan *client.SolveStatus -} - -func (p *prefixed) Done() <-chan struct{} { - return p.main.Done() -} - -func (p *prefixed) Err() error { - return p.main.Err() -} - -func (p *prefixed) Status() chan *client.SolveStatus { - return p.in + main Writer + pfx string + force bool } -func NewMultiWriter(pw Writer) *MultiWriter { - if pw == nil { - return nil - } - eg, _ := errgroup.WithContext(context.TODO()) - - ready := make(chan struct{}) - - go func() { - <-ready - eg.Wait() - close(pw.Status()) - }() - - return &MultiWriter{ - w: pw, - eg: eg, - ready: ready, +func (p *prefixed) Write(v *client.SolveStatus) { + if p.force { + for _, v := range v.Vertexes { + v.Name = addPrefix(p.pfx, v.Name) + } } + p.main.Write(v) } func addPrefix(pfx, name string) string { diff --git a/util/progress/printer.go b/util/progress/printer.go index b4a76d7f..ec627191 100644 --- a/util/progress/printer.go +++ b/util/progress/printer.go @@ -9,32 +9,27 @@ import ( "github.com/moby/buildkit/util/progress/progressui" ) -type printer struct { +type Printer struct { status chan *client.SolveStatus done <-chan struct{} err error } -func (p *printer) Done() <-chan struct{} { - return p.done -} - -func (p *printer) Err() error { +func (p *Printer) Wait() error { + close(p.status) + <-p.done return p.err } -func (p *printer) Status() chan *client.SolveStatus { - if p == nil { - return nil - } - return p.status +func (p *Printer) Write(s *client.SolveStatus) { + p.status <- s } -func NewPrinter(ctx context.Context, out console.File, mode string) Writer { +func NewPrinter(ctx context.Context, out console.File, mode string) *Printer { statusCh := make(chan *client.SolveStatus) doneCh := make(chan struct{}) - pw := &printer{ + pw := &Printer{ status: statusCh, done: doneCh, } diff --git a/util/progress/reset.go b/util/progress/reset.go index c2dfcbf9..f8ce39b6 100644 --- a/util/progress/reset.go +++ b/util/progress/reset.go @@ -7,56 +7,45 @@ import ( ) func ResetTime(in Writer) Writer { - w := &pw{Writer: in, status: make(chan *client.SolveStatus), tm: time.Now()} - go func() { - for { - select { - case <-in.Done(): - return - case st, ok := <-w.status: - if !ok { - close(in.Status()) - return - } - if w.diff == nil { - for _, v := range st.Vertexes { - if v.Started != nil { - d := v.Started.Sub(w.tm) - w.diff = &d - } - } - } - if w.diff != nil { - for _, v := range st.Vertexes { - if v.Started != nil { - d := v.Started.Add(-*w.diff) - v.Started = &d - } - if v.Completed != nil { - d := v.Completed.Add(-*w.diff) - v.Completed = &d - } - } - for _, v := range st.Statuses { - if v.Started != nil { - d := v.Started.Add(-*w.diff) - v.Started = &d - } - if v.Completed != nil { - d := v.Completed.Add(-*w.diff) - v.Completed = &d - } - v.Timestamp = v.Timestamp.Add(-*w.diff) - } - for _, v := range st.Logs { - v.Timestamp = v.Timestamp.Add(-*w.diff) - } - } - in.Status() <- st + return &pw{Writer: in, status: make(chan *client.SolveStatus), tm: time.Now()} +} + +func (w *pw) Write(st *client.SolveStatus) { + if w.diff == nil { + for _, v := range st.Vertexes { + if v.Started != nil { + d := v.Started.Sub(w.tm) + w.diff = &d + } + } + } + if w.diff != nil { + for _, v := range st.Vertexes { + if v.Started != nil { + d := v.Started.Add(-*w.diff) + v.Started = &d + } + if v.Completed != nil { + d := v.Completed.Add(-*w.diff) + v.Completed = &d } } - }() - return w + for _, v := range st.Statuses { + if v.Started != nil { + d := v.Started.Add(-*w.diff) + v.Started = &d + } + if v.Completed != nil { + d := v.Completed.Add(-*w.diff) + v.Completed = &d + } + v.Timestamp = v.Timestamp.Add(-*w.diff) + } + for _, v := range st.Logs { + v.Timestamp = v.Timestamp.Add(-*w.diff) + } + } + w.Writer.Write(st) } type pw struct { diff --git a/util/progress/writer.go b/util/progress/writer.go index 7e7ab265..d2a77e17 100644 --- a/util/progress/writer.go +++ b/util/progress/writer.go @@ -9,13 +9,10 @@ import ( ) type Writer interface { - Done() <-chan struct{} - Err() error - Status() chan *client.SolveStatus + Write(*client.SolveStatus) } func Write(w Writer, name string, f func() error) { - status := w.Status() dgst := digest.FromBytes([]byte(identity.NewID())) tm := time.Now() @@ -25,9 +22,9 @@ func Write(w Writer, name string, f func() error) { Started: &tm, } - status <- &client.SolveStatus{ + w.Write(&client.SolveStatus{ Vertexes: []*client.Vertex{&vtx}, - } + }) err := f() @@ -37,7 +34,21 @@ func Write(w Writer, name string, f func() error) { if err != nil { vtx2.Error = err.Error() } - status <- &client.SolveStatus{ + w.Write(&client.SolveStatus{ Vertexes: []*client.Vertex{&vtx2}, - } + }) +} + +func NewChannel(w Writer) chan *client.SolveStatus { + ch := make(chan *client.SolveStatus) + go func() { + for { + v, ok := <-ch + if !ok { + return + } + w.Write(v) + } + }() + return ch }