Skip to content

Commit

Permalink
Process nodes using forEach()
Browse files Browse the repository at this point in the history
  • Loading branch information
labkey-nicka committed Oct 9, 2024
1 parent f75d908 commit 37d564d
Showing 1 changed file with 27 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
import org.json.JSONObject;
import org.labkey.api.action.ApiJsonWriter;
import org.labkey.api.data.Container;
import org.labkey.api.data.SQLFragment;
import org.labkey.api.data.Selector;
import org.labkey.api.data.SimpleFilter;
import org.labkey.api.data.SqlSelector;
import org.labkey.api.data.TableInfo;
import org.labkey.api.data.TableSelector;
import org.labkey.api.exp.Identifiable;
import org.labkey.api.exp.IdentifiableBase;
Expand All @@ -25,29 +24,22 @@
import org.labkey.api.exp.api.ExpRunItem;
import org.labkey.api.exp.api.ExperimentJSONConverter;
import org.labkey.api.exp.api.ExpLineageService;
import org.labkey.api.exp.query.ExpDataTable;
import org.labkey.api.exp.query.ExpMaterialTable;
import org.labkey.api.exp.query.ExpRunTable;
import org.labkey.api.query.FieldKey;
import org.labkey.api.security.User;
import org.labkey.api.security.permissions.ReadPermission;
import org.labkey.api.util.GUID;
import org.labkey.api.util.logging.LogHelper;
import org.labkey.api.view.UnauthorizedException;
import org.labkey.experiment.api.Data;
import org.labkey.experiment.api.ExpDataImpl;
import org.labkey.experiment.api.ExpMaterialImpl;
import org.labkey.experiment.api.ExpRunImpl;
import org.labkey.experiment.api.ExperimentRun;
import org.labkey.experiment.api.ExperimentServiceImpl;
import org.labkey.experiment.api.Material;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;

Expand All @@ -64,36 +56,6 @@ public static ExpLineageServiceImpl get()
return (ExpLineageServiceImpl) ExpLineageService.get();
}

private static void forEachBatchExpData(Collection<Integer> rowIds, int batchSize, final Selector.ForEachBatchBlock<ExpDataImpl> block)
{
if (rowIds.isEmpty())
return;

SimpleFilter filter = new SimpleFilter(FieldKey.fromParts(ExpDataTable.Column.RowId.name()), rowIds, IN);
new TableSelector(ExperimentServiceImpl.get().getTinfoData(), filter, null)
.forEachBatch(Data.class, batchSize, (batch) -> block.exec(ExpDataImpl.fromDatas(batch)));
}

private static void forEachBatchExpMaterial(Collection<Integer> rowIds, int batchSize, final Selector.ForEachBatchBlock<ExpMaterialImpl> block)
{
if (rowIds.isEmpty())
return;

SimpleFilter filter = new SimpleFilter(FieldKey.fromParts(ExpMaterialTable.Column.RowId.name()), rowIds, IN);
new TableSelector(ExperimentServiceImpl.get().getTinfoMaterial(), filter, null)
.forEachBatch(Material.class, batchSize, (batch) -> block.exec(ExpMaterialImpl.fromMaterials(batch)));
}

private static void forEachBatchExpRun(Collection<Integer> rowIds, int batchSize, final Selector.ForEachBatchBlock<ExpRunImpl> block)
{
if (rowIds.isEmpty())
return;

SimpleFilter filter = new SimpleFilter().addInClause(FieldKey.fromParts(ExpRunTable.Column.RowId.name()), rowIds);
new TableSelector(ExperimentServiceImpl.get().getTinfoExperimentRun(), filter, null)
.forEachBatch(ExperimentRun.class, batchSize, (batch) -> block.exec(ExpRunImpl.fromRuns(batch)));
}

public record LineageResult(
Set<Identifiable> seeds,
Set<ExpLineage.Edge> edges,
Expand Down Expand Up @@ -288,7 +250,7 @@ private record StreamContext(
Map<GUID, Boolean> hasPermission
)
{
boolean hasPermission(Container container, User user)
boolean hasPermission(Container container)
{
return this.hasPermission.computeIfAbsent(container.getEntityId(), (id) -> container.hasPermission(user, ReadPermission.class));
}
Expand All @@ -302,8 +264,6 @@ ExpLineage.Edges popEdges(String lsid)
}
}

private static final int BATCH_SIZE = 5_000;

public void streamLineage(Container container, User user, HttpServletResponse response, Set<Identifiable> seeds, ExpLineageOptions options) throws IOException
{
var lineage = getLineageResult(container, user, seeds, options);
Expand All @@ -330,48 +290,49 @@ private static void writeNodes(LineageResult lineage, StreamContext context) thr

if (!context.nodesAndEdges.isEmpty())
{
if (!lineage.dataIds().isEmpty())
forEachBatchExpData(lineage.dataIds(), BATCH_SIZE, writeBatch(context));
if (!lineage.materialIds().isEmpty())
forEachBatchExpMaterial(lineage.materialIds(), BATCH_SIZE, writeBatch(context));
if (!lineage.runIds().isEmpty())
forEachBatchExpRun(lineage.runIds(), BATCH_SIZE, writeBatch(context));
if (!lineage.objectLsids().isEmpty())
if (!lineage.dataIds.isEmpty())
writeNodes(lineage.dataIds, ExperimentServiceImpl.get().getTinfoData(), Data.class, context);
if (!lineage.materialIds.isEmpty())
writeNodes(lineage.materialIds, ExperimentServiceImpl.get().getTinfoMaterial(), Material.class, context);
if (!lineage.runIds.isEmpty())
writeNodes(lineage.runIds, ExperimentServiceImpl.get().getTinfoExperimentRun(), ExperimentRun.class, context);
if (!lineage.objectLsids.isEmpty())
{
var lsidManager = LsidManager.get();
writeBatchList(lineage.objectLsids().stream().map(lsidManager::getObject).toList(), context);
lineage.objectLsids.stream().map(lsidManager::getObject).filter(Objects::nonNull).forEach((node) -> writeNode(node, context));
}
}

writeBatchList(lineage.seeds().stream().toList(), context);
for (var seed : lineage.seeds())
writeNode(seed, context);

// Write out the edges for any node that was not previously written
// (e.g. not resolved, user does not have permissions to read, etc.).
for (var entry : context.nodesAndEdges.entrySet())
context.writer.writeProperty(entry.getKey(), nodeToJson(null, entry.getValue(), context));

context.writer.endObject();
}

private static Selector.ForEachBatchBlock writeBatch(StreamContext context)
private static <T extends Identifiable> void writeNodes(Set<Integer> rowIds, TableInfo table, Class<T> clazz, StreamContext context)
{
return (batch) -> {
try
{
writeBatchList(batch, context);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
};
new TableSelector(table, new SimpleFilter(FieldKey.fromParts("RowId"), rowIds, IN), null).forEach(clazz, (node) -> writeNode(node.getExpObject(), context));
}

private static void writeBatchList(List<? extends Identifiable> batch, StreamContext context) throws IOException
private static void writeNode(Identifiable node, StreamContext context)
{
for (var node : batch)
if (node == null)
return;

try
{
if (context.hasPermission(node.getContainer(), context.user))
if (context.hasPermission(node.getContainer()))
context.writer.writeProperty(node.getLSID(), nodeToJson(node, context.popEdges(node.getLSID()), context));
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}

private static JSONObject nodeToJson(@Nullable Identifiable node, ExpLineage.Edges edges, StreamContext context)
Expand Down

0 comments on commit 37d564d

Please sign in to comment.