rpcx_server.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. // Copyright 2020 github.com. All rights reserved.
  2. // Use of this source code is governed by github.com.
  3. /**
  4. * @Author: mac
  5. * @Description: rpcx opentracing serverInterceptor
  6. * @Date: 2020/3/25 10:25
  7. */
  8. package trace
  9. import (
  10. "context"
  11. "github.com/opentracing/opentracing-go"
  12. "github.com/opentracing/opentracing-go/ext"
  13. "github.com/opentracing/opentracing-go/log"
  14. "github.com/smallnest/rpcx/protocol"
  15. "github.com/smallnest/rpcx/server"
  16. "github.com/smallnest/rpcx/share"
  17. "github.com/uber/jaeger-client-go"
  18. "net"
  19. )
  20. type RpcxOpenTracingServerPlugin struct{}
  21. func (p RpcxOpenTracingServerPlugin) Register(name string, rcvr interface{}, metadata string) error {
  22. span := opentracing.StartSpan(
  23. "rpcx.Register")
  24. defer span.Finish()
  25. span.LogFields(log.String("register_service", name))
  26. return nil
  27. }
  28. func (p RpcxOpenTracingServerPlugin) RegisterFunction(serviceName, fname string, fn interface{}, metadata string) error {
  29. span := opentracing.StartSpan(
  30. "rpcx.RegisterFunction")
  31. defer span.Finish()
  32. span.LogFields(log.String("register_function", serviceName+"."+fname))
  33. return nil
  34. }
  35. func (p RpcxOpenTracingServerPlugin) PostConnAccept(conn net.Conn) (net.Conn, bool) {
  36. span1 := opentracing.StartSpan(
  37. "rpcx.AcceptConn")
  38. defer span1.Finish()
  39. span1.LogFields(log.String("remote_addr", conn.RemoteAddr().String()))
  40. return conn, true
  41. }
  42. func (p RpcxOpenTracingServerPlugin) PreHandleRequest(ctx context.Context, r *protocol.Message) error {
  43. wireContext, err := share.GetSpanContextFromContext(ctx)
  44. if err != nil || wireContext == nil {
  45. return err
  46. }
  47. span := opentracing.StartSpan(
  48. "rpcx.service."+r.ServicePath+"."+r.ServiceMethod,
  49. ext.RPCServerOption(wireContext))
  50. clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
  51. span.LogFields(
  52. log.String("network", clientConn.RemoteAddr().Network()),
  53. log.String("remote_addr", clientConn.RemoteAddr().String()))
  54. if rpcxContext, ok := ctx.(*share.Context); ok {
  55. rpcxContext.SetValue(share.OpentracingSpanServerKey, span)
  56. }
  57. // 设置 request ID,可用于日志记录, 使用ctx.Value("RequestId")可以取出
  58. if sc, ok := span.Context().(jaeger.SpanContext); ok {
  59. traceId := sc.TraceID().String()
  60. ctx = context.WithValue(ctx, "RequestId", traceId)
  61. }
  62. return nil
  63. }
  64. func (p RpcxOpenTracingServerPlugin) PostWriteResponse(ctx context.Context, req *protocol.Message, res *protocol.Message, err error) error {
  65. if rpcxContext, ok := ctx.(*share.Context); ok {
  66. span := rpcxContext.Value(share.OpentracingSpanServerKey)
  67. if span != nil {
  68. sp := span.(opentracing.Span)
  69. sp.Finish()
  70. }
  71. }
  72. return nil
  73. }