Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Fix race condition in invalidating ledger cache entries #10480

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public boolean insert(EntryImpl entry) {
return true;
} else {
// entry was not inserted into cache, we need to discard it
cacheEntry.release();
cacheEntry.invalidate();
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCounted;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted;
import org.apache.bookkeeper.mledger.util.InvalidateableReferenceCounted;

public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted {
public final class EntryImpl extends AbstractCASReferenceCounted implements Entry, Comparable<EntryImpl>,
InvalidateableReferenceCounted {

private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
@Override
Expand All @@ -45,6 +48,7 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
private long ledgerId;
private long entryId;
ByteBuf data;
private final AtomicBoolean invalidated = new AtomicBoolean();

public static EntryImpl create(LedgerEntry ledgerEntry) {
EntryImpl entry = RECYCLER.get();
Expand Down Expand Up @@ -166,7 +170,16 @@ protected void deallocate() {
timestamp = -1;
ledgerId = -1;
entryId = -1;
invalidated.set(false);
recyclerHandle.recycle(this);
}

@Override
public boolean invalidate() {
if (invalidated.compareAndSet(false, true)) {
release();
return true;
}
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the correct approach. If the issue is that we're using an already released buffer, we should fix that instead.

This will avoid decrementing the ref-count more than once, but it will not prevent the 2nd thread from accessing an entry whose ref-count was already to 0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a safety measure to prevent races in invalidating the entries. I agree that the issues in releasing must be fixed. The benefit of adding a separate method for invalidation would help detect when the problem is caused by invalidating the entry twice. Some logging could be added to detect the issues where there's a race in invalidation which causes a "double release".

Currently, it seems that the problems that we are seeing could occur only when there's a race in invalidation. At a quick glance, there doesn't seem to be other code paths where the entry is released but not retained as part of the same "flow".

Would this justify adding some extra protection against races in invalidation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will avoid decrementing the ref-count more than once, but it will not prevent the 2nd thread from accessing an entry whose ref-count was already to 0.

Yes, that's a good point. I'm thinking of a solution where invalidation would be a completely separate operation which triggers when reference count is 1 or gets back to 1. Another protection here would be a change in logic that release operations to change reference count from 1 to 0 would be rejected completely. That would prevent bugs which are caused by release being called too many times. Those issues could be logged and fixed if such bugs exist.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
}
checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
lh.getId());

if (!checkAndCompleteOp(ctx)) {
// means callback might have been completed by different thread (timeout task thread).. so do nothing
return;
Expand Down Expand Up @@ -189,7 +189,7 @@ public void safeRun() {
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
// insert
ml.entryCache.insert(entry);
entry.release();
entry.invalidate();
}

PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
Expand Down Expand Up @@ -248,7 +248,7 @@ private void updateLatency() {

/**
* Checks if add-operation is completed
*
*
* @return true if task is not already completed else returns false.
*/
private boolean checkAndCompleteOp(Object ctx) {
Expand All @@ -269,7 +269,7 @@ void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {

/**
* It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
*
*
* @param ledger
*/
void handleAddFailure(final LedgerHandle ledger) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.util;

import io.netty.util.ReferenceCounted;

public interface InvalidateableReferenceCounted extends ReferenceCounted {
/**
* Marks the entry to be removed. Internally {@link ReferenceCounted#release()} will be called unless
* the entry has already been invalidated before. No calls to {@link ReferenceCounted#release()} should be
* made separately to invalidate the entry.
*
* @return true if the value was marked to be invalidated, false if it was already invalidated before.
*/
boolean invalidate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.collect.Lists;
import io.netty.util.ReferenceCounted;
import io.netty.util.IllegalReferenceCountException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -38,7 +38,7 @@
* @param <Value>
* Cache value
*/
public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCounted> {
public class RangeCache<Key extends Comparable<Key>, Value extends InvalidateableReferenceCounted> {
// Map from key to nodes inside the linked list
private final ConcurrentNavigableMap<Key, Value> entries;
private AtomicLong size; // Total size of values stored in cache
Expand Down Expand Up @@ -90,7 +90,7 @@ public Value get(Key key) {
try {
value.retain();
return value;
} catch (Throwable t) {
} catch (IllegalReferenceCountException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably we should log something here.
this case must not happen

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, logging would be useful to get more information. I'm just wondering if it should be done only at debug level since it's not a real problem. It's part of the expected behavior that this could sometimes happen.

// Value was already destroyed between get() and retain()
return null;
}
Expand All @@ -113,7 +113,7 @@ public Collection<Value> getRange(Key first, Key last) {
try {
value.retain();
values.add(value);
} catch (Throwable t) {
} catch (IllegalReferenceCountException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably we should log something here.
this case must not happen

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's expected to happen and it's fine when it happens. It just indicates the entry is being evicted when we're trying to access it. If the retain succeeds, the operation was successful, otherwise the entry is already gone.

// Value was already destroyed between get() and retain()
}
}
Expand All @@ -140,9 +140,11 @@ public Pair<Integer, Long> removeRange(Key first, Key last, boolean lastInclusiv
continue;
}

removedSize += weighter.getSize(value);
value.release();
++removedEntries;
long entrySize = weighter.getSize(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eg: in case of concurrent invalidate, the value is already invalid here

if (value.invalidate()) {
removedSize += entrySize;
++removedEntries;
}
}

size.addAndGet(-removedSize);
Expand All @@ -168,9 +170,11 @@ public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
}

Value value = entry.getValue();
++removedEntries;
removedSize += weighter.getSize(value);
value.release();
long entrySize = weighter.getSize(value);
if (value.invalidate()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid we are only hiding the problem.
there must be some coordination about these entries, some clear protocol about who is the owner of the entry.

when we get to this point then we must be sure that the refcount is valid on the value, otherwise it is always an hazard.

I believe that the right protocol is that before calling weighter.getSize(value); we should try to acquire the entry and in case of failure we can ignore the entry.

Value value = entry.getValue();
if (value.tryAcquire()) {
            ++removedEntries;
            removedSize += weighter.getSize(value);
            value.release(); // this refers to the value.tryAcquire()
}

when we remove the entry we must have some write lock over the entry, that prevents double releases

++removedEntries;
removedSize += entrySize;
}
}

size.addAndGet(-removedSize);
Expand All @@ -197,8 +201,10 @@ public long evictLEntriesBeforeTimestamp(long maxTimestamp) {
}

Value value = entry.getValue();
removedSize += weighter.getSize(value);
value.release();
long entrySize = weighter.getSize(value);
if (value.invalidate()) {
removedSize += entrySize;
}
}

size.addAndGet(-removedSize);
Expand Down Expand Up @@ -230,8 +236,10 @@ public synchronized long clear() {
break;
}
Value value = entry.getValue();
removedSize += weighter.getSize(value);
value.release();
long entrySize = weighter.getSize(value);
if (value.invalidate()) {
removedSize += entrySize;
}
}

entries.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import com.google.common.collect.Lists;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.testng.annotations.Test;

public class RangeCacheTest {

class RefString extends AbstractReferenceCounted implements ReferenceCounted {
class RefString extends AbstractReferenceCounted implements InvalidateableReferenceCounted {
final String s;
private final AtomicBoolean invalidated = new AtomicBoolean();

RefString(String s) {
super();
Expand All @@ -43,7 +45,7 @@ class RefString extends AbstractReferenceCounted implements ReferenceCounted {

@Override
protected void deallocate() {
// no-op
invalidated.set(false);
}

@Override
Expand All @@ -61,6 +63,15 @@ public boolean equals(Object obj) {

return false;
}

@Override
public boolean invalidate() {
if (invalidated.compareAndSet(false, true)) {
release();
return true;
}
return false;
}
}

@Test
Expand Down