Skip to content

Commit

Permalink
About the Fault Tolerance doc of the Registry.
Browse files Browse the repository at this point in the history
  • Loading branch information
carryxyh committed Jan 8, 2019
1 parent 864f933 commit 6241f88
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public abstract class AbstractRegistry implements Registry {
private static final String URL_SPLIT = "\\s+";
// Log output
protected final Logger logger = LoggerFactory.getLogger(getClass());
// Local disk cache, where the special key value.registies records the list of registry centers, and the others are the list of notified service providers
// Local disk cache, where the special key value.registries records the list of registry centers, and the others are the list of notified service providers
private final Properties properties = new Properties();
// File cache timing writing
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
Expand Down Expand Up @@ -90,6 +90,8 @@ public AbstractRegistry(URL url) {
}
}
this.file = file;
// When starting the subscription center,
// we need to read the local cache file for future Registry fault tolerance processing.
loadProperties();
notify(url.getBackupUrls());
}
Expand Down Expand Up @@ -244,7 +246,7 @@ public List<URL> lookup(URL url) {
}
}
} else {
final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
final AtomicReference<List<URL>> reference = new AtomicReference<>();
NotifyListener listener = reference::set;
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
Expand Down Expand Up @@ -364,6 +366,13 @@ protected void notify(List<URL> urls) {
}
}

/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
Expand All @@ -379,6 +388,7 @@ protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
Expand All @@ -395,8 +405,10 @@ protected void notify(URL url, NotifyListener listener, List<URL> urls) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public void unsubscribe(URL url, NotifyListener listener) {

@Override
public List<URL> lookup(URL url) {
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> values : notifiedUrls.values()) {
Expand Down

0 comments on commit 6241f88

Please sign in to comment.