Skip to content

Commit

Permalink
SHS-NG M1: Separate index introspection from storage.
Browse files Browse the repository at this point in the history
The new KVTypeInfo class can help with writing different implementations
of KVStore without duplicating logic from LevelDBTypeInfo.
  • Loading branch information
Marcelo Vanzin committed May 5, 2017
1 parent b6a6b0a commit b279da3
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 100 deletions.
135 changes: 135 additions & 0 deletions common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kvstore;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import com.google.common.base.Preconditions;

/**
* Wrapper around types managed in a KVStore, providing easy access to their indexed fields.
*/
public class KVTypeInfo {

private final Class<?> type;
private final Collection<KVIndex> indices;
private final Map<String, Accessor> accessors;

public KVTypeInfo(Class<?> type) throws Exception {
this.type = type;
this.indices = new ArrayList<>();
this.accessors = new HashMap<>();

for (Field f : type.getFields()) {
KVIndex idx = f.getAnnotation(KVIndex.class);
if (idx != null) {
checkIndex(idx);
indices.add(idx);
accessors.put(idx.value(), new FieldAccessor(f));
}
}

for (Method m : type.getMethods()) {
KVIndex idx = m.getAnnotation(KVIndex.class);
if (idx != null) {
checkIndex(idx);
Preconditions.checkArgument(m.getParameterTypes().length == 0,
"Annotated method %s::%s should not have any parameters.", type.getName(), m.getName());
indices.add(idx);
accessors.put(idx.value(), new MethodAccessor(m));
}
}

Preconditions.checkArgument(accessors.containsKey(KVIndex.NATURAL_INDEX_NAME),
"No natural index defined for type %s.", type.getName());
}

private void checkIndex(KVIndex idx) {
Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(),
"No name provided for index in type %s.", type.getName());
Preconditions.checkArgument(
!idx.value().startsWith("_") || idx.value().equals(KVIndex.NATURAL_INDEX_NAME),
"Index name %s (in type %s) is not allowed.", idx.value(), type.getName());
Preconditions.checkArgument(!indices.contains(idx.value()),
"Duplicate index %s for type %s.", idx.value(), type.getName());
}

public Class<?> getType() {
return type;
}

public Object getIndexValue(String indexName, Object instance) throws Exception {
return getAccessor(indexName).get(instance);
}

public Stream<KVIndex> indices() {
return indices.stream();
}

Accessor getAccessor(String indexName) {
Accessor a = accessors.get(indexName);
Preconditions.checkArgument(a != null, "No index %s.", indexName);
return a;
}

/**
* Abstracts the difference between invoking a Field and a Method.
*/
interface Accessor {

Object get(Object instance) throws Exception;

}

private class FieldAccessor implements Accessor {

private final Field field;

FieldAccessor(Field field) {
this.field = field;
}

@Override
public Object get(Object instance) throws Exception {
return field.get(instance);
}

}

private class MethodAccessor implements Accessor {

private final Method method;

MethodAccessor(Method method) {
this.method = method;
}

@Override
public Object get(Object instance) throws Exception {
return method.invoke(instance);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void write(Object value) throws Exception {

public void write(Object value, boolean sync) throws Exception {
Preconditions.checkArgument(value != null, "Null values are not allowed.");
LevelDBTypeInfo<?> ti = getTypeInfo(value.getClass());
LevelDBTypeInfo ti = getTypeInfo(value.getClass());

LevelDBWriteBatch batch = new LevelDBWriteBatch(this);
try {
Expand All @@ -160,7 +160,7 @@ public void write(Object value, boolean sync) throws Exception {
} catch (NoSuchElementException e) {
// Ignore. No previous value.
}
for (LevelDBTypeInfo<?>.Index idx : ti.indices()) {
for (LevelDBTypeInfo.Index idx : ti.indices()) {
idx.add(batch, value, data);
}
batch.write(sync);
Expand All @@ -179,7 +179,7 @@ public void delete(Class<?> type, Object naturalKey, boolean sync) throws Except
Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
LevelDBWriteBatch batch = new LevelDBWriteBatch(this);
try {
LevelDBTypeInfo<?> ti = getTypeInfo(type);
LevelDBTypeInfo ti = getTypeInfo(type);
byte[] key = ti.naturalIndex().start(naturalKey);
byte[] data = db().get(key);
if (data != null) {
Expand Down Expand Up @@ -210,7 +210,7 @@ public Iterator<T> iterator() {

@Override
public long count(Class<?> type) throws Exception {
LevelDBTypeInfo<?>.Index idx = getTypeInfo(type).naturalIndex();
LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
return idx.getCount(idx.end());
}

Expand All @@ -231,10 +231,10 @@ public void close() throws IOException {
}

/** Returns metadata about indices for the given type. */
<T> LevelDBTypeInfo<T> getTypeInfo(Class<T> type) throws Exception {
LevelDBTypeInfo<T> ti = types.get(type);
LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
LevelDBTypeInfo ti = types.get(type);
if (ti == null) {
LevelDBTypeInfo<T> tmp = new LevelDBTypeInfo<>(this, type, getTypeAlias(type));
LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, getTypeAlias(type));
ti = types.putIfAbsent(type, tmp);
if (ti == null) {
ti = tmp;
Expand All @@ -256,9 +256,9 @@ DB db() {
return _db;
}

private void removeInstance(LevelDBTypeInfo<?> ti, LevelDBWriteBatch batch, Object instance)
private void removeInstance(LevelDBTypeInfo ti, LevelDBWriteBatch batch, Object instance)
throws Exception {
for (LevelDBTypeInfo<?>.Index idx : ti.indices()) {
for (LevelDBTypeInfo.Index idx : ti.indices()) {
idx.remove(batch, instance);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
private final boolean ascending;
private final DBIterator it;
private final Class<T> type;
private final LevelDBTypeInfo<T> ti;
private final LevelDBTypeInfo<T>.Index index;
private final LevelDBTypeInfo ti;
private final LevelDBTypeInfo.Index index;
private final byte[] indexKeyPrefix;
private final byte[] end;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected
* via reflection, to make it cheaper to access it multiple times.
*/
class LevelDBTypeInfo<T> {
class LevelDBTypeInfo {

static final String ENTRY_PREFIX = "+";
static final String END_MARKER = "-";
Expand All @@ -58,52 +58,27 @@ class LevelDBTypeInfo<T> {
static final int SHORT_ENCODED_LEN = String.valueOf(Short.MAX_VALUE).length() + 1;

private final LevelDB db;
private final Class<T> type;
private final Class<?> type;
private final Map<String, Index> indices;
private final byte[] typePrefix;

LevelDBTypeInfo(LevelDB db, Class<T> type, byte[] alias) throws Exception {
LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws Exception {
this.db = db;
this.type = type;
this.indices = new HashMap<>();

for (Field f : type.getFields()) {
KVIndex idx = f.getAnnotation(KVIndex.class);
if (idx != null) {
register(idx, new FieldAccessor(f));
}
}

for (Method m : type.getMethods()) {
KVIndex idx = m.getAnnotation(KVIndex.class);
if (idx != null) {
Preconditions.checkArgument(m.getParameterTypes().length == 0,
"Annotated method %s::%s should not have any parameters.", type.getName(), m.getName());
register(idx, new MethodAccessor(m));
}
}

Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME) != null,
"No natural index defined for type %s.", type.getName());
KVTypeInfo ti = new KVTypeInfo(type);
ti.indices().forEach(idx -> {
indices.put(idx.value(), new Index(idx.value(), idx.copy(), ti.getAccessor(idx.value())));
});

ByteArrayOutputStream typePrefix = new ByteArrayOutputStream();
typePrefix.write(utf8(ENTRY_PREFIX));
typePrefix.write(alias);
this.typePrefix = typePrefix.toByteArray();
}

private void register(KVIndex idx, Accessor accessor) {
Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(),
"No name provided for index in type %s.", type.getName());
Preconditions.checkArgument(
!idx.value().startsWith("_") || idx.value().equals(KVIndex.NATURAL_INDEX_NAME),
"Index name %s (in type %s) is not allowed.", idx.value(), type.getName());
Preconditions.checkArgument(indices.get(idx.value()) == null,
"Duplicate index %s for type %s.", idx.value(), type.getName());
indices.put(idx.value(), new Index(idx.value(), idx.copy(), accessor));
}

Class<T> type() {
Class<?> type() {
return type;
}

Expand Down Expand Up @@ -164,11 +139,9 @@ class Index {
private final boolean copy;
private final boolean isNatural;
private final String name;
private final KVTypeInfo.Accessor accessor;

@VisibleForTesting
final Accessor accessor;

private Index(String name, boolean copy, Accessor accessor) {
private Index(String name, boolean copy, KVTypeInfo.Accessor accessor) {
this.name = name;
this.isNatural = name.equals(KVIndex.NATURAL_INDEX_NAME);
this.copy = isNatural || copy;
Expand Down Expand Up @@ -320,44 +293,4 @@ String toKey(Object value) {

}

/**
* Abstracts the difference between invoking a Field and a Method.
*/
@VisibleForTesting
interface Accessor {

Object get(Object instance) throws Exception;

}

private class FieldAccessor implements Accessor {

private final Field field;

FieldAccessor(Field field) {
this.field = field;
}

@Override
public Object get(Object instance) throws Exception {
return field.get(instance);
}

}

private class MethodAccessor implements Accessor {

private final Method method;

MethodAccessor(Method method) {
this.method = method;
}

@Override
public Object get(Object instance) throws Exception {
return method.invoke(instance);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testMetadata() throws Exception {
}

private long countIndexEntries(Class<?> type, String index, Object value) throws Exception {
LevelDBTypeInfo<?>.Index idx = db.getTypeInfo(type).index(index);
LevelDBTypeInfo.Index idx = db.getTypeInfo(type).index(index);
return idx.getCount(idx.end());
}

Expand Down
Loading

0 comments on commit b279da3

Please sign in to comment.