monitor: Allow launching container from non-exec op with `on-error`

Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
pull/1807/head
Kohei Tokunaga 2 years ago
parent 060ac842bb
commit 49e63b3265
No known key found for this signature in database
GPG Key ID: 6CE0A04690DB3FB3

@ -113,7 +113,7 @@ func (c *Container) Exec(ctx context.Context, cfg *controllerapi.InvokeConfig, s
} }
func exec(ctx context.Context, resultCtx *ResultContext, cfg *controllerapi.InvokeConfig, ctr gateway.Container, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error { func exec(ctx context.Context, resultCtx *ResultContext, cfg *controllerapi.InvokeConfig, ctr gateway.Container, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
processCfg, err := resultCtx.getProcessConfig(cfg, stdin, stdout, stderr) processCfg, err := resultCtx.getProcessConfig(ctx, cfg, stdin, stdout, stderr)
if err != nil { if err != nil {
return err return err
} }

@ -11,11 +11,13 @@ import (
controllerapi "github.com/docker/buildx/controller/pb" controllerapi "github.com/docker/buildx/controller/pb"
"github.com/moby/buildkit/client" "github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/exporter/containerimage/exptypes" "github.com/moby/buildkit/exporter/containerimage/exptypes"
gateway "github.com/moby/buildkit/frontend/gateway/client" gateway "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/solver/errdefs" "github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/solver/result" "github.com/moby/buildkit/solver/result"
"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -101,11 +103,12 @@ func getResultAt(ctx context.Context, c *client.Client, solveOpt client.SolveOpt
}) })
return nil return nil
}) })
resultCtx := ResultContext{} resultCtx := ResultContext{def: targets}
if err := eg.Wait(); err != nil { if err := eg.Wait(); err != nil {
var se *errdefs.SolveError var se *errdefs.SolveError
if errors.As(err, &se) { if errors.As(err, &se) {
resultCtx.solveErr = se resultCtx.solveErr = se
resultCtx.rawErr = err
} else { } else {
return nil, err return nil, err
} }
@ -170,6 +173,9 @@ type ResultContext struct {
cleanups []func() cleanups []func()
cleanupsMu sync.Mutex cleanupsMu sync.Mutex
def *result.Result[*pb.Definition]
rawErr error
} }
func (r *ResultContext) Done() { func (r *ResultContext) Done() {
@ -196,10 +202,115 @@ func (r *ResultContext) build(buildFunc gateway.BuildFunc) (err error) {
return err return err
} }
func (r *ResultContext) solve(ctx context.Context, def *result.Result[*pb.Definition]) (*gateway.Result, error) {
resultCh := make(chan *gateway.Result)
errCh := make(chan error)
go func() {
err := r.build(func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// force evaluation of all targets in parallel
results := make(map[*pb.Definition]*gateway.Result)
resultsMu := sync.Mutex{}
eg, egCtx := errgroup.WithContext(ctx)
def.EachRef(func(def *pb.Definition) error {
eg.Go(func() error {
res2, err := c.Solve(egCtx, gateway.SolveRequest{
Evaluate: true,
Definition: def,
})
if err != nil {
return err
}
resultsMu.Lock()
results[def] = res2
resultsMu.Unlock()
return nil
})
return nil
})
if err := eg.Wait(); err != nil {
return nil, err
}
res2, _ := result.ConvertResult(def, func(def *pb.Definition) (gateway.Reference, error) {
if res, ok := results[def]; ok {
return res.Ref, nil
}
return nil, nil
})
select {
case resultCh <- res2:
case <-ctx.Done():
return nil, ctx.Err()
}
<-ctx.Done()
return nil, nil
})
if err != nil {
errCh <- err
}
}()
select {
case req := <-resultCh:
return req, nil
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (r *ResultContext) getParentResultOfError(ctx context.Context) (*gateway.Result, error) {
if r.def == nil || r.rawErr == nil || r.solveErr == nil {
return nil, errors.Errorf("no definition is provided")
}
var dgst digest.Digest
var ve *errdefs.VertexError
if errors.As(r.rawErr, &ve) {
dgst = digest.Digest(ve.Digest)
} else {
return nil, errors.Errorf("unsupported vertex: cannot get parent of error")
}
var parentSelector func(inputs []llb.Output) llb.Output
switch r.solveErr.Solve.Op.GetOp().(type) {
case *pb.Op_File:
parentSelector = func(inputs []llb.Output) llb.Output { return inputs[0] } // TODO: allow user selecting one input?
default:
return nil, errors.Errorf("unsupported error type: cannot create parent selector")
}
targets, err := result.ConvertResult(r.def, func(def *pb.Definition) (*pb.Definition, error) {
op, err := llb.NewDefinitionOp(def)
if err != nil {
return nil, err
}
stP, err := getStateOfDigest(ctx, llb.NewState(op), dgst)
if err != nil {
return nil, err
} else if stP == nil {
return nil, errors.Errorf("vertex %v not found", dgst)
}
st := *stP
if inputs := st.Output().Vertex(ctx, nil).Inputs(); len(inputs) > 0 {
def, err := llb.NewState(parentSelector(inputs)).Marshal(ctx)
if err != nil {
return nil, err
}
return def.ToPB(), nil
}
return nil, errors.Errorf("no input")
})
if err != nil {
return nil, err
}
return r.solve(ctx, targets)
}
func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client, cfg *controllerapi.InvokeConfig) (containerCfg gateway.NewContainerRequest, _ error) { func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client, cfg *controllerapi.InvokeConfig) (containerCfg gateway.NewContainerRequest, _ error) {
if r.res != nil && r.solveErr == nil { if r.res != nil && r.solveErr == nil {
logrus.Debugf("creating container from successful build") logrus.Debugf("creating container from successful build")
ccfg, err := containerConfigFromResult(ctx, r.res, c, *cfg) ccfg, err := containerConfigFromResult(ctx, r.res, *cfg)
if err != nil { if err != nil {
return containerCfg, err return containerCfg, err
} }
@ -208,14 +319,22 @@ func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client
logrus.Debugf("creating container from failed build %+v", cfg) logrus.Debugf("creating container from failed build %+v", cfg)
ccfg, err := containerConfigFromError(r.solveErr, *cfg) ccfg, err := containerConfigFromError(r.solveErr, *cfg)
if err != nil { if err != nil {
return containerCfg, errors.Wrapf(err, "no result nor error is available") res, err := r.getParentResultOfError(ctx)
if err != nil {
return containerCfg, errors.Wrapf(err, "no result nor error is available")
}
cfg.Initial = false
ccfg, err = containerConfigFromResult(ctx, res, *cfg)
if err != nil {
return containerCfg, errors.Wrapf(err, "no result nor error is available. cannot fallback to parent of the error")
}
} }
containerCfg = *ccfg containerCfg = *ccfg
} }
return containerCfg, nil return containerCfg, nil
} }
func (r *ResultContext) getProcessConfig(cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) (_ gateway.StartRequest, err error) { func (r *ResultContext) getProcessConfig(ctx context.Context, cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) (_ gateway.StartRequest, err error) {
processCfg := newStartRequest(stdin, stdout, stderr) processCfg := newStartRequest(stdin, stdout, stderr)
if r.res != nil && r.solveErr == nil { if r.res != nil && r.solveErr == nil {
logrus.Debugf("creating container from successful build") logrus.Debugf("creating container from successful build")
@ -225,13 +344,20 @@ func (r *ResultContext) getProcessConfig(cfg *controllerapi.InvokeConfig, stdin
} else { } else {
logrus.Debugf("creating container from failed build %+v", cfg) logrus.Debugf("creating container from failed build %+v", cfg)
if err := populateProcessConfigFromError(&processCfg, r.solveErr, *cfg); err != nil { if err := populateProcessConfigFromError(&processCfg, r.solveErr, *cfg); err != nil {
return processCfg, err res, err := r.getParentResultOfError(ctx)
if err != nil {
return processCfg, err
}
cfg.Initial = false
if err := populateProcessConfigFromResult(&processCfg, res, *cfg); err != nil {
return processCfg, err
}
} }
} }
return processCfg, nil return processCfg, nil
} }
func containerConfigFromResult(ctx context.Context, res *gateway.Result, c gateway.Client, cfg controllerapi.InvokeConfig) (*gateway.NewContainerRequest, error) { func containerConfigFromResult(ctx context.Context, res *gateway.Result, cfg controllerapi.InvokeConfig) (*gateway.NewContainerRequest, error) {
if cfg.Initial { if cfg.Initial {
return nil, errors.Errorf("starting from the container from the initial state of the step is supported only on the failed steps") return nil, errors.Errorf("starting from the container from the initial state of the step is supported only on the failed steps")
} }
@ -397,3 +523,24 @@ func newStartRequest(stdin io.ReadCloser, stdout io.WriteCloser, stderr io.Write
Stderr: stderr, Stderr: stderr,
} }
} }
func getStateOfDigest(ctx context.Context, st llb.State, dgst digest.Digest) (*llb.State, error) {
vtxDgst, _, _, _, err := st.Output().Vertex(ctx, nil).Marshal(ctx, nil)
if err != nil {
return nil, err
}
if vtxDgst.String() == dgst.String() {
return &st, nil
}
inputs := st.Output().Vertex(ctx, nil).Inputs()
for _, in := range inputs {
got, err := getStateOfDigest(ctx, llb.NewState(in), dgst)
if err != nil {
return nil, err
}
if got != nil {
return got, nil
}
}
return nil, nil
}

Loading…
Cancel
Save