Skip to content

Commit

Permalink
[Bc2lkk3N] Fixed testImportCsvTerminate and added TerminationGuard to…
Browse files Browse the repository at this point in the history
… apoc.import.csv (neo4j-contrib#343)

* [Bc2lkk3N] Fix testImportCsvTerminate and add TerminationGuard to apoc.import.csv

* [Bc2lkk3N] Fix failing tests

* [Bc2lkk3N] Fix typo
  • Loading branch information
vga91 authored Apr 5, 2023
1 parent bf9988f commit e8a4a7c
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 65,311 deletions.
8 changes: 7 additions & 1 deletion core/src/main/java/apoc/export/csv/CsvEntityLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.opencsv.CSVReaderBuilder;
import org.neo4j.graphdb.*;
import org.neo4j.logging.Log;
import org.neo4j.procedure.TerminationGuard;

import java.io.IOException;
import java.util.*;
Expand All @@ -25,14 +26,17 @@ public class CsvEntityLoader {
private final ProgressReporter reporter;
private final Log log;

private final TerminationGuard terminationGuard;

/**
* @param clc configuration object
* @param reporter
*/
public CsvEntityLoader(CsvLoaderConfig clc, ProgressReporter reporter, Log log) {
public CsvEntityLoader(CsvLoaderConfig clc, ProgressReporter reporter, Log log, TerminationGuard terminationGuard) {
this.clc = clc;
this.reporter = reporter;
this.log = log;
this.terminationGuard = terminationGuard;
}

/**
Expand Down Expand Up @@ -82,6 +86,7 @@ public void loadNodes(final Object fileName, final List<String> labels, final Gr
BatchTransaction btx = new BatchTransaction(db, clc.getBatchSize(), reporter);
try {
csv.forEach(line -> {
terminationGuard.check();
lineNo.getAndIncrement();

final EnumSet<Results> results = EnumSet.of(Results.map);
Expand Down Expand Up @@ -196,6 +201,7 @@ public void loadRelationships(
BatchTransaction btx = new BatchTransaction(db, clc.getBatchSize(), reporter);
try {
csv.forEach(line -> {
terminationGuard.check();
lineNo.getAndIncrement();

final EnumSet<Results> results = EnumSet.of(Results.map);
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/apoc/export/csv/ImportCsv.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public class ImportCsv {
@Context
public Log log;

@Context
public TerminationGuard terminationGuard;

@Procedure(name = "apoc.import.csv", mode = Mode.SCHEMA)
@Description("Imports nodes and relationships with the given labels and types from the provided CSV file.")
public Stream<ProgressInfo> importCsv(
Expand All @@ -41,7 +44,7 @@ public Stream<ProgressInfo> importCsv(
}
final CsvLoaderConfig clc = CsvLoaderConfig.from(config);
final ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, source, "csv"));
final CsvEntityLoader loader = new CsvEntityLoader(clc, reporter, log);
final CsvEntityLoader loader = new CsvEntityLoader(clc, reporter, log, terminationGuard);

final Map<String, Map<String, String>> idMapping = new HashMap<>();
for (Map<String, Object> node : nodes) {
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/apoc/refactor/GraphRefactoring.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class GraphRefactoring {
@Context
public Pools pools;

@Context
public TerminationGuard terminationGuard;

private Stream<NodeRefactorResult> doCloneNodes(@Name("nodes") List<Node> nodes, @Name("withRelationships") boolean withRelationships, List<String> skipProperties) {
if (nodes == null) return Stream.empty();
return nodes.stream().map(node -> Util.rebind(tx, node)).map(node -> {
Expand Down Expand Up @@ -199,6 +202,7 @@ public Stream<NodeRefactorResult> cloneSubgraph(@Name("nodes") List<Node> nodes,

// clone nodes and populate copy map
for (Node node : nodes) {
terminationGuard.check();
if (node == null || standinMap.containsKey(node)) continue;
// standinNodes will NOT be cloned

Expand All @@ -223,6 +227,8 @@ public Stream<NodeRefactorResult> cloneSubgraph(@Name("nodes") List<Node> nodes,

// clone relationships, will be between cloned nodes and/or standins
for (Relationship rel : rels) {
terminationGuard.check();

if (rel == null) continue;

Node oldStart = rel.getStartNode();
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/apoc/cypher/CypherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testRunTimeboxedWithTerminationInnerTransaction() {
TestUtil.testCall(db, query,
Map.of("innerQuery", innerLongQuery),
row -> assertEquals(Map.of("0", 0L), row.get("value")));

lastTransactionChecks(db, query, timeBefore);
}

Expand Down
22 changes: 17 additions & 5 deletions core/src/test/java/apoc/export/BigGraphTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import apoc.export.json.ImportJson;
import apoc.graph.Graphs;
import apoc.meta.Meta;
import apoc.periodic.PeriodicTestUtils;
import apoc.refactor.GraphRefactoring;
import apoc.refactor.rename.Rename;
import apoc.util.TestUtil;
Expand All @@ -29,7 +30,8 @@
import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED;
import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED;
import static apoc.ApocConfig.apocConfig;
import static apoc.util.TransactionTestUtil.checkTerminationGuard;
import static apoc.util.TransactionTestUtil.*;
import static java.util.Collections.emptyMap;
import static org.neo4j.configuration.GraphDatabaseSettings.TransactionStateMemoryAllocation.OFF_HEAP;
import static org.neo4j.configuration.SettingValueParsers.BYTES;

Expand Down Expand Up @@ -79,7 +81,11 @@ public void testTerminateExportJson() {

@Test
public void testTerminateRenameNodeProp() {
checkTerminationGuard(db, "CALL apoc.refactor.rename.nodeProperty('name', 'nameTwo')");
// this procedure leverage the apoc.periodic.iterate, so we should check in the same way
String innerLongQuery = "match (n) where n.`name` IS NOT NULL return n";
String query = "CALL apoc.refactor.rename.nodeProperty('name', 'nameTwo')";

PeriodicTestUtils.testTerminateWithCommand(db, query, innerLongQuery);
}

@Test
Expand All @@ -88,14 +94,20 @@ public void testTerminateRenameType() {
}

@Test
public void testTerminateRefactorProcs() {
public void testTerminateRefactorCloneNodes() {
List<Node> nodes = db.executeTransactionally("MATCH (n:Person) RETURN collect(n) as nodes", Collections.emptyMap(),
r -> r.<List<Node>>columnAs("nodes").next());

checkTerminationGuard(db, "CALL apoc.refactor.cloneNodes($nodes)",
checkTerminationGuard(db, "CALL apoc.refactor.cloneNodes($nodes)",
Map.of("nodes", nodes));
}

@Test
public void testTerminateRefactorCloneSubgraph() {
List<Node> nodes = db.executeTransactionally("MATCH (n:Person) RETURN collect(n) as nodes", Collections.emptyMap(),
r -> r.<List<Node>>columnAs("nodes").next());

checkTerminationGuard(db, "CALL apoc.refactor.cloneSubgraph($nodes)",
Map.of("nodes", nodes));
Map.of("nodes", Util.rebind(nodes, db.beginTx())));
}
}
10 changes: 7 additions & 3 deletions core/src/test/java/apoc/export/csv/ImportCsvTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,13 @@ public void testImportCsvLargeFile() {

@Test
public void testImportCsvTerminate() {
checkTerminationGuard(db, "CALL apoc.import.csv([{fileName: $nodeFile, labels: ['Person']}], [], $config)",
map("nodeFile", "file:/largeFile.csv",
"config", map("batchSize", 100L)));
// multiple files
String files = Stream.of( "Person", "Movie", "Foo", "Bar", "Baz", "Aaa", "Bbb", "Ccc")
.map(i -> "{fileName: $nodeFile, labels: ['" + i + "']}")
.collect(Collectors.joining(","));
checkTerminationGuard(db, "CALL apoc.import.csv([" + files +"], [], {})",
map("nodeFile", "file:/largeFile.csv",
"config", map("batchSize", 100L)));
}

@Test
Expand Down
11 changes: 2 additions & 9 deletions core/src/test/java/apoc/export/json/ImportJsonTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@


public class ImportJsonTest {
public static String LARGE_REMOTE_FILE = "https://devrel-data-science.s3.us-east-2.amazonaws.com/twitch_all.json";

private static final long NODES_BIG_JSON = 16L;
private static final long RELS_BIG_JSON = 4L;
Expand Down Expand Up @@ -192,11 +193,9 @@ public void shouldTerminateImportWhenTransactionIsTimedOut() {

createConstraints(List.of("Stream", "User", "Game", "Team", "Language"));

String filename = "https://devrel-data-science.s3.us-east-2.amazonaws.com/twitch_all.json";

final String query = "CALL apoc.import.json($file)";

TransactionTestUtil.checkTerminationGuard(db, query, map("file", filename));
TransactionTestUtil.checkTerminationGuard(db, query, map("file", LARGE_REMOTE_FILE));
}

@Test
Expand Down Expand Up @@ -307,12 +306,6 @@ public void shouldImportAllJsonFromBinary() {
assertionsAllJsonDbResult();
}

@Test
public void shouldTerminateImportJson() {
createConstraints(List.of("Movie", "Other", "Person"));
checkTerminationGuard(db, "CALL apoc.import.json('testTerminate.json',{})");
}

private void assertionsAllJsonProgressInfo(Map<String, Object> r, boolean isBinary) {
// then
Assert.assertEquals(isBinary ? null : "all.json", r.get("file"));
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/java/apoc/load/LoadJsonTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;

import static apoc.ApocConfig.*;
import static apoc.export.json.ImportJsonTest.LARGE_REMOTE_FILE;
import static apoc.util.BinaryTestUtil.fileToBinary;
import static apoc.util.CompressionConfig.COMPRESSION;
import static apoc.convert.ConvertJsonTest.EXPECTED_AS_PATH_LIST;
Expand Down Expand Up @@ -422,8 +423,8 @@ public void testLoadJsonParams() {

@Test
public void shouldTerminateLoadJson() {
URL url = ClassLoader.getSystemResource("exportJSON/testTerminate.json");
checkTerminationGuard(db, "CALL apoc.load.json($file)",
Map.of("file", url.toString()));
Map.of("file", LARGE_REMOTE_FILE)
);
}
}
8 changes: 7 additions & 1 deletion core/src/test/java/apoc/load/XmlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.configuration.GraphDatabaseInternalSettings;
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.test.rule.DbmsRule;
Expand All @@ -40,12 +41,17 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.neo4j.configuration.GraphDatabaseSettings.TransactionStateMemoryAllocation.OFF_HEAP;
import static org.neo4j.configuration.SettingValueParsers.BYTES;

public class XmlTest {
public static final String FILE_SHORTENED = "src/test/resources/xml/humboldt_soemmering01_1791.TEI-P5-shortened.xml";

@Rule
public DbmsRule db = new ImpermanentDbmsRule()
.withSetting(GraphDatabaseSettings.memory_tracking, true)
.withSetting(GraphDatabaseSettings.tx_state_memory_allocation, OFF_HEAP)
.withSetting(GraphDatabaseSettings.tx_state_max_off_heap_memory, BYTES.parse("1G"))
.withSetting(GraphDatabaseInternalSettings.cypher_ip_blocklist, List.of(new IPAddressString("127.0.0.0/8")));

@Before
Expand Down Expand Up @@ -560,6 +566,6 @@ public void testImportXmlPreventBillionLaughsVulnerabilityThrowsQueryExecutionEx
@Test
public void testTerminateImportXml() {
final String file = ClassLoader.getSystemResource("largeFile.graphml").toString();
TransactionTestUtil.checkTerminationGuard(db, "call apoc.import.xml($file)", Map.of("file", file));
TransactionTestUtil.checkTerminationGuard(db, 3L, "call apoc.import.xml($file)", Map.of("file", file));
}
}
Loading

0 comments on commit e8a4a7c

Please sign in to comment.