Skip to content

Commit

Permalink
fix apache#3179 , apache#3205, apache#3218. modify review issue; when…
Browse files Browse the repository at this point in the history
… sharing zookeeper connection, it should judge zookeeperClient.isConnected()
  • Loading branch information
cvictory committed Jan 14, 2019
1 parent 8afbbf0 commit 4a10a81
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 23 deletions.
12 changes: 10 additions & 2 deletions dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@
* @see java.net.URL
* @see java.net.URI
*/
public /**final**/
class URL implements Serializable {
public /**final**/ class URL implements Serializable {

private static final long serialVersionUID = -1985165475234910535L;

Expand Down Expand Up @@ -499,6 +498,15 @@ public String[] getParameter(String key, String[] defaultValue) {
return Constants.COMMA_SPLIT_PATTERN.split(value);
}

public List<String> getParameter(String key, List<String> defaultValue) {
String value = getParameter(key);
if (value == null || value.length() == 0) {
return defaultValue;
}
String[] strArray = Constants.COMMA_SPLIT_PATTERN.split(value);
return Arrays.asList(strArray);
}

private Map<String, Number> getNumbers() {
if (numbers == null) { // concurrent initialization is tolerant
numbers = new ConcurrentHashMap<String, Number>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public void run() {
// Check and connect to the registry
try {
int times = retryCounter.incrementAndGet();
logger.error("start to retry task for metadata report. retry times:" + times);
logger.info("start to retry task for metadata report. retry times:" + times);
if (retry() && times > retryTimesIfNonFail) {
cancelRetryTask();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ public String doGetContent(String path) {
try {
byte[] dataBytes = client.getData().forPath(path);
return (dataBytes == null || dataBytes.length == 0) ? null : new String(dataBytes, charset);
} catch (NodeExistsException e) {
} catch (NoNodeException e) {
// ignore NoNode Exception.
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -41,24 +41,25 @@ public abstract class AbstractZookeeperTransporter implements ZookeeperTransport

/**
* share connnect for registry, metadata, etc..
* <p>
* Make sure the connection is connected.
*
* @param url
* @return
*/
public ZookeeperClient connect(URL url) {
ZookeeperClient clientData;
ZookeeperClient zookeeperClient;
List<String> addressList = getURLBackupAddress(url);
// The field define the zookeeper server , including protocol, host, port, username, password
if ((clientData = fetchAndUpdateZookeeperClientCache(url, addressList)) != null) {
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("Get result from map for the first time when invoking zookeeperTransporter.connnect .");
return clientData;
return zookeeperClient;
}
ZookeeperClient zookeeperClient = null;
// avoid creating too many connections, so add lock
synchronized (zookeeperClientMap) {
if ((clientData = fetchAndUpdateZookeeperClientCache(url, addressList)) != null) {
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("Get result from map for the second time when invoking zookeeperTransporter.connnect .");
return clientData;
return zookeeperClient;
}

zookeeperClient = createZookeeperClient(createServerURL(url));
Expand All @@ -76,38 +77,51 @@ public ZookeeperClient connect(URL url) {
*/
protected abstract ZookeeperClient createZookeeperClient(URL url);

ZookeeperClient fetchAndUpdateZookeeperClientCache(URL url, List<String> addressList) {
/**
* get the ZookeeperClient from cache, the ZookeeperClient must be connected.
* <p>
* It is not private method for unit test.
*
* @param addressList
* @return
*/
ZookeeperClient fetchAndUpdateZookeeperClientCache(List<String> addressList) {

ZookeeperClient zookeeperClientData = null;
ZookeeperClient zookeeperClient = null;
for (String address : addressList) {
if ((zookeeperClientData = zookeeperClientMap.get(address)) != null) {
if ((zookeeperClient = zookeeperClientMap.get(address)) != null && zookeeperClient.isConnected()) {
break;
}
}
if (zookeeperClientData != null) {
writeToClientMap(addressList, zookeeperClientData);
if (zookeeperClient != null && zookeeperClient.isConnected()) {
writeToClientMap(addressList, zookeeperClient);
}
return zookeeperClientData;
return zookeeperClient;
}

/**
* get all zookeeper urls (such as :zookeeper://127.0.0.1:2181?127.0.0.1:8989,127.0.0.1:9999)
*
* @param url such as:zookeeper://127.0.0.1:2181?127.0.0.1:8989,127.0.0.1:9999
* @return such as 127.0.0.1:2181,127.0.0.1:8989,127.0.0.1:9999
*/
List<String> getURLBackupAddress(URL url) {
List<String> addressList = new ArrayList<String>();
addressList.add(url.getAddress());

String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
addressList.addAll(Arrays.asList(backups));
addressList.addAll(url.getParameter(Constants.BACKUP_KEY, Collections.EMPTY_LIST));
return addressList;
}

/**
* write address-ZookeeperClient relationship to Map
*
* @param addressList
* @param ZookeeperClient
* @param zookeeperClient
*/
void writeToClientMap(List<String> addressList, ZookeeperClient ZookeeperClient) {
void writeToClientMap(List<String> addressList, ZookeeperClient zookeeperClient) {
for (String address : addressList) {
zookeeperClientMap.put(address, ZookeeperClient);
zookeeperClientMap.put(address, zookeeperClient);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.dubbo.remoting.zookeeper.zkclient;


import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public void testFetchAndUpdateZookeeperClientCache() throws Exception {

URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:" + zkServerPort3 + ",127.0.0.1:" + zkServerPort2 + "&application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT&timestamp=1547102428828");
ZookeeperClient newZookeeperClient = abstractZookeeperTransporter.connect(url);
//just for connected
newZookeeperClient.getContent("/dubbo/test");
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 3);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient);

Expand All @@ -133,7 +135,7 @@ public void testFetchAndUpdateZookeeperClientCache() throws Exception {

private void checkFetchAndUpdateCacheNotNull(URL url) {
List<String> addressList = abstractZookeeperTransporter.getURLBackupAddress(url);
ZookeeperClient zookeeperClient = abstractZookeeperTransporter.fetchAndUpdateZookeeperClientCache(url, addressList);
ZookeeperClient zookeeperClient = abstractZookeeperTransporter.fetchAndUpdateZookeeperClientCache(addressList);
Assert.assertNotNull(zookeeperClient);
}

Expand All @@ -142,10 +144,15 @@ public void testRepeatConnect() {
URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT&timestamp=1547102428828");
URL url2 = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.metadata.store.MetadataReport?address=zookeeper://127.0.0.1:2181&application=metadatareport-local-xml-provider2&cycle-report=false&interface=org.apache.dubbo.metadata.store.MetadataReport&retry-period=4590&retry-times=23&sync-report=true");
ZookeeperClient newZookeeperClient = abstractZookeeperTransporter.connect(url);
//just for connected
newZookeeperClient.getContent("/dubbo/test");
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 1);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient);
Assert.assertTrue(newZookeeperClient.isConnected());

ZookeeperClient newZookeeperClient2 = abstractZookeeperTransporter.connect(url2);
//just for connected
newZookeeperClient2.getContent("/dubbo/test");
Assert.assertEquals(newZookeeperClient, newZookeeperClient2);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 1);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient);
Expand All @@ -159,10 +166,14 @@ public void testNotRepeatConnect() throws Exception {
URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT&timestamp=1547102428828");
URL url2 = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort2 + "/org.apache.dubbo.metadata.store.MetadataReport?address=zookeeper://127.0.0.1:2181&application=metadatareport-local-xml-provider2&cycle-report=false&interface=org.apache.dubbo.metadata.store.MetadataReport&retry-period=4590&retry-times=23&sync-report=true");
ZookeeperClient newZookeeperClient = abstractZookeeperTransporter.connect(url);
//just for connected
newZookeeperClient.getContent("/dubbo/test");
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 1);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient);

ZookeeperClient newZookeeperClient2 = abstractZookeeperTransporter.connect(url2);
//just for connected
newZookeeperClient2.getContent("/dubbo/test");
Assert.assertNotEquals(newZookeeperClient, newZookeeperClient2);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 2);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort2), newZookeeperClient2);
Expand All @@ -181,10 +192,14 @@ public void testRepeatConnectForBackUpAdd() throws Exception {
URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:" + zkServerPort2 + "&application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT&timestamp=1547102428828");
URL url2 = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort2 + "/org.apache.dubbo.metadata.store.MetadataReport?backup=127.0.0.1:" + zkServerPort3 + "&address=zookeeper://127.0.0.1:2181&application=metadatareport-local-xml-provider2&cycle-report=false&interface=org.apache.dubbo.metadata.store.MetadataReport&retry-period=4590&retry-times=23&sync-report=true");
ZookeeperClient newZookeeperClient = abstractZookeeperTransporter.connect(url);
//just for connected
newZookeeperClient.getContent("/dubbo/test");
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 2);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient);

ZookeeperClient newZookeeperClient2 = abstractZookeeperTransporter.connect(url2);
//just for connected
newZookeeperClient2.getContent("/dubbo/test");
Assert.assertEquals(newZookeeperClient, newZookeeperClient2);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 3);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort2), newZookeeperClient2);
Expand All @@ -204,10 +219,14 @@ public void testRepeatConnectForNoMatchBackUpAdd() throws Exception {
URL url = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort + "/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:" + zkServerPort3 + "&application=metadatareport-local-xml-provider2&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=47418&specVersion=2.7.0-SNAPSHOT&timestamp=1547102428828");
URL url2 = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort2 + "/org.apache.dubbo.metadata.store.MetadataReport?address=zookeeper://127.0.0.1:2181&application=metadatareport-local-xml-provider2&cycle-report=false&interface=org.apache.dubbo.metadata.store.MetadataReport&retry-period=4590&retry-times=23&sync-report=true");
ZookeeperClient newZookeeperClient = abstractZookeeperTransporter.connect(url);
//just for connected
newZookeeperClient.getContent("/dubbo/test");
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 2);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort), newZookeeperClient);

ZookeeperClient newZookeeperClient2 = abstractZookeeperTransporter.connect(url2);
//just for connected
newZookeeperClient2.getContent("/dubbo/test");
Assert.assertNotEquals(newZookeeperClient, newZookeeperClient2);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().size(), 3);
Assert.assertEquals(abstractZookeeperTransporter.getZookeeperClientMap().get("127.0.0.1:" + zkServerPort2), newZookeeperClient2);
Expand Down

0 comments on commit 4a10a81

Please sign in to comment.