debug: finish server when all session finished

Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
pull/2010/head
Kohei Tokunaga 2 years ago
parent 86ae8ea854
commit caa83a6d84
No known key found for this signature in database
GPG Key ID: 6CE0A04690DB3FB3

@ -152,6 +152,16 @@ func serveCmd(dockerCli command.Cli) *cobra.Command {
return cbuild.RunBuild(ctx, dockerCli, *options, stdin, progress, true) return cbuild.RunBuild(ctx, dockerCli, *options, stdin, progress, true)
}) })
defer b.Close() defer b.Close()
sessionsDoneCh := make(chan struct{})
go func() {
for {
if !b.isUsed() {
sessionsDoneCh <- struct{}{}
break
}
time.Sleep(10 * time.Second)
}
}()
// serve server // serve server
addr := filepath.Join(root, defaultSocketFilename) addr := filepath.Join(root, defaultSocketFilename)
@ -187,6 +197,9 @@ func serveCmd(dockerCli command.Cli) *cobra.Command {
signal.Notify(sigCh, syscall.SIGINT) signal.Notify(sigCh, syscall.SIGINT)
signal.Notify(sigCh, syscall.SIGTERM) signal.Notify(sigCh, syscall.SIGTERM)
select { select {
case <-sessionsDoneCh:
logrus.Infof("all sessions finished")
return nil
case err := <-errCh: case err := <-errCh:
logrus.Errorf("got error %s, exiting", err) logrus.Errorf("got error %s, exiting", err)
return err return err

@ -3,8 +3,6 @@ package remote
import ( import (
"context" "context"
"io" "io"
"sync"
"sync/atomic"
"time" "time"
"github.com/docker/buildx/build" "github.com/docker/buildx/build"
@ -23,54 +21,39 @@ type BuildFunc func(ctx context.Context, options *pb.BuildOptions, stdin io.Read
func NewServer(buildFunc BuildFunc) *Server { func NewServer(buildFunc BuildFunc) *Server {
return &Server{ return &Server{
buildFunc: buildFunc, sessions: newSessionManager(buildFunc),
} }
} }
type Server struct { type Server struct {
buildFunc BuildFunc sessions *sessionManager
session map[string]*session
sessionMu sync.Mutex
} }
type session struct { func (m *Server) isUsed() bool {
buildOnGoing atomic.Bool if m.sessions.sessionCreated() && len(m.sessions.list()) == 0 {
statusChan chan *pb.StatusResponse return false
cancelBuild func() }
buildOptions *pb.BuildOptions return true
inputPipe *io.PipeWriter
result *build.ResultHandle
processes *processes.Manager
}
func (s *session) cancelRunningProcesses() {
s.processes.CancelRunningProcesses()
} }
func (m *Server) ListProcesses(ctx context.Context, req *pb.ListProcessesRequest) (res *pb.ListProcessesResponse, err error) { func (m *Server) ListProcesses(ctx context.Context, req *pb.ListProcessesRequest) (res *pb.ListProcessesResponse, err error) {
m.sessionMu.Lock() s := m.sessions.get(req.Ref)
defer m.sessionMu.Unlock() if s == nil {
s, ok := m.session[req.Ref]
if !ok {
return nil, errors.Errorf("unknown ref %q", req.Ref) return nil, errors.Errorf("unknown ref %q", req.Ref)
} }
res = new(pb.ListProcessesResponse) res = new(pb.ListProcessesResponse)
for _, p := range s.processes.ListProcesses() { for _, p := range s.listProcesses() {
res.Infos = append(res.Infos, p) res.Infos = append(res.Infos, p)
} }
return res, nil return res, nil
} }
func (m *Server) DisconnectProcess(ctx context.Context, req *pb.DisconnectProcessRequest) (res *pb.DisconnectProcessResponse, err error) { func (m *Server) DisconnectProcess(ctx context.Context, req *pb.DisconnectProcessRequest) (res *pb.DisconnectProcessResponse, err error) {
m.sessionMu.Lock() s := m.sessions.get(req.Ref)
defer m.sessionMu.Unlock() if s == nil {
s, ok := m.session[req.Ref]
if !ok {
return nil, errors.Errorf("unknown ref %q", req.Ref) return nil, errors.Errorf("unknown ref %q", req.Ref)
} }
return res, s.processes.DeleteProcess(req.ProcessID) return res, s.deleteProcess(req.ProcessID)
} }
func (m *Server) Info(ctx context.Context, req *pb.InfoRequest) (res *pb.InfoResponse, err error) { func (m *Server) Info(ctx context.Context, req *pb.InfoRequest) (res *pb.InfoResponse, err error) {
@ -84,56 +67,20 @@ func (m *Server) Info(ctx context.Context, req *pb.InfoRequest) (res *pb.InfoRes
} }
func (m *Server) List(ctx context.Context, req *pb.ListRequest) (res *pb.ListResponse, err error) { func (m *Server) List(ctx context.Context, req *pb.ListRequest) (res *pb.ListResponse, err error) {
keys := make(map[string]struct{})
m.sessionMu.Lock()
for k := range m.session {
keys[k] = struct{}{}
}
m.sessionMu.Unlock()
var keysL []string
for k := range keys {
keysL = append(keysL, k)
}
return &pb.ListResponse{ return &pb.ListResponse{
Keys: keysL, Keys: m.sessions.list(),
}, nil }, nil
} }
func (m *Server) Disconnect(ctx context.Context, req *pb.DisconnectRequest) (res *pb.DisconnectResponse, err error) { func (m *Server) Disconnect(ctx context.Context, req *pb.DisconnectRequest) (res *pb.DisconnectResponse, err error) {
key := req.Ref if err := m.sessions.delete(req.Ref); err != nil {
if key == "" { return nil, errors.Errorf("failed to delete session %q: %v", req.Ref, err)
return nil, errors.New("disconnect: empty key")
} }
m.sessionMu.Lock()
if s, ok := m.session[key]; ok {
if s.cancelBuild != nil {
s.cancelBuild()
}
s.cancelRunningProcesses()
if s.result != nil {
s.result.Done()
}
}
delete(m.session, key)
m.sessionMu.Unlock()
return &pb.DisconnectResponse{}, nil return &pb.DisconnectResponse{}, nil
} }
func (m *Server) Close() error { func (m *Server) Close() error {
m.sessionMu.Lock() m.sessions.close()
for k := range m.session {
if s, ok := m.session[k]; ok {
if s.cancelBuild != nil {
s.cancelBuild()
}
s.cancelRunningProcesses()
}
}
m.sessionMu.Unlock()
return nil return nil
} }
@ -142,15 +89,11 @@ func (m *Server) Inspect(ctx context.Context, req *pb.InspectRequest) (*pb.Inspe
if ref == "" { if ref == "" {
return nil, errors.New("inspect: empty key") return nil, errors.New("inspect: empty key")
} }
var bo *pb.BuildOptions s := m.sessions.get(ref)
m.sessionMu.Lock() if s == nil {
if s, ok := m.session[ref]; ok {
bo = s.buildOptions
} else {
m.sessionMu.Unlock()
return nil, errors.Errorf("inspect: unknown key %v", ref) return nil, errors.Errorf("inspect: unknown key %v", ref)
} }
m.sessionMu.Unlock() bo := s.getBuildOptions()
return &pb.InspectResponse{Options: bo}, nil return &pb.InspectResponse{Options: bo}, nil
} }
@ -160,77 +103,17 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
return nil, errors.New("build: empty key") return nil, errors.New("build: empty key")
} }
// Prepare status channel and session s := m.sessions.get(ref)
m.sessionMu.Lock() if s == nil {
if m.session == nil { s = m.sessions.newSession(ref)
m.session = make(map[string]*session)
}
s, ok := m.session[ref]
if ok {
if !s.buildOnGoing.CompareAndSwap(false, true) {
m.sessionMu.Unlock()
return &pb.BuildResponse{}, errors.New("build ongoing")
}
s.cancelRunningProcesses()
s.result = nil
} else {
s = &session{}
s.buildOnGoing.Store(true)
}
s.processes = processes.NewManager()
statusChan := make(chan *pb.StatusResponse)
s.statusChan = statusChan
inR, inW := io.Pipe()
defer inR.Close()
s.inputPipe = inW
m.session[ref] = s
m.sessionMu.Unlock()
defer func() {
close(statusChan)
m.sessionMu.Lock()
s, ok := m.session[ref]
if ok {
s.statusChan = nil
s.buildOnGoing.Store(false)
}
m.sessionMu.Unlock()
}()
pw := pb.NewProgressWriter(statusChan)
// Build the specified request
ctx, cancel := context.WithCancel(ctx)
defer cancel()
resp, res, buildErr := m.buildFunc(ctx, req.Options, inR, pw)
m.sessionMu.Lock()
if s, ok := m.session[ref]; ok {
// NOTE: buildFunc can return *build.ResultHandle even on error (e.g. when it's implemented using (github.com/docker/buildx/controller/build).RunBuild).
if res != nil {
s.result = res
s.cancelBuild = cancel
s.buildOptions = req.Options
m.session[ref] = s
if buildErr != nil {
buildErr = controllererrors.WrapBuild(buildErr, ref)
}
}
} else {
m.sessionMu.Unlock()
return nil, errors.Errorf("build: unknown key %v", ref)
}
m.sessionMu.Unlock()
if buildErr != nil {
return nil, buildErr
} }
resp, err := s.build(ctx, req.Options, func(err error) error { return controllererrors.WrapBuild(err, ref) })
if resp == nil { if resp == nil {
resp = &client.SolveResponse{} resp = &client.SolveResponse{}
} }
return &pb.BuildResponse{ return &pb.BuildResponse{
ExporterResponse: resp.ExporterResponse, ExporterResponse: resp.ExporterResponse,
}, nil }, err
} }
func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer) error { func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer) error {
@ -240,18 +123,15 @@ func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer
} }
// Wait and get status channel prepared by Build() // Wait and get status channel prepared by Build()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var statusChan <-chan *pb.StatusResponse var statusChan <-chan *pb.StatusResponse
for { _, err := m.sessions.waitAndGet(ctx, ref, func(s *session) bool {
// TODO: timeout? statusChan = s.getStatusChan()
m.sessionMu.Lock() return statusChan != nil
if _, ok := m.session[ref]; !ok || m.session[ref].statusChan == nil { })
m.sessionMu.Unlock() if err != nil {
time.Sleep(time.Millisecond) // TODO: wait Build without busy loop and make it cancellable return err
continue
}
statusChan = m.session[ref].statusChan
m.sessionMu.Unlock()
break
} }
// forward status // forward status
@ -286,23 +166,19 @@ func (m *Server) Input(stream pb.Controller_InputServer) (err error) {
} }
// Wait and get input stream pipe prepared by Build() // Wait and get input stream pipe prepared by Build()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var inputPipeW *io.PipeWriter var inputPipeW *io.PipeWriter
for { if _, err := m.sessions.waitAndGet(ctx, ref, func(s *session) bool {
// TODO: timeout? inputPipeW = s.getInputWriter()
m.sessionMu.Lock() return inputPipeW != nil
if _, ok := m.session[ref]; !ok || m.session[ref].inputPipe == nil { }); err != nil {
m.sessionMu.Unlock() return err
time.Sleep(time.Millisecond) // TODO: wait Build without busy loop and make it cancellable
continue
}
inputPipeW = m.session[ref].inputPipe
m.sessionMu.Unlock()
break
} }
// Forward input stream // Forward input stream
eg, ctx := errgroup.WithContext(context.TODO()) eg, ctx := errgroup.WithContext(context.TODO())
done := make(chan struct{}) doneCh := make(chan struct{})
msgCh := make(chan *pb.InputMessage) msgCh := make(chan *pb.InputMessage)
eg.Go(func() error { eg.Go(func() error {
defer close(msgCh) defer close(msgCh)
@ -316,7 +192,7 @@ func (m *Server) Input(stream pb.Controller_InputServer) (err error) {
} }
select { select {
case msgCh <- msg: case msgCh <- msg:
case <-done: case <-doneCh:
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
@ -324,7 +200,7 @@ func (m *Server) Input(stream pb.Controller_InputServer) (err error) {
} }
}) })
eg.Go(func() (retErr error) { eg.Go(func() (retErr error) {
defer close(done) defer close(doneCh)
defer func() { defer func() {
if retErr != nil { if retErr != nil {
inputPipeW.CloseWithError(retErr) inputPipeW.CloseWithError(retErr)
@ -378,26 +254,23 @@ func (m *Server) Invoke(srv pb.Controller_InvokeServer) error {
ref := initMessage.Ref ref := initMessage.Ref
cfg := initMessage.InvokeConfig cfg := initMessage.InvokeConfig
m.sessionMu.Lock() s := m.sessions.get(ref)
s, ok := m.session[ref] if s == nil {
if !ok {
m.sessionMu.Unlock()
return errors.Errorf("invoke: unknown key %v", ref) return errors.Errorf("invoke: unknown key %v", ref)
} }
m.sessionMu.Unlock()
pid := initMessage.ProcessID pid := initMessage.ProcessID
if pid == "" { if pid == "" {
return errors.Errorf("invoke: specify process ID") return errors.Errorf("invoke: specify process ID")
} }
proc, ok := s.processes.Get(pid) proc, ok := s.getProcess(pid)
if !ok { if !ok {
// Start a new process. // Start a new process.
if cfg == nil { if cfg == nil {
return errors.New("no container config is provided") return errors.New("no container config is provided")
} }
var err error var err error
proc, err = s.processes.StartProcess(pid, s.result, cfg) proc, err = s.startProcess(pid, cfg)
if err != nil { if err != nil {
return err return err
} }

@ -0,0 +1,217 @@
package remote
import (
"context"
"io"
"sync"
"sync/atomic"
"github.com/docker/buildx/build"
"github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/controller/processes"
"github.com/moby/buildkit/client"
"github.com/pkg/errors"
)
type session struct {
manager *sessionManager
buildOnGoing atomic.Bool
statusChan chan *pb.StatusResponse
cancelBuild func()
buildOptions *pb.BuildOptions
inputPipe *io.PipeWriter
result *build.ResultHandle
processes *processes.Manager
mu sync.Mutex
}
func (s *session) close() {
s.mu.Lock()
defer s.mu.Unlock()
s.processes.CancelRunningProcesses()
if s.cancelBuild != nil {
s.cancelBuild()
}
if s.result != nil {
s.result.Done()
}
}
func (s *session) getProcess(id string) (*processes.Process, bool) {
return s.processes.Get(id)
}
func (s *session) listProcesses() []*pb.ProcessInfo {
return s.processes.ListProcesses()
}
func (s *session) deleteProcess(id string) error {
return s.processes.DeleteProcess(id)
}
func (s *session) startProcess(id string, cfg *pb.InvokeConfig) (*processes.Process, error) {
res := s.result
return s.processes.StartProcess(id, res, cfg)
}
func (s *session) build(ctx context.Context, options *pb.BuildOptions, wrapBuildError func(error) error) (*client.SolveResponse, error) {
s.mu.Lock()
if !s.buildOnGoing.CompareAndSwap(false, true) {
s.mu.Unlock()
return nil, errors.New("build ongoing")
}
if s.processes != nil {
s.processes.CancelRunningProcesses()
}
s.processes = processes.NewManager()
s.result = nil
s.statusChan = make(chan *pb.StatusResponse)
inR, inW := io.Pipe()
s.inputPipe = inW
bCtx, cancel := context.WithCancel(ctx)
var doneOnce sync.Once
s.cancelBuild = func() {
doneOnce.Do(func() {
s.mu.Lock()
cancel()
close(s.statusChan)
s.statusChan = nil
s.buildOnGoing.Store(false)
s.mu.Unlock()
})
}
defer s.cancelBuild()
s.manager.updateCond.Broadcast()
s.mu.Unlock()
resp, res, buildErr := s.manager.buildFunc(bCtx, options, inR, pb.NewProgressWriter(s.statusChan))
s.mu.Lock()
// NOTE: buildFunc can return *build.ResultHandle even on error (e.g. when it's implemented using (github.com/docker/buildx/controller/build).RunBuild).
if res != nil {
s.result = res
s.buildOptions = options
s.manager.updateCond.Broadcast()
if buildErr != nil {
buildErr = wrapBuildError(buildErr)
}
}
s.mu.Unlock()
return resp, buildErr
}
func (s *session) getStatusChan() chan *pb.StatusResponse {
s.mu.Lock()
defer s.mu.Unlock()
return s.statusChan
}
func (s *session) getInputWriter() *io.PipeWriter {
s.mu.Lock()
defer s.mu.Unlock()
return s.inputPipe
}
func (s *session) getBuildOptions() *pb.BuildOptions {
s.mu.Lock()
defer s.mu.Unlock()
return s.buildOptions
}
func newSessionManager(buildFunc BuildFunc) *sessionManager {
var mu sync.Mutex
return &sessionManager{
updateCond: sync.NewCond(&mu),
updateCondMu: &mu,
buildFunc: buildFunc,
}
}
type sessionManager struct {
sessions sync.Map
buildFunc BuildFunc
updateCond *sync.Cond
updateCondMu *sync.Mutex
created atomic.Bool
}
func (m *sessionManager) newSession(ref string) *session {
s := &session{manager: m, processes: processes.NewManager()}
m.add(ref, s)
return s
}
func (m *sessionManager) add(id string, v *session) {
m.sessions.Store(id, v)
m.updateCond.Broadcast()
m.created.Store(true)
}
func (m *sessionManager) get(id string) *session {
v, ok := m.sessions.Load(id)
if !ok {
return nil
}
return v.(*session)
}
func (m *sessionManager) delete(id string) error {
v, ok := m.sessions.LoadAndDelete(id)
if !ok {
return errors.Errorf("unknown session %q", id)
}
v.(*session).close()
return nil
}
func (m *sessionManager) list() (res []string) {
m.sessions.Range(func(key, value any) bool {
res = append(res, key.(string))
return true
})
return
}
func (m *sessionManager) close() error {
m.sessions.Range(func(key, value any) bool {
value.(*session).close()
return true
})
return nil
}
func (m *sessionManager) waitAndGet(ctx context.Context, id string, f func(s *session) bool) (*session, error) {
go func() {
<-ctx.Done()
m.updateCond.Broadcast()
}()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
s := m.get(id)
if s == nil || !f(s) {
m.updateCondMu.Lock()
m.updateCond.Wait()
m.updateCondMu.Unlock()
continue
}
return s, nil
}
}
func (m *sessionManager) sessionCreated() bool {
return m.created.Load()
}
Loading…
Cancel
Save