You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
240 lines
7.1 KiB
Go
240 lines
7.1 KiB
Go
package otgrpc
|
|
|
|
import (
|
|
"github.com/opentracing/opentracing-go"
|
|
"github.com/opentracing/opentracing-go/ext"
|
|
"github.com/opentracing/opentracing-go/log"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/metadata"
|
|
"io"
|
|
"runtime"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// OpenTracingClientInterceptor returns a grpc.UnaryClientInterceptor suitable
|
|
// for use in a grpc.Dial call.
|
|
//
|
|
// For example:
|
|
//
|
|
// conn, err := grpc.Dial(
|
|
// address,
|
|
// ..., // (existing DialOptions)
|
|
// grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)))
|
|
//
|
|
// All gRPC client spans will inject the OpenTracing SpanContext into the gRPC
|
|
// metadata; they will also look in the context.Context for an active
|
|
// in-process parent Span and establish a ChildOf reference if such a parent
|
|
// Span could be found.
|
|
func OpenTracingClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryClientInterceptor {
|
|
otgrpcOpts := newOptions()
|
|
otgrpcOpts.apply(optFuncs...)
|
|
return func(
|
|
ctx context.Context,
|
|
method string,
|
|
req, resp interface{},
|
|
cc *grpc.ClientConn,
|
|
invoker grpc.UnaryInvoker,
|
|
opts ...grpc.CallOption,
|
|
) error {
|
|
var err error
|
|
var parentCtx opentracing.SpanContext
|
|
if parent := opentracing.SpanFromContext(ctx); parent != nil {
|
|
parentCtx = parent.Context()
|
|
}
|
|
if otgrpcOpts.inclusionFunc != nil &&
|
|
!otgrpcOpts.inclusionFunc(parentCtx, method, req, resp) {
|
|
return invoker(ctx, method, req, resp, cc, opts...)
|
|
}
|
|
clientSpan := tracer.StartSpan(
|
|
method,
|
|
opentracing.ChildOf(parentCtx),
|
|
ext.SpanKindRPCClient,
|
|
gRPCComponentTag,
|
|
)
|
|
defer clientSpan.Finish()
|
|
ctx = injectSpanContext(ctx, tracer, clientSpan)
|
|
if otgrpcOpts.logPayloads {
|
|
clientSpan.LogFields(log.Object("gRPC request", req))
|
|
}
|
|
err = invoker(ctx, method, req, resp, cc, opts...)
|
|
if err == nil {
|
|
if otgrpcOpts.logPayloads {
|
|
clientSpan.LogFields(log.Object("gRPC response", resp))
|
|
}
|
|
} else {
|
|
SetSpanTags(clientSpan, err, true)
|
|
clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
|
|
}
|
|
if otgrpcOpts.decorator != nil {
|
|
otgrpcOpts.decorator(clientSpan, method, req, resp, err)
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
// OpenTracingStreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
|
|
// for use in a grpc.Dial call. The interceptor instruments streaming RPCs by creating
|
|
// a single span to correspond to the lifetime of the RPC's stream.
|
|
//
|
|
// For example:
|
|
//
|
|
// conn, err := grpc.Dial(
|
|
// address,
|
|
// ..., // (existing DialOptions)
|
|
// grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer)))
|
|
//
|
|
// All gRPC client spans will inject the OpenTracing SpanContext into the gRPC
|
|
// metadata; they will also look in the context.Context for an active
|
|
// in-process parent Span and establish a ChildOf reference if such a parent
|
|
// Span could be found.
|
|
func OpenTracingStreamClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.StreamClientInterceptor {
|
|
otgrpcOpts := newOptions()
|
|
otgrpcOpts.apply(optFuncs...)
|
|
return func(
|
|
ctx context.Context,
|
|
desc *grpc.StreamDesc,
|
|
cc *grpc.ClientConn,
|
|
method string,
|
|
streamer grpc.Streamer,
|
|
opts ...grpc.CallOption,
|
|
) (grpc.ClientStream, error) {
|
|
var err error
|
|
var parentCtx opentracing.SpanContext
|
|
if parent := opentracing.SpanFromContext(ctx); parent != nil {
|
|
parentCtx = parent.Context()
|
|
}
|
|
if otgrpcOpts.inclusionFunc != nil &&
|
|
!otgrpcOpts.inclusionFunc(parentCtx, method, nil, nil) {
|
|
return streamer(ctx, desc, cc, method, opts...)
|
|
}
|
|
|
|
clientSpan := tracer.StartSpan(
|
|
method,
|
|
opentracing.ChildOf(parentCtx),
|
|
ext.SpanKindRPCClient,
|
|
gRPCComponentTag,
|
|
)
|
|
ctx = injectSpanContext(ctx, tracer, clientSpan)
|
|
cs, err := streamer(ctx, desc, cc, method, opts...)
|
|
if err != nil {
|
|
clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
|
|
SetSpanTags(clientSpan, err, true)
|
|
clientSpan.Finish()
|
|
return cs, err
|
|
}
|
|
return newOpenTracingClientStream(cs, method, desc, clientSpan, otgrpcOpts), nil
|
|
}
|
|
}
|
|
|
|
func newOpenTracingClientStream(cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan opentracing.Span, otgrpcOpts *options) grpc.ClientStream {
|
|
finishChan := make(chan struct{})
|
|
|
|
isFinished := new(int32)
|
|
*isFinished = 0
|
|
finishFunc := func(err error) {
|
|
// The current OpenTracing specification forbids finishing a span more than
|
|
// once. Since we have multiple code paths that could concurrently call
|
|
// `finishFunc`, we need to add some sort of synchronization to guard against
|
|
// multiple finishing.
|
|
if !atomic.CompareAndSwapInt32(isFinished, 0, 1) {
|
|
return
|
|
}
|
|
close(finishChan)
|
|
defer clientSpan.Finish()
|
|
if err != nil {
|
|
clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
|
|
SetSpanTags(clientSpan, err, true)
|
|
}
|
|
if otgrpcOpts.decorator != nil {
|
|
otgrpcOpts.decorator(clientSpan, method, nil, nil, err)
|
|
}
|
|
}
|
|
go func() {
|
|
select {
|
|
case <-finishChan:
|
|
// The client span is being finished by another code path; hence, no
|
|
// action is necessary.
|
|
case <-cs.Context().Done():
|
|
finishFunc(cs.Context().Err())
|
|
}
|
|
}()
|
|
otcs := &openTracingClientStream{
|
|
ClientStream: cs,
|
|
desc: desc,
|
|
finishFunc: finishFunc,
|
|
}
|
|
|
|
// The `ClientStream` interface allows one to omit calling `Recv` if it's
|
|
// known that the result will be `io.EOF`. See
|
|
// http://stackoverflow.com/q/42915337
|
|
// In such cases, there's nothing that triggers the span to finish. We,
|
|
// therefore, set a finalizer so that the span and the context goroutine will
|
|
// at least be cleaned up when the garbage collector is run.
|
|
runtime.SetFinalizer(otcs, func(otcs *openTracingClientStream) {
|
|
otcs.finishFunc(nil)
|
|
})
|
|
return otcs
|
|
}
|
|
|
|
type openTracingClientStream struct {
|
|
grpc.ClientStream
|
|
desc *grpc.StreamDesc
|
|
finishFunc func(error)
|
|
}
|
|
|
|
func (cs *openTracingClientStream) Header() (metadata.MD, error) {
|
|
md, err := cs.ClientStream.Header()
|
|
if err != nil {
|
|
cs.finishFunc(err)
|
|
}
|
|
return md, err
|
|
}
|
|
|
|
func (cs *openTracingClientStream) SendMsg(m interface{}) error {
|
|
err := cs.ClientStream.SendMsg(m)
|
|
if err != nil {
|
|
cs.finishFunc(err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (cs *openTracingClientStream) RecvMsg(m interface{}) error {
|
|
err := cs.ClientStream.RecvMsg(m)
|
|
if err == io.EOF {
|
|
cs.finishFunc(nil)
|
|
return err
|
|
} else if err != nil {
|
|
cs.finishFunc(err)
|
|
return err
|
|
}
|
|
if !cs.desc.ServerStreams {
|
|
cs.finishFunc(nil)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (cs *openTracingClientStream) CloseSend() error {
|
|
err := cs.ClientStream.CloseSend()
|
|
if err != nil {
|
|
cs.finishFunc(err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func injectSpanContext(ctx context.Context, tracer opentracing.Tracer, clientSpan opentracing.Span) context.Context {
|
|
md, ok := metadata.FromOutgoingContext(ctx)
|
|
if !ok {
|
|
md = metadata.New(nil)
|
|
} else {
|
|
md = md.Copy()
|
|
}
|
|
mdWriter := metadataReaderWriter{md}
|
|
err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, mdWriter)
|
|
// We have no better place to record an error than the Span itself :-/
|
|
if err != nil {
|
|
clientSpan.LogFields(log.String("event", "Tracer.Inject() failed"), log.Error(err))
|
|
}
|
|
return metadata.NewOutgoingContext(ctx, md)
|
|
}
|