Skip to content
Snippets Groups Projects
Commit fa8116ad authored by Juergen Enge's avatar Juergen Enge
Browse files

domain detection for client interceptor optimized

parent 48762940
No related merge requests found
......@@ -11,6 +11,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io"
"os"
......@@ -67,7 +68,10 @@ func NewMiniresolverClient(serverAddr string, clientMap map[string]string, clien
logger: logger,
}
//res.SetServerOpts(grpc.ChainUnaryInterceptor(res.unaryServerInterceptor), grpc.ChainStreamInterceptor(res.streamServerInterceptor))
res.SetDialOpts(grpc.WithUnaryInterceptor(res.unaryClientInterceptor), grpc.WithStreamInterceptor(res.streamClientInterceptor))
res.SetDialOpts(
grpc.WithUnaryInterceptor(res.getUnaryClientInterceptor()),
grpc.WithStreamInterceptor(res.getStreamClientInterceptor()),
)
if serverAddr != "" {
res.MiniResolverClient, res.conn, err = newClient[pb.MiniResolverClient](pb.NewMiniResolverClient, serverAddr, clientTLSConfig, res.dialOpts...)
if err != nil {
......@@ -101,38 +105,62 @@ func (c *MiniResolver) SetServerOpts(options ...grpc.ServerOption) {
c.serverOpts = append(c.serverOpts, options...)
}
func (c *MiniResolver) streamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
start := time.Now()
clientStream, err := streamer(ctx, desc, cc, method, opts...)
end := time.Now()
c.logger.Debug().Msgf("RPC Stream: %s, duration: %s, err: %v", method, end.Sub(start).String(), err)
if err != nil {
if stat, ok := status.FromError(err); ok {
if stat.Code() == codes.Unavailable {
c.RefreshResolver(cc.Target())
var domainRegexp = regexp.MustCompile(`^miniresolver:([a-zA-Z0-9-]+)\.([a-zA-Z0-9-]+)\.([a-zA-Z0-9-]+)`)
func (c *MiniResolver) getStreamClientInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
var domain string
if matches := domainRegexp.FindStringSubmatch(cc.Target()); matches != nil {
domain = matches[1]
}
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
}
md.Set("domain", domain)
ctx = metadata.NewOutgoingContext(ctx, md)
start := time.Now()
clientStream, err := streamer(ctx, desc, cc, method, opts...)
end := time.Now()
c.logger.Debug().Msgf("RPC Stream: %s, duration: %s, err: %v", method, end.Sub(start).String(), err)
if err != nil {
if stat, ok := status.FromError(err); ok {
if stat.Code() == codes.Unavailable {
c.RefreshResolver(cc.Target())
}
}
return nil, errors.Wrapf(err, "RPC: %s", method)
}
return nil, errors.Wrapf(err, "RPC: %s", method)
return clientStream, nil
}
return clientStream, nil
}
var domainRegexp = regexp.MustCompile(`^([a-zA-Z0-9-]+)\.([a-zA-Z0-9-]+)\.([a-zA-Z0-9-]+)`)
func (c *MiniResolver) unaryClientInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
end := time.Now()
c.logger.Debug().Msgf("RPC Unary: %s, duration: %s, err: %v", method, end.Sub(start).String(), err)
if err != nil {
if status, ok := status.FromError(err); ok {
if status.Code() == codes.Unavailable {
c.RefreshResolver(cc.Target())
func (c *MiniResolver) getUnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
var domain string
if matches := domainRegexp.FindStringSubmatch(cc.Target()); matches != nil {
domain = matches[1]
}
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
}
md.Set("domain", domain)
ctx = metadata.NewOutgoingContext(ctx, md)
err := invoker(ctx, method, req, reply, cc, opts...)
end := time.Now()
c.logger.Debug().Str("domain", domain).Str("method", method).Dur("duration", end.Sub(start)).Err(err)
if err != nil {
if status, ok := status.FromError(err); ok {
if status.Code() == codes.Unavailable {
c.RefreshResolver(cc.Target())
}
}
return errors.Wrapf(err, "RPC: %s", method)
}
return errors.Wrapf(err, "RPC: %s", method)
return nil
}
return nil
}
func NewClients[V any](c *MiniResolver, newClientFunc func(conn grpc.ClientConnInterface) V, serviceName string, domains []string) (map[string]V, error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment