Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v1.0] fix: handle BerkeleyJE DB interruption [tp-tests] #4455

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Put;
import com.sleepycat.je.ReadOptions;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.WriteOptions;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.TemporaryBackendException;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.diskstorage.keycolumnvalue.keyvalue.KVQuery;
import org.janusgraph.diskstorage.keycolumnvalue.keyvalue.KeySelector;
Expand Down Expand Up @@ -60,10 +63,10 @@
public static Function<Integer, Integer> ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours());


private final Database db;
private volatile Database db;
private final String name;
private final BerkeleyJEStoreManager manager;
private boolean isOpen;
private volatile boolean isOpen;

public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m) {
db = data;
Expand All @@ -75,6 +78,11 @@
public DatabaseConfig getConfiguration() throws BackendException {
try {
return db.getConfig();
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);

Check warning on line 85 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L81-L85

Added lines #L81 - L85 were not covered by tests
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -95,15 +103,24 @@
return ((BerkeleyJETx) txh).openCursor(db);
}

private static void closeCursor(StoreTransaction txh, Cursor cursor) {
private static void closeCursor(StoreTransaction txh, Cursor cursor) throws BackendException {
Preconditions.checkArgument(txh!=null);
((BerkeleyJETx) txh).closeCursor(cursor);
}

public void reopen(final Database db) {
this.db = db;
}

@Override
public synchronized void close() throws BackendException {
try {
if(isOpen) db.close();
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);

Check warning on line 123 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L119-L123

Added lines #L119 - L123 were not covered by tests
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -127,6 +144,11 @@
} else {
return null;
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);

Check warning on line 151 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L147-L151

Added lines #L147 - L151 were not covered by tests
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down Expand Up @@ -161,7 +183,11 @@
@Override
public boolean hasNext() {
if (current == null) {
current = getNextEntry();
try {
current = getNextEntry();
} catch (BackendException e) {
throw new RuntimeException(e);

Check warning on line 189 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L189

Avoid throwing raw exception types.

Check warning on line 189 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L188-L189

Added lines #L188 - L189 were not covered by tests
}
}
return current != null;
}
Expand All @@ -176,16 +202,26 @@
return next;
}

private KeyValueEntry getNextEntry() {
private KeyValueEntry getNextEntry() throws BackendException {
if (status != null && status != OperationStatus.SUCCESS) {
return null;
}
while (!selector.reachedLimit()) {
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
try {
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);

Check warning on line 222 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L216-L222

Added lines #L216 - L222 were not covered by tests
}

if (status != OperationStatus.SUCCESS) {
break;
}
Expand All @@ -205,7 +241,11 @@

@Override
public void close() {
closeCursor(txh, cursor);
try {
closeCursor(txh, cursor);
} catch (BackendException e) {
throw new RuntimeException(e);

Check warning on line 247 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L246-L247

Added lines #L246 - L247 were not covered by tests
}
}

@Override
Expand Down Expand Up @@ -237,13 +277,22 @@
int convertedTtl = ttlConverter.apply(ttl);
writeOptions.setTTL(convertedTtl, TimeUnit.HOURS);
}
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
try {
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);

Check warning on line 286 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L286

Added line #L286 was not covered by tests
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}

if (status != OperationStatus.SUCCESS) {
Expand All @@ -261,6 +310,11 @@
if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) {
throw new PermanentBackendException("Could not remove: " + status);
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);

Check warning on line 317 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEKeyValueStore.java#L313-L317

Added lines #L313 - L317 were not covered by tests
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.TemporaryBackendException;
import org.janusgraph.diskstorage.common.LocalStoreManager;
import org.janusgraph.diskstorage.configuration.ConfigNamespace;
import org.janusgraph.diskstorage.configuration.ConfigOption;
Expand All @@ -48,9 +52,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty;

Expand Down Expand Up @@ -88,19 +93,16 @@
ConfigOption.Type.MASKABLE, String.class,
IsolationLevel.REPEATABLE_READ.toString(), disallowEmpty(String.class));

private final Map<String, BerkeleyJEKeyValueStore> stores;
private final ConcurrentMap<String, BerkeleyJEKeyValueStore> stores;

protected Environment environment;
protected volatile Environment environment;
protected final StoreFeatures features;

public BerkeleyJEStoreManager(Configuration configuration) throws BackendException {
super(configuration);
stores = new HashMap<>();
stores = new ConcurrentHashMap<>();

int cachePercentage = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
initialize(cachePercentage, sharedCache, cacheMode);
initialize();

features = new StandardStoreFeatures.Builder()
.orderedScan(true)
Expand All @@ -111,14 +113,24 @@
.scanTxConfig(GraphDatabaseConfiguration.buildGraphConfiguration()
.set(ISOLATION_LEVEL, IsolationLevel.READ_UNCOMMITTED.toString())
)
.supportsInterruption(false)
.supportsInterruption(true)
.cellTTL(true)
.optimisticLocking(false)
.build();
}

private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
private synchronized void initialize() throws BackendException {
try {
if (environment != null && environment.isValid()) {
return;

Check warning on line 125 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L125

Added line #L125 was not covered by tests
}

close(true);

int cachePercent = storageConfig.get(JVM_CACHE);
boolean sharedCache = storageConfig.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(storageConfig.get(CACHE_MODE), CacheMode.class);

EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(transactional);
Expand All @@ -131,15 +143,28 @@
envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
}

//Open the environment
// Open the environment
environment = new Environment(directory, envConfig);

// Reopen any existing DB connections
for (String storeName : stores.keySet()) {
openDatabase(storeName, true);
}
} catch (DatabaseException e) {
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
}

}

private synchronized void reInitialize(DatabaseException exception) throws BackendException {
initialize();

if (exception instanceof ThreadInterruptedException) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(exception);
}
}

Check warning on line 166 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L166

Added line #L166 was not covered by tests

@Override
public StoreFeatures getFeatures() {
return features;
Expand All @@ -150,8 +175,7 @@
throw new UnsupportedOperationException();
}

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
private BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg, boolean retryEnvironmentFailure) throws BackendException {
try {
Transaction tx = null;

Expand Down Expand Up @@ -182,15 +206,27 @@
}

return btx;
} catch (EnvironmentFailureException e) {
reInitialize(e);

Check warning on line 210 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L210

Added line #L210 was not covered by tests

if (retryEnvironmentFailure) {
return beginTransaction(txCfg, false);

Check warning on line 213 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L213

Added line #L213 was not covered by tests
}

throw new TemporaryBackendException("Could not start BerkeleyJE transaction", e);

Check warning on line 216 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L216

Added line #L216 was not covered by tests
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
}

@Override
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
return beginTransaction(txCfg, true);
}

private BerkeleyJEKeyValueStore openDatabase(String name, boolean force, boolean retryEnvironmentFailure) throws BackendException {
Preconditions.checkNotNull(name);
if (stores.containsKey(name)) {
if (stores.containsKey(name) && !force) {
return stores.get(name);
}
try {
Expand All @@ -209,13 +245,34 @@
log.debug("Opened database {}", name);

BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this);
stores.put(name, store);
if (stores.containsKey(name)) {
stores.get(name).reopen(db);
} else {
stores.put(name, store);
}
return store;
} catch (EnvironmentFailureException e) {
reInitialize(e);

Check warning on line 255 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L254-L255

Added lines #L254 - L255 were not covered by tests

if (retryEnvironmentFailure) {
return openDatabase(name, force, false);

Check warning on line 258 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L258

Added line #L258 was not covered by tests
}

throw new TemporaryBackendException("Could not open BerkeleyJE data store", e);

Check warning on line 261 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L261

Added line #L261 was not covered by tests
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not open BerkeleyJE data store", e);
}
}

private BerkeleyJEKeyValueStore openDatabase(String name, boolean force) throws BackendException {
return openDatabase(name, force, true);
}

@Override
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
return openDatabase(name, false, true);
}

@Override
public void mutateMany(Map<String, KVMutation> mutations, StoreTransaction txh) throws BackendException {
for (Map.Entry<String,KVMutation> mutation : mutations.entrySet()) {
Expand Down Expand Up @@ -252,18 +309,16 @@
log.debug("Removed database {}", name);
}


@Override
public void close() throws BackendException {
public void close(boolean force) throws BackendException {
if (environment != null) {
if (!stores.isEmpty())
if (!force && !stores.isEmpty())
throw new IllegalStateException("Cannot shutdown manager since some databases are still open");
try {
// TODO this looks like a race condition
//Wait just a little bit before closing so that independent transaction threads can clean up.
Thread.sleep(30);
} catch (InterruptedException e) {
//Ignore
Thread.currentThread().interrupt();

Check warning on line 321 in janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-berkeleyje/src/main/java/org/janusgraph/diskstorage/berkeleyje/BerkeleyJEStoreManager.java#L321

Added line #L321 was not covered by tests
}
try {
environment.close();
Expand All @@ -274,6 +329,11 @@

}

@Override
public void close() throws BackendException {
close(false);
}

private static final Transaction NULL_TRANSACTION = null;

@Override
Expand Down
Loading
Loading