Skip to content

Commit

Permalink
Merge pull request apache#1843, support implicit delivery of attachme…
Browse files Browse the repository at this point in the history
…nts from provider to consumer.

Fixes apache#889, apache#1466, apache#1834, apache#1466, apache#1524
  • Loading branch information
chickenlj authored May 31, 2018
1 parent a277b38 commit 912995a
Show file tree
Hide file tree
Showing 38 changed files with 353 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ private String toKey(Object[] args) {

private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}

private long hash(byte[] digest, int number) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public class Constants {

public static final String LOADBALANCE_KEY = "loadbalance";

// key for router type, for e.g., "script"/"file", corresponding to ScriptRouterFactory.NAME, FileRouterFactory.NAME
// key for router type, for e.g., "script"/"file", corresponding to ScriptRouterFactory.NAME, FileRouterFactory.NAME
public static final String ROUTER_KEY = "router";

public static final String CLUSTER_KEY = "cluster";
Expand Down Expand Up @@ -624,7 +624,7 @@ public class Constants {
public static final String QOS_PORT = "qos.port";

public static final String ACCEPT_FOREIGN_IP = "qos.accept.foreign.ip";

public static final String HESSIAN2_REQUEST_KEY = "hessian2.request";

public static final boolean DEFAULT_HESSIAN2_REQUEST = false;
Expand Down
46 changes: 43 additions & 3 deletions dubbo-common/src/main/java/com/alibaba/dubbo/common/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,28 @@
import java.net.URL;
import java.security.CodeSource;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Version
*/
public final class Version {

private static final String DEFAULT_DUBBO_VERSION = "2.0.0";
private static final Logger logger = LoggerFactory.getLogger(Version.class);
private static final String VERSION = getVersion(Version.class, DEFAULT_DUBBO_VERSION);

// Dubbo RPC protocol version
public static final String DEFAULT_DUBBO_PROTOCOL_VERSION = "2.0.2";
// Dubbo implementation version, usually is jar version.
private static final String VERSION = getVersion(Version.class, "");

/**
* For protocol compatibility purpose.
* Because {@link #isSupportResponseAttatchment} is checked for every call, int compare expect to has higher performance than string.
*/
private static final int LOWEST_VERSION_FOR_RESPONSE_ATTATCHMENT = 202; // 2.0.2
private static final Map<String, Integer> VERSION2INT = new HashMap<String, Integer>();

static {
// check if there's duplicated jar
Expand All @@ -43,10 +54,39 @@ public final class Version {
private Version() {
}

public static String getProtocolVersion() {
return DEFAULT_DUBBO_PROTOCOL_VERSION;
}

public static String getVersion() {
return VERSION;
}

public static boolean isSupportResponseAttatchment(String version) {
if (version == null || version.length() == 0) {
return false;
}
return getIntVersion(version) >= LOWEST_VERSION_FOR_RESPONSE_ATTATCHMENT;
}

public static int getIntVersion(String version) {
Integer v = VERSION2INT.get(version);
if (v == null) {
v = parseInt(version);
VERSION2INT.put(version, v);
}
return v;
}

private static int parseInt(String version) {
int v = 0;
String[] vArr = version.split("\\.");
int len = vArr.length;
for (int i = 1; i <= len; i++) {
v += Integer.parseInt(vArr[len - i]) * Math.pow(10, i - 1);
}
return v;
}

private static boolean hasResource(String path) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.common.version;


import com.alibaba.dubbo.common.Version;

import org.junit.Assert;
import org.junit.Test;

public class VersionTest {

@Test
public void testGetProtocolVersion() {
Assert.assertEquals(Version.getProtocolVersion(), Version.DEFAULT_DUBBO_PROTOCOL_VERSION);
}

@Test
public void testSupportResponseAttatchment() {
Assert.assertTrue(Version.isSupportResponseAttatchment("2.0.2"));
Assert.assertTrue(Version.isSupportResponseAttatchment("2.0.3"));
Assert.assertFalse(Version.isSupportResponseAttatchment("2.0.0"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected List<URL> loadRegistries(boolean provider) {
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getVersion());
map.put("dubbo", Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
Expand Down Expand Up @@ -220,7 +220,7 @@ protected URL loadMonitor(URL registryURL) {
appendProperties(monitor);
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.INTERFACE_KEY, MonitorService.class.getName());
map.put("dubbo", Version.getVersion());
map.put("dubbo", Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private void init() {
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> r

Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.alibaba.dubbo.config.ServiceConfig;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.RpcInvocation;

import junit.framework.TestCase;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.alibaba.dubbo.config.spring.ReferenceBean;
import com.alibaba.dubbo.config.spring.ServiceBean;
import com.alibaba.dubbo.rpc.Protocol;

import org.springframework.beans.PropertyValue;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
Expand Down Expand Up @@ -213,7 +214,7 @@ private static BeanDefinition parse(Element element, ParserContext parserContext
String invokeRefMethod = value.substring(index + 1);
reference = new RuntimeBeanReference(invokeRef);
beanDefinition.getPropertyValues().addPropertyValue("oninvokeMethod", invokeRefMethod);
}else {
} else {
if ("ref".equals(property) && parserContext.getRegistry().containsBeanDefinition(value)) {
BeanDefinition refBean = parserContext.getRegistry().getBeanDefinition(value);
if (!refBean.isSingleton()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alibaba.dubbo.remoting.exchange.codec;

import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.io.Bytes;
import com.alibaba.dubbo.common.io.StreamUtils;
import com.alibaba.dubbo.common.logger.Logger;
Expand Down Expand Up @@ -173,7 +174,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
} else {
// decode request.
Request req = new Request(id);
req.setVersion("2.0.0");
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
Expand Down Expand Up @@ -231,7 +232,7 @@ protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData());
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
Expand Down Expand Up @@ -274,7 +275,7 @@ protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response re
if (res.isHeartbeat()) {
encodeHeartbeatData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult());
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
} else out.writeUTF(res.getErrorMessage());
out.flushBuffer();
Expand Down Expand Up @@ -442,4 +443,13 @@ protected void encodeResponseData(Channel channel, ObjectOutput out, Object data
encodeResponseData(out, data);
}

protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
encodeRequestData(out, data);
}

protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
encodeResponseData(out, data);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.Channel;
Expand Down Expand Up @@ -88,7 +89,7 @@ public void send(Object message, boolean sent) throws RemotingException {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion("2.0.0");
request.setVersion(Version.getProtocolVersion());
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
Expand All @@ -107,7 +108,7 @@ public ResponseFuture request(Object request, int timeout) throws RemotingExcept
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.alibaba.dubbo.remoting.exchange.ExchangeChannel;
import com.alibaba.dubbo.remoting.exchange.ExchangeServer;
import com.alibaba.dubbo.remoting.exchange.Request;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -133,7 +134,7 @@ private void sendChannelReadOnlyEvent() {
Request request = new Request();
request.setEvent(Request.READONLY_EVENT);
request.setTwoWay(false);
request.setVersion(Version.getVersion());
request.setVersion(Version.getProtocolVersion());

Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.alibaba.dubbo.remoting.exchange.support.header;

import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.Channel;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void run() {
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion("2.0.0");
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@


import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.io.Bytes;
import com.alibaba.dubbo.common.io.UnsafeByteArrayOutputStream;
Expand Down Expand Up @@ -216,7 +217,7 @@ public void test_Decode_Return_Request_Event_Object() throws IOException {
Assert.assertEquals(person, obj.getData());
Assert.assertEquals(true, obj.isTwoWay());
Assert.assertEquals(true, obj.isEvent());
Assert.assertEquals("2.0.0", obj.getVersion());
Assert.assertEquals(Version.getProtocolVersion(), obj.getVersion());
System.out.println(obj);
}

Expand All @@ -231,7 +232,7 @@ public void test_Decode_Return_Request_Event_String() throws IOException {
Assert.assertEquals(event, obj.getData());
Assert.assertEquals(true, obj.isTwoWay());
Assert.assertEquals(true, obj.isEvent());
Assert.assertEquals("2.0.0", obj.getVersion());
Assert.assertEquals(Version.getProtocolVersion(), obj.getVersion());
System.out.println(obj);
}

Expand All @@ -244,7 +245,7 @@ public void test_Decode_Return_Request_Heartbeat_Object() throws IOException {
Assert.assertEquals(null, obj.getData());
Assert.assertEquals(true, obj.isTwoWay());
Assert.assertEquals(true, obj.isHeartbeat());
Assert.assertEquals("2.0.0", obj.getVersion());
Assert.assertEquals(Version.getProtocolVersion(), obj.getVersion());
System.out.println(obj);
}

Expand All @@ -259,7 +260,7 @@ public void test_Decode_Return_Request_Object() throws IOException {
Assert.assertEquals(person, obj.getData());
Assert.assertEquals(true, obj.isTwoWay());
Assert.assertEquals(false, obj.isHeartbeat());
Assert.assertEquals("2.0.0", obj.getVersion());
Assert.assertEquals(Version.getProtocolVersion(), obj.getVersion());
System.out.println(obj);
}

Expand Down Expand Up @@ -350,7 +351,7 @@ public void test_Encode_Response() throws IOException {
Assert.assertEquals(response.isHeartbeat(), obj.isHeartbeat());
Assert.assertEquals(person, obj.getResult());
// encode response verson ??
// Assert.assertEquals(response.getVersion(), obj.getVersion());
// Assert.assertEquals(response.getProtocolVersion(), obj.getVersion());

}

Expand Down Expand Up @@ -380,15 +381,15 @@ public void test_Encode_Error_Response() throws IOException {
Assert.assertEquals(response.isHeartbeat(), obj.isHeartbeat());
Assert.assertEquals(badString, obj.getErrorMessage());
Assert.assertEquals(null, obj.getResult());
// Assert.assertEquals(response.getVersion(), obj.getVersion());
// Assert.assertEquals(response.getProtocolVersion(), obj.getVersion());
}

// http://code.alibabatech.com/jira/browse/DUBBO-392
@Test
public void testMessageLengthGreaterThanMessageActualLength() throws Exception {
Channel channel = getCliendSideChannel(url);
Request request = new Request(1L);
request.setVersion("2.0.0");
request.setVersion(Version.getProtocolVersion());
Date date = new Date();
request.setData(date);
ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024);
Expand Down
Loading

0 comments on commit 912995a

Please sign in to comment.