package client import ( "context" "encoding/json" "io" "os" "path/filepath" "strings" "time" "github.com/containerd/containerd/content" contentlocal "github.com/containerd/containerd/content/local" controlapi "github.com/moby/buildkit/api/services/control" "github.com/moby/buildkit/client/llb" "github.com/moby/buildkit/client/ociindex" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" sessioncontent "github.com/moby/buildkit/session/content" "github.com/moby/buildkit/session/filesync" "github.com/moby/buildkit/session/grpchijack" "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/entitlements" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/tonistiigi/fsutil" fstypes "github.com/tonistiigi/fsutil/types" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" ) type SolveOpt struct { Exports []ExportEntry LocalDirs map[string]string OCIStores map[string]content.Store SharedKey string Frontend string FrontendAttrs map[string]string FrontendInputs map[string]llb.State CacheExports []CacheOptionsEntry CacheImports []CacheOptionsEntry Session []session.Attachable AllowedEntitlements []entitlements.Entitlement SharedSession *session.Session // TODO: refactor to better session syncing SessionPreInitialized bool // TODO: refactor to better session syncing } type ExportEntry struct { Type string Attrs map[string]string Output func(map[string]string) (io.WriteCloser, error) // for ExporterOCI and ExporterDocker OutputDir string // for ExporterLocal } type CacheOptionsEntry struct { Type string Attrs map[string]string } // Solve calls Solve on the controller. // def must be nil if (and only if) opt.Frontend is set. func (c *Client) Solve(ctx context.Context, def *llb.Definition, opt SolveOpt, statusChan chan *SolveStatus) (*SolveResponse, error) { defer func() { if statusChan != nil { close(statusChan) } }() if opt.Frontend == "" && def == nil { return nil, errors.New("invalid empty definition") } if opt.Frontend != "" && def != nil { return nil, errors.Errorf("invalid definition for frontend %s", opt.Frontend) } return c.solve(ctx, def, nil, opt, statusChan) } type runGatewayCB func(ref string, s *session.Session, opts map[string]string) error func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runGatewayCB, opt SolveOpt, statusChan chan *SolveStatus) (*SolveResponse, error) { if def != nil && runGateway != nil { return nil, errors.New("invalid with def and cb") } syncedDirs, err := prepareSyncedDirs(def, opt.LocalDirs) if err != nil { return nil, err } ref := identity.NewID() eg, ctx := errgroup.WithContext(ctx) statusContext, cancelStatus := context.WithCancel(context.Background()) defer cancelStatus() if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { statusContext = trace.ContextWithSpan(statusContext, span) } s := opt.SharedSession if s == nil { if opt.SessionPreInitialized { return nil, errors.Errorf("no session provided for preinitialized option") } s, err = session.NewSession(statusContext, defaultSessionName(), opt.SharedKey) if err != nil { return nil, errors.Wrap(err, "failed to create session") } } cacheOpt, err := parseCacheOptions(ctx, runGateway != nil, opt) if err != nil { return nil, err } var ex ExportEntry if len(opt.Exports) > 1 { return nil, errors.New("currently only single Exports can be specified") } if len(opt.Exports) == 1 { ex = opt.Exports[0] } if !opt.SessionPreInitialized { if len(syncedDirs) > 0 { s.Allow(filesync.NewFSSyncProvider(syncedDirs)) } for _, a := range opt.Session { s.Allow(a) } switch ex.Type { case ExporterLocal: if ex.Output != nil { return nil, errors.New("output file writer is not supported by local exporter") } if ex.OutputDir == "" { return nil, errors.New("output directory is required for local exporter") } s.Allow(filesync.NewFSSyncTargetDir(ex.OutputDir)) case ExporterOCI, ExporterDocker, ExporterTar: if ex.OutputDir != "" { return nil, errors.Errorf("output directory %s is not supported by %s exporter", ex.OutputDir, ex.Type) } if ex.Output == nil { return nil, errors.Errorf("output file writer is required for %s exporter", ex.Type) } s.Allow(filesync.NewFSSyncTarget(ex.Output)) default: if ex.Output != nil { return nil, errors.Errorf("output file writer is not supported by %s exporter", ex.Type) } if ex.OutputDir != "" { return nil, errors.Errorf("output directory %s is not supported by %s exporter", ex.OutputDir, ex.Type) } } // this is a new map that contains both cacheOpt stores and OCILayout stores contentStores := make(map[string]content.Store, len(cacheOpt.contentStores)+len(opt.OCIStores)) // copy over the stores references from cacheOpt for key, store := range cacheOpt.contentStores { contentStores[key] = store } // copy over the stores references from ociLayout opts for key, store := range opt.OCIStores { // conflicts are not allowed if _, ok := contentStores[key]; ok { // we probably should check if the store is identical, but given that // https://pkg.go.dev/github.com/containerd/containerd/content#Store // is just an interface, composing 4 others, that is rather hard to do. // For a future iteration. return nil, errors.Errorf("contentStore key %s exists in both cache and OCI layouts", key) } contentStores[key] = store } if len(contentStores) > 0 { s.Allow(sessioncontent.NewAttachable(contentStores)) } eg.Go(func() error { sd := c.sessionDialer if sd == nil { sd = grpchijack.Dialer(c.controlClient()) } return s.Run(statusContext, sd) }) } for k, v := range cacheOpt.frontendAttrs { if opt.FrontendAttrs == nil { opt.FrontendAttrs = map[string]string{} } opt.FrontendAttrs[k] = v } solveCtx, cancelSolve := context.WithCancel(ctx) var res *SolveResponse eg.Go(func() error { ctx := solveCtx defer cancelSolve() defer func() { // make sure the Status ends cleanly on build errors go func() { <-time.After(3 * time.Second) cancelStatus() }() bklog.G(ctx).Debugf("stopping session") s.Close() }() var pbd *pb.Definition if def != nil { pbd = def.ToPB() } frontendInputs := make(map[string]*pb.Definition) for key, st := range opt.FrontendInputs { def, err := st.Marshal(ctx) if err != nil { return err } frontendInputs[key] = def.ToPB() } resp, err := c.controlClient().Solve(ctx, &controlapi.SolveRequest{ Ref: ref, Definition: pbd, Exporter: ex.Type, ExporterAttrs: ex.Attrs, Session: s.ID(), Frontend: opt.Frontend, FrontendAttrs: opt.FrontendAttrs, FrontendInputs: frontendInputs, Cache: cacheOpt.options, Entitlements: opt.AllowedEntitlements, }) if err != nil { return errors.Wrap(err, "failed to solve") } res = &SolveResponse{ ExporterResponse: resp.ExporterResponse, } return nil }) if runGateway != nil { eg.Go(func() error { err := runGateway(ref, s, opt.FrontendAttrs) if err == nil { return nil } // If the callback failed then the main // `Solve` (called above) should error as // well. However as a fallback we wait up to // 5s for that to happen before failing this // goroutine. select { case <-solveCtx.Done(): case <-time.After(5 * time.Second): cancelSolve() } return err }) } eg.Go(func() error { stream, err := c.controlClient().Status(statusContext, &controlapi.StatusRequest{ Ref: ref, }) if err != nil { return errors.Wrap(err, "failed to get status") } for { resp, err := stream.Recv() if err != nil { if err == io.EOF { return nil } return errors.Wrap(err, "failed to receive status") } s := SolveStatus{} for _, v := range resp.Vertexes { s.Vertexes = append(s.Vertexes, &Vertex{ Digest: v.Digest, Inputs: v.Inputs, Name: v.Name, Started: v.Started, Completed: v.Completed, Error: v.Error, Cached: v.Cached, ProgressGroup: v.ProgressGroup, }) } for _, v := range resp.Statuses { s.Statuses = append(s.Statuses, &VertexStatus{ ID: v.ID, Vertex: v.Vertex, Name: v.Name, Total: v.Total, Current: v.Current, Timestamp: v.Timestamp, Started: v.Started, Completed: v.Completed, }) } for _, v := range resp.Logs { s.Logs = append(s.Logs, &VertexLog{ Vertex: v.Vertex, Stream: int(v.Stream), Data: v.Msg, Timestamp: v.Timestamp, }) } for _, v := range resp.Warnings { s.Warnings = append(s.Warnings, &VertexWarning{ Vertex: v.Vertex, Level: int(v.Level), Short: v.Short, Detail: v.Detail, URL: v.Url, SourceInfo: v.Info, Range: v.Ranges, }) } if statusChan != nil { statusChan <- &s } } }) if err := eg.Wait(); err != nil { return nil, err } // Update index.json of exported cache content store // FIXME(AkihiroSuda): dedupe const definition of cache/remotecache.ExporterResponseManifestDesc = "cache.manifest" if manifestDescJSON := res.ExporterResponse["cache.manifest"]; manifestDescJSON != "" { var manifestDesc ocispecs.Descriptor if err = json.Unmarshal([]byte(manifestDescJSON), &manifestDesc); err != nil { return nil, err } for indexJSONPath, tag := range cacheOpt.indicesToUpdate { if err = ociindex.PutDescToIndexJSONFileLocked(indexJSONPath, manifestDesc, tag); err != nil { return nil, err } } } return res, nil } func prepareSyncedDirs(def *llb.Definition, localDirs map[string]string) ([]filesync.SyncedDir, error) { for _, d := range localDirs { fi, err := os.Stat(d) if err != nil { return nil, errors.Wrapf(err, "could not find %s", d) } if !fi.IsDir() { return nil, errors.Errorf("%s not a directory", d) } } resetUIDAndGID := func(p string, st *fstypes.Stat) fsutil.MapResult { st.Uid = 0 st.Gid = 0 return fsutil.MapResultKeep } dirs := make([]filesync.SyncedDir, 0, len(localDirs)) if def == nil { for name, d := range localDirs { dirs = append(dirs, filesync.SyncedDir{Name: name, Dir: d, Map: resetUIDAndGID}) } } else { for _, dt := range def.Def { var op pb.Op if err := (&op).Unmarshal(dt); err != nil { return nil, errors.Wrap(err, "failed to parse llb proto op") } if src := op.GetSource(); src != nil { if strings.HasPrefix(src.Identifier, "local://") { name := strings.TrimPrefix(src.Identifier, "local://") d, ok := localDirs[name] if !ok { return nil, errors.Errorf("local directory %s not enabled", name) } dirs = append(dirs, filesync.SyncedDir{Name: name, Dir: d, Map: resetUIDAndGID}) } } } } return dirs, nil } func defaultSessionName() string { wd, err := os.Getwd() if err != nil { return "unknown" } return filepath.Base(wd) } type cacheOptions struct { options controlapi.CacheOptions contentStores map[string]content.Store // key: ID of content store ("local:" + csDir) indicesToUpdate map[string]string // key: index.JSON file name, value: tag frontendAttrs map[string]string } func parseCacheOptions(ctx context.Context, isGateway bool, opt SolveOpt) (*cacheOptions, error) { var ( cacheExports []*controlapi.CacheOptionsEntry cacheImports []*controlapi.CacheOptionsEntry // legacy API is used for registry caches, because the daemon might not support the new API legacyExportRef string legacyImportRefs []string ) contentStores := make(map[string]content.Store) indicesToUpdate := make(map[string]string) // key: index.JSON file name, value: tag frontendAttrs := make(map[string]string) legacyExportAttrs := make(map[string]string) for _, ex := range opt.CacheExports { if ex.Type == "local" { csDir := ex.Attrs["dest"] if csDir == "" { return nil, errors.New("local cache exporter requires dest") } if err := os.MkdirAll(csDir, 0755); err != nil { return nil, err } cs, err := contentlocal.NewStore(csDir) if err != nil { return nil, err } contentStores["local:"+csDir] = cs // TODO(AkihiroSuda): support custom index JSON path and tag indexJSONPath := filepath.Join(csDir, "index.json") indicesToUpdate[indexJSONPath] = "latest" } if ex.Type == "registry" && legacyExportRef == "" { legacyExportRef = ex.Attrs["ref"] for k, v := range ex.Attrs { if k != "ref" { legacyExportAttrs[k] = v } } } else { cacheExports = append(cacheExports, &controlapi.CacheOptionsEntry{ Type: ex.Type, Attrs: ex.Attrs, }) } } for _, im := range opt.CacheImports { attrs := im.Attrs if im.Type == "local" { csDir := im.Attrs["src"] if csDir == "" { return nil, errors.New("local cache importer requires src") } cs, err := contentlocal.NewStore(csDir) if err != nil { bklog.G(ctx).Warning("local cache import at " + csDir + " not found due to err: " + err.Error()) continue } // if digest is not specified, load from "latest" tag if attrs["digest"] == "" { idx, err := ociindex.ReadIndexJSONFileLocked(filepath.Join(csDir, "index.json")) if err != nil { bklog.G(ctx).Warning("local cache import at " + csDir + " not found due to err: " + err.Error()) continue } for _, m := range idx.Manifests { if (m.Annotations[ocispecs.AnnotationRefName] == "latest" && attrs["tag"] == "") || (attrs["tag"] != "" && m.Annotations[ocispecs.AnnotationRefName] == attrs["tag"]) { attrs["digest"] = string(m.Digest) break } } if attrs["digest"] == "" { return nil, errors.New("local cache importer requires either explicit digest, \"latest\" tag or custom tag on index.json") } } contentStores["local:"+csDir] = cs } if im.Type == "registry" { legacyImportRef := attrs["ref"] legacyImportRefs = append(legacyImportRefs, legacyImportRef) } else { cacheImports = append(cacheImports, &controlapi.CacheOptionsEntry{ Type: im.Type, Attrs: attrs, }) } } if opt.Frontend != "" || isGateway { // use legacy API for registry importers, because the frontend might not support the new API if len(legacyImportRefs) > 0 { frontendAttrs["cache-from"] = strings.Join(legacyImportRefs, ",") } // use new API for other importers if len(cacheImports) > 0 { s, err := json.Marshal(cacheImports) if err != nil { return nil, err } frontendAttrs["cache-imports"] = string(s) } } res := cacheOptions{ options: controlapi.CacheOptions{ // old API (for registry caches, planned to be removed in early 2019) ExportRefDeprecated: legacyExportRef, ExportAttrsDeprecated: legacyExportAttrs, ImportRefsDeprecated: legacyImportRefs, // new API Exports: cacheExports, Imports: cacheImports, }, contentStores: contentStores, indicesToUpdate: indicesToUpdate, frontendAttrs: frontendAttrs, } return &res, nil }