Skip to content
Snippets Groups Projects
Commit a4652d8a authored by Juergen Enge's avatar Juergen Enge
Browse files

more sophisticated client reconnection on lost connection

parent 045dd9a2
No related branches found
Tags v2.0.13
No related merge requests found
package builder
package resolver
import (
"context"
"emperror.dev/errors"
"fmt"
pb "github.com/je4/miniresolver/v2/pkg/miniresolverproto"
"github.com/je4/utils/v2/pkg/zLogger"
"google.golang.org/grpc/resolver"
......@@ -12,7 +13,7 @@ import (
const RESOLVERSCHEMA = "miniresolver"
func NewMiniResolverResolverBuilder(miniResolverclient pb.MiniResolverClient, checkTimeout time.Duration, notFoundTimeout time.Duration, logger zLogger.ZLogger) resolver.Builder {
func NewMiniResolverResolverBuilder(miniResolverclient *MiniResolver, checkTimeout time.Duration, notFoundTimeout time.Duration, logger zLogger.ZLogger) resolver.Builder {
if time.Duration(checkTimeout).Seconds() == 0 {
checkTimeout = 10 * time.Minute
}
......@@ -28,7 +29,7 @@ func NewMiniResolverResolverBuilder(miniResolverclient pb.MiniResolverClient, ch
}
type miniResolverResolverBuilder struct {
miniResolverclient pb.MiniResolverClient
miniResolverclient *MiniResolver
logger zLogger.ZLogger
checkTimeout time.Duration
notFoundTimeout time.Duration
......@@ -44,10 +45,18 @@ func (mrrb *miniResolverResolverBuilder) Build(target resolver.Target, cc resolv
checkTimeout: mrrb.checkTimeout,
notFoundTimeout: mrrb.notFoundTimeout,
}
go func() {
refreshTarget := make(chan bool)
defer close(refreshTarget)
tstr := fmt.Sprintf("%s:%s", target.URL.Scheme, target.Endpoint())
mrrb.miniResolverclient.WatchService(tstr, refreshTarget)
defer mrrb.miniResolverclient.UnwatchService(tstr)
for {
timeout := r.doIt()
select {
case <-refreshTarget:
mrrb.logger.Debug().Msgf("refresh target %s", target.Endpoint())
case <-r.done:
return
case <-time.After(timeout):
......
package builder
package resolver
import (
pb "github.com/je4/miniresolver/v2/pkg/miniresolverproto"
"github.com/je4/utils/v2/pkg/zLogger"
"google.golang.org/grpc/resolver"
"time"
)
func RegisterResolver(miniresolverClient pb.MiniResolverClient, checkTimeout, notFoundTimeout time.Duration, logger zLogger.ZLogger) {
func RegisterResolver(miniresolverClient *MiniResolver, checkTimeout, notFoundTimeout time.Duration, logger zLogger.ZLogger) {
resolver.Register(NewMiniResolverResolverBuilder(miniresolverClient, checkTimeout, notFoundTimeout, logger))
}
package resolver
import (
"context"
"crypto/tls"
"emperror.dev/errors"
"fmt"
"github.com/je4/miniresolver/v2/pkg/builder"
pb "github.com/je4/miniresolver/v2/pkg/miniresolverproto"
"github.com/je4/utils/v2/pkg/zLogger"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"io"
"sync"
"time"
)
func newClient[V any](newClientFunc func(conn grpc.ClientConnInterface) V, serverAddr string, tlsConfig *tls.Config, opts ...grpc.DialOption) (V, io.Closer, error) {
if tlsConfig != nil {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
......@@ -50,6 +54,8 @@ func NewMiniresolverClient(serverAddr string, clientMap map[string]string, clien
dialOpts = []grpc.DialOption{}
}
res := &MiniResolver{
watchServices: map[string]chan<- bool{},
watchLock: sync.Mutex{},
clientMap: clientMap,
clientTLSConfig: clientTLSConfig,
serverTLSConfig: serverTLSConfig,
......@@ -57,12 +63,14 @@ func NewMiniresolverClient(serverAddr string, clientMap map[string]string, clien
serverOpts: []grpc.ServerOption{},
logger: logger,
}
res.SetDialOpts(grpc.WithUnaryInterceptor(res.unaryClientInterceptor))
res.SetDialOpts(grpc.WithStreamInterceptor(res.streamClientInterceptor))
if serverAddr != "" {
res.MiniResolverClient, res.conn, err = newClient[pb.MiniResolverClient](pb.NewMiniResolverClient, serverAddr, clientTLSConfig, dialOpts...)
if err != nil {
return nil, errors.Wrapf(err, "cannot create client for %s", serverAddr)
}
builder.RegisterResolver(res.MiniResolverClient, resolverTimeout, resolverNotFoundTimeout, logger)
RegisterResolver(res, resolverTimeout, resolverNotFoundTimeout, logger)
}
return res, nil
......@@ -71,6 +79,8 @@ func NewMiniresolverClient(serverAddr string, clientMap map[string]string, clien
type MiniResolver struct {
pb.MiniResolverClient
conn io.Closer
watchLock sync.Mutex
watchServices map[string]chan<- bool
clientCloser []io.Closer
clientTLSConfig *tls.Config
dialOpts []grpc.DialOption
......@@ -88,6 +98,38 @@ func (c *MiniResolver) SetServerOpts(options ...grpc.ServerOption) {
c.serverOpts = append(c.serverOpts, options...)
}
func (c *MiniResolver) streamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
start := time.Now()
clientStream, err := streamer(ctx, desc, cc, method, opts...)
end := time.Now()
c.logger.Debug().Msgf("RPC Stream: %s, duration: %s, err: %v", method, end.Sub(start).String(), err)
if err != nil {
if stat, ok := status.FromError(err); ok {
if stat.Code() == codes.Unavailable {
c.RefreshResolver(cc.Target())
}
}
return nil, errors.Wrapf(err, "RPC: %s", method)
}
return clientStream, nil
}
func (c *MiniResolver) unaryClientInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
end := time.Now()
c.logger.Debug().Msgf("RPC Unary: %s, duration: %s, err: %v", method, end.Sub(start).String(), err)
if err != nil {
if status, ok := status.FromError(err); ok {
if status.Code() == codes.Unavailable {
c.RefreshResolver(cc.Target())
}
}
return errors.Wrapf(err, "RPC: %s", method)
}
return nil
}
func NewClient[V any](c *MiniResolver, newClientFunc func(conn grpc.ClientConnInterface) V, serviceName string) (V, error) {
var n V
var clientAddr string
......@@ -134,3 +176,34 @@ func (c *MiniResolver) NewServer(addr string) (*Server, error) {
}
return server, nil
}
func (c *MiniResolver) WatchService(target string, reloadChannel chan<- bool) {
c.logger.Debug().Msgf("watch service %s", target)
c.watchLock.Lock()
defer c.watchLock.Unlock()
c.watchServices[target] = reloadChannel
}
func (c *MiniResolver) UnwatchService(target string) {
c.logger.Debug().Msgf("unwatch service %s", target)
c.watchLock.Lock()
defer c.watchLock.Unlock()
delete(c.watchServices, target)
}
func (c *MiniResolver) RefreshResolver(target string) {
c.watchLock.Lock()
ch, ok := c.watchServices[target]
c.watchLock.Unlock()
if ok {
select {
case ch <- true:
c.logger.Debug().Msgf("refresh service %s", target)
case <-time.After(2 * time.Second):
c.logger.Error().Msgf("cannot refresh resolver for %s", target)
c.logger.Debug().Msgf("timeout refresh service %s", target)
}
} else {
c.logger.Debug().Msgf("service %s not in watch map", target)
}
}
......@@ -65,7 +65,7 @@ func (s *Server) Startup() {
s.waitShutdown.Add(2)
go func() {
defer s.waitShutdown.Done()
s.logger.Info().Msgf("starting server at %s", s.listener.Addr().String())
s.logger.Info().Msg("starting server")
if err := s.Server.Serve(s.listener); err != nil {
s.logger.Error().Err(err).Msg("cannot serve")
} else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment