Skip to content

Commit

Permalink
separate-projections-poc: MigrationProjectionCache
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Jun 2, 2024
1 parent 9ef88c6 commit c9b0228
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -405,25 +405,25 @@ public T insert(T tt) {
T t = tt.preSave();
transaction.getWatcher().markRowRead(type, t.getId());
transaction.doInWriteTransaction("insert(" + t + ")", type, shard -> shard.insert(t));
transaction.getTransactionLocal().projectionCache().save(transaction, t);
transaction.getTransactionLocal().firstLevelCache().put(t);
transaction.getTransactionLocal().projectionCache().save(t);
return t;
}

@Override
public T save(T tt) {
T t = tt.preSave();
transaction.doInWriteTransaction("save(" + t + ")", type, shard -> shard.save(t));
transaction.getTransactionLocal().projectionCache().save(transaction, t);
transaction.getTransactionLocal().firstLevelCache().put(t);
transaction.getTransactionLocal().projectionCache().save(t);
return t;
}

@Override
public void delete(Entity.Id<T> id) {
transaction.doInWriteTransaction("delete(" + id + ")", type, shard -> shard.delete(id));
transaction.getTransactionLocal().projectionCache().delete(transaction, id);
transaction.getTransactionLocal().firstLevelCache().putEmpty(id);
transaction.getTransactionLocal().projectionCache().delete(id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.Range;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.ViewSchema;
import tech.ydb.yoj.repository.db.bulk.BulkParams;
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.db.statement.Changeset;
import tech.ydb.yoj.repository.ydb.YdbRepositoryTransaction;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapperImpl;
import tech.ydb.yoj.repository.ydb.readtable.EntityIdKeyMapper;
Expand Down Expand Up @@ -54,17 +54,17 @@
import static tech.ydb.yoj.repository.db.EntityExpressions.defaultOrder;

public class YdbTable<T extends Entity<T>> implements Table<T> {
private final QueryExecutor executor;
private final YdbRepositoryTransaction<?> executor;
@Getter
private final Class<T> type;

public YdbTable(Class<T> type, QueryExecutor executor) {
public YdbTable(Class<T> type, YdbRepositoryTransaction<?> executor) {
this.type = type;
this.executor = new CheckingQueryExecutor(executor);
this.executor = executor;
}

protected YdbTable(QueryExecutor executor) {
this.executor = new CheckingQueryExecutor(executor);
this.executor = (YdbRepositoryTransaction<?>) executor;
this.type = resolveEntityType();
}

Expand Down Expand Up @@ -421,7 +421,7 @@ public T insert(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.insert(type), entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
return t;
}

Expand All @@ -430,15 +430,15 @@ public T save(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
return t;
}

@Override
public void delete(Entity.Id<T> id) {
executor.pendingExecute(YqlStatement.delete(type), id);
executor.getTransactionLocal().firstLevelCache().putEmpty(id);
executor.getTransactionLocal().projectionCache().delete(id);
executor.getTransactionLocal().projectionCache().delete(executor, id);
}

/**
Expand All @@ -458,7 +458,7 @@ public <ID extends Id<T>> void migrate(ID id) {
T rawEntity = foundRaw.get(0);
T entityToSave = rawEntity.postLoad().preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
}

@Override
Expand Down Expand Up @@ -494,55 +494,6 @@ default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams p
TransactionLocal getTransactionLocal();
}

public static class CheckingQueryExecutor implements QueryExecutor {
private final QueryExecutor delegate;
private final Tx originTx;

public CheckingQueryExecutor(QueryExecutor delegate) {
this.delegate = delegate;
this.originTx = Tx.Current.exists() ? Tx.Current.get() : null;
}

private void check() {
Tx.checkSameTx(originTx);
}

@Override
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
check();
return delegate.execute(statement, params);
}

@Override
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
return delegate.executeScanQuery(statement, params);
}

@Override
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
check();
delegate.pendingExecute(statement, value);
}

@Override
public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
check();
delegate.bulkUpsert(mapper, input, params);
}

@Override
public <IN, OUT> Stream<OUT> readTable(ReadTableMapper<IN, OUT> mapper, ReadTableParams<IN> params) {
check();
return delegate.readTable(mapper, params);
}

@Override
public TransactionLocal getTransactionLocal() {
check();
return delegate.getTransactionLocal();
}
}

public <ID extends Id<T>> void updateIn(Collection<ID> ids, Changeset changeset) {
var params = new UpdateInStatement.UpdateInStatementInput<>(ids, changeset.toMap());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public TxManager immediateWrites() {
return withOptions(this.options.withImmediateWrites(true));
}

@Override
public TxManager separateProjections() {
return withOptions(this.options.withSeparateProjections(true));
}

@Override
public TxManager noFirstLevelCache() {
return withOptions(this.options.withFirstLevelCache(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public interface TxManager {
*/
TxManager immediateWrites();

TxManager separateProjections();

/**
* Turn off first level cache
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class TxOptions {

boolean immediateWrites;

boolean separateProjections;

public static TxOptions create(@NonNull IsolationLevel isolationLevel) {
return builder()
.isolationLevel(isolationLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,48 @@
import lombok.NonNull;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.projection.MigrationProjectionCache;
import tech.ydb.yoj.repository.db.projection.ProjectionCache;
import tech.ydb.yoj.repository.db.projection.RoProjectionCache;
import tech.ydb.yoj.repository.db.projection.RwProjectionCache;

import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Supplier;

public class TransactionLocal {
private final Map<Supplier<?>, Object> singletons = new IdentityHashMap<>();

private final Supplier<FirstLevelCache> firstLevelCacheSupplier;
private final Supplier<ProjectionCache> projectionCacheSupplier;
private final Supplier<TransactionLog> logSupplier;
private final FirstLevelCache firstLevelCache;
private final ProjectionCache projectionCache;
private final TransactionLog log;

public TransactionLocal(@NonNull TxOptions options) {
this.firstLevelCacheSupplier = options.isFirstLevelCache() ? FirstLevelCache::create : FirstLevelCache::empty;
this.projectionCacheSupplier = options.isMutable() ? RwProjectionCache::new : RoProjectionCache::new;
this.logSupplier = () -> new TransactionLog(options.getLogLevel());
this.firstLevelCache = options.isFirstLevelCache() ? FirstLevelCache.create() : FirstLevelCache.empty();
this.projectionCache = createProjectionCache(firstLevelCache, options);
this.log = new TransactionLog(options.getLogLevel());
}

public static TransactionLocal get() {
return BaseDb.current(Holder.class).getTransactionLocal();
private static ProjectionCache createProjectionCache(FirstLevelCache firstLevelCache, TxOptions options) {
if (options.isMutable()) {
if (options.isSeparateProjections()) {
return new MigrationProjectionCache(firstLevelCache);
}

return new RwProjectionCache();
}

return new RoProjectionCache();
}

@SuppressWarnings("unchecked")
public <X> X instance(@NonNull Supplier<X> supplier) {
return (X) singletons.computeIfAbsent(supplier, Supplier::get);
public static TransactionLocal get() {
return BaseDb.current(Holder.class).getTransactionLocal();
}

public ProjectionCache projectionCache() {
return instance(projectionCacheSupplier);
return projectionCache;
}

public FirstLevelCache firstLevelCache() {
return instance(firstLevelCacheSupplier);
return firstLevelCache;
}

public TransactionLog log() {
return instance(logSupplier);
return log;
}

public interface Holder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package tech.ydb.yoj.repository.db.projection;

import lombok.RequiredArgsConstructor;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;

import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;

@RequiredArgsConstructor
public class MigrationProjectionCache implements ProjectionCache {
private final FirstLevelCache cache;

@Override
public void load(Entity<?> entity) {
}

@Override
public void save(RepositoryTransaction transaction, Entity<?> entity) {
delete(transaction, entity.getId());

List<Entity<?>> newProjections = entity.createProjections();
for (Entity<?> projection : newProjections) {
saveEntity(transaction, projection);
}
}

@Override
public void delete(RepositoryTransaction transaction, Entity.Id<?> id) {
Optional<? extends Entity<?>> oldEntity;
try {
oldEntity = cache.peek(id);
} catch (NoSuchElementException e) {
return;
}

if (oldEntity.isPresent()) {
List<Entity<?>> oldProjections = oldEntity.get().createProjections();
for (Entity<?> projection : oldProjections) {
deleteEntity(transaction, projection.getId());
}
}
}

@Override
public void applyProjectionChanges(RepositoryTransaction transaction) {
}

private <T extends Entity<T>> void deleteEntity(RepositoryTransaction transaction, Entity.Id<T> entityId) {
transaction.table(entityId.getType()).delete(entityId);
}

private <T extends Entity<T>> void saveEntity(RepositoryTransaction transaction, Entity<T> entity) {
@SuppressWarnings("unchecked")
T castedEntity = (T) entity;

transaction.table(entity.getId().getType()).save(castedEntity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
public interface ProjectionCache {
void load(Entity<?> entity);

void save(Entity<?> entity);
void save(RepositoryTransaction transaction, Entity<?> entity);

void delete(Entity.Id<?> id);
void delete(RepositoryTransaction transaction, Entity.Id<?> id);

void applyProjectionChanges(RepositoryTransaction transaction);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ public void load(Entity<?> entity) {
}

@Override
public void save(Entity<?> entity) {
public void save(RepositoryTransaction transaction, Entity<?> entity) {
throw new UnsupportedOperationException("Should not be invoked in RO");
}

@Override
public void delete(Entity.Id<?> id) {
public void delete(RepositoryTransaction transaction, Entity.Id<?> id) {
throw new UnsupportedOperationException("Should not be invoked in RO");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ public void load(Entity<?> entity) {
}

@Override
public void save(Entity<?> entity) {
public void save(RepositoryTransaction transaction, Entity<?> entity) {
row(entity.getId()).save(entity);
}

@Override
public void delete(Entity.Id<?> id) {
public void delete(RepositoryTransaction transaction, Entity.Id<?> id) {
row(id).delete();
}

Expand Down

0 comments on commit c9b0228

Please sign in to comment.