package grpcclient

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"net"
	"os"
	"strings"
	"sync"
	"time"

	"github.com/gogo/googleapis/google/rpc"
	gogotypes "github.com/gogo/protobuf/types"
	"github.com/golang/protobuf/ptypes/any"
	"github.com/moby/buildkit/client/llb"
	"github.com/moby/buildkit/frontend/gateway/client"
	"github.com/moby/buildkit/frontend/gateway/errdefs"
	pb "github.com/moby/buildkit/frontend/gateway/pb"
	"github.com/moby/buildkit/identity"
	opspb "github.com/moby/buildkit/solver/pb"
	"github.com/moby/buildkit/util/apicaps"
	"github.com/moby/buildkit/util/grpcerrors"
	digest "github.com/opencontainers/go-digest"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
	fstypes "github.com/tonistiigi/fsutil/types"
	"golang.org/x/sync/errgroup"
	spb "google.golang.org/genproto/googleapis/rpc/status"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

const frontendPrefix = "BUILDKIT_FRONTEND_OPT_"

type GrpcClient interface {
	client.Client
	Run(context.Context, client.BuildFunc) error
}

func New(ctx context.Context, opts map[string]string, session, product string, c pb.LLBBridgeClient, w []client.WorkerInfo) (GrpcClient, error) {
	pingCtx, pingCancel := context.WithTimeout(ctx, 15*time.Second)
	defer pingCancel()
	resp, err := c.Ping(pingCtx, &pb.PingRequest{})
	if err != nil {
		return nil, err
	}

	if resp.FrontendAPICaps == nil {
		resp.FrontendAPICaps = defaultCaps()
	}

	if resp.LLBCaps == nil {
		resp.LLBCaps = defaultLLBCaps()
	}

	return &grpcClient{
		client:    c,
		opts:      opts,
		sessionID: session,
		workers:   w,
		product:   product,
		caps:      pb.Caps.CapSet(resp.FrontendAPICaps),
		llbCaps:   opspb.Caps.CapSet(resp.LLBCaps),
		requests:  map[string]*pb.SolveRequest{},
		execMsgs:  newMessageForwarder(ctx, c),
	}, nil
}

func current() (GrpcClient, error) {
	if ep := product(); ep != "" {
		apicaps.ExportedProduct = ep
	}

	ctx, conn, err := grpcClientConn(context.Background())
	if err != nil {
		return nil, err
	}

	return New(ctx, opts(), sessionID(), product(), pb.NewLLBBridgeClient(conn), workers())
}

func convertRef(ref client.Reference) (*pb.Ref, error) {
	if ref == nil {
		return &pb.Ref{}, nil
	}
	r, ok := ref.(*reference)
	if !ok {
		return nil, errors.Errorf("invalid return reference type %T", ref)
	}
	return &pb.Ref{Id: r.id, Def: r.def}, nil
}

func RunFromEnvironment(ctx context.Context, f client.BuildFunc) error {
	client, err := current()
	if err != nil {
		return errors.Wrapf(err, "failed to initialize client from environment")
	}
	return client.Run(ctx, f)
}

func (c *grpcClient) Run(ctx context.Context, f client.BuildFunc) (retError error) {
	export := c.caps.Supports(pb.CapReturnResult) == nil

	var (
		res *client.Result
		err error
	)
	if export {
		defer func() {
			req := &pb.ReturnRequest{}
			if retError == nil {
				if res == nil {
					res = &client.Result{}
				}
				pbRes := &pb.Result{
					Metadata: res.Metadata,
				}
				if res.Refs != nil {
					if c.caps.Supports(pb.CapProtoRefArray) == nil {
						m := map[string]*pb.Ref{}
						for k, r := range res.Refs {
							pbRef, err := convertRef(r)
							if err != nil {
								retError = err
								continue
							}
							m[k] = pbRef
						}
						pbRes.Result = &pb.Result_Refs{Refs: &pb.RefMap{Refs: m}}
					} else {
						// Server doesn't support the new wire format for refs, so we construct
						// a deprecated result ref map.
						m := map[string]string{}
						for k, r := range res.Refs {
							pbRef, err := convertRef(r)
							if err != nil {
								retError = err
								continue
							}
							m[k] = pbRef.Id
						}
						pbRes.Result = &pb.Result_RefsDeprecated{RefsDeprecated: &pb.RefMapDeprecated{Refs: m}}
					}
				} else {
					pbRef, err := convertRef(res.Ref)
					if err != nil {
						retError = err
					} else {
						if c.caps.Supports(pb.CapProtoRefArray) == nil {
							pbRes.Result = &pb.Result_Ref{Ref: pbRef}
						} else {
							// Server doesn't support the new wire format for refs, so we construct
							// a deprecated result ref.
							pbRes.Result = &pb.Result_RefDeprecated{RefDeprecated: pbRef.Id}
						}
					}
				}
				if retError == nil {
					req.Result = pbRes
				}
			}
			if retError != nil {
				st, _ := status.FromError(grpcerrors.ToGRPC(retError))
				stp := st.Proto()
				req.Error = &rpc.Status{
					Code:    stp.Code,
					Message: stp.Message,
					Details: convertToGogoAny(stp.Details),
				}
			}
			if _, err := c.client.Return(ctx, req); err != nil && retError == nil {
				retError = err
			}
		}()
	}

	defer func() {
		err = c.execMsgs.Release()
		if err != nil && retError != nil {
			retError = err
		}
	}()

	if res, err = f(ctx, c); err != nil {
		return err
	}

	if res == nil {
		return nil
	}

	if err := c.caps.Supports(pb.CapReturnMap); len(res.Refs) > 1 && err != nil {
		return err
	}

	if !export {
		exportedAttrBytes, err := json.Marshal(res.Metadata)
		if err != nil {
			return errors.Wrapf(err, "failed to marshal return metadata")
		}

		req, err := c.requestForRef(res.Ref)
		if err != nil {
			return errors.Wrapf(err, "failed to find return ref")
		}

		req.Final = true
		req.ExporterAttr = exportedAttrBytes

		if _, err := c.client.Solve(ctx, req); err != nil {
			return errors.Wrapf(err, "failed to solve")
		}
	}

	return nil
}

// defaultCaps returns the capabilities that were implemented when capabilities
// support was added. This list is frozen and should never be changed.
func defaultCaps() []apicaps.PBCap {
	return []apicaps.PBCap{
		{ID: string(pb.CapSolveBase), Enabled: true},
		{ID: string(pb.CapSolveInlineReturn), Enabled: true},
		{ID: string(pb.CapResolveImage), Enabled: true},
		{ID: string(pb.CapReadFile), Enabled: true},
	}
}

// defaultLLBCaps returns the LLB capabilities that were implemented when capabilities
// support was added. This list is frozen and should never be changed.
func defaultLLBCaps() []apicaps.PBCap {
	return []apicaps.PBCap{
		{ID: string(opspb.CapSourceImage), Enabled: true},
		{ID: string(opspb.CapSourceLocal), Enabled: true},
		{ID: string(opspb.CapSourceLocalUnique), Enabled: true},
		{ID: string(opspb.CapSourceLocalSessionID), Enabled: true},
		{ID: string(opspb.CapSourceLocalIncludePatterns), Enabled: true},
		{ID: string(opspb.CapSourceLocalFollowPaths), Enabled: true},
		{ID: string(opspb.CapSourceLocalExcludePatterns), Enabled: true},
		{ID: string(opspb.CapSourceLocalSharedKeyHint), Enabled: true},
		{ID: string(opspb.CapSourceGit), Enabled: true},
		{ID: string(opspb.CapSourceGitKeepDir), Enabled: true},
		{ID: string(opspb.CapSourceGitFullURL), Enabled: true},
		{ID: string(opspb.CapSourceHTTP), Enabled: true},
		{ID: string(opspb.CapSourceHTTPChecksum), Enabled: true},
		{ID: string(opspb.CapSourceHTTPPerm), Enabled: true},
		{ID: string(opspb.CapSourceHTTPUIDGID), Enabled: true},
		{ID: string(opspb.CapBuildOpLLBFileName), Enabled: true},
		{ID: string(opspb.CapExecMetaBase), Enabled: true},
		{ID: string(opspb.CapExecMetaProxy), Enabled: true},
		{ID: string(opspb.CapExecMountBind), Enabled: true},
		{ID: string(opspb.CapExecMountCache), Enabled: true},
		{ID: string(opspb.CapExecMountCacheSharing), Enabled: true},
		{ID: string(opspb.CapExecMountSelector), Enabled: true},
		{ID: string(opspb.CapExecMountTmpfs), Enabled: true},
		{ID: string(opspb.CapExecMountSecret), Enabled: true},
		{ID: string(opspb.CapConstraints), Enabled: true},
		{ID: string(opspb.CapPlatform), Enabled: true},
		{ID: string(opspb.CapMetaIgnoreCache), Enabled: true},
		{ID: string(opspb.CapMetaDescription), Enabled: true},
		{ID: string(opspb.CapMetaExportCache), Enabled: true},
	}
}

type grpcClient struct {
	client    pb.LLBBridgeClient
	opts      map[string]string
	sessionID string
	product   string
	workers   []client.WorkerInfo
	caps      apicaps.CapSet
	llbCaps   apicaps.CapSet
	requests  map[string]*pb.SolveRequest
	execMsgs  *messageForwarder
}

func (c *grpcClient) requestForRef(ref client.Reference) (*pb.SolveRequest, error) {
	emptyReq := &pb.SolveRequest{
		Definition: &opspb.Definition{},
	}
	if ref == nil {
		return emptyReq, nil
	}
	r, ok := ref.(*reference)
	if !ok {
		return nil, errors.Errorf("return reference has invalid type %T", ref)
	}
	if r.id == "" {
		return emptyReq, nil
	}
	req, ok := c.requests[r.id]
	if !ok {
		return nil, errors.Errorf("did not find request for return reference %s", r.id)
	}
	return req, nil
}

func (c *grpcClient) Solve(ctx context.Context, creq client.SolveRequest) (res *client.Result, err error) {
	if creq.Definition != nil {
		for _, md := range creq.Definition.Metadata {
			for cap := range md.Caps {
				if err := c.llbCaps.Supports(cap); err != nil {
					return nil, err
				}
			}
		}
	}
	var (
		// old API
		legacyRegistryCacheImports []string
		// new API (CapImportCaches)
		cacheImports []*pb.CacheOptionsEntry
	)
	supportCapImportCaches := c.caps.Supports(pb.CapImportCaches) == nil
	for _, im := range creq.CacheImports {
		if !supportCapImportCaches && im.Type == "registry" {
			legacyRegistryCacheImports = append(legacyRegistryCacheImports, im.Attrs["ref"])
		} else {
			cacheImports = append(cacheImports, &pb.CacheOptionsEntry{
				Type:  im.Type,
				Attrs: im.Attrs,
			})
		}
	}

	req := &pb.SolveRequest{
		Definition:          creq.Definition,
		Frontend:            creq.Frontend,
		FrontendOpt:         creq.FrontendOpt,
		FrontendInputs:      creq.FrontendInputs,
		AllowResultReturn:   true,
		AllowResultArrayRef: true,
		// old API
		ImportCacheRefsDeprecated: legacyRegistryCacheImports,
		// new API
		CacheImports: cacheImports,
	}

	// backwards compatibility with inline return
	if c.caps.Supports(pb.CapReturnResult) != nil {
		req.ExporterAttr = []byte("{}")
	}

	if creq.Evaluate {
		if c.caps.Supports(pb.CapGatewayEvaluateSolve) == nil {
			req.Evaluate = creq.Evaluate
		} else {
			// If evaluate is not supported, fallback to running Stat(".") in order to
			// trigger an evaluation of the result.
			defer func() {
				if res == nil {
					return
				}

				var (
					id  string
					ref client.Reference
				)
				ref, err = res.SingleRef()
				if err != nil {
					for refID := range res.Refs {
						id = refID
						break
					}
				} else {
					id = ref.(*reference).id
				}

				_, err = c.client.StatFile(ctx, &pb.StatFileRequest{
					Ref:  id,
					Path: ".",
				})
			}()
		}
	}

	resp, err := c.client.Solve(ctx, req)
	if err != nil {
		return nil, err
	}

	res = &client.Result{}
	if resp.Result == nil {
		if id := resp.Ref; id != "" {
			c.requests[id] = req
		}
		res.SetRef(&reference{id: resp.Ref, c: c})
	} else {
		res.Metadata = resp.Result.Metadata
		switch pbRes := resp.Result.Result.(type) {
		case *pb.Result_RefDeprecated:
			if id := pbRes.RefDeprecated; id != "" {
				res.SetRef(&reference{id: id, c: c})
			}
		case *pb.Result_RefsDeprecated:
			for k, v := range pbRes.RefsDeprecated.Refs {
				ref := &reference{id: v, c: c}
				if v == "" {
					ref = nil
				}
				res.AddRef(k, ref)
			}
		case *pb.Result_Ref:
			if pbRes.Ref.Id != "" {
				ref, err := newReference(c, pbRes.Ref)
				if err != nil {
					return nil, err
				}
				res.SetRef(ref)
			}
		case *pb.Result_Refs:
			for k, v := range pbRes.Refs.Refs {
				var ref *reference
				if v.Id != "" {
					ref, err = newReference(c, v)
					if err != nil {
						return nil, err
					}
				}
				res.AddRef(k, ref)
			}
		}
	}

	return res, nil
}

func (c *grpcClient) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt) (digest.Digest, []byte, error) {
	var p *opspb.Platform
	if platform := opt.Platform; platform != nil {
		p = &opspb.Platform{
			OS:           platform.OS,
			Architecture: platform.Architecture,
			Variant:      platform.Variant,
			OSVersion:    platform.OSVersion,
			OSFeatures:   platform.OSFeatures,
		}
	}
	resp, err := c.client.ResolveImageConfig(ctx, &pb.ResolveImageConfigRequest{Ref: ref, Platform: p, ResolveMode: opt.ResolveMode, LogName: opt.LogName})
	if err != nil {
		return "", nil, err
	}
	return resp.Digest, resp.Config, nil
}

func (c *grpcClient) BuildOpts() client.BuildOpts {
	return client.BuildOpts{
		Opts:      c.opts,
		SessionID: c.sessionID,
		Workers:   c.workers,
		Product:   c.product,
		LLBCaps:   c.llbCaps,
		Caps:      c.caps,
	}
}

func (c *grpcClient) Inputs(ctx context.Context) (map[string]llb.State, error) {
	err := c.caps.Supports(pb.CapFrontendInputs)
	if err != nil {
		return nil, err
	}

	resp, err := c.client.Inputs(ctx, &pb.InputsRequest{})
	if err != nil {
		return nil, err
	}

	inputs := make(map[string]llb.State)
	for key, def := range resp.Definitions {
		op, err := llb.NewDefinitionOp(def)
		if err != nil {
			return nil, err
		}
		inputs[key] = llb.NewState(op)
	}
	return inputs, nil
}

// procMessageForwarder is created per container process to act as the
// communication channel between the process and the ExecProcess message
// stream.
type procMessageForwarder struct {
	done      chan struct{}
	closeOnce sync.Once
	msgs      chan *pb.ExecMessage
}

func newProcMessageForwarder() *procMessageForwarder {
	return &procMessageForwarder{
		done: make(chan struct{}),
		msgs: make(chan *pb.ExecMessage),
	}
}

func (b *procMessageForwarder) Send(ctx context.Context, m *pb.ExecMessage) {
	select {
	case <-ctx.Done():
	case <-b.done:
		b.closeOnce.Do(func() {
			close(b.msgs)
		})
	case b.msgs <- m:
	}
}

func (b *procMessageForwarder) Recv(ctx context.Context) (m *pb.ExecMessage, ok bool) {
	select {
	case <-ctx.Done():
		return nil, true
	case <-b.done:
		return nil, false
	case m = <-b.msgs:
		return m, true
	}
}

func (b *procMessageForwarder) Close() {
	close(b.done)
	b.Recv(context.Background())      // flush any messages in queue
	b.Send(context.Background(), nil) // ensure channel is closed
}

// messageForwarder manages a single grpc stream for ExecProcess to facilitate
// a pub/sub message channel for each new process started from the client
// connection.
type messageForwarder struct {
	client pb.LLBBridgeClient
	ctx    context.Context
	cancel func()
	eg     *errgroup.Group
	mu     sync.Mutex
	pids   map[string]*procMessageForwarder
	stream pb.LLBBridge_ExecProcessClient
	// startOnce used to only start the exec message forwarder once,
	// so we only have one exec stream per client
	startOnce sync.Once
	// startErr tracks the error when initializing the stream, it will
	// be returned on subsequent calls to Start
	startErr error
}

func newMessageForwarder(ctx context.Context, client pb.LLBBridgeClient) *messageForwarder {
	ctx, cancel := context.WithCancel(ctx)
	eg, ctx := errgroup.WithContext(ctx)
	return &messageForwarder{
		client: client,
		pids:   map[string]*procMessageForwarder{},
		ctx:    ctx,
		cancel: cancel,
		eg:     eg,
	}
}

func (m *messageForwarder) Start() (err error) {
	defer func() {
		if err != nil {
			m.startErr = err
		}
	}()

	if m.startErr != nil {
		return m.startErr
	}

	m.startOnce.Do(func() {
		m.stream, err = m.client.ExecProcess(m.ctx)
		if err != nil {
			return
		}
		m.eg.Go(func() error {
			for {
				msg, err := m.stream.Recv()
				if errors.Is(err, io.EOF) || grpcerrors.Code(err) == codes.Canceled {
					return nil
				}
				logrus.Debugf("|<--- %s", debugMessage(msg))

				if err != nil {
					return err
				}

				m.mu.Lock()
				msgs, ok := m.pids[msg.ProcessID]
				m.mu.Unlock()

				if !ok {
					logrus.Debugf("Received exec message for unregistered process: %s", msg.String())
					continue
				}
				msgs.Send(m.ctx, msg)
			}
		})
	})
	return err
}

func debugMessage(msg *pb.ExecMessage) string {
	switch m := msg.GetInput().(type) {
	case *pb.ExecMessage_Init:
		return fmt.Sprintf("Init Message %s", msg.ProcessID)
	case *pb.ExecMessage_File:
		if m.File.EOF {
			return fmt.Sprintf("File Message %s, fd=%d, EOF", msg.ProcessID, m.File.Fd)
		}
		return fmt.Sprintf("File Message %s, fd=%d, %d bytes", msg.ProcessID, m.File.Fd, len(m.File.Data))
	case *pb.ExecMessage_Resize:
		return fmt.Sprintf("Resize Message %s", msg.ProcessID)
	case *pb.ExecMessage_Started:
		return fmt.Sprintf("Started Message %s", msg.ProcessID)
	case *pb.ExecMessage_Exit:
		return fmt.Sprintf("Exit Message %s, code=%d, err=%s", msg.ProcessID, m.Exit.Code, m.Exit.Error)
	case *pb.ExecMessage_Done:
		return fmt.Sprintf("Done Message %s", msg.ProcessID)
	}
	return fmt.Sprintf("Unknown Message %s", msg.String())
}

func (m *messageForwarder) Send(msg *pb.ExecMessage) error {
	m.mu.Lock()
	_, ok := m.pids[msg.ProcessID]
	defer m.mu.Unlock()
	if !ok {
		return errors.Errorf("process %s has ended, not sending message %#v", msg.ProcessID, msg.Input)
	}
	logrus.Debugf("|---> %s", debugMessage(msg))
	return m.stream.Send(msg)
}

func (m *messageForwarder) Release() error {
	m.cancel()
	return m.eg.Wait()
}

func (m *messageForwarder) Register(pid string) *procMessageForwarder {
	m.mu.Lock()
	defer m.mu.Unlock()
	sender := newProcMessageForwarder()
	m.pids[pid] = sender
	return sender
}

func (m *messageForwarder) Deregister(pid string) {
	m.mu.Lock()
	defer m.mu.Unlock()
	sender, ok := m.pids[pid]
	if !ok {
		return
	}
	delete(m.pids, pid)
	sender.Close()
}

type msgWriter struct {
	mux       *messageForwarder
	fd        uint32
	processID string
}

func (w *msgWriter) Write(msg []byte) (int, error) {
	err := w.mux.Send(&pb.ExecMessage{
		ProcessID: w.processID,
		Input: &pb.ExecMessage_File{
			File: &pb.FdMessage{
				Fd:   w.fd,
				Data: msg,
			},
		},
	})
	if err != nil {
		return 0, err
	}
	return len(msg), nil
}

func (c *grpcClient) NewContainer(ctx context.Context, req client.NewContainerRequest) (client.Container, error) {
	err := c.caps.Supports(pb.CapGatewayExec)
	if err != nil {
		return nil, err
	}
	id := identity.NewID()
	var mounts []*opspb.Mount
	for _, m := range req.Mounts {
		resultID := m.ResultID
		if m.Ref != nil {
			ref, ok := m.Ref.(*reference)
			if !ok {
				return nil, errors.Errorf("unexpected type for reference, got %T", m.Ref)
			}
			resultID = ref.id
		}
		mounts = append(mounts, &opspb.Mount{
			Dest:      m.Dest,
			Selector:  m.Selector,
			Readonly:  m.Readonly,
			MountType: m.MountType,
			ResultID:  resultID,
			CacheOpt:  m.CacheOpt,
			SecretOpt: m.SecretOpt,
			SSHOpt:    m.SSHOpt,
		})
	}

	logrus.Debugf("|---> NewContainer %s", id)
	_, err = c.client.NewContainer(ctx, &pb.NewContainerRequest{
		ContainerID: id,
		Mounts:      mounts,
		Platform:    req.Platform,
		Constraints: req.Constraints,
	})
	if err != nil {
		return nil, err
	}

	// ensure message forwarder is started, only sets up stream first time called
	err = c.execMsgs.Start()
	if err != nil {
		return nil, err
	}

	return &container{
		client:   c.client,
		id:       id,
		execMsgs: c.execMsgs,
	}, nil
}

type container struct {
	client   pb.LLBBridgeClient
	id       string
	execMsgs *messageForwarder
}

func (ctr *container) Start(ctx context.Context, req client.StartRequest) (client.ContainerProcess, error) {
	pid := fmt.Sprintf("%s:%s", ctr.id, identity.NewID())
	msgs := ctr.execMsgs.Register(pid)

	init := &pb.InitMessage{
		ContainerID: ctr.id,
		Meta: &opspb.Meta{
			Args: req.Args,
			Env:  req.Env,
			Cwd:  req.Cwd,
			User: req.User,
		},
		Tty:      req.Tty,
		Security: req.SecurityMode,
	}
	if req.Stdin != nil {
		init.Fds = append(init.Fds, 0)
	}
	if req.Stdout != nil {
		init.Fds = append(init.Fds, 1)
	}
	if req.Stderr != nil {
		init.Fds = append(init.Fds, 2)
	}

	err := ctr.execMsgs.Send(&pb.ExecMessage{
		ProcessID: pid,
		Input: &pb.ExecMessage_Init{
			Init: init,
		},
	})
	if err != nil {
		return nil, err
	}

	msg, _ := msgs.Recv(ctx)
	if msg == nil {
		return nil, errors.Errorf("failed to receive started message")
	}
	started := msg.GetStarted()
	if started == nil {
		return nil, errors.Errorf("expecting started message, got %T", msg.GetInput())
	}

	eg, ctx := errgroup.WithContext(ctx)
	done := make(chan struct{})

	ctrProc := &containerProcess{
		execMsgs: ctr.execMsgs,
		id:       pid,
		eg:       eg,
	}

	var stdinReader *io.PipeReader
	ctrProc.eg.Go(func() error {
		<-done
		if stdinReader != nil {
			return stdinReader.Close()
		}
		return nil
	})

	if req.Stdin != nil {
		var stdinWriter io.WriteCloser
		stdinReader, stdinWriter = io.Pipe()
		// This go routine is intentionally not part of the errgroup because
		// if os.Stdin is used for req.Stdin then this will block until
		// the user closes the input, which will likely be after we are done
		// with the container, so we can't Wait on it.
		go func() {
			io.Copy(stdinWriter, req.Stdin)
			stdinWriter.Close()
		}()

		ctrProc.eg.Go(func() error {
			m := &msgWriter{
				mux:       ctr.execMsgs,
				processID: pid,
				fd:        0,
			}
			_, err := io.Copy(m, stdinReader)
			// ignore ErrClosedPipe, it is EOF for our usage.
			if err != nil && !errors.Is(err, io.ErrClosedPipe) {
				return err
			}
			// not an error so must be eof
			return ctr.execMsgs.Send(&pb.ExecMessage{
				ProcessID: pid,
				Input: &pb.ExecMessage_File{
					File: &pb.FdMessage{
						Fd:  0,
						EOF: true,
					},
				},
			})
		})
	}

	ctrProc.eg.Go(func() error {
		var closeDoneOnce sync.Once
		var exitError error
		for {
			msg, ok := msgs.Recv(ctx)
			if !ok {
				// no more messages, return
				return exitError
			}

			if msg == nil {
				// empty message from ctx cancel, so just start shutting down
				// input, but continue processing more exit/done messages
				closeDoneOnce.Do(func() {
					close(done)
				})
				continue
			}

			if file := msg.GetFile(); file != nil {
				var out io.WriteCloser
				switch file.Fd {
				case 1:
					out = req.Stdout
				case 2:
					out = req.Stderr
				}
				if out == nil {
					// if things are plumbed correctly this should never happen
					return errors.Errorf("missing writer for output fd %d", file.Fd)
				}
				if len(file.Data) > 0 {
					_, err := out.Write(file.Data)
					if err != nil {
						return err
					}
				}
			} else if exit := msg.GetExit(); exit != nil {
				// capture exit message to exitError so we can return it after
				// the server sends the Done message
				closeDoneOnce.Do(func() {
					close(done)
				})
				if exit.Code == 0 {
					continue
				}
				exitError = grpcerrors.FromGRPC(status.ErrorProto(&spb.Status{
					Code:    exit.Error.Code,
					Message: exit.Error.Message,
					Details: convertGogoAny(exit.Error.Details),
				}))
				if exit.Code != errdefs.UnknownExitStatus {
					exitError = &errdefs.ExitError{ExitCode: exit.Code, Err: exitError}
				}
			} else if serverDone := msg.GetDone(); serverDone != nil {
				return exitError
			} else {
				return errors.Errorf("unexpected Exec Message for pid %s: %T", pid, msg.GetInput())
			}
		}
	})

	return ctrProc, nil
}

func (ctr *container) Release(ctx context.Context) error {
	logrus.Debugf("|---> ReleaseContainer %s", ctr.id)
	_, err := ctr.client.ReleaseContainer(ctx, &pb.ReleaseContainerRequest{
		ContainerID: ctr.id,
	})
	return err
}

type containerProcess struct {
	execMsgs *messageForwarder
	id       string
	eg       *errgroup.Group
}

func (ctrProc *containerProcess) Wait() error {
	defer ctrProc.execMsgs.Deregister(ctrProc.id)
	return ctrProc.eg.Wait()
}

func (ctrProc *containerProcess) Resize(_ context.Context, size client.WinSize) error {
	return ctrProc.execMsgs.Send(&pb.ExecMessage{
		ProcessID: ctrProc.id,
		Input: &pb.ExecMessage_Resize{
			Resize: &pb.ResizeMessage{
				Cols: size.Cols,
				Rows: size.Rows,
			},
		},
	})
}

type reference struct {
	c   *grpcClient
	id  string
	def *opspb.Definition
}

func newReference(c *grpcClient, ref *pb.Ref) (*reference, error) {
	return &reference{c: c, id: ref.Id, def: ref.Def}, nil
}

func (r *reference) ToState() (st llb.State, err error) {
	err = r.c.caps.Supports(pb.CapReferenceOutput)
	if err != nil {
		return st, err
	}

	if r.def == nil {
		return st, errors.Errorf("gateway did not return reference with definition")
	}

	defop, err := llb.NewDefinitionOp(r.def)
	if err != nil {
		return st, err
	}

	return llb.NewState(defop), nil
}

func (r *reference) ReadFile(ctx context.Context, req client.ReadRequest) ([]byte, error) {
	rfr := &pb.ReadFileRequest{FilePath: req.Filename, Ref: r.id}
	if r := req.Range; r != nil {
		rfr.Range = &pb.FileRange{
			Offset: int64(r.Offset),
			Length: int64(r.Length),
		}
	}
	resp, err := r.c.client.ReadFile(ctx, rfr)
	if err != nil {
		return nil, err
	}
	return resp.Data, nil
}

func (r *reference) ReadDir(ctx context.Context, req client.ReadDirRequest) ([]*fstypes.Stat, error) {
	if err := r.c.caps.Supports(pb.CapReadDir); err != nil {
		return nil, err
	}
	rdr := &pb.ReadDirRequest{
		DirPath:        req.Path,
		IncludePattern: req.IncludePattern,
		Ref:            r.id,
	}
	resp, err := r.c.client.ReadDir(ctx, rdr)
	if err != nil {
		return nil, err
	}
	return resp.Entries, nil
}

func (r *reference) StatFile(ctx context.Context, req client.StatRequest) (*fstypes.Stat, error) {
	if err := r.c.caps.Supports(pb.CapStatFile); err != nil {
		return nil, err
	}
	rdr := &pb.StatFileRequest{
		Path: req.Path,
		Ref:  r.id,
	}
	resp, err := r.c.client.StatFile(ctx, rdr)
	if err != nil {
		return nil, err
	}
	return resp.Stat, nil
}

func grpcClientConn(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
	dialOpt := grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
		return stdioConn(), nil
	})

	cc, err := grpc.DialContext(ctx, "localhost", dialOpt, grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpcerrors.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpcerrors.StreamClientInterceptor))
	if err != nil {
		return nil, nil, errors.Wrap(err, "failed to create grpc client")
	}

	ctx, cancel := context.WithCancel(ctx)
	_ = cancel
	// go monitorHealth(ctx, cc, cancel)

	return ctx, cc, nil
}

func stdioConn() net.Conn {
	return &conn{os.Stdin, os.Stdout, os.Stdout}
}

type conn struct {
	io.Reader
	io.Writer
	io.Closer
}

func (s *conn) LocalAddr() net.Addr {
	return dummyAddr{}
}
func (s *conn) RemoteAddr() net.Addr {
	return dummyAddr{}
}
func (s *conn) SetDeadline(t time.Time) error {
	return nil
}
func (s *conn) SetReadDeadline(t time.Time) error {
	return nil
}
func (s *conn) SetWriteDeadline(t time.Time) error {
	return nil
}

type dummyAddr struct {
}

func (d dummyAddr) Network() string {
	return "pipe"
}

func (d dummyAddr) String() string {
	return "localhost"
}

func opts() map[string]string {
	opts := map[string]string{}
	for _, env := range os.Environ() {
		parts := strings.SplitN(env, "=", 2)
		k := parts[0]
		v := ""
		if len(parts) == 2 {
			v = parts[1]
		}
		if !strings.HasPrefix(k, frontendPrefix) {
			continue
		}
		parts = strings.SplitN(v, "=", 2)
		v = ""
		if len(parts) == 2 {
			v = parts[1]
		}
		opts[parts[0]] = v
	}
	return opts
}

func sessionID() string {
	return os.Getenv("BUILDKIT_SESSION_ID")
}

func workers() []client.WorkerInfo {
	var c []client.WorkerInfo
	if err := json.Unmarshal([]byte(os.Getenv("BUILDKIT_WORKERS")), &c); err != nil {
		return nil
	}
	return c
}

func product() string {
	return os.Getenv("BUILDKIT_EXPORTEDPRODUCT")
}

func convertGogoAny(in []*gogotypes.Any) []*any.Any {
	out := make([]*any.Any, len(in))
	for i := range in {
		out[i] = &any.Any{TypeUrl: in[i].TypeUrl, Value: in[i].Value}
	}
	return out
}

func convertToGogoAny(in []*any.Any) []*gogotypes.Any {
	out := make([]*gogotypes.Any, len(in))
	for i := range in {
		out[i] = &gogotypes.Any{TypeUrl: in[i].TypeUrl, Value: in[i].Value}
	}
	return out
}