driver: allow attaching additional metadata to the client

Signed-off-by: CrazyMax <crazy-max@users.noreply.github.com>
pull/2018/head
CrazyMax 1 year ago
parent 29a496cdab
commit 905bee450a
No known key found for this signature in database
GPG Key ID: 3248E46B6BB8C7F7

@ -29,6 +29,8 @@ import (
"github.com/moby/buildkit/client" "github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/tracing/detect" "github.com/moby/buildkit/util/tracing/detect"
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
) )
const ( const (
@ -359,7 +361,12 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil return nil
} }
func (d *Driver) Client(ctx context.Context) (*client.Client, error) { func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) {
co := driver.ClientOptions{}
for _, opt := range copts {
opt(&co)
}
_, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"}) _, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"})
if err != nil { if err != nil {
return nil, err return nil, err
@ -380,9 +387,19 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
} }
return conn, nil return conn, nil
})) }))
if td, ok := exp.(client.TracerDelegate); ok { if td, ok := exp.(client.TracerDelegate); ok {
opts = append(opts, client.WithTracerDelegate(td)) opts = append(opts, client.WithTracerDelegate(td))
} }
if len(co.Meta) > 0 {
opts = append(opts, client.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// merge the existing context with new metadata.
ctx = metadata.NewOutgoingContext(ctx, co.Meta)
return invoker(ctx, method, req, reply, cc, opts...)
})))
}
return client.New(ctx, "", opts...) return client.New(ctx, "", opts...)
} }

@ -50,9 +50,14 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil return nil
} }
func (d *Driver) Client(ctx context.Context) (*client.Client, error) { func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) {
co := driver.ClientOptions{}
for _, opt := range copts {
opt(&co)
}
return client.New(ctx, "", client.WithContextDialer(func(context.Context, string) (net.Conn, error) { return client.New(ctx, "", client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", nil) return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", co.Meta)
}), client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) { }), client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) {
return d.DockerAPI.DialHijack(ctx, "/session", proto, meta) return d.DockerAPI.DialHijack(ctx, "/session", proto, meta)
})) }))

@ -58,7 +58,7 @@ type Driver interface {
Version(context.Context) (string, error) Version(context.Context) (string, error)
Stop(ctx context.Context, force bool) error Stop(ctx context.Context, force bool) error
Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error
Client(ctx context.Context) (*client.Client, error) Client(ctx context.Context, opts ...ClientOption) (*client.Client, error)
Features(ctx context.Context) map[Feature]bool Features(ctx context.Context) map[Feature]bool
IsMobyDriver() bool IsMobyDriver() bool
Config() InitConfig Config() InitConfig
@ -110,3 +110,15 @@ func historyAPISupported(ctx context.Context, c *client.Client) bool {
} }
} }
} }
type ClientOption func(*ClientOptions)
type ClientOptions struct {
Meta map[string][]string
}
func WithMeta(meta map[string][]string) ClientOption {
return func(o *ClientOptions) {
o.Meta = meta
}
}

@ -17,6 +17,8 @@ import (
"github.com/moby/buildkit/client" "github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/tracing/detect" "github.com/moby/buildkit/util/tracing/detect"
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -189,7 +191,12 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil return nil
} }
func (d *Driver) Client(ctx context.Context) (*client.Client, error) { func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) {
co := driver.ClientOptions{}
for _, opt := range copts {
opt(&co)
}
restClient := d.clientset.CoreV1().RESTClient() restClient := d.clientset.CoreV1().RESTClient()
restClientConfig, err := d.KubeClientConfig.ClientConfig() restClientConfig, err := d.KubeClientConfig.ClientConfig()
if err != nil { if err != nil {
@ -218,9 +225,19 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) { opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return conn, nil return conn, nil
})) }))
if td, ok := exp.(client.TracerDelegate); ok { if td, ok := exp.(client.TracerDelegate); ok {
opts = append(opts, client.WithTracerDelegate(td)) opts = append(opts, client.WithTracerDelegate(td))
} }
if len(co.Meta) > 0 {
opts = append(opts, client.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// merge the existing context with new metadata.
ctx = metadata.NewOutgoingContext(ctx, co.Meta)
return invoker(ctx, method, req, reply, cc, opts...)
})))
}
return client.New(ctx, "", opts...) return client.New(ctx, "", opts...)
} }

@ -159,9 +159,9 @@ type DriverHandle struct {
hostGatewayIPErr error hostGatewayIPErr error
} }
func (d *DriverHandle) Client(ctx context.Context) (*client.Client, error) { func (d *DriverHandle) Client(ctx context.Context, copts ...ClientOption) (*client.Client, error) {
d.once.Do(func() { d.once.Do(func() {
d.client, d.err = d.Driver.Client(ctx) d.client, d.err = d.Driver.Client(ctx, copts...)
}) })
return d.client, d.err return d.client, d.err
} }

@ -9,6 +9,7 @@ import (
"github.com/moby/buildkit/client" "github.com/moby/buildkit/client"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/backoff" "google.golang.org/grpc/backoff"
"google.golang.org/grpc/metadata"
) )
type Driver struct { type Driver struct {
@ -63,8 +64,13 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil return nil
} }
func (d *Driver) Client(ctx context.Context) (*client.Client, error) { func (d *Driver) Client(ctx context.Context, copts ...driver.ClientOption) (*client.Client, error) {
opts := []client.ClientOpt{} co := driver.ClientOptions{}
for _, opt := range copts {
opt(&co)
}
var opts []client.ClientOpt
backoffConfig := backoff.DefaultConfig backoffConfig := backoff.DefaultConfig
backoffConfig.MaxDelay = 1 * time.Second backoffConfig.MaxDelay = 1 * time.Second
@ -79,6 +85,14 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
}...) }...)
} }
if len(co.Meta) > 0 {
opts = append(opts, client.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// merge the existing context with new metadata.
ctx = metadata.NewOutgoingContext(ctx, co.Meta)
return invoker(ctx, method, req, reply, cc, opts...)
})))
}
return client.New(ctx, d.InitConfig.EndpointAddr, opts...) return client.New(ctx, d.InitConfig.EndpointAddr, opts...)
} }

Loading…
Cancel
Save