package progressui import ( "bytes" "container/ring" "context" "fmt" "io" "os" "sort" "strconv" "strings" "time" "github.com/containerd/console" "github.com/moby/buildkit/client" "github.com/morikuni/aec" digest "github.com/opencontainers/go-digest" "github.com/tonistiigi/units" "github.com/tonistiigi/vt100" "golang.org/x/time/rate" ) func DisplaySolveStatus(ctx context.Context, phase string, c console.Console, w io.Writer, ch chan *client.SolveStatus) ([]client.VertexWarning, error) { modeConsole := c != nil disp := &display{c: c, phase: phase} printer := &textMux{w: w} if disp.phase == "" { disp.phase = "Building" } t := newTrace(w, modeConsole) tickerTimeout := 150 * time.Millisecond displayTimeout := 100 * time.Millisecond if v := os.Getenv("TTY_DISPLAY_RATE"); v != "" { if r, err := strconv.ParseInt(v, 10, 64); err == nil { tickerTimeout = time.Duration(r) * time.Millisecond displayTimeout = time.Duration(r) * time.Millisecond } } var done bool ticker := time.NewTicker(tickerTimeout) // implemented as closure because "ticker" can change defer func() { ticker.Stop() }() displayLimiter := rate.NewLimiter(rate.Every(displayTimeout), 1) var height int width, _ := disp.getSize() for { select { case <-ctx.Done(): return nil, ctx.Err() case <-ticker.C: case ss, ok := <-ch: if ok { t.update(ss, width) } else { done = true } } if modeConsole { width, height = disp.getSize() if done { disp.print(t.displayInfo(), width, height, true) t.printErrorLogs(c) return t.warnings(), nil } else if displayLimiter.Allow() { ticker.Stop() ticker = time.NewTicker(tickerTimeout) disp.print(t.displayInfo(), width, height, false) } } else { if done || displayLimiter.Allow() { printer.print(t) if done { t.printErrorLogs(w) return t.warnings(), nil } ticker.Stop() ticker = time.NewTicker(tickerTimeout) } } } } const termHeight = 6 const termPad = 10 type displayInfo struct { startTime time.Time jobs []*job countTotal int countCompleted int } type job struct { intervals []interval isCompleted bool name string status string hasError bool isCanceled bool vertex *vertex showTerm bool } type trace struct { w io.Writer startTime *time.Time localTimeDiff time.Duration vertexes []*vertex byDigest map[digest.Digest]*vertex nextIndex int updates map[digest.Digest]struct{} modeConsole bool groups map[string]*vertexGroup // group id -> group } type vertex struct { *client.Vertex statuses []*status byID map[string]*status indent string index int logs [][]byte logsPartial bool logsOffset int logsBuffer *ring.Ring // stores last logs to print them on error prev *client.Vertex events []string lastBlockTime *time.Time count int statusUpdates map[string]struct{} warnings []client.VertexWarning warningIdx int jobs []*job jobCached bool term *vt100.VT100 termBytes int termCount int // Interval start time in unix nano -> interval. Using a map ensures // that updates for the same interval overwrite their previous updates. intervals map[int64]interval mostRecentStart *time.Time // whether the vertex should be hidden due to being in a progress group // that doesn't have any non-weak members that have started hidden bool } func (v *vertex) update(c int) { if v.count == 0 { now := time.Now() v.lastBlockTime = &now } v.count += c } func (v *vertex) isStarted() bool { return len(v.intervals) > 0 } func (v *vertex) isCompleted() bool { for _, ival := range v.intervals { if ival.stop == nil { return false } } return true } type vertexGroup struct { *vertex subVtxs map[digest.Digest]client.Vertex } func (vg *vertexGroup) refresh() (changed, newlyStarted, newlyRevealed bool) { newVtx := *vg.Vertex newVtx.Cached = true alreadyStarted := vg.isStarted() wasHidden := vg.hidden for _, subVtx := range vg.subVtxs { if subVtx.Started != nil { newInterval := interval{ start: subVtx.Started, stop: subVtx.Completed, } prevInterval := vg.intervals[subVtx.Started.UnixNano()] if !newInterval.isEqual(prevInterval) { changed = true } if !alreadyStarted { newlyStarted = true } vg.intervals[subVtx.Started.UnixNano()] = newInterval if vg.mostRecentStart == nil || subVtx.Started.After(*vg.mostRecentStart) { vg.mostRecentStart = subVtx.Started } if !subVtx.ProgressGroup.Weak { vg.hidden = false } } // Group is considered cached iff all subvtxs are cached newVtx.Cached = newVtx.Cached && subVtx.Cached // Group error is set to the first error found in subvtxs, if any if newVtx.Error == "" { newVtx.Error = subVtx.Error } else { vg.hidden = false } } if vg.Cached != newVtx.Cached { changed = true } if vg.Error != newVtx.Error { changed = true } vg.Vertex = &newVtx if !vg.hidden && wasHidden { changed = true newlyRevealed = true } return changed, newlyStarted, newlyRevealed } type interval struct { start *time.Time stop *time.Time } func (ival interval) duration() time.Duration { if ival.start == nil { return 0 } if ival.stop == nil { return time.Since(*ival.start) } return ival.stop.Sub(*ival.start) } func (ival interval) isEqual(other interval) (isEqual bool) { return equalTimes(ival.start, other.start) && equalTimes(ival.stop, other.stop) } func equalTimes(t1, t2 *time.Time) bool { if t2 == nil { return t1 == nil } if t1 == nil { return false } return t1.Equal(*t2) } // mergeIntervals takes a slice of (start, stop) pairs and returns a slice where // any intervals that overlap in time are combined into a single interval. If an // interval's stop time is nil, it is treated as positive infinity and consumes // any intervals after it. Intervals with nil start times are ignored and not // returned. func mergeIntervals(intervals []interval) []interval { // remove any intervals that have not started var filtered []interval for _, interval := range intervals { if interval.start != nil { filtered = append(filtered, interval) } } intervals = filtered if len(intervals) == 0 { return nil } // sort intervals by start time sort.Slice(intervals, func(i, j int) bool { return intervals[i].start.Before(*intervals[j].start) }) var merged []interval cur := intervals[0] for i := 1; i < len(intervals); i++ { next := intervals[i] if cur.stop == nil { // if cur doesn't stop, all intervals after it will be merged into it merged = append(merged, cur) return merged } if cur.stop.Before(*next.start) { // if cur stops before next starts, no intervals after cur will be // merged into it; cur stands on its own merged = append(merged, cur) cur = next continue } if next.stop == nil { // cur and next partially overlap, but next also never stops, so all // subsequent intervals will be merged with both cur and next merged = append(merged, interval{ start: cur.start, stop: nil, }) return merged } if cur.stop.After(*next.stop) || cur.stop.Equal(*next.stop) { // cur fully subsumes next continue } // cur partially overlaps with next, merge them together into cur cur = interval{ start: cur.start, stop: next.stop, } } // append anything we are left with merged = append(merged, cur) return merged } type status struct { *client.VertexStatus } func newTrace(w io.Writer, modeConsole bool) *trace { return &trace{ byDigest: make(map[digest.Digest]*vertex), updates: make(map[digest.Digest]struct{}), w: w, modeConsole: modeConsole, groups: make(map[string]*vertexGroup), } } func (t *trace) warnings() []client.VertexWarning { var out []client.VertexWarning for _, v := range t.vertexes { out = append(out, v.warnings...) } return out } func (t *trace) triggerVertexEvent(v *client.Vertex) { if v.Started == nil { return } var old client.Vertex vtx := t.byDigest[v.Digest] if v := vtx.prev; v != nil { old = *v } changed := false if v.Digest != old.Digest { changed = true } if v.Name != old.Name { changed = true } if v.Started != old.Started { if v.Started != nil && old.Started == nil || !v.Started.Equal(*old.Started) { changed = true } } if v.Completed != old.Completed && v.Completed != nil { changed = true } if v.Cached != old.Cached { changed = true } if v.Error != old.Error { changed = true } if changed { vtx.update(1) t.updates[v.Digest] = struct{}{} } t.byDigest[v.Digest].prev = v } func (t *trace) update(s *client.SolveStatus, termWidth int) { seenGroups := make(map[string]struct{}) var groups []string for _, v := range s.Vertexes { if t.startTime == nil { t.startTime = v.Started } if v.ProgressGroup != nil { group, ok := t.groups[v.ProgressGroup.Id] if !ok { t.nextIndex++ group = &vertexGroup{ vertex: &vertex{ Vertex: &client.Vertex{ Digest: digest.Digest(v.ProgressGroup.Id), Name: v.ProgressGroup.Name, }, byID: make(map[string]*status), statusUpdates: make(map[string]struct{}), index: t.nextIndex, intervals: make(map[int64]interval), hidden: true, }, subVtxs: make(map[digest.Digest]client.Vertex), } if t.modeConsole { group.term = vt100.NewVT100(termHeight, termWidth-termPad) } t.groups[v.ProgressGroup.Id] = group t.byDigest[group.Digest] = group.vertex } if _, ok := seenGroups[v.ProgressGroup.Id]; !ok { groups = append(groups, v.ProgressGroup.Id) seenGroups[v.ProgressGroup.Id] = struct{}{} } group.subVtxs[v.Digest] = *v t.byDigest[v.Digest] = group.vertex continue } prev, ok := t.byDigest[v.Digest] if !ok { t.nextIndex++ t.byDigest[v.Digest] = &vertex{ byID: make(map[string]*status), statusUpdates: make(map[string]struct{}), index: t.nextIndex, intervals: make(map[int64]interval), } if t.modeConsole { t.byDigest[v.Digest].term = vt100.NewVT100(termHeight, termWidth-termPad) } } t.triggerVertexEvent(v) if v.Started != nil && (prev == nil || !prev.isStarted()) { if t.localTimeDiff == 0 { t.localTimeDiff = time.Since(*v.Started) } t.vertexes = append(t.vertexes, t.byDigest[v.Digest]) } // allow a duplicate initial vertex that shouldn't reset state if !(prev != nil && prev.isStarted() && v.Started == nil) { t.byDigest[v.Digest].Vertex = v } if v.Started != nil { t.byDigest[v.Digest].intervals[v.Started.UnixNano()] = interval{ start: v.Started, stop: v.Completed, } if t.byDigest[v.Digest].mostRecentStart == nil || v.Started.After(*t.byDigest[v.Digest].mostRecentStart) { t.byDigest[v.Digest].mostRecentStart = v.Started } } t.byDigest[v.Digest].jobCached = false } for _, groupID := range groups { group := t.groups[groupID] changed, newlyStarted, newlyRevealed := group.refresh() if newlyStarted { if t.localTimeDiff == 0 { t.localTimeDiff = time.Since(*group.mostRecentStart) } } if group.hidden { continue } if newlyRevealed { t.vertexes = append(t.vertexes, group.vertex) } if changed { group.update(1) t.updates[group.Digest] = struct{}{} } group.jobCached = false } for _, s := range s.Statuses { v, ok := t.byDigest[s.Vertex] if !ok { continue // shouldn't happen } v.jobCached = false prev, ok := v.byID[s.ID] if !ok { v.byID[s.ID] = &status{VertexStatus: s} } if s.Started != nil && (prev == nil || prev.Started == nil) { v.statuses = append(v.statuses, v.byID[s.ID]) } v.byID[s.ID].VertexStatus = s v.statusUpdates[s.ID] = struct{}{} t.updates[v.Digest] = struct{}{} v.update(1) } for _, w := range s.Warnings { v, ok := t.byDigest[w.Vertex] if !ok { continue // shouldn't happen } v.warnings = append(v.warnings, *w) v.update(1) } for _, l := range s.Logs { v, ok := t.byDigest[l.Vertex] if !ok { continue // shouldn't happen } v.jobCached = false if v.term != nil { if v.term.Width != termWidth { v.term.Resize(termHeight, termWidth-termPad) } v.termBytes += len(l.Data) v.term.Write(l.Data) // error unhandled on purpose. don't trust vt100 } i := 0 complete := split(l.Data, byte('\n'), func(dt []byte) { if v.logsPartial && len(v.logs) != 0 && i == 0 { v.logs[len(v.logs)-1] = append(v.logs[len(v.logs)-1], dt...) } else { ts := time.Duration(0) if v.isStarted() { ts = l.Timestamp.Sub(*v.mostRecentStart) } prec := 1 sec := ts.Seconds() if sec < 10 { prec = 3 } else if sec < 100 { prec = 2 } v.logs = append(v.logs, []byte(fmt.Sprintf("#%d %s %s", v.index, fmt.Sprintf("%.[2]*[1]f", sec, prec), dt))) } i++ }) v.logsPartial = !complete t.updates[v.Digest] = struct{}{} v.update(1) } } func (t *trace) printErrorLogs(f io.Writer) { for _, v := range t.vertexes { if v.Error != "" && !strings.HasSuffix(v.Error, context.Canceled.Error()) { fmt.Fprintln(f, "------") fmt.Fprintf(f, " > %s:\n", v.Name) // tty keeps original logs for _, l := range v.logs { f.Write(l) fmt.Fprintln(f) } // printer keeps last logs buffer if v.logsBuffer != nil { for i := 0; i < v.logsBuffer.Len(); i++ { if v.logsBuffer.Value != nil { fmt.Fprintln(f, string(v.logsBuffer.Value.([]byte))) } v.logsBuffer = v.logsBuffer.Next() } } fmt.Fprintln(f, "------") } } } func (t *trace) displayInfo() (d displayInfo) { d.startTime = time.Now() if t.startTime != nil { d.startTime = t.startTime.Add(t.localTimeDiff) } d.countTotal = len(t.byDigest) for _, v := range t.byDigest { if v.ProgressGroup != nil || v.hidden { // don't count vtxs in a group, they are merged into a single vtx d.countTotal-- continue } if v.isCompleted() { d.countCompleted++ } } for _, v := range t.vertexes { if v.jobCached { d.jobs = append(d.jobs, v.jobs...) continue } var jobs []*job j := &job{ name: strings.Replace(v.Name, "\t", " ", -1), vertex: v, isCompleted: true, } for _, ival := range v.intervals { j.intervals = append(j.intervals, interval{ start: addTime(ival.start, t.localTimeDiff), stop: addTime(ival.stop, t.localTimeDiff), }) if ival.stop == nil { j.isCompleted = false } } j.intervals = mergeIntervals(j.intervals) if v.Error != "" { if strings.HasSuffix(v.Error, context.Canceled.Error()) { j.isCanceled = true j.name = "CANCELED " + j.name } else { j.hasError = true j.name = "ERROR " + j.name } } if v.Cached { j.name = "CACHED " + j.name } j.name = v.indent + j.name jobs = append(jobs, j) for _, s := range v.statuses { j := &job{ intervals: []interval{{ start: addTime(s.Started, t.localTimeDiff), stop: addTime(s.Completed, t.localTimeDiff), }}, isCompleted: s.Completed != nil, name: v.indent + "=> " + s.ID, } if s.Total != 0 { j.status = fmt.Sprintf("%.2f / %.2f", units.Bytes(s.Current), units.Bytes(s.Total)) } else if s.Current != 0 { j.status = fmt.Sprintf("%.2f", units.Bytes(s.Current)) } jobs = append(jobs, j) } for _, w := range v.warnings { msg := "WARN: " + string(w.Short) mostRecentStart := v.mostRecentStart var mostRecentStop *time.Time if mostRecentStart != nil { mostRecentStop = v.intervals[mostRecentStart.UnixNano()].stop } j := &job{ intervals: []interval{{ start: addTime(mostRecentStart, t.localTimeDiff), stop: addTime(mostRecentStop, t.localTimeDiff), }}, name: msg, isCanceled: true, } jobs = append(jobs, j) } d.jobs = append(d.jobs, jobs...) v.jobs = jobs v.jobCached = true } return d } func split(dt []byte, sep byte, fn func([]byte)) bool { if len(dt) == 0 { return false } for { if len(dt) == 0 { return true } idx := bytes.IndexByte(dt, sep) if idx == -1 { fn(dt) return false } fn(dt[:idx]) dt = dt[idx+1:] } } func addTime(tm *time.Time, d time.Duration) *time.Time { if tm == nil { return nil } t := (*tm).Add(d) return &t } type display struct { c console.Console phase string lineCount int repeated bool } func (disp *display) getSize() (int, int) { width := 80 height := 10 if disp.c != nil { size, err := disp.c.Size() if err == nil && size.Width > 0 && size.Height > 0 { width = int(size.Width) height = int(size.Height) } } return width, height } func setupTerminals(jobs []*job, height int, all bool) []*job { var candidates []*job numInUse := 0 for _, j := range jobs { if j.vertex != nil && j.vertex.termBytes > 0 && !j.isCompleted { candidates = append(candidates, j) } if !j.isCompleted { numInUse++ } } sort.Slice(candidates, func(i, j int) bool { idxI := candidates[i].vertex.termBytes + candidates[i].vertex.termCount*50 idxJ := candidates[j].vertex.termBytes + candidates[j].vertex.termCount*50 return idxI > idxJ }) numFree := height - 2 - numInUse numToHide := 0 termLimit := termHeight + 3 for i := 0; numFree > termLimit && i < len(candidates); i++ { candidates[i].showTerm = true numToHide += candidates[i].vertex.term.UsedHeight() numFree -= termLimit } if !all { jobs = wrapHeight(jobs, height-2-numToHide) } return jobs } func (disp *display) print(d displayInfo, width, height int, all bool) { // this output is inspired by Buck d.jobs = setupTerminals(d.jobs, height, all) b := aec.EmptyBuilder for i := 0; i <= disp.lineCount; i++ { b = b.Up(1) } if !disp.repeated { b = b.Down(1) } disp.repeated = true fmt.Fprint(disp.c, b.Column(0).ANSI) statusStr := "" if d.countCompleted > 0 && d.countCompleted == d.countTotal && all { statusStr = "FINISHED" } fmt.Fprint(disp.c, aec.Hide) defer fmt.Fprint(disp.c, aec.Show) out := fmt.Sprintf("[+] %s %.1fs (%d/%d) %s", disp.phase, time.Since(d.startTime).Seconds(), d.countCompleted, d.countTotal, statusStr) out = align(out, "", width) fmt.Fprintln(disp.c, out) lineCount := 0 for _, j := range d.jobs { if len(j.intervals) == 0 { continue } var dt float64 for _, ival := range j.intervals { dt += ival.duration().Seconds() } if dt < 0.05 { dt = 0 } pfx := " => " timer := fmt.Sprintf(" %3.1fs\n", dt) status := j.status showStatus := false left := width - len(pfx) - len(timer) - 1 if status != "" { if left+len(status) > 20 { showStatus = true left -= len(status) + 1 } } if left < 12 { // too small screen to show progress continue } name := j.name if len(name) > left { name = name[:left] } out := pfx + name if showStatus { out += " " + status } out = align(out, timer, width) if j.isCompleted { color := colorRun if j.isCanceled { color = colorCancel } else if j.hasError { color = colorError } out = aec.Apply(out, color) } fmt.Fprint(disp.c, out) lineCount++ if j.showTerm { term := j.vertex.term term.Resize(termHeight, width-termPad) for _, l := range term.Content { if !isEmpty(l) { out := aec.Apply(fmt.Sprintf(" => => # %s\n", string(l)), aec.Faint) fmt.Fprint(disp.c, out) lineCount++ } } j.vertex.termCount++ j.showTerm = false } } // override previous content if diff := disp.lineCount - lineCount; diff > 0 { for i := 0; i < diff; i++ { fmt.Fprintln(disp.c, strings.Repeat(" ", width)) } fmt.Fprint(disp.c, aec.EmptyBuilder.Up(uint(diff)).Column(0).ANSI) } disp.lineCount = lineCount } func isEmpty(l []rune) bool { for _, r := range l { if r != ' ' { return false } } return true } func align(l, r string, w int) string { return fmt.Sprintf("%-[2]*[1]s %[3]s", l, w-len(r)-1, r) } func wrapHeight(j []*job, limit int) []*job { if limit < 0 { return nil } var wrapped []*job wrapped = append(wrapped, j...) if len(j) > limit { wrapped = wrapped[len(j)-limit:] // wrap things around if incomplete jobs were cut var invisible []*job for _, j := range j[:len(j)-limit] { if !j.isCompleted { invisible = append(invisible, j) } } if l := len(invisible); l > 0 { rewrapped := make([]*job, 0, len(wrapped)) for _, j := range wrapped { if !j.isCompleted || l <= 0 { rewrapped = append(rewrapped, j) } l-- } freespace := len(wrapped) - len(rewrapped) wrapped = append(invisible[len(invisible)-freespace:], rewrapped...) } } return wrapped }