You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
buildx/vendor/github.com/moby/buildkit/util/contentutil/buffer.go

211 lines
4.9 KiB
Go

package contentutil
import (
"bytes"
"context"
"io"
"strings"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
// Buffer is a content provider and ingester that keeps data in memory
type Buffer interface {
content.Provider
content.Ingester
content.Manager
}
// NewBuffer returns a new buffer
func NewBuffer() Buffer {
return &buffer{
buffers: map[digest.Digest][]byte{},
infos: map[digest.Digest]content.Info{},
refs: map[string]struct{}{},
}
}
type buffer struct {
mu sync.Mutex
buffers map[digest.Digest][]byte
infos map[digest.Digest]content.Info
refs map[string]struct{}
}
func (b *buffer) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
b.mu.Lock()
v, ok := b.infos[dgst]
b.mu.Unlock()
if !ok {
return content.Info{}, errdefs.ErrNotFound
}
return v, nil
}
func (b *buffer) Update(ctx context.Context, new content.Info, fieldpaths ...string) (content.Info, error) {
b.mu.Lock()
defer b.mu.Unlock()
updated, ok := b.infos[new.Digest]
if !ok {
return content.Info{}, errdefs.ErrNotFound
}
if len(fieldpaths) == 0 {
fieldpaths = []string{"labels"}
}
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if updated.Labels == nil {
updated.Labels = map[string]string{}
}
key := strings.TrimPrefix(path, "labels.")
updated.Labels[key] = new.Labels[key]
continue
}
if path == "labels" {
updated.Labels = new.Labels
}
}
b.infos[new.Digest] = updated
return updated, nil
}
func (b *buffer) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
return nil // not implemented
}
func (b *buffer) Delete(ctx context.Context, dgst digest.Digest) error {
return nil // not implemented
}
func (b *buffer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
b.mu.Lock()
if _, ok := b.refs[wOpts.Ref]; ok {
return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %s locked", wOpts.Ref)
}
b.mu.Unlock()
return &bufferedWriter{
main: b,
digester: digest.Canonical.Digester(),
buffer: bytes.NewBuffer(nil),
expected: wOpts.Desc.Digest,
releaseRef: func() {
b.mu.Lock()
delete(b.refs, wOpts.Ref)
b.mu.Unlock()
},
}, nil
}
func (b *buffer) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
r, err := b.getBytesReader(ctx, desc.Digest)
if err != nil {
return nil, err
}
return &readerAt{Reader: r, Closer: io.NopCloser(r), size: int64(r.Len())}, nil
}
func (b *buffer) getBytesReader(ctx context.Context, dgst digest.Digest) (*bytes.Reader, error) {
b.mu.Lock()
defer b.mu.Unlock()
if dt, ok := b.buffers[dgst]; ok {
return bytes.NewReader(dt), nil
}
return nil, errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
}
func (b *buffer) addValue(k digest.Digest, dt []byte) {
b.mu.Lock()
defer b.mu.Unlock()
b.buffers[k] = dt
b.infos[k] = content.Info{Digest: k, Size: int64(len(dt))}
}
type bufferedWriter struct {
main *buffer
ref string
offset int64
total int64
startedAt time.Time
updatedAt time.Time
buffer *bytes.Buffer
expected digest.Digest
digester digest.Digester
releaseRef func()
}
func (w *bufferedWriter) Write(p []byte) (n int, err error) {
n, err = w.buffer.Write(p)
w.digester.Hash().Write(p[:n])
w.offset += int64(len(p))
w.updatedAt = time.Now()
return n, err
}
func (w *bufferedWriter) Close() error {
if w.buffer != nil {
w.releaseRef()
w.buffer = nil
}
return nil
}
func (w *bufferedWriter) Status() (content.Status, error) {
return content.Status{
Ref: w.ref,
Offset: w.offset,
Total: w.total,
StartedAt: w.startedAt,
UpdatedAt: w.updatedAt,
}, nil
}
func (w *bufferedWriter) Digest() digest.Digest {
return w.digester.Digest()
}
func (w *bufferedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opt ...content.Opt) error {
if w.buffer == nil {
return errors.Errorf("can't commit already committed or closed")
}
if s := int64(w.buffer.Len()); size > 0 && size != s {
return errors.Errorf("unexpected commit size %d, expected %d", s, size)
}
dgst := w.digester.Digest()
if expected != "" && expected != dgst {
return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
}
if w.expected != "" && w.expected != dgst {
return errors.Errorf("unexpected digest: %v != %v", dgst, w.expected)
}
w.main.addValue(dgst, w.buffer.Bytes())
return w.Close()
}
func (w *bufferedWriter) Truncate(size int64) error {
if size != 0 {
return errors.New("Truncate: unsupported size")
}
w.offset = 0
w.digester.Hash().Reset()
w.buffer.Reset()
return nil
}