package contentutil import ( "bytes" "context" "io" "sync" "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) // Buffer is a content provider and ingester that keeps data in memory type Buffer interface { content.Provider content.Ingester } // NewBuffer returns a new buffer func NewBuffer() Buffer { return &buffer{ buffers: map[digest.Digest][]byte{}, refs: map[string]struct{}{}, } } type buffer struct { mu sync.Mutex buffers map[digest.Digest][]byte refs map[string]struct{} } func (b *buffer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { var wOpts content.WriterOpts for _, opt := range opts { if err := opt(&wOpts); err != nil { return nil, err } } b.mu.Lock() if _, ok := b.refs[wOpts.Ref]; ok { return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %s locked", wOpts.Ref) } b.mu.Unlock() return &bufferedWriter{ main: b, digester: digest.Canonical.Digester(), buffer: bytes.NewBuffer(nil), expected: wOpts.Desc.Digest, releaseRef: func() { b.mu.Lock() delete(b.refs, wOpts.Ref) b.mu.Unlock() }, }, nil } func (b *buffer) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) { r, err := b.getBytesReader(ctx, desc.Digest) if err != nil { return nil, err } return &readerAt{Reader: r, Closer: io.NopCloser(r), size: int64(r.Len())}, nil } func (b *buffer) getBytesReader(ctx context.Context, dgst digest.Digest) (*bytes.Reader, error) { b.mu.Lock() defer b.mu.Unlock() if dt, ok := b.buffers[dgst]; ok { return bytes.NewReader(dt), nil } return nil, errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst) } func (b *buffer) addValue(k digest.Digest, dt []byte) { b.mu.Lock() defer b.mu.Unlock() b.buffers[k] = dt } type bufferedWriter struct { main *buffer ref string offset int64 total int64 startedAt time.Time updatedAt time.Time buffer *bytes.Buffer expected digest.Digest digester digest.Digester releaseRef func() } func (w *bufferedWriter) Write(p []byte) (n int, err error) { n, err = w.buffer.Write(p) w.digester.Hash().Write(p[:n]) w.offset += int64(len(p)) w.updatedAt = time.Now() return n, err } func (w *bufferedWriter) Close() error { if w.buffer != nil { w.releaseRef() w.buffer = nil } return nil } func (w *bufferedWriter) Status() (content.Status, error) { return content.Status{ Ref: w.ref, Offset: w.offset, Total: w.total, StartedAt: w.startedAt, UpdatedAt: w.updatedAt, }, nil } func (w *bufferedWriter) Digest() digest.Digest { return w.digester.Digest() } func (w *bufferedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opt ...content.Opt) error { if w.buffer == nil { return errors.Errorf("can't commit already committed or closed") } if s := int64(w.buffer.Len()); size > 0 && size != s { return errors.Errorf("unexpected commit size %d, expected %d", s, size) } dgst := w.digester.Digest() if expected != "" && expected != dgst { return errors.Errorf("unexpected digest: %v != %v", dgst, expected) } if w.expected != "" && w.expected != dgst { return errors.Errorf("unexpected digest: %v != %v", dgst, w.expected) } w.main.addValue(dgst, w.buffer.Bytes()) return w.Close() } func (w *bufferedWriter) Truncate(size int64) error { if size != 0 { return errors.New("Truncate: unsupported size") } w.offset = 0 w.digester.Hash().Reset() w.buffer.Reset() return nil }