@ -6,8 +6,6 @@ import (
"encoding/json"
"io"
"sync"
"sync/atomic"
"time"
controllerapi "github.com/docker/buildx/controller/pb"
"github.com/moby/buildkit/client"
@ -22,72 +20,211 @@ import (
"golang.org/x/sync/errgroup"
)
func NewResultContext ( ctx context . Context , c * client . Client , solveOpt client . SolveOpt , res * gateway . Result ) ( * ResultContext , error ) {
def , err := getDefinition ( ctx , res )
if err != nil {
return nil , err
// NewResultHandle makes a call to client.Build, additionally returning a
// opaque ResultHandle alongside the standard response and error.
//
// This ResultHandle can be used to execute additional build steps in the same
// context as the build occurred, which can allow easy debugging of build
// failures and successes.
//
// If the returned ResultHandle is not nil, the caller must call Done() on it.
func NewResultHandle ( ctx context . Context , cc * client . Client , opt client . SolveOpt , product string , buildFunc gateway . BuildFunc , ch chan * client . SolveStatus ) ( * ResultHandle , * client . SolveResponse , error ) {
// Create a new context to wrap the original, and cancel it when the
// caller-provided context is cancelled.
//
// We derive the context from the background context so that we can forbid
// cancellation of the build request after <-done is closed (which we do
// before returning the ResultHandle).
baseCtx := ctx
ctx , cancel := context . WithCancelCause ( context . Background ( ) )
done := make ( chan struct { } )
go func ( ) {
select {
case <- baseCtx . Done ( ) :
cancel ( baseCtx . Err ( ) )
case <- done :
// Once done is closed, we've recorded a ResultHandle, so we
// shouldn't allow cancelling the underlying build request anymore.
}
} ( )
// Create a new channel to forward status messages to the original.
//
// We do this so that we can discard status messages after the main portion
// of the build is complete. This is necessary for the solve error case,
// where the original gateway is kept open until the ResultHandle is
// closed - we don't want progress messages from operations in that
// ResultHandle to display after this function exits.
//
// Additionally, callers should wait for the progress channel to be closed.
// If we keep the session open and never close the progress channel, the
// caller will likely hang.
baseCh := ch
ch = make ( chan * client . SolveStatus )
go func ( ) {
for {
s , ok := <- ch
if ! ok {
return
}
select {
case <- baseCh :
// base channel is closed, discard status messages
default :
baseCh <- s
}
return getResultAt ( ctx , c , solveOpt , def , nil )
}
} ( )
defer close ( baseCh )
func getDefinition ( ctx context . Context , res * gateway . Result ) ( * result . Result [ * pb . Definition ] , error ) {
return result . ConvertResult ( res , func ( ref gateway . Reference ) ( * pb . Definition , error ) {
st , err := ref . ToState ( )
if err != nil {
return nil , err
var resp * client . SolveResponse
var respErr error
var respHandle * ResultHandle
go func ( ) {
defer cancel ( context . Canceled ) // ensure no dangling processes
var res * gateway . Result
var err error
resp , err = cc . Build ( ctx , opt , product , func ( ctx context . Context , c gateway . Client ) ( * gateway . Result , error ) {
var err error
res , err = buildFunc ( ctx , c )
if res != nil && err == nil {
// Force evaluation of the build result (otherwise, we likely
// won't get a solve error)
def , err2 := getDefinition ( ctx , res )
if err2 != nil {
return nil , err2
}
def , err := st . Marshal ( ctx )
res , err = evalDefinition ( ctx , c , def )
}
if err != nil {
return nil , err
// Scenario 1: we failed to evaluate a node somewhere in the
// build graph.
//
// In this case, we construct a ResultHandle from this
// original Build session, and return it alongside the original
// build error. We then need to keep the gateway session open
// until the caller explicitly closes the ResultHandle.
var se * errdefs . SolveError
if errors . As ( err , & se ) {
respHandle = & ResultHandle {
done : make ( chan struct { } ) ,
solveErr : se ,
gwClient : c ,
gwCtx : ctx ,
}
respErr = se
close ( done )
// Block until the caller closes the ResultHandle.
select {
case <- respHandle . done :
case <- ctx . Done ( ) :
}
}
return def . ToPB ( ) , nil
} )
}
return res , err
} , ch )
if respHandle != nil {
return
}
if err != nil {
// Something unexpected failed during the build, we didn't succeed,
// but we also didn't make it far enough to create a ResultHandle.
respErr = err
close ( done )
return
}
func getResultAt ( ctx context . Context , c * client . Client , solveOpt client . SolveOpt , targets * result . Result [ * pb . Definition ] , statusChan chan * client . SolveStatus ) ( * ResultContext , error ) {
ctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
// Scenario 2: we successfully built the image with no errors.
//
// In this case, the original gateway session has now been closed
// since the Build has been completed. So, we need to create a new
// gateway session to populate the ResultHandle. To do this, we
// need to re-evaluate the target result, in this new session. This
// should be instantaneous since the result should be cached.
// forward SolveStatus
done := new ( atomic . Bool )
defer done . Store ( true )
ch := make ( chan * client . SolveStatus )
go func ( ) {
for {
s := <- ch
if s == nil {
def , err := getDefinition ( ctx , res )
if err != nil {
respErr = err
close ( done )
return
}
if done . Load ( ) {
// Do not forward if the function returned because statusChan is possibly closed
continue
// NOTE: ideally this second connection should be lazily opened
opt := opt
opt . Ref = ""
opt . Exports = nil
opt . CacheExports = nil
_ , respErr = cc . Build ( ctx , opt , "buildx" , func ( ctx context . Context , c gateway . Client ) ( * gateway . Result , error ) {
res , err := evalDefinition ( ctx , c , def )
if err != nil {
// This should probably not happen, since we've previously
// successfully evaluated the same result with no issues.
return nil , errors . Wrap ( err , "inconsistent solve result" )
}
respHandle = & ResultHandle {
done : make ( chan struct { } ) ,
res : res ,
gwClient : c ,
gwCtx : ctx ,
}
close ( done )
// Block until the caller closes the ResultHandle.
select {
case statusChan <- s :
case <- respHandle . done :
case <- ctx . Done ( ) :
}
return nil , ctx . Err ( )
} , nil )
if respHandle != nil {
return
}
close ( done )
} ( )
// get result
resultCtxCh := make ( chan * ResultContext )
errCh := make ( chan error )
go func ( ) {
solveOpt := solveOpt
solveOpt . Ref = ""
buildDoneCh := make ( chan struct { } )
_ , err := c . Build ( context . Background ( ) , solveOpt , "buildx" , func ( ctx context . Context , c gateway . Client ) ( * gateway . Result , error ) {
doneErr := errors . Errorf ( "done" )
ctx , cancel := context . WithCancelCause ( ctx )
defer cancel ( doneErr )
// Block until the other thread signals that it's completed the build.
select {
case <- done :
case <- baseCtx . Done ( ) :
if respErr == nil {
respErr = baseCtx . Err ( )
}
}
return respHandle , resp , respErr
}
// getDefinition converts a gateway result into a collection of definitions for
// each ref in the result.
func getDefinition ( ctx context . Context , res * gateway . Result ) ( * result . Result [ * pb . Definition ] , error ) {
return result . ConvertResult ( res , func ( ref gateway . Reference ) ( * pb . Definition , error ) {
st , err := ref . ToState ( )
if err != nil {
return nil , err
}
def , err := st . Marshal ( ctx )
if err != nil {
return nil , err
}
return def . ToPB ( ) , nil
} )
}
// evalDefinition performs the reverse of getDefinition, converting a
// collection of definitions into a gateway result.
func evalDefinition ( ctx context . Context , c gateway . Client , defs * result . Result [ * pb . Definition ] ) ( * gateway . Result , error ) {
// force evaluation of all targets in parallel
results := make ( map [ * pb . Definition ] * gateway . Result )
resultsMu := sync . Mutex { }
eg , egCtx := errgroup . WithContext ( ctx )
targets . EachRef ( func ( def * pb . Definition ) error {
def s. EachRef ( func ( def * pb . Definition ) error {
eg . Go ( func ( ) error {
res2 , err := c . Solve ( egCtx , gateway . SolveRequest {
res , err := c . Solve ( egCtx , gateway . SolveRequest {
Evaluate : true ,
Definition : def ,
} )
@ -95,85 +232,41 @@ func getResultAt(ctx context.Context, c *client.Client, solveOpt client.SolveOpt
return err
}
resultsMu . Lock ( )
results [ def ] = res 2
results [ def ] = res
resultsMu . Unlock ( )
return nil
} )
return nil
} )
resultCtx := ResultContext { }
if err := eg . Wait ( ) ; err != nil {
var se * errdefs . SolveError
if errors . As ( err , & se ) {
resultCtx . solveErr = se
} else {
return nil , err
}
}
res2 , _ := result . ConvertResult ( targets , func ( def * pb . Definition ) ( gateway . Reference , error ) {
res , _ := result . ConvertResult ( defs , func ( def * pb . Definition ) ( gateway . Reference , error ) {
if res , ok := results [ def ] ; ok {
return res . Ref , nil
}
return nil , nil
} )
// Record the client and ctx as well so that containers can be created from the SolveError.
resultCtx . res = res2
resultCtx . gwClient = c
resultCtx . gwCtx = ctx
resultCtx . gwDone = func ( ) {
cancel ( doneErr )
// wait for Build() completion(or timeout) to ensure the Build's finalizing and avoiding an error "context canceled"
select {
case <- buildDoneCh :
case <- time . After ( 5 * time . Second ) :
}
}
select {
case resultCtxCh <- & resultCtx :
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
return res , nil
}
// wait for cleanup or cancel
<- ctx . Done ( )
if context . Cause ( ctx ) != doneErr { // doneErr is not an error.
return nil , ctx . Err ( )
}
return nil , nil
} , ch )
close ( buildDoneCh )
if err != nil {
errCh <- err
}
} ( )
select {
case resultCtx := <- resultCtxCh :
return resultCtx , nil
case err := <- errCh :
return nil , err
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
}
}
// ResultContext is a build result with the client that built it.
type ResultContext struct {
// ResultHandle is a build result with the client that built it.
type ResultHandle struct {
res * gateway . Result
solveErr * errdefs . SolveError
done chan struct { }
doneOnce sync . Once
gwClient gateway . Client
gwCtx context . Context
gwDone func ( )
gwDoneOnce sync . Once
cleanups [ ] func ( )
cleanupsMu sync . Mutex
}
func ( r * Result Context ) Done ( ) {
r . gwD oneOnce. Do ( func ( ) {
func ( r * ResultHandle ) Done ( ) {
r . doneOnce . Do ( func ( ) {
r . cleanupsMu . Lock ( )
cleanups := r . cleanups
r . cleanups = nil
@ -181,22 +274,24 @@ func (r *ResultContext) Done() {
for _ , f := range cleanups {
f ( )
}
r . gwDone ( )
close ( r . done )
<- r . gwCtx . Done ( )
} )
}
func ( r * Result Context ) registerCleanup ( f func ( ) ) {
func ( r * Result Handle ) registerCleanup ( f func ( ) ) {
r . cleanupsMu . Lock ( )
r . cleanups = append ( r . cleanups , f )
r . cleanupsMu . Unlock ( )
}
func ( r * Result Context ) build ( buildFunc gateway . BuildFunc ) ( err error ) {
func ( r * Result Handle ) build ( buildFunc gateway . BuildFunc ) ( err error ) {
_ , err = buildFunc ( r . gwCtx , r . gwClient )
return err
}
func ( r * Result Context ) getContainerConfig ( ctx context . Context , c gateway . Client , cfg * controllerapi . InvokeConfig ) ( containerCfg gateway . NewContainerRequest , _ error ) {
func ( r * Result Handle ) getContainerConfig ( ctx context . Context , c gateway . Client , cfg * controllerapi . InvokeConfig ) ( containerCfg gateway . NewContainerRequest , _ error ) {
if r . res != nil && r . solveErr == nil {
logrus . Debugf ( "creating container from successful build" )
ccfg , err := containerConfigFromResult ( ctx , r . res , c , * cfg )
@ -215,7 +310,7 @@ func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client
return containerCfg , nil
}
func ( r * Result Context ) getProcessConfig ( cfg * controllerapi . InvokeConfig , stdin io . ReadCloser , stdout io . WriteCloser , stderr io . WriteCloser ) ( _ gateway . StartRequest , err error ) {
func ( r * Result Handle ) getProcessConfig ( cfg * controllerapi . InvokeConfig , stdin io . ReadCloser , stdout io . WriteCloser , stderr io . WriteCloser ) ( _ gateway . StartRequest , err error ) {
processCfg := newStartRequest ( stdin , stdout , stderr )
if r . res != nil && r . solveErr == nil {
logrus . Debugf ( "creating container from successful build" )