diff --git a/controller/remote/controller.go b/controller/remote/controller.go index 28840998..b31cb53f 100644 --- a/controller/remote/controller.go +++ b/controller/remote/controller.go @@ -152,6 +152,16 @@ func serveCmd(dockerCli command.Cli) *cobra.Command { return cbuild.RunBuild(ctx, dockerCli, *options, stdin, progress, true) }) defer b.Close() + sessionsDoneCh := make(chan struct{}) + go func() { + for { + if !b.isUsed() { + sessionsDoneCh <- struct{}{} + break + } + time.Sleep(10 * time.Second) + } + }() // serve server addr := filepath.Join(root, defaultSocketFilename) @@ -187,6 +197,9 @@ func serveCmd(dockerCli command.Cli) *cobra.Command { signal.Notify(sigCh, syscall.SIGINT) signal.Notify(sigCh, syscall.SIGTERM) select { + case <-sessionsDoneCh: + logrus.Infof("all sessions finished") + return nil case err := <-errCh: logrus.Errorf("got error %s, exiting", err) return err diff --git a/controller/remote/server.go b/controller/remote/server.go index a39fea51..dee741f5 100644 --- a/controller/remote/server.go +++ b/controller/remote/server.go @@ -3,8 +3,6 @@ package remote import ( "context" "io" - "sync" - "sync/atomic" "time" "github.com/docker/buildx/build" @@ -23,54 +21,39 @@ type BuildFunc func(ctx context.Context, options *pb.BuildOptions, stdin io.Read func NewServer(buildFunc BuildFunc) *Server { return &Server{ - buildFunc: buildFunc, + sessions: newSessionManager(buildFunc), } } type Server struct { - buildFunc BuildFunc - session map[string]*session - sessionMu sync.Mutex + sessions *sessionManager } -type session struct { - buildOnGoing atomic.Bool - statusChan chan *pb.StatusResponse - cancelBuild func() - buildOptions *pb.BuildOptions - inputPipe *io.PipeWriter - - result *build.ResultHandle - - processes *processes.Manager -} - -func (s *session) cancelRunningProcesses() { - s.processes.CancelRunningProcesses() +func (m *Server) isUsed() bool { + if m.sessions.sessionCreated() && len(m.sessions.list()) == 0 { + return false + } + return true } func (m *Server) ListProcesses(ctx context.Context, req *pb.ListProcessesRequest) (res *pb.ListProcessesResponse, err error) { - m.sessionMu.Lock() - defer m.sessionMu.Unlock() - s, ok := m.session[req.Ref] - if !ok { + s := m.sessions.get(req.Ref) + if s == nil { return nil, errors.Errorf("unknown ref %q", req.Ref) } res = new(pb.ListProcessesResponse) - for _, p := range s.processes.ListProcesses() { + for _, p := range s.listProcesses() { res.Infos = append(res.Infos, p) } return res, nil } func (m *Server) DisconnectProcess(ctx context.Context, req *pb.DisconnectProcessRequest) (res *pb.DisconnectProcessResponse, err error) { - m.sessionMu.Lock() - defer m.sessionMu.Unlock() - s, ok := m.session[req.Ref] - if !ok { + s := m.sessions.get(req.Ref) + if s == nil { return nil, errors.Errorf("unknown ref %q", req.Ref) } - return res, s.processes.DeleteProcess(req.ProcessID) + return res, s.deleteProcess(req.ProcessID) } func (m *Server) Info(ctx context.Context, req *pb.InfoRequest) (res *pb.InfoResponse, err error) { @@ -84,56 +67,20 @@ func (m *Server) Info(ctx context.Context, req *pb.InfoRequest) (res *pb.InfoRes } func (m *Server) List(ctx context.Context, req *pb.ListRequest) (res *pb.ListResponse, err error) { - keys := make(map[string]struct{}) - - m.sessionMu.Lock() - for k := range m.session { - keys[k] = struct{}{} - } - m.sessionMu.Unlock() - - var keysL []string - for k := range keys { - keysL = append(keysL, k) - } return &pb.ListResponse{ - Keys: keysL, + Keys: m.sessions.list(), }, nil } func (m *Server) Disconnect(ctx context.Context, req *pb.DisconnectRequest) (res *pb.DisconnectResponse, err error) { - key := req.Ref - if key == "" { - return nil, errors.New("disconnect: empty key") + if err := m.sessions.delete(req.Ref); err != nil { + return nil, errors.Errorf("failed to delete session %q: %v", req.Ref, err) } - - m.sessionMu.Lock() - if s, ok := m.session[key]; ok { - if s.cancelBuild != nil { - s.cancelBuild() - } - s.cancelRunningProcesses() - if s.result != nil { - s.result.Done() - } - } - delete(m.session, key) - m.sessionMu.Unlock() - return &pb.DisconnectResponse{}, nil } func (m *Server) Close() error { - m.sessionMu.Lock() - for k := range m.session { - if s, ok := m.session[k]; ok { - if s.cancelBuild != nil { - s.cancelBuild() - } - s.cancelRunningProcesses() - } - } - m.sessionMu.Unlock() + m.sessions.close() return nil } @@ -142,15 +89,11 @@ func (m *Server) Inspect(ctx context.Context, req *pb.InspectRequest) (*pb.Inspe if ref == "" { return nil, errors.New("inspect: empty key") } - var bo *pb.BuildOptions - m.sessionMu.Lock() - if s, ok := m.session[ref]; ok { - bo = s.buildOptions - } else { - m.sessionMu.Unlock() + s := m.sessions.get(ref) + if s == nil { return nil, errors.Errorf("inspect: unknown key %v", ref) } - m.sessionMu.Unlock() + bo := s.getBuildOptions() return &pb.InspectResponse{Options: bo}, nil } @@ -160,77 +103,17 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp return nil, errors.New("build: empty key") } - // Prepare status channel and session - m.sessionMu.Lock() - if m.session == nil { - m.session = make(map[string]*session) - } - s, ok := m.session[ref] - if ok { - if !s.buildOnGoing.CompareAndSwap(false, true) { - m.sessionMu.Unlock() - return &pb.BuildResponse{}, errors.New("build ongoing") - } - s.cancelRunningProcesses() - s.result = nil - } else { - s = &session{} - s.buildOnGoing.Store(true) - } - - s.processes = processes.NewManager() - statusChan := make(chan *pb.StatusResponse) - s.statusChan = statusChan - inR, inW := io.Pipe() - defer inR.Close() - s.inputPipe = inW - m.session[ref] = s - m.sessionMu.Unlock() - defer func() { - close(statusChan) - m.sessionMu.Lock() - s, ok := m.session[ref] - if ok { - s.statusChan = nil - s.buildOnGoing.Store(false) - } - m.sessionMu.Unlock() - }() - - pw := pb.NewProgressWriter(statusChan) - - // Build the specified request - ctx, cancel := context.WithCancel(ctx) - defer cancel() - resp, res, buildErr := m.buildFunc(ctx, req.Options, inR, pw) - m.sessionMu.Lock() - if s, ok := m.session[ref]; ok { - // NOTE: buildFunc can return *build.ResultHandle even on error (e.g. when it's implemented using (github.com/docker/buildx/controller/build).RunBuild). - if res != nil { - s.result = res - s.cancelBuild = cancel - s.buildOptions = req.Options - m.session[ref] = s - if buildErr != nil { - buildErr = controllererrors.WrapBuild(buildErr, ref) - } - } - } else { - m.sessionMu.Unlock() - return nil, errors.Errorf("build: unknown key %v", ref) - } - m.sessionMu.Unlock() - - if buildErr != nil { - return nil, buildErr + s := m.sessions.get(ref) + if s == nil { + s = m.sessions.newSession(ref) } - + resp, err := s.build(ctx, req.Options, func(err error) error { return controllererrors.WrapBuild(err, ref) }) if resp == nil { resp = &client.SolveResponse{} } return &pb.BuildResponse{ ExporterResponse: resp.ExporterResponse, - }, nil + }, err } func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer) error { @@ -240,18 +123,15 @@ func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer } // Wait and get status channel prepared by Build() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() var statusChan <-chan *pb.StatusResponse - for { - // TODO: timeout? - m.sessionMu.Lock() - if _, ok := m.session[ref]; !ok || m.session[ref].statusChan == nil { - m.sessionMu.Unlock() - time.Sleep(time.Millisecond) // TODO: wait Build without busy loop and make it cancellable - continue - } - statusChan = m.session[ref].statusChan - m.sessionMu.Unlock() - break + _, err := m.sessions.waitAndGet(ctx, ref, func(s *session) bool { + statusChan = s.getStatusChan() + return statusChan != nil + }) + if err != nil { + return err } // forward status @@ -286,23 +166,19 @@ func (m *Server) Input(stream pb.Controller_InputServer) (err error) { } // Wait and get input stream pipe prepared by Build() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() var inputPipeW *io.PipeWriter - for { - // TODO: timeout? - m.sessionMu.Lock() - if _, ok := m.session[ref]; !ok || m.session[ref].inputPipe == nil { - m.sessionMu.Unlock() - time.Sleep(time.Millisecond) // TODO: wait Build without busy loop and make it cancellable - continue - } - inputPipeW = m.session[ref].inputPipe - m.sessionMu.Unlock() - break + if _, err := m.sessions.waitAndGet(ctx, ref, func(s *session) bool { + inputPipeW = s.getInputWriter() + return inputPipeW != nil + }); err != nil { + return err } // Forward input stream eg, ctx := errgroup.WithContext(context.TODO()) - done := make(chan struct{}) + doneCh := make(chan struct{}) msgCh := make(chan *pb.InputMessage) eg.Go(func() error { defer close(msgCh) @@ -316,7 +192,7 @@ func (m *Server) Input(stream pb.Controller_InputServer) (err error) { } select { case msgCh <- msg: - case <-done: + case <-doneCh: return nil case <-ctx.Done(): return nil @@ -324,7 +200,7 @@ func (m *Server) Input(stream pb.Controller_InputServer) (err error) { } }) eg.Go(func() (retErr error) { - defer close(done) + defer close(doneCh) defer func() { if retErr != nil { inputPipeW.CloseWithError(retErr) @@ -378,26 +254,23 @@ func (m *Server) Invoke(srv pb.Controller_InvokeServer) error { ref := initMessage.Ref cfg := initMessage.InvokeConfig - m.sessionMu.Lock() - s, ok := m.session[ref] - if !ok { - m.sessionMu.Unlock() + s := m.sessions.get(ref) + if s == nil { return errors.Errorf("invoke: unknown key %v", ref) } - m.sessionMu.Unlock() pid := initMessage.ProcessID if pid == "" { return errors.Errorf("invoke: specify process ID") } - proc, ok := s.processes.Get(pid) + proc, ok := s.getProcess(pid) if !ok { // Start a new process. if cfg == nil { return errors.New("no container config is provided") } var err error - proc, err = s.processes.StartProcess(pid, s.result, cfg) + proc, err = s.startProcess(pid, cfg) if err != nil { return err } diff --git a/controller/remote/session.go b/controller/remote/session.go new file mode 100644 index 00000000..587a93ed --- /dev/null +++ b/controller/remote/session.go @@ -0,0 +1,217 @@ +package remote + +import ( + "context" + "io" + "sync" + "sync/atomic" + + "github.com/docker/buildx/build" + "github.com/docker/buildx/controller/pb" + "github.com/docker/buildx/controller/processes" + "github.com/moby/buildkit/client" + "github.com/pkg/errors" +) + +type session struct { + manager *sessionManager + + buildOnGoing atomic.Bool + statusChan chan *pb.StatusResponse + cancelBuild func() + buildOptions *pb.BuildOptions + inputPipe *io.PipeWriter + + result *build.ResultHandle + + processes *processes.Manager + + mu sync.Mutex +} + +func (s *session) close() { + s.mu.Lock() + defer s.mu.Unlock() + s.processes.CancelRunningProcesses() + if s.cancelBuild != nil { + s.cancelBuild() + } + if s.result != nil { + s.result.Done() + } +} + +func (s *session) getProcess(id string) (*processes.Process, bool) { + return s.processes.Get(id) +} + +func (s *session) listProcesses() []*pb.ProcessInfo { + return s.processes.ListProcesses() +} + +func (s *session) deleteProcess(id string) error { + return s.processes.DeleteProcess(id) +} + +func (s *session) startProcess(id string, cfg *pb.InvokeConfig) (*processes.Process, error) { + res := s.result + return s.processes.StartProcess(id, res, cfg) +} + +func (s *session) build(ctx context.Context, options *pb.BuildOptions, wrapBuildError func(error) error) (*client.SolveResponse, error) { + s.mu.Lock() + if !s.buildOnGoing.CompareAndSwap(false, true) { + s.mu.Unlock() + return nil, errors.New("build ongoing") + } + if s.processes != nil { + s.processes.CancelRunningProcesses() + } + s.processes = processes.NewManager() + s.result = nil + s.statusChan = make(chan *pb.StatusResponse) + + inR, inW := io.Pipe() + s.inputPipe = inW + + bCtx, cancel := context.WithCancel(ctx) + + var doneOnce sync.Once + s.cancelBuild = func() { + doneOnce.Do(func() { + s.mu.Lock() + cancel() + close(s.statusChan) + s.statusChan = nil + s.buildOnGoing.Store(false) + s.mu.Unlock() + }) + } + defer s.cancelBuild() + s.manager.updateCond.Broadcast() + s.mu.Unlock() + + resp, res, buildErr := s.manager.buildFunc(bCtx, options, inR, pb.NewProgressWriter(s.statusChan)) + + s.mu.Lock() + // NOTE: buildFunc can return *build.ResultHandle even on error (e.g. when it's implemented using (github.com/docker/buildx/controller/build).RunBuild). + if res != nil { + s.result = res + s.buildOptions = options + s.manager.updateCond.Broadcast() + if buildErr != nil { + buildErr = wrapBuildError(buildErr) + } + } + s.mu.Unlock() + + return resp, buildErr +} + +func (s *session) getStatusChan() chan *pb.StatusResponse { + s.mu.Lock() + defer s.mu.Unlock() + return s.statusChan +} + +func (s *session) getInputWriter() *io.PipeWriter { + s.mu.Lock() + defer s.mu.Unlock() + return s.inputPipe +} + +func (s *session) getBuildOptions() *pb.BuildOptions { + s.mu.Lock() + defer s.mu.Unlock() + return s.buildOptions +} + +func newSessionManager(buildFunc BuildFunc) *sessionManager { + var mu sync.Mutex + return &sessionManager{ + updateCond: sync.NewCond(&mu), + updateCondMu: &mu, + buildFunc: buildFunc, + } +} + +type sessionManager struct { + sessions sync.Map + buildFunc BuildFunc + + updateCond *sync.Cond + updateCondMu *sync.Mutex + + created atomic.Bool +} + +func (m *sessionManager) newSession(ref string) *session { + s := &session{manager: m, processes: processes.NewManager()} + m.add(ref, s) + return s +} + +func (m *sessionManager) add(id string, v *session) { + m.sessions.Store(id, v) + m.updateCond.Broadcast() + m.created.Store(true) +} + +func (m *sessionManager) get(id string) *session { + v, ok := m.sessions.Load(id) + if !ok { + return nil + } + return v.(*session) +} + +func (m *sessionManager) delete(id string) error { + v, ok := m.sessions.LoadAndDelete(id) + if !ok { + return errors.Errorf("unknown session %q", id) + } + v.(*session).close() + return nil +} + +func (m *sessionManager) list() (res []string) { + m.sessions.Range(func(key, value any) bool { + res = append(res, key.(string)) + return true + }) + return +} + +func (m *sessionManager) close() error { + m.sessions.Range(func(key, value any) bool { + value.(*session).close() + return true + }) + return nil +} + +func (m *sessionManager) waitAndGet(ctx context.Context, id string, f func(s *session) bool) (*session, error) { + go func() { + <-ctx.Done() + m.updateCond.Broadcast() + }() + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + s := m.get(id) + if s == nil || !f(s) { + m.updateCondMu.Lock() + m.updateCond.Wait() + m.updateCondMu.Unlock() + continue + } + return s, nil + } +} + +func (m *sessionManager) sessionCreated() bool { + return m.created.Load() +}