You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
468 lines
12 KiB
Go
468 lines
12 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package contentserver
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
|
|
api "github.com/containerd/containerd/api/services/content/v1"
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/log"
|
|
ptypes "github.com/gogo/protobuf/types"
|
|
digest "github.com/opencontainers/go-digest"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
type service struct {
|
|
store content.Store
|
|
}
|
|
|
|
var bufPool = sync.Pool{
|
|
New: func() interface{} {
|
|
buffer := make([]byte, 1<<20)
|
|
return &buffer
|
|
},
|
|
}
|
|
|
|
// New returns the content GRPC server
|
|
func New(cs content.Store) api.ContentServer {
|
|
return &service{store: cs}
|
|
}
|
|
|
|
func (s *service) Register(server *grpc.Server) error {
|
|
api.RegisterContentServer(server, s)
|
|
return nil
|
|
}
|
|
|
|
func (s *service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) {
|
|
if err := req.Digest.Validate(); err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
|
|
}
|
|
|
|
bi, err := s.store.Info(ctx, req.Digest)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
|
|
return &api.InfoResponse{
|
|
Info: infoToGRPC(bi),
|
|
}, nil
|
|
}
|
|
|
|
func (s *service) Update(ctx context.Context, req *api.UpdateRequest) (*api.UpdateResponse, error) {
|
|
if err := req.Info.Digest.Validate(); err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%q failed validation", req.Info.Digest)
|
|
}
|
|
|
|
info, err := s.store.Update(ctx, infoFromGRPC(req.Info), req.UpdateMask.GetPaths()...)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
|
|
return &api.UpdateResponse{
|
|
Info: infoToGRPC(info),
|
|
}, nil
|
|
}
|
|
|
|
func (s *service) List(req *api.ListContentRequest, session api.Content_ListServer) error {
|
|
var (
|
|
buffer []api.Info
|
|
sendBlock = func(block []api.Info) error {
|
|
// send last block
|
|
return session.Send(&api.ListContentResponse{
|
|
Info: block,
|
|
})
|
|
}
|
|
)
|
|
|
|
if err := s.store.Walk(session.Context(), func(info content.Info) error {
|
|
buffer = append(buffer, api.Info{
|
|
Digest: info.Digest,
|
|
Size_: info.Size,
|
|
CreatedAt: info.CreatedAt,
|
|
Labels: info.Labels,
|
|
})
|
|
|
|
if len(buffer) >= 100 {
|
|
if err := sendBlock(buffer); err != nil {
|
|
return err
|
|
}
|
|
|
|
buffer = buffer[:0]
|
|
}
|
|
|
|
return nil
|
|
}, req.Filters...); err != nil {
|
|
return errdefs.ToGRPC(err)
|
|
}
|
|
|
|
if len(buffer) > 0 {
|
|
// send last block
|
|
if err := sendBlock(buffer); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*ptypes.Empty, error) {
|
|
log.G(ctx).WithField("digest", req.Digest).Debugf("delete content")
|
|
if err := req.Digest.Validate(); err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
if err := s.store.Delete(ctx, req.Digest); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
|
|
return &ptypes.Empty{}, nil
|
|
}
|
|
|
|
func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServer) error {
|
|
if err := req.Digest.Validate(); err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
|
|
}
|
|
|
|
oi, err := s.store.Info(session.Context(), req.Digest)
|
|
if err != nil {
|
|
return errdefs.ToGRPC(err)
|
|
}
|
|
|
|
ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest})
|
|
if err != nil {
|
|
return errdefs.ToGRPC(err)
|
|
}
|
|
defer ra.Close()
|
|
|
|
var (
|
|
offset = req.Offset
|
|
// size is read size, not the expected size of the blob (oi.Size), which the caller might not be aware of.
|
|
// offset+size can be larger than oi.Size.
|
|
size = req.Size_
|
|
|
|
// TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably
|
|
// little inefficient for work over a fast network. We can tune this later.
|
|
p = bufPool.Get().(*[]byte)
|
|
)
|
|
defer bufPool.Put(p)
|
|
|
|
if offset < 0 {
|
|
offset = 0
|
|
}
|
|
|
|
if offset > oi.Size {
|
|
return status.Errorf(codes.OutOfRange, "read past object length %v bytes", oi.Size)
|
|
}
|
|
|
|
if size <= 0 || offset+size > oi.Size {
|
|
size = oi.Size - offset
|
|
}
|
|
|
|
_, err = io.CopyBuffer(
|
|
&readResponseWriter{session: session},
|
|
io.NewSectionReader(ra, offset, size), *p)
|
|
return errdefs.ToGRPC(err)
|
|
}
|
|
|
|
// readResponseWriter is a writer that places the output into ReadContentRequest messages.
|
|
//
|
|
// This allows io.CopyBuffer to do the heavy lifting of chunking the responses
|
|
// into the buffer size.
|
|
type readResponseWriter struct {
|
|
offset int64
|
|
session api.Content_ReadServer
|
|
}
|
|
|
|
func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
|
|
if err := rw.session.Send(&api.ReadContentResponse{
|
|
Offset: rw.offset,
|
|
Data: p,
|
|
}); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
rw.offset += int64(len(p))
|
|
return len(p), nil
|
|
}
|
|
|
|
func (s *service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) {
|
|
status, err := s.store.Status(ctx, req.Ref)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPCf(err, "could not get status for ref %q", req.Ref)
|
|
}
|
|
|
|
var resp api.StatusResponse
|
|
resp.Status = &api.Status{
|
|
StartedAt: status.StartedAt,
|
|
UpdatedAt: status.UpdatedAt,
|
|
Ref: status.Ref,
|
|
Offset: status.Offset,
|
|
Total: status.Total,
|
|
Expected: status.Expected,
|
|
}
|
|
|
|
return &resp, nil
|
|
}
|
|
|
|
func (s *service) ListStatuses(ctx context.Context, req *api.ListStatusesRequest) (*api.ListStatusesResponse, error) {
|
|
statuses, err := s.store.ListStatuses(ctx, req.Filters...)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
|
|
var resp api.ListStatusesResponse
|
|
for _, status := range statuses {
|
|
resp.Statuses = append(resp.Statuses, api.Status{
|
|
StartedAt: status.StartedAt,
|
|
UpdatedAt: status.UpdatedAt,
|
|
Ref: status.Ref,
|
|
Offset: status.Offset,
|
|
Total: status.Total,
|
|
Expected: status.Expected,
|
|
})
|
|
}
|
|
|
|
return &resp, nil
|
|
}
|
|
|
|
func (s *service) Write(session api.Content_WriteServer) (err error) {
|
|
var (
|
|
ctx = session.Context()
|
|
msg api.WriteContentResponse
|
|
req *api.WriteContentRequest
|
|
ref string
|
|
total int64
|
|
expected digest.Digest
|
|
)
|
|
|
|
defer func(msg *api.WriteContentResponse) {
|
|
// pump through the last message if no error was encountered
|
|
if err != nil {
|
|
if s, ok := status.FromError(err); ok && s.Code() != codes.AlreadyExists {
|
|
// TODO(stevvooe): Really need a log line here to track which
|
|
// errors are actually causing failure on the server side. May want
|
|
// to configure the service with an interceptor to make this work
|
|
// identically across all GRPC methods.
|
|
//
|
|
// This is pretty noisy, so we can remove it but leave it for now.
|
|
log.G(ctx).WithError(err).Error("(*service).Write failed")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
err = session.Send(msg)
|
|
}(&msg)
|
|
|
|
// handle the very first request!
|
|
req, err = session.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ref = req.Ref
|
|
|
|
if ref == "" {
|
|
return status.Errorf(codes.InvalidArgument, "first message must have a reference")
|
|
}
|
|
|
|
fields := logrus.Fields{
|
|
"ref": ref,
|
|
}
|
|
total = req.Total
|
|
expected = req.Expected
|
|
if total > 0 {
|
|
fields["total"] = total
|
|
}
|
|
|
|
if expected != "" {
|
|
fields["expected"] = expected
|
|
}
|
|
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields))
|
|
|
|
log.G(ctx).Debug("(*service).Write started")
|
|
// this action locks the writer for the session.
|
|
wr, err := s.store.Writer(ctx,
|
|
content.WithRef(ref),
|
|
content.WithDescriptor(ocispec.Descriptor{Size: total, Digest: expected}))
|
|
if err != nil {
|
|
return errdefs.ToGRPC(err)
|
|
}
|
|
defer wr.Close()
|
|
|
|
for {
|
|
msg.Action = req.Action
|
|
ws, err := wr.Status()
|
|
if err != nil {
|
|
return errdefs.ToGRPC(err)
|
|
}
|
|
|
|
msg.Offset = ws.Offset // always set the offset.
|
|
|
|
// NOTE(stevvooe): In general, there are two cases underwhich a remote
|
|
// writer is used.
|
|
//
|
|
// For pull, we almost always have this before fetching large content,
|
|
// through descriptors. We allow predeclaration of the expected size
|
|
// and digest.
|
|
//
|
|
// For push, it is more complex. If we want to cut through content into
|
|
// storage, we may have no expectation until we are done processing the
|
|
// content. The case here is the following:
|
|
//
|
|
// 1. Start writing content.
|
|
// 2. Compress inline.
|
|
// 3. Validate digest and size (maybe).
|
|
//
|
|
// Supporting these two paths is quite awkward but it lets both API
|
|
// users use the same writer style for each with a minimum of overhead.
|
|
if req.Expected != "" {
|
|
if expected != "" && expected != req.Expected {
|
|
log.G(ctx).Debugf("commit digest differs from writer digest: %v != %v", req.Expected, expected)
|
|
}
|
|
expected = req.Expected
|
|
|
|
if _, err := s.store.Info(session.Context(), req.Expected); err == nil {
|
|
if err := wr.Close(); err != nil {
|
|
log.G(ctx).WithError(err).Error("failed to close writer")
|
|
}
|
|
if err := s.store.Abort(session.Context(), ref); err != nil {
|
|
log.G(ctx).WithError(err).Error("failed to abort write")
|
|
}
|
|
|
|
return status.Errorf(codes.AlreadyExists, "blob with expected digest %v exists", req.Expected)
|
|
}
|
|
}
|
|
|
|
if req.Total > 0 {
|
|
// Update the expected total. Typically, this could be seen at
|
|
// negotiation time or on a commit message.
|
|
if total > 0 && req.Total != total {
|
|
log.G(ctx).Debugf("commit size differs from writer size: %v != %v", req.Total, total)
|
|
}
|
|
total = req.Total
|
|
}
|
|
|
|
switch req.Action {
|
|
case api.WriteActionStat:
|
|
msg.Digest = wr.Digest()
|
|
msg.StartedAt = ws.StartedAt
|
|
msg.UpdatedAt = ws.UpdatedAt
|
|
msg.Total = total
|
|
case api.WriteActionWrite, api.WriteActionCommit:
|
|
if req.Offset > 0 {
|
|
// validate the offset if provided
|
|
if req.Offset != ws.Offset {
|
|
return status.Errorf(codes.OutOfRange, "write @%v must occur at current offset %v", req.Offset, ws.Offset)
|
|
}
|
|
}
|
|
|
|
if req.Offset == 0 && ws.Offset > 0 {
|
|
if err := wr.Truncate(req.Offset); err != nil {
|
|
return fmt.Errorf("truncate failed: %w", err)
|
|
}
|
|
msg.Offset = req.Offset
|
|
}
|
|
|
|
// issue the write if we actually have data.
|
|
if len(req.Data) > 0 {
|
|
// While this looks like we could use io.WriterAt here, because we
|
|
// maintain the offset as append only, we just issue the write.
|
|
n, err := wr.Write(req.Data)
|
|
if err != nil {
|
|
return errdefs.ToGRPC(err)
|
|
}
|
|
|
|
if n != len(req.Data) {
|
|
// TODO(stevvooe): Perhaps, we can recover this by including it
|
|
// in the offset on the write return.
|
|
return status.Errorf(codes.DataLoss, "wrote %v of %v bytes", n, len(req.Data))
|
|
}
|
|
|
|
msg.Offset += int64(n)
|
|
}
|
|
|
|
if req.Action == api.WriteActionCommit {
|
|
var opts []content.Opt
|
|
if req.Labels != nil {
|
|
opts = append(opts, content.WithLabels(req.Labels))
|
|
}
|
|
if err := wr.Commit(ctx, total, expected, opts...); err != nil {
|
|
return errdefs.ToGRPC(err)
|
|
}
|
|
}
|
|
|
|
msg.Digest = wr.Digest()
|
|
}
|
|
|
|
if err := session.Send(&msg); err != nil {
|
|
return err
|
|
}
|
|
|
|
if req.Action == api.WriteActionCommit {
|
|
return nil
|
|
}
|
|
|
|
req, err = session.Recv()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *service) Abort(ctx context.Context, req *api.AbortRequest) (*ptypes.Empty, error) {
|
|
if err := s.store.Abort(ctx, req.Ref); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
|
|
return &ptypes.Empty{}, nil
|
|
}
|
|
|
|
func infoToGRPC(info content.Info) api.Info {
|
|
return api.Info{
|
|
Digest: info.Digest,
|
|
Size_: info.Size,
|
|
CreatedAt: info.CreatedAt,
|
|
UpdatedAt: info.UpdatedAt,
|
|
Labels: info.Labels,
|
|
}
|
|
}
|
|
|
|
func infoFromGRPC(info api.Info) content.Info {
|
|
return content.Info{
|
|
Digest: info.Digest,
|
|
Size: info.Size_,
|
|
CreatedAt: info.CreatedAt,
|
|
UpdatedAt: info.UpdatedAt,
|
|
Labels: info.Labels,
|
|
}
|
|
}
|