diff --git a/driver/docker-container/driver.go b/driver/docker-container/driver.go index d42d233a..b4478ad4 100644 --- a/driver/docker-container/driver.go +++ b/driver/docker-container/driver.go @@ -29,6 +29,8 @@ import ( "github.com/moby/buildkit/client" "github.com/moby/buildkit/util/tracing/detect" "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) const ( @@ -359,7 +361,12 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) { + co := driver.ClientOptions{} + for _, opt := range copts { + opt(&co) + } + _, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"}) if err != nil { return nil, err @@ -380,9 +387,19 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { } return conn, nil })) + if td, ok := exp.(client.TracerDelegate); ok { opts = append(opts, client.WithTracerDelegate(td)) } + + if len(co.Meta) > 0 { + opts = append(opts, client.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + // merge the existing context with new metadata. + ctx = metadata.NewOutgoingContext(ctx, co.Meta) + return invoker(ctx, method, req, reply, cc, opts...) + }))) + } + return client.New(ctx, "", opts...) } diff --git a/driver/docker/driver.go b/driver/docker/driver.go index 64c0dd25..9441b71a 100644 --- a/driver/docker/driver.go +++ b/driver/docker/driver.go @@ -50,9 +50,14 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) { + co := driver.ClientOptions{} + for _, opt := range copts { + opt(&co) + } + return client.New(ctx, "", client.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", nil) + return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", co.Meta) }), client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) { return d.DockerAPI.DialHijack(ctx, "/session", proto, meta) })) diff --git a/driver/driver.go b/driver/driver.go index 8642a543..137a6fb2 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -58,7 +58,7 @@ type Driver interface { Version(context.Context) (string, error) Stop(ctx context.Context, force bool) error Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error - Client(ctx context.Context) (*client.Client, error) + Client(ctx context.Context, opts ...ClientOption) (*client.Client, error) Features(ctx context.Context) map[Feature]bool IsMobyDriver() bool Config() InitConfig @@ -110,3 +110,15 @@ func historyAPISupported(ctx context.Context, c *client.Client) bool { } } } + +type ClientOption func(*ClientOptions) + +type ClientOptions struct { + Meta map[string][]string +} + +func WithMeta(meta map[string][]string) ClientOption { + return func(o *ClientOptions) { + o.Meta = meta + } +} diff --git a/driver/kubernetes/driver.go b/driver/kubernetes/driver.go index 55c7853d..66f729d3 100644 --- a/driver/kubernetes/driver.go +++ b/driver/kubernetes/driver.go @@ -17,6 +17,8 @@ import ( "github.com/moby/buildkit/client" "github.com/moby/buildkit/util/tracing/detect" "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -189,7 +191,12 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) { + co := driver.ClientOptions{} + for _, opt := range copts { + opt(&co) + } + restClient := d.clientset.CoreV1().RESTClient() restClientConfig, err := d.KubeClientConfig.ClientConfig() if err != nil { @@ -218,9 +225,19 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) { return conn, nil })) + if td, ok := exp.(client.TracerDelegate); ok { opts = append(opts, client.WithTracerDelegate(td)) } + + if len(co.Meta) > 0 { + opts = append(opts, client.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + // merge the existing context with new metadata. + ctx = metadata.NewOutgoingContext(ctx, co.Meta) + return invoker(ctx, method, req, reply, cc, opts...) + }))) + } + return client.New(ctx, "", opts...) } diff --git a/driver/manager.go b/driver/manager.go index b4ec318f..67362434 100644 --- a/driver/manager.go +++ b/driver/manager.go @@ -159,9 +159,9 @@ type DriverHandle struct { hostGatewayIPErr error } -func (d *DriverHandle) Client(ctx context.Context) (*client.Client, error) { +func (d *DriverHandle) Client(ctx context.Context, copts ...ClientOption) (*client.Client, error) { d.once.Do(func() { - d.client, d.err = d.Driver.Client(ctx) + d.client, d.err = d.Driver.Client(ctx, copts...) }) return d.client, d.err } diff --git a/driver/remote/driver.go b/driver/remote/driver.go index 27ffd169..530240c6 100644 --- a/driver/remote/driver.go +++ b/driver/remote/driver.go @@ -9,6 +9,7 @@ import ( "github.com/moby/buildkit/client" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/metadata" ) type Driver struct { @@ -63,8 +64,13 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { - opts := []client.ClientOpt{} +func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) { + co := driver.ClientOptions{} + for _, opt := range copts { + opt(&co) + } + + var opts []client.ClientOpt backoffConfig := backoff.DefaultConfig backoffConfig.MaxDelay = 1 * time.Second @@ -79,6 +85,14 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { }...) } + if len(co.Meta) > 0 { + opts = append(opts, client.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + // merge the existing context with new metadata. + ctx = metadata.NewOutgoingContext(ctx, co.Meta) + return invoker(ctx, method, req, reply, cc, opts...) + }))) + } + return client.New(ctx, d.InitConfig.EndpointAddr, opts...) }