You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
216 lines
4.6 KiB
Go
216 lines
4.6 KiB
Go
6 years ago
|
package containerizedengine
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/containerd/containerd/content"
|
||
|
"github.com/containerd/containerd/errdefs"
|
||
|
"github.com/containerd/containerd/remotes"
|
||
|
"github.com/docker/docker/pkg/jsonmessage"
|
||
|
digest "github.com/opencontainers/go-digest"
|
||
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||
|
"github.com/sirupsen/logrus"
|
||
|
)
|
||
|
|
||
|
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, out io.WriteCloser) {
|
||
|
var (
|
||
|
ticker = time.NewTicker(100 * time.Millisecond)
|
||
|
start = time.Now()
|
||
|
enc = json.NewEncoder(out)
|
||
|
statuses = map[string]statusInfo{}
|
||
|
done bool
|
||
|
)
|
||
|
defer ticker.Stop()
|
||
|
|
||
|
outer:
|
||
|
for {
|
||
|
select {
|
||
|
case <-ticker.C:
|
||
|
|
||
|
resolved := "resolved"
|
||
|
if !ongoing.isResolved() {
|
||
|
resolved = "resolving"
|
||
|
}
|
||
|
statuses[ongoing.name] = statusInfo{
|
||
|
Ref: ongoing.name,
|
||
|
Status: resolved,
|
||
|
}
|
||
|
keys := []string{ongoing.name}
|
||
|
|
||
|
activeSeen := map[string]struct{}{}
|
||
|
if !done {
|
||
|
active, err := cs.ListStatuses(ctx, "")
|
||
|
if err != nil {
|
||
|
logrus.Debugf("active check failed: %s", err)
|
||
|
continue
|
||
|
}
|
||
|
// update status of active entries!
|
||
|
for _, active := range active {
|
||
|
statuses[active.Ref] = statusInfo{
|
||
|
Ref: active.Ref,
|
||
|
Status: "downloading",
|
||
|
Offset: active.Offset,
|
||
|
Total: active.Total,
|
||
|
StartedAt: active.StartedAt,
|
||
|
UpdatedAt: active.UpdatedAt,
|
||
|
}
|
||
|
activeSeen[active.Ref] = struct{}{}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
err := updateNonActive(ctx, ongoing, cs, statuses, &keys, activeSeen, &done, start)
|
||
|
if err != nil {
|
||
|
continue outer
|
||
|
}
|
||
|
|
||
|
var ordered []statusInfo
|
||
|
for _, key := range keys {
|
||
|
ordered = append(ordered, statuses[key])
|
||
|
}
|
||
|
|
||
|
for _, si := range ordered {
|
||
|
jm := si.JSONMessage()
|
||
|
err := enc.Encode(jm)
|
||
|
if err != nil {
|
||
|
logrus.Debugf("failed to encode progress message: %s", err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if done {
|
||
|
out.Close()
|
||
|
return
|
||
|
}
|
||
|
case <-ctx.Done():
|
||
|
done = true // allow ui to update once more
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func updateNonActive(ctx context.Context, ongoing *jobs, cs content.Store, statuses map[string]statusInfo, keys *[]string, activeSeen map[string]struct{}, done *bool, start time.Time) error {
|
||
|
|
||
|
for _, j := range ongoing.jobs() {
|
||
|
key := remotes.MakeRefKey(ctx, j)
|
||
|
*keys = append(*keys, key)
|
||
|
if _, ok := activeSeen[key]; ok {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
status, ok := statuses[key]
|
||
|
if !*done && (!ok || status.Status == "downloading") {
|
||
|
info, err := cs.Info(ctx, j.Digest)
|
||
|
if err != nil {
|
||
|
if !errdefs.IsNotFound(err) {
|
||
|
logrus.Debugf("failed to get content info: %s", err)
|
||
|
return err
|
||
|
}
|
||
|
statuses[key] = statusInfo{
|
||
|
Ref: key,
|
||
|
Status: "waiting",
|
||
|
}
|
||
|
} else if info.CreatedAt.After(start) {
|
||
|
statuses[key] = statusInfo{
|
||
|
Ref: key,
|
||
|
Status: "done",
|
||
|
Offset: info.Size,
|
||
|
Total: info.Size,
|
||
|
UpdatedAt: info.CreatedAt,
|
||
|
}
|
||
|
} else {
|
||
|
statuses[key] = statusInfo{
|
||
|
Ref: key,
|
||
|
Status: "exists",
|
||
|
}
|
||
|
}
|
||
|
} else if *done {
|
||
|
if ok {
|
||
|
if status.Status != "done" && status.Status != "exists" {
|
||
|
status.Status = "done"
|
||
|
statuses[key] = status
|
||
|
}
|
||
|
} else {
|
||
|
statuses[key] = statusInfo{
|
||
|
Ref: key,
|
||
|
Status: "done",
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type jobs struct {
|
||
|
name string
|
||
|
added map[digest.Digest]struct{}
|
||
|
descs []ocispec.Descriptor
|
||
|
mu sync.Mutex
|
||
|
resolved bool
|
||
|
}
|
||
|
|
||
|
func newJobs(name string) *jobs {
|
||
|
return &jobs{
|
||
|
name: name,
|
||
|
added: map[digest.Digest]struct{}{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (j *jobs) add(desc ocispec.Descriptor) {
|
||
|
j.mu.Lock()
|
||
|
defer j.mu.Unlock()
|
||
|
j.resolved = true
|
||
|
|
||
|
if _, ok := j.added[desc.Digest]; ok {
|
||
|
return
|
||
|
}
|
||
|
j.descs = append(j.descs, desc)
|
||
|
j.added[desc.Digest] = struct{}{}
|
||
|
}
|
||
|
|
||
|
func (j *jobs) jobs() []ocispec.Descriptor {
|
||
|
j.mu.Lock()
|
||
|
defer j.mu.Unlock()
|
||
|
|
||
|
var descs []ocispec.Descriptor
|
||
|
return append(descs, j.descs...)
|
||
|
}
|
||
|
|
||
|
func (j *jobs) isResolved() bool {
|
||
|
j.mu.Lock()
|
||
|
defer j.mu.Unlock()
|
||
|
return j.resolved
|
||
|
}
|
||
|
|
||
|
// statusInfo holds the status info for an upload or download
|
||
|
type statusInfo struct {
|
||
|
Ref string
|
||
|
Status string
|
||
|
Offset int64
|
||
|
Total int64
|
||
|
StartedAt time.Time
|
||
|
UpdatedAt time.Time
|
||
|
}
|
||
|
|
||
|
func (s statusInfo) JSONMessage() jsonmessage.JSONMessage {
|
||
|
// Shorten the ID to use up less width on the display
|
||
|
id := s.Ref
|
||
|
if strings.Contains(id, ":") {
|
||
|
split := strings.SplitN(id, ":", 2)
|
||
|
id = split[1]
|
||
|
}
|
||
|
id = fmt.Sprintf("%.12s", id)
|
||
|
|
||
|
return jsonmessage.JSONMessage{
|
||
|
ID: id,
|
||
|
Status: s.Status,
|
||
|
Progress: &jsonmessage.JSONProgress{
|
||
|
Current: s.Offset,
|
||
|
Total: s.Total,
|
||
|
},
|
||
|
}
|
||
|
}
|