Skip to content

Commit

Permalink
support multiple registry and refresh apache#3932
Browse files Browse the repository at this point in the history
  • Loading branch information
cvictory committed May 7, 2019
1 parent 064d7df commit 8e7f950
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.RegistryFactory;
import org.apache.dubbo.registry.support.AbstractRegistry;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* MultipleRegistry
Expand All @@ -45,31 +51,39 @@ public class MultipleRegistry implements Registry {
private RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
private Map<String, Registry> serviceRegistries = new HashMap<String, Registry>(4);
private Map<String, Registry> referenceRegistries = new HashMap<String, Registry>(4);
List<String> origServiceRegistryURLs;
List<String> origReferenceRegistryURLs;
List<String> effectServiceRegistryURLs;
List<String> effectReferenceRegistryURLs;
private URL registryUrl;
private String applicationName;


public MultipleRegistry(URL url) {
this.registryUrl = url;
this.applicationName = url.getParameter(Constants.APPLICATION_KEY);
checkApplicationName(this.applicationName);
// This urls contain parameter and it donot inherit from the parameter of url in MultipleRegistry
List<String> serviceRegistryURLs = url.getParameter(REGISTRY_FOR_SERVICE, new ArrayList<String>());
List<String> referenceRegistryURLs = url.getParameter(REGISTRY_FOR_REFERENCE, new ArrayList<String>());

origServiceRegistryURLs = url.getParameter(REGISTRY_FOR_SERVICE, new ArrayList<String>());
origReferenceRegistryURLs = url.getParameter(REGISTRY_FOR_REFERENCE, new ArrayList<String>());
effectServiceRegistryURLs = this.filterServiceRegistry(origServiceRegistryURLs);
effectReferenceRegistryURLs = this.filterReferenceRegistry(origReferenceRegistryURLs);

boolean defaultRegistry = url.getParameter(Constants.DEFAULT_KEY, true);
if (defaultRegistry && serviceRegistryURLs.isEmpty() && referenceRegistryURLs.isEmpty()) {
if (defaultRegistry && effectServiceRegistryURLs.isEmpty() && effectReferenceRegistryURLs.isEmpty()) {
throw new IllegalArgumentException("Illegal registry url. You need to configure parameter " +
REGISTRY_FOR_SERVICE + " or " + REGISTRY_FOR_REFERENCE);
}
Set<String> allURLs = new HashSet<String>(serviceRegistryURLs);
allURLs.addAll(referenceRegistryURLs);
Set<String> allURLs = new HashSet<String>(effectServiceRegistryURLs);
allURLs.addAll(effectReferenceRegistryURLs);
Map<String, Registry> tmpMap = new HashMap<String, Registry>(4);
for (String tmpUrl : allURLs) {
tmpMap.put(tmpUrl, registryFactory.getRegistry(URL.valueOf(tmpUrl)));
}
for (String serviceRegistyURL : serviceRegistryURLs) {
for (String serviceRegistyURL : effectServiceRegistryURLs) {
serviceRegistries.put(serviceRegistyURL, tmpMap.get(serviceRegistyURL));
}
for (String referenceReigstyURL : referenceRegistryURLs) {
for (String referenceReigstyURL : effectReferenceRegistryURLs) {
referenceRegistries.put(referenceReigstyURL, tmpMap.get(referenceReigstyURL));
}
}
Expand All @@ -82,7 +96,26 @@ public URL getUrl() {

@Override
public boolean isAvailable() {
return false;
boolean available = serviceRegistries.isEmpty() ? true : false;
for (Registry serviceRegistry : serviceRegistries.values()) {
if (serviceRegistry.isAvailable()) {
available = true;
}
}
if (!available) {
return false;
}

available = referenceRegistries.isEmpty() ? true : false;
for (Registry referenceRegistry : referenceRegistries.values()) {
if (referenceRegistry.isAvailable()) {
available = true;
}
}
if (!available) {
return false;
}
return true;
}

@Override
Expand Down Expand Up @@ -142,21 +175,141 @@ protected List<String> filterReferenceRegistry(List<String> referenceRegistryURL
return referenceRegistryURLs;
}

protected List<String> refreshServiceRegistry(List<String> serviceRegistryURLs) {
for (String serviceRegistryURL : serviceRegistryURLs) {
Registry registry = serviceRegistries.get(serviceRegistryURL);
if(registry == null){
Registry newRegistry = registryFactory.getRegistry(URL.valueOf(serviceRegistryURL));
//估计得需要保存下registriedURL
// newRegistry.register();
// serviceRegistries.put(serviceRegistryURL, );
protected synchronized void refreshServiceRegistry(List<String> serviceRegistryURLs) {
doRefreshRegistry(serviceRegistryURLs, serviceRegistries, () -> this.getRegisteredURLs(),
(registry, registeredURLs) -> {
for (URL url : (Set<URL>) registeredURLs) {
registry.register(url);
}
},
(registry, registeredURLs) -> {
for (URL url : (Set<URL>) registeredURLs) {
registry.unregister(url);
}
},
newRegistryMap -> this.serviceRegistries = newRegistryMap

);
}

protected synchronized void refreshReferenceRegistry(List<String> referenceRegistryURLs) {
doRefreshRegistry(referenceRegistryURLs, referenceRegistries, () -> this.getSubscribedURLMap(),
(registry, registeredURLs) -> {
for (Map.Entry<URL, Set<NotifyListener>> urlNotifyListenerMap : ((Map<URL, Set<NotifyListener>>) registeredURLs).entrySet()) {
for (NotifyListener notifyListener : urlNotifyListenerMap.getValue()) {
registry.subscribe(urlNotifyListenerMap.getKey(), notifyListener);
}
}
},
(registry, registeredURLs) -> {
for (Map.Entry<URL, Set<NotifyListener>> urlNotifyListenerMap : ((Map<URL, Set<NotifyListener>>) registeredURLs).entrySet()) {
for (NotifyListener notifyListener : urlNotifyListenerMap.getValue()) {
registry.unsubscribe(urlNotifyListenerMap.getKey(), notifyListener);
}
}
},
newRegistryMap -> this.referenceRegistries = newRegistryMap

);
}

/**
* @param newRegistryURLs
* @param oldRegistryMap
* @param getURLSupplier if result is empty, please return null
* @param joinConsumer
* @param leftConsumer
* @param setResultConsumer
*/
private synchronized void doRefreshRegistry(List<String> newRegistryURLs, Map<String, Registry> oldRegistryMap,
Supplier<Object> getURLSupplier,
BiConsumer<Registry, Object> joinConsumer, BiConsumer<Registry, Object> leftConsumer,
Consumer<Map<String, Registry>> setResultConsumer
) {
// If new registry is empty or registry running is empty , it will not be freshed.
if (newRegistryURLs == null || newRegistryURLs.isEmpty() || oldRegistryMap.isEmpty()) {
return;
}
// fetch register or subscriber
Object registeredURLs = this.getRegisteredURLs();
if (registeredURLs == null) {
logger.info("Cannot fetch registered URL.");
return;
}

Map<String, Registry> newRegistryMap = new HashMap<String, Registry>(4);
for (String serviceRegistryURL : newRegistryURLs) {
Registry registry = oldRegistryMap.get(serviceRegistryURL);
if (registry == null) {
registry = registryFactory.getRegistry(URL.valueOf(serviceRegistryURL));
newRegistryMap.put(serviceRegistryURL, registry);
// registry all
joinConsumer.accept(registry, registeredURLs);
}
}
return serviceRegistryURLs;

// get removed registry and keep the registry that is the same as new Configuration.
List<Registry> removedRegistries = new ArrayList<>();
for (Map.Entry<String, Registry> origRegistryEntry : oldRegistryMap.entrySet()) {
if (newRegistryURLs.contains(origRegistryEntry.getKey())) {
newRegistryMap.put(origRegistryEntry.getKey(), origRegistryEntry.getValue());
} else {
removedRegistries.add(origRegistryEntry.getValue());
}
}
// unregister by remove registry
for (Registry removedRegistry : removedRegistries) {
leftConsumer.accept(removedRegistry, registeredURLs);
}
setResultConsumer.accept(newRegistryMap);
}

protected List<String> refreshReferenceRegistry(List<String> referenceRegistryURLs) {
return referenceRegistryURLs;
private Set<URL> getRegisteredURLs() {
// registry all
Iterator<Registry> iterator = serviceRegistries.values().iterator();
while (iterator.hasNext()) {
Registry tmpRegistry = iterator.next();
if (tmpRegistry instanceof AbstractRegistry) {
AbstractRegistry tmpAbstractRegistry = (AbstractRegistry) tmpRegistry;
return tmpAbstractRegistry.getRegistered();
}
}
return Collections.emptySet();
}

private Map<URL, Set<NotifyListener>> getSubscribedURLMap() {
// registry all
Iterator<Registry> iterator = referenceRegistries.values().iterator();
while (iterator.hasNext()) {
Registry tmpRegistry = iterator.next();
if (tmpRegistry instanceof AbstractRegistry) {
AbstractRegistry tmpAbstractRegistry = (AbstractRegistry) tmpRegistry;
return tmpAbstractRegistry.getSubscribed();
}
}
return Collections.EMPTY_MAP;
}

protected void checkApplicationName(String applicationName) {
}

protected String getApplicationName() {
return applicationName;
}

public Map<String, Registry> getServiceRegistries() {
return serviceRegistries;
}

public Map<String, Registry> getReferenceRegistries() {
return referenceRegistries;
}

public List<String> getOrigServiceRegistryURLs() {
return origServiceRegistryURLs;
}

public List<String> getOrigReferenceRegistryURLs() {
return origReferenceRegistryURLs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
public class MultipleRegistryFactory extends AbstractRegistryFactory {


@Override
protected Registry createRegistry(URL url) {
return new MultipleRegistry(url);
Expand Down

0 comments on commit 8e7f950

Please sign in to comment.