Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix Panic caused by failure to connect to the peer when obtaining remote metadata #2651

Merged
merged 3 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions common/metadata_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,24 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
)

var IncludeKeys = gxset.NewSet(
constant.ApplicationKey,
constant.GroupKey,
constant.TimestampKey,
constant.SerializationKey,
constant.ClusterKey,
constant.LoadbalanceKey,
constant.PathKey,
constant.TimeoutKey,
constant.TokenKey,
constant.VersionKey,
constant.WarmupKey,
constant.WeightKey,
constant.ReleaseKey)
var (
IncludeKeys = gxset.NewSet(
constant.ApplicationKey,
constant.GroupKey,
constant.TimestampKey,
constant.SerializationKey,
constant.ClusterKey,
constant.LoadbalanceKey,
constant.PathKey,
constant.TimeoutKey,
constant.TokenKey,
constant.VersionKey,
constant.WarmupKey,
constant.WeightKey,
constant.ReleaseKey)

EmptyMetadataInfo = &MetadataInfo{}
)

// MetadataInfo the metadata information of instance
type MetadataInfo struct {
Expand Down
3 changes: 3 additions & 0 deletions metadata/service/local/metadata_service_proxy_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func createProxy(ins registry.ServiceInstance) service.MetadataService {
u := urls[0]
p := extension.GetProtocol(u.Protocol)
invoker := p.Refer(u)
if invoker == nil { // can't connect instance
return nil
}
return &MetadataServiceProxy{
Invoker: invoker,
}
Expand Down
7 changes: 5 additions & 2 deletions registry/servicediscovery/service_discovery_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ func (s *ServiceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.No
}
logger.Infof("Find initial mapping applications %q for service %s.", services, url.ServiceKey())
// first notify
mappingListener.OnEvent(registry.NewServiceMappingChangedEvent(url.ServiceKey(), services))
err = mappingListener.OnEvent(registry.NewServiceMappingChangedEvent(url.ServiceKey(), services))
if err != nil {
logger.Errorf("[ServiceDiscoveryRegistry] ServiceInstancesChangedListenerImpl handle error:%v", err)
}
return nil
}

Expand All @@ -246,7 +249,7 @@ func (s *ServiceDiscoveryRegistry) SubscribeURL(url *common.URL, notify registry
Instances: instances,
})
if err != nil {
logger.Warnf("[ServiceDiscoveryRegistry] ServiceInstancesChangedListenerImpl handle error:%v", err)
logger.Errorf("[ServiceDiscoveryRegistry] ServiceInstancesChangedListenerImpl handle error:%v", err)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package servicediscovery

import (
"encoding/gob"
"errors"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -87,7 +88,6 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error
if !ok {
return nil
}
var err error

lstn.mutex.Lock()
defer lstn.mutex.Unlock()
Expand Down Expand Up @@ -119,15 +119,7 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error
revisionToInstances[revision] = append(subInstances, instance)
metadataInfo := lstn.revisionToMetadata[revision]
if metadataInfo == nil {
if val, ok := metaCache.Get(revision); ok {
metadataInfo = val.(*common.MetadataInfo)
} else {
metadataInfo, err = GetMetadataInfo(lstn.app, instance, revision)
if err != nil {
return err
}
metaCache.Set(revision, metadataInfo)
}
metadataInfo = GetMetadataInfo(lstn.app, instance, revision)
}
instance.SetServiceMetadata(metadataInfo)
for _, service := range metadataInfo.Services {
Expand Down Expand Up @@ -225,44 +217,49 @@ func (lstn *ServiceInstancesChangedListenerImpl) GetEventType() reflect.Type {
}

// GetMetadataInfo get metadata info when MetadataStorageTypePropertyName is null
func GetMetadataInfo(app string, instance registry.ServiceInstance, revision string) (*common.MetadataInfo, error) {
func GetMetadataInfo(app string, instance registry.ServiceInstance, revision string) *common.MetadataInfo {
cacheOnce.Do(func() {
initCache(app)
})
if metadataInfo, ok := metaCache.Get(revision); ok {
return metadataInfo.(*common.MetadataInfo), nil
return metadataInfo.(*common.MetadataInfo)
}

var metadataStorageType string
var metadataInfo *common.MetadataInfo
metadataInfo := common.EmptyMetadataInfo
var err error
if instance.GetMetadata() == nil {
metadataStorageType = constant.DefaultMetadataStorageType
} else {
metadataStorageType = instance.GetMetadata()[constant.MetadataStorageTypePropertyName]
}
if metadataStorageType == constant.RemoteMetadataStorageType {
remoteMetadataServiceImpl, err := extension.GetRemoteMetadataService()
if err != nil {
return nil, err
}
metadataInfo, err = remoteMetadataServiceImpl.GetMetadata(instance)
if err != nil {
return nil, err
remoteMetadataServiceImpl, remoteMetadataErr := extension.GetRemoteMetadataService()
if remoteMetadataErr == nil {
metadataInfo, err = remoteMetadataServiceImpl.GetMetadata(instance)
} else {
err = remoteMetadataErr
}
} else {
var err error
proxyFactory := extension.GetMetadataServiceProxyFactory(constant.DefaultKey)
metadataService := proxyFactory.GetProxy(instance)
defer destroyInvoker(metadataService)
metadataInfo, err = metadataService.GetMetadataInfo(revision)
if err != nil {
return nil, err
if metadataService != nil {
defer destroyInvoker(metadataService)
metadataInfo, err = metadataService.GetMetadataInfo(revision)
} else {
err = errors.New("get remote metadata error please check instance " + instance.GetHost() + " is alive")
}
}

metaCache.Set(revision, metadataInfo)
if err != nil {
logger.Errorf("get metadata of %s failed, %v", instance.GetHost(), err)
}

if metadataInfo != common.EmptyMetadataInfo {
metaCache.Set(revision, metadataInfo)
}

return metadataInfo, nil
return metadataInfo
}

func destroyInvoker(metadataService service.MetadataService) {
Expand Down
Loading