Enable to create container from error

Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
pull/1640/head
Kohei Tokunaga 2 years ago
parent 1303715aba
commit b3340cc7ba
No known key found for this signature in database
GPG Key ID: 6CE0A04690DB3FB3

@ -920,7 +920,12 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
} }
results.Set(resultKey(dp.driverIndex, k), res) results.Set(resultKey(dp.driverIndex, k), res)
if resultHandleFunc != nil { if resultHandleFunc != nil {
resultHandleFunc(dp.driverIndex, &ResultContext{cc, res}) resultCtx, err := NewResultContext(cc, so, res)
if err == nil {
resultHandleFunc(dp.driverIndex, resultCtx)
} else {
logrus.Warnf("failed to record result: %s", err)
}
} }
return res, nil return res, nil
} }

@ -3,110 +3,68 @@ package build
import ( import (
"context" "context"
_ "crypto/sha256" // ensure digests can be computed _ "crypto/sha256" // ensure digests can be computed
"encoding/json"
"fmt"
"io" "io"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
controllerapi "github.com/docker/buildx/controller/pb" controllerapi "github.com/docker/buildx/controller/pb"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/exporter/containerimage/exptypes"
gateway "github.com/moby/buildkit/frontend/gateway/client" gateway "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/solver/pb"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// ResultContext is a build result with the client that built it.
type ResultContext struct {
Client *client.Client
Res *gateway.Result
}
type Container struct { type Container struct {
cancelOnce sync.Once cancelOnce sync.Once
containerCancel func() containerCancel func()
isUnavailable atomic.Bool isUnavailable atomic.Bool
initStarted atomic.Bool initStarted atomic.Bool
container gateway.Container container gateway.Container
image *specs.Image
releaseCh chan struct{} releaseCh chan struct{}
resultCtx *ResultContext
} }
func NewContainer(ctx context.Context, resultCtx *ResultContext) (*Container, error) { func NewContainer(ctx context.Context, resultCtx *ResultContext, cfg *controllerapi.InvokeConfig) (*Container, error) {
c, res := resultCtx.Client, resultCtx.Res
mainCtx := ctx mainCtx := ctx
ctrCh := make(chan *Container) ctrCh := make(chan *Container)
errCh := make(chan error) errCh := make(chan error)
go func() { go func() {
_, err := c.Build(context.TODO(), client.SolveOpt{}, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { err := resultCtx.build(func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
go func() { go func() {
<-mainCtx.Done() <-mainCtx.Done()
cancel() cancel()
}() }()
if res.Ref == nil { containerCfg, err := resultCtx.getContainerConfig(ctx, c, cfg)
return nil, errors.Errorf("no reference is registered")
}
st, err := res.Ref.ToState()
if err != nil {
return nil, err
}
def, err := st.Marshal(ctx)
if err != nil {
return nil, err
}
imgRef, err := c.Solve(ctx, gateway.SolveRequest{
Definition: def.ToPB(),
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
containerCtx, containerCancel := context.WithCancel(ctx) containerCtx, containerCancel := context.WithCancel(ctx)
defer containerCancel() defer containerCancel()
bkContainer, err := c.NewContainer(containerCtx, gateway.NewContainerRequest{ bkContainer, err := c.NewContainer(containerCtx, containerCfg)
Mounts: []gateway.Mount{
{
Dest: "/",
MountType: pb.MountType_BIND,
Ref: imgRef.Ref,
},
},
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
imgData := res.Metadata[exptypes.ExporterImageConfigKey]
var img *specs.Image
if len(imgData) > 0 {
img = &specs.Image{}
if err := json.Unmarshal(imgData, img); err != nil {
fmt.Println(err)
return nil, err
}
}
releaseCh := make(chan struct{}) releaseCh := make(chan struct{})
container := &Container{ container := &Container{
containerCancel: containerCancel, containerCancel: containerCancel,
container: bkContainer, container: bkContainer,
image: img,
releaseCh: releaseCh, releaseCh: releaseCh,
resultCtx: resultCtx,
} }
doneCh := make(chan struct{})
defer close(doneCh)
resultCtx.registerCleanup(func() {
container.Cancel()
<-doneCh
})
ctrCh <- container ctrCh <- container
<-container.releaseCh <-container.releaseCh
return nil, bkContainer.Release(ctx) return nil, bkContainer.Release(ctx)
}, nil) })
if err != nil { if err != nil {
errCh <- err errCh <- err
} }
@ -146,7 +104,7 @@ func (c *Container) Exec(ctx context.Context, cfg *controllerapi.InvokeConfig, s
c.markUnavailable() c.markUnavailable()
}() }()
} }
err := exec(ctx, cfg, c.container, c.image, stdin, stdout, stderr) err := exec(ctx, c.resultCtx, cfg, c.container, stdin, stdout, stderr)
if err != nil { if err != nil {
// Container becomes unavailable if one of the processes fails in it. // Container becomes unavailable if one of the processes fails in it.
c.markUnavailable() c.markUnavailable()
@ -154,48 +112,12 @@ func (c *Container) Exec(ctx context.Context, cfg *controllerapi.InvokeConfig, s
return err return err
} }
func exec(ctx context.Context, cfg *controllerapi.InvokeConfig, ctr gateway.Container, img *specs.Image, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error { func exec(ctx context.Context, resultCtx *ResultContext, cfg *controllerapi.InvokeConfig, ctr gateway.Container, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
user := "" processCfg, err := resultCtx.getProcessConfig(cfg, stdin, stdout, stderr)
if !cfg.NoUser { if err != nil {
user = cfg.User return err
} else if img != nil {
user = img.Config.User
}
cwd := ""
if !cfg.NoCwd {
cwd = cfg.Cwd
} else if img != nil {
cwd = img.Config.WorkingDir
}
env := []string{}
if img != nil {
env = append(env, img.Config.Env...)
}
env = append(env, cfg.Env...)
args := []string{}
if cfg.Entrypoint != nil {
args = append(args, cfg.Entrypoint...)
}
if cfg.Cmd != nil {
args = append(args, cfg.Cmd...)
}
// caller should always set args
if len(args) == 0 {
return errors.Errorf("specify args to execute")
} }
proc, err := ctr.Start(ctx, gateway.StartRequest{ proc, err := ctr.Start(ctx, processCfg)
Args: args,
Env: env,
User: user,
Cwd: cwd,
Tty: cfg.Tty,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
})
if err != nil { if err != nil {
return errors.Errorf("failed to start container: %v", err) return errors.Errorf("failed to start container: %v", err)
} }

@ -0,0 +1,365 @@
package build
import (
"context"
_ "crypto/sha256" // ensure digests can be computed
"encoding/json"
"io"
"sync"
"sync/atomic"
controllerapi "github.com/docker/buildx/controller/pb"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/exporter/containerimage/exptypes"
gateway "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/solver/pb"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
func NewResultContext(c *client.Client, solveOpt client.SolveOpt, res *gateway.Result) (*ResultContext, error) {
ctx := context.Background()
def, err := getDefinition(ctx, res)
if err != nil {
return nil, err
}
return getResultAt(ctx, c, solveOpt, def, nil)
}
func getDefinition(ctx context.Context, res *gateway.Result) (*pb.Definition, error) {
ref, err := res.SingleRef()
if err != nil {
return nil, err
}
st, err := ref.ToState()
if err != nil {
return nil, err
}
def, err := st.Marshal(ctx)
if err != nil {
return nil, err
}
return def.ToPB(), nil
}
func getResultAt(ctx context.Context, c *client.Client, solveOpt client.SolveOpt, target *pb.Definition, statusChan chan *client.SolveStatus) (*ResultContext, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// forward SolveStatus
done := new(atomic.Bool)
defer done.Store(true)
ch := make(chan *client.SolveStatus)
go func() {
for {
s := <-ch
if s == nil {
return
}
if done.Load() {
// Do not forward if the function returned because statusChan is possibly closed
continue
}
select {
case statusChan <- s:
case <-ctx.Done():
}
}
}()
// get result
resultCtxCh := make(chan *ResultContext)
errCh := make(chan error)
go func() {
resultCtx := ResultContext{
client: c,
solveOpt: solveOpt,
}
_, err := c.Build(context.Background(), solveOpt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
res2, err := c.Solve(ctx, gateway.SolveRequest{
Evaluate: true,
Definition: target,
})
if err != nil {
var se *errdefs.SolveError
if errors.As(err, &se) {
resultCtx.solveErr = se
} else {
return nil, err
}
}
// Record the client and ctx as well so that containers can be created from the SolveError.
resultCtx.res = res2
resultCtx.gwClient = c
resultCtx.gwCtx = ctx
resultCtx.gwDone = cancel
select {
case resultCtxCh <- &resultCtx:
case <-ctx.Done():
return nil, ctx.Err()
}
<-ctx.Done()
return nil, nil
}, ch)
if err != nil {
errCh <- err
}
}()
select {
case resultCtx := <-resultCtxCh:
return resultCtx, nil
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}
// ResultContext is a build result with the client that built it.
type ResultContext struct {
client *client.Client
res *gateway.Result
solveOpt client.SolveOpt
solveErr *errdefs.SolveError
gwClient gateway.Client
gwCtx context.Context
gwDone func()
gwDoneOnce sync.Once
cleanups []func()
cleanupsMu sync.Mutex
}
func (r *ResultContext) Done() {
r.gwDoneOnce.Do(func() {
r.cleanupsMu.Lock()
cleanups := r.cleanups
r.cleanups = nil
r.cleanupsMu.Unlock()
for _, f := range cleanups {
f()
}
r.gwDone()
})
}
func (r *ResultContext) registerCleanup(f func()) {
r.cleanupsMu.Lock()
r.cleanups = append(r.cleanups, f)
r.cleanupsMu.Unlock()
}
func (r *ResultContext) build(buildFunc gateway.BuildFunc) (err error) {
_, err = buildFunc(r.gwCtx, r.gwClient)
return err
}
func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client, cfg *controllerapi.InvokeConfig) (containerCfg gateway.NewContainerRequest, _ error) {
if r.res != nil && r.solveErr == nil {
logrus.Debugf("creating container from successful build")
ccfg, err := containerConfigFromResult(ctx, r.res, c, *cfg)
if err != nil {
return containerCfg, err
}
containerCfg = *ccfg
} else {
logrus.Debugf("creating container from failed build %+v", cfg)
ccfg, err := containerConfigFromError(r.solveErr, *cfg)
if err != nil {
return containerCfg, errors.Wrapf(err, "no result nor error is available")
}
containerCfg = *ccfg
}
return containerCfg, nil
}
func (r *ResultContext) getProcessConfig(cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) (_ gateway.StartRequest, err error) {
processCfg := newStartRequest(stdin, stdout, stderr)
if r.res != nil && r.solveErr == nil {
logrus.Debugf("creating container from successful build")
if err := populateProcessConfigFromResult(&processCfg, r.res, *cfg); err != nil {
return processCfg, err
}
} else {
logrus.Debugf("creating container from failed build %+v", cfg)
if err := populateProcessConfigFromError(&processCfg, r.solveErr, *cfg); err != nil {
return processCfg, err
}
}
return processCfg, nil
}
func containerConfigFromResult(ctx context.Context, res *gateway.Result, c gateway.Client, cfg controllerapi.InvokeConfig) (*gateway.NewContainerRequest, error) {
if res.Ref == nil {
return nil, errors.Errorf("no reference is registered")
}
st, err := res.Ref.ToState()
if err != nil {
return nil, err
}
def, err := st.Marshal(ctx)
if err != nil {
return nil, err
}
imgRef, err := c.Solve(ctx, gateway.SolveRequest{
Definition: def.ToPB(),
})
if err != nil {
return nil, err
}
return &gateway.NewContainerRequest{
Mounts: []gateway.Mount{
{
Dest: "/",
MountType: pb.MountType_BIND,
Ref: imgRef.Ref,
},
},
}, nil
}
func populateProcessConfigFromResult(req *gateway.StartRequest, res *gateway.Result, cfg controllerapi.InvokeConfig) error {
imgData := res.Metadata[exptypes.ExporterImageConfigKey]
var img *specs.Image
if len(imgData) > 0 {
img = &specs.Image{}
if err := json.Unmarshal(imgData, img); err != nil {
return err
}
}
user := ""
if !cfg.NoUser {
user = cfg.User
} else if img != nil {
user = img.Config.User
}
cwd := ""
if !cfg.NoCwd {
cwd = cfg.Cwd
} else if img != nil {
cwd = img.Config.WorkingDir
}
env := []string{}
if img != nil {
env = append(env, img.Config.Env...)
}
env = append(env, cfg.Env...)
args := []string{}
if cfg.Entrypoint != nil {
args = append(args, cfg.Entrypoint...)
} else if img != nil {
args = append(args, img.Config.Entrypoint...)
}
if cfg.Cmd != nil {
args = append(args, cfg.Cmd...)
} else if img != nil {
args = append(args, img.Config.Cmd...)
}
req.Args = args
req.Env = env
req.User = user
req.Cwd = cwd
req.Tty = cfg.Tty
return nil
}
func containerConfigFromError(solveErr *errdefs.SolveError, cfg controllerapi.InvokeConfig) (*gateway.NewContainerRequest, error) {
exec, err := execOpFromError(solveErr)
if err != nil {
return nil, err
}
var mounts []gateway.Mount
for i, mnt := range exec.Mounts {
rid := solveErr.Solve.MountIDs[i]
mounts = append(mounts, gateway.Mount{
Selector: mnt.Selector,
Dest: mnt.Dest,
ResultID: rid,
Readonly: mnt.Readonly,
MountType: mnt.MountType,
CacheOpt: mnt.CacheOpt,
SecretOpt: mnt.SecretOpt,
SSHOpt: mnt.SSHOpt,
})
}
return &gateway.NewContainerRequest{
Mounts: mounts,
NetMode: exec.Network,
}, nil
}
func populateProcessConfigFromError(req *gateway.StartRequest, solveErr *errdefs.SolveError, cfg controllerapi.InvokeConfig) error {
exec, err := execOpFromError(solveErr)
if err != nil {
return err
}
meta := exec.Meta
user := ""
if !cfg.NoUser {
user = cfg.User
} else {
user = meta.User
}
cwd := ""
if !cfg.NoCwd {
cwd = cfg.Cwd
} else {
cwd = meta.Cwd
}
env := append(meta.Env, cfg.Env...)
args := []string{}
if cfg.Entrypoint != nil {
args = append(args, cfg.Entrypoint...)
}
if cfg.Cmd != nil {
args = append(args, cfg.Cmd...)
}
if len(args) == 0 {
args = meta.Args
}
req.Args = args
req.Env = env
req.User = user
req.Cwd = cwd
req.Tty = cfg.Tty
return nil
}
func execOpFromError(solveErr *errdefs.SolveError) (*pb.ExecOp, error) {
if solveErr == nil {
return nil, errors.Errorf("no error is available")
}
switch op := solveErr.Solve.Op.GetOp().(type) {
case *pb.Op_Exec:
return op.Exec, nil
default:
return nil, errors.Errorf("invoke: unsupported error type")
}
// TODO: support other ops
}
func newStartRequest(stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) gateway.StartRequest {
return gateway.StartRequest{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
}
}

@ -93,7 +93,7 @@ func (b *localController) Invoke(ctx context.Context, ref string, pid string, cf
return errors.New("no build result is registered") return errors.New("no build result is registered")
} }
var err error var err error
proc, err = b.processes.StartProcess(pid, b.resultCtx, &cfg) proc, err = b.processes.StartProcess(pid, b.buildConfig.resultCtx, &cfg)
if err != nil { if err != nil {
return err return err
} }

@ -111,7 +111,7 @@ func (m *Manager) StartProcess(pid string, resultCtx *build.ResultContext, cfg *
go ctr.Cancel() // Finish the existing container go ctr.Cancel() // Finish the existing container
} }
var err error var err error
ctr, err = build.NewContainer(context.TODO(), resultCtx) ctr, err = build.NewContainer(context.TODO(), resultCtx, cfg)
if err != nil { if err != nil {
return nil, errors.Errorf("failed to create container %v", err) return nil, errors.Errorf("failed to create container %v", err)
} }

Loading…
Cancel
Save