diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index 6cbed7748e..c693e75b6f 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -18,6 +18,8 @@ package cluster_impl import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" perrors "github.com/pkg/errors" "go.uber.org/atomic" ) @@ -115,12 +117,24 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p } func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool { - if len(invoked) > 0 { - for _, i := range invoked { - if i == selectedInvoker { - return true - } + for _, i := range invoked { + if i == selectedInvoker { + return true } } return false } + +func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance { + url := invoker.GetUrl() + + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + return extension.GetLoadbalance(lb) +} diff --git a/cluster/cluster_impl/failfast_cluster.go b/cluster/cluster_impl/failfast_cluster.go new file mode 100644 index 0000000000..6301d94562 --- /dev/null +++ b/cluster/cluster_impl/failfast_cluster.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failfastCluster struct{} + +const failfast = "failfast" + +func init() { + extension.SetCluster(failfast, NewFailFastCluster) +} + +func NewFailFastCluster() cluster.Cluster { + return &failfastCluster{} +} + +func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailFastClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failfast_cluster_invoker.go b/cluster/cluster_impl/failfast_cluster_invoker.go new file mode 100644 index 0000000000..f473ef5c28 --- /dev/null +++ b/cluster/cluster_impl/failfast_cluster_invoker.go @@ -0,0 +1,53 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/protocol" +) + +type failfastClusterInvoker struct { + baseClusterInvoker +} + +func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &failfastClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + + if err != nil { + return &protocol.RPCResult{Err: err} + } + + loadbalance := getLoadBalance(invokers[0], invocation) + + err = invoker.checkWhetherDestroyed() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + ivk := invoker.doSelect(loadbalance, invocation, invokers, nil) + return ivk.Invoke(invocation) +} diff --git a/cluster/cluster_impl/failfast_cluster_test.go b/cluster/cluster_impl/failfast_cluster_test.go new file mode 100644 index 0000000000..094c7e8442 --- /dev/null +++ b/cluster/cluster_impl/failfast_cluster_test.go @@ -0,0 +1,96 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +var ( + failfastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +// register_failfast register failfastCluster to cluster extension. +func register_failfast(t *testing.T, invoker *protocol.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failfastCluster := NewFailFastCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + + invoker.EXPECT().GetUrl().Return(failfastUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failfastCluster.Join(staticDir) + return clusterInvoker +} + +func Test_FailfastInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := protocol.NewMockInvoker(ctrl) + clusterInvoker := register_failfast(t, invoker) + + invoker.EXPECT().GetUrl().Return(failfastUrl) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + res := result.Result().(rest) + assert.True(t, res.success) + assert.Equal(t, 0, res.tried) +} + +func Test_FailfastInvokeFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := protocol.NewMockInvoker(ctrl) + clusterInvoker := register_failfast(t, invoker) + + invoker.EXPECT().GetUrl().Return(failfastUrl) + + mockResult := &protocol.RPCResult{Err: perrors.New("error")} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NotNil(t, result.Error()) + assert.Equal(t, "error", result.Error().Error()) + assert.Nil(t, result.Result()) +} diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go index cd17a85e00..63b547691a 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster_impl/failover_cluster_invoker.go @@ -24,7 +24,6 @@ import ( import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/version" @@ -48,17 +47,11 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr if err != nil { return &protocol.RPCResult{Err: err} } - url := invokers[0].GetUrl() - methodName := invocation.MethodName() - //Get the service loadbalance config - lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + loadbalance := getLoadBalance(invokers[0], invocation) - //Get the service method loadbalance config if have - if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { - lb = v - } - loadbalance := extension.GetLoadbalance(lb) + methodName := invocation.MethodName() + url := invoker.GetUrl() //get reties retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) diff --git a/go.mod b/go.mod index 217206796f..d9c893955c 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ require ( github.com/dubbogo/getty v1.2.0 github.com/dubbogo/gost v1.1.1 github.com/dubbogo/hessian2 v1.2.0 + github.com/golang/mock v1.3.1 github.com/magiconair/properties v1.8.1 github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec diff --git a/go.sum b/go.sum index 41c0365c25..4a909efba0 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= github.com/dubbogo/hessian2 v1.2.0 h1:5wFYuMzzRhneUAPbVBVKubIknrEjUM/B76vievYD0Vw= github.com/dubbogo/hessian2 v1.2.0/go.mod h1:7EohF3mE7xmZcj43nP172sapRHOEifcV/jwyHhG4SaY= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= @@ -30,12 +32,15 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= diff --git a/protocol/invoker.go b/protocol/invoker.go index fe6aab848c..c7a74b3255 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -22,6 +22,7 @@ import ( "github.com/apache/dubbo-go/common/logger" ) +//go:generate mockgen -source invoker.go -destination mock_invoker.go -self_package github.com/apache/dubbo-go/protocol --package protocol Invoker // Extension - Invoker type Invoker interface { common.Node diff --git a/protocol/mock_invoker.go b/protocol/mock_invoker.go new file mode 100644 index 0000000000..e31ec6cf48 --- /dev/null +++ b/protocol/mock_invoker.go @@ -0,0 +1,80 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: invoker.go + +// Package protocol is a generated GoMock package. +package protocol + +import ( + common "github.com/apache/dubbo-go/common" + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockInvoker is a mock of Invoker interface +type MockInvoker struct { + ctrl *gomock.Controller + recorder *MockInvokerMockRecorder +} + +// MockInvokerMockRecorder is the mock recorder for MockInvoker +type MockInvokerMockRecorder struct { + mock *MockInvoker +} + +// NewMockInvoker creates a new mock instance +func NewMockInvoker(ctrl *gomock.Controller) *MockInvoker { + mock := &MockInvoker{ctrl: ctrl} + mock.recorder = &MockInvokerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockInvoker) EXPECT() *MockInvokerMockRecorder { + return m.recorder +} + +// GetUrl mocks base method +func (m *MockInvoker) GetUrl() common.URL { + ret := m.ctrl.Call(m, "GetUrl") + ret0, _ := ret[0].(common.URL) + return ret0 +} + +// GetUrl indicates an expected call of GetUrl +func (mr *MockInvokerMockRecorder) GetUrl() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUrl", reflect.TypeOf((*MockInvoker)(nil).GetUrl)) +} + +// IsAvailable mocks base method +func (m *MockInvoker) IsAvailable() bool { + ret := m.ctrl.Call(m, "IsAvailable") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsAvailable indicates an expected call of IsAvailable +func (mr *MockInvokerMockRecorder) IsAvailable() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsAvailable", reflect.TypeOf((*MockInvoker)(nil).IsAvailable)) +} + +// Destroy mocks base method +func (m *MockInvoker) Destroy() { + m.ctrl.Call(m, "Destroy") +} + +// Destroy indicates an expected call of Destroy +func (mr *MockInvokerMockRecorder) Destroy() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Destroy", reflect.TypeOf((*MockInvoker)(nil).Destroy)) +} + +// Invoke mocks base method +func (m *MockInvoker) Invoke(arg0 Invocation) Result { + ret := m.ctrl.Call(m, "Invoke", arg0) + ret0, _ := ret[0].(Result) + return ret0 +} + +// Invoke indicates an expected call of Invoke +func (mr *MockInvokerMockRecorder) Invoke(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Invoke", reflect.TypeOf((*MockInvoker)(nil).Invoke), arg0) +}