Skip to content

Commit

Permalink
[gRNAet7B] Added trigger show procedure (#3335)
Browse files Browse the repository at this point in the history
* Fix flaky trigger dbms availability tests
* added apoc.trigger.show proc
* retry to fix flaky test by separating setLastUpdate()
* removed updated output - some code refactoring
* split pr - changes review
* removed inner transaction
* updated documentation
* reset wrong generated docs
* Added admin in trigger.show
* small adoc changes
  • Loading branch information
vga91 authored Dec 12, 2022
1 parent 4e8ac25 commit f13b6c4
Show file tree
Hide file tree
Showing 19 changed files with 195 additions and 43 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/apoc/trigger/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public TriggerInfo toTriggerInfo(Map.Entry<String, Object> e) {

@Admin
@Procedure(mode = Mode.READ)
@Description("list all installed triggers")
@Description("CALL apoc.trigger.list() | list all currently working triggers for all databases for the session database")
public Stream<TriggerInfo> list() {
return triggerHandler.list().entrySet().stream()
.map( (e) -> new TriggerInfo(e.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static apoc.ApocConfig.APOC_TRIGGER_ENABLED;
import static apoc.ApocConfig.apocConfig;

public class TriggerHandlerWrite {
public class TriggerHandlerNewProcedures {
public static final String NOT_ENABLED_ERROR = "Triggers have not been enabled." +
" Set 'apoc.trigger.enabled=true' in your apoc.conf file located in the $NEO4J_HOME/conf/ directory.";

public static Map<String, Object> toTriggerInfo(Node node) {
private static Map<String, Object> toTriggerInfo(Node node) {
return node.getAllProperties()
.entrySet().stream()
.filter(e -> !List.of(SystemPropertyKeys.name.name(), SystemPropertyKeys.database.name()).contains(e.getKey()))
.filter(e -> !SystemPropertyKeys.database.name().equals(e.getKey()))
.collect(HashMap::new, // workaround for https://bugs.openjdk.java.net/browse/JDK-8148463
(mapAccumulator, e) -> {
Object value = List.of(SystemPropertyKeys.selector.name(), SystemPropertyKeys.params.name()).contains(e.getKey())
Expand Down Expand Up @@ -53,7 +54,7 @@ public static Map<String, Object> install(String databaseName, String triggerNam
Pair.of(SystemPropertyKeys.name.name(), triggerName));

// we'll return previous trigger info
previous.putAll(TriggerHandlerWrite.toTriggerInfo(node));
previous.putAll(toTriggerInfo(node));

node.setProperty(SystemPropertyKeys.statement.name(), statement);
node.setProperty(SystemPropertyKeys.selector.name(), Util.toJson(selector));
Expand All @@ -73,7 +74,7 @@ public static Map<String, Object> drop(String databaseName, String triggerName)
withSystemDb(tx -> {
getTriggerNodes(databaseName, tx, triggerName)
.forEachRemaining(node -> {
previous.putAll(TriggerHandlerWrite.toTriggerInfo(node));
previous.putAll(toTriggerInfo(node));
node.delete();
});

Expand All @@ -93,7 +94,7 @@ public static Map<String, Object> updatePaused(String databaseName, String name,
node.setProperty( SystemPropertyKeys.paused.name(), paused );

// we'll return previous trigger info
result.putAll(TriggerHandlerWrite.toTriggerInfo(node));
result.putAll(toTriggerInfo(node));
});

setLastUpdate(databaseName, tx);
Expand All @@ -112,7 +113,7 @@ public static Map<String, Object> dropAll(String databaseName) {
String triggerName = (String) node.getProperty(SystemPropertyKeys.name.name());

// we'll return previous trigger info
previous.put(triggerName, TriggerHandlerWrite.toTriggerInfo(node));
previous.put(triggerName, toTriggerInfo(node));
node.delete();
});
setLastUpdate(databaseName, tx);
Expand All @@ -122,6 +123,13 @@ public static Map<String, Object> dropAll(String databaseName) {
return previous;
}

public static List<Map<String, Object>> getTriggerNodesList(String databaseName, Transaction tx) {
return getTriggerNodes(databaseName, tx)
.stream()
.map(TriggerHandlerNewProcedures::toTriggerInfo)
.collect(Collectors.toList());
}

public static ResourceIterator<Node> getTriggerNodes(String databaseName, Transaction tx) {
return getTriggerNodes(databaseName, tx, null);
}
Expand Down
66 changes: 41 additions & 25 deletions core/src/main/java/apoc/trigger/TriggerNewProcedures.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package apoc.trigger;

import apoc.ApocConfig;
import apoc.SystemPropertyKeys;
import apoc.util.Util;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.api.procedure.SystemProcedure;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Admin;
Expand All @@ -22,7 +24,7 @@
public class TriggerNewProcedures {
// public for testing purpose
public static final String TRIGGER_NOT_ROUTED_ERROR = "The procedure should be routed and executed against a LEADER system database";

public static class TriggerInfo {
public String name;
public String query;
Expand All @@ -40,21 +42,28 @@ public TriggerInfo(String name, String query, Map<String, Object> selector, bool

public TriggerInfo( String name, String query, Map<String,Object> selector, Map<String,Object> params, boolean installed, boolean paused )
{
this.name = name;
this.query = query;
this.selector = selector;
this(name, query, selector, installed, paused);
this.params = params;
this.installed = installed;
this.paused = paused;
}

public static TriggerInfo from(Map<String, Object> mapInfo, boolean installed) {
return new TriggerInfo((String) mapInfo.get(SystemPropertyKeys.name.name()),
(String) mapInfo.get(SystemPropertyKeys.statement.name()),
(Map<String, Object>) mapInfo.get(SystemPropertyKeys.selector.name()),
(Map<String, Object>) mapInfo.get(SystemPropertyKeys.params.name()),
installed,
(boolean) mapInfo.getOrDefault(SystemPropertyKeys.paused.name(), true));
}
}

@Context public GraphDatabaseService db;

@Context public Log log;

@Context public Transaction tx;

private void checkInSystemLeader() {
TriggerHandlerWrite.checkEnabled();
TriggerHandlerNewProcedures.checkEnabled();
// routing check
if (!db.databaseName().equals(SYSTEM_DATABASE_NAME) || !Util.isWriteableInstance(db, SYSTEM_DATABASE_NAME)) {
throw new RuntimeException(TRIGGER_NOT_ROUTED_ERROR);
Expand All @@ -66,7 +75,7 @@ public TriggerInfo toTriggerInfo(Map.Entry<String, Object> e) {
if (e.getValue() instanceof Map) {
try {
Map<String, Object> value = (Map<String, Object>) e.getValue();
return new TriggerInfo(name, (String) value.get("statement"), (Map<String, Object>) value.get("selector"), (Map<String, Object>) value.get("params"), false, false);
return TriggerInfo.from(value, false);
} catch(Exception ex) {
return new TriggerInfo(name, ex.getMessage(), null, false, false);
}
Expand All @@ -86,10 +95,10 @@ public Stream<TriggerInfo> install(@Name("databaseName") String databaseName, @N
Util.validateQuery(ApocConfig.apocConfig().getDatabase(databaseName), statement);

Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
Map<String, Object> removed = TriggerHandlerWrite.install(databaseName, name, statement, selector, params);
if (!removed.isEmpty()) {
Map<String, Object> removed = TriggerHandlerNewProcedures.install(databaseName, name, statement, selector, params);
if (removed.containsKey(SystemPropertyKeys.statement.name())) {
return Stream.of(
new TriggerInfo( name, (String)removed.get( "statement"), (Map<String, Object>) removed.get( "selector"), (Map<String, Object>) removed.get( "params"), false, false),
TriggerInfo.from(removed, false),
new TriggerInfo( name, statement, selector, params, true, false));
}
return Stream.of(new TriggerInfo( name, statement, selector, params, true, false));
Expand All @@ -103,11 +112,11 @@ public Stream<TriggerInfo> install(@Name("databaseName") String databaseName, @N
@Description("CALL apoc.trigger.drop(databaseName, name) | eventually removes an existing trigger, returns the trigger's information")
public Stream<TriggerInfo> drop(@Name("databaseName") String databaseName, @Name("name")String name) {
checkInSystemLeader();
Map<String, Object> removed = TriggerHandlerWrite.drop(databaseName, name);
if (removed.isEmpty()) {
Map<String, Object> removed = TriggerHandlerNewProcedures.drop(databaseName, name);
if (!removed.containsKey(SystemPropertyKeys.statement.name())) {
return Stream.of(new TriggerInfo(name, null, null, false, false));
}
return Stream.of(new TriggerInfo(name,(String)removed.get("statement"), (Map<String, Object>) removed.get("selector"), (Map<String, Object>) removed.get("params"),false, false));
return Stream.of(TriggerInfo.from(removed, false));
}


Expand All @@ -118,7 +127,7 @@ public Stream<TriggerInfo> drop(@Name("databaseName") String databaseName, @Name
@Description("CALL apoc.trigger.dropAll(databaseName) | eventually removes all previously added trigger, returns triggers' information")
public Stream<TriggerInfo> dropAll(@Name("databaseName") String databaseName) {
checkInSystemLeader();
Map<String, Object> removed = TriggerHandlerWrite.dropAll(databaseName);
Map<String, Object> removed = TriggerHandlerNewProcedures.dropAll(databaseName);
return removed.entrySet().stream().map(this::toTriggerInfo);
}

Expand All @@ -129,12 +138,9 @@ public Stream<TriggerInfo> dropAll(@Name("databaseName") String databaseName) {
@Description("CALL apoc.trigger.stop(databaseName, name) | eventually pauses the trigger")
public Stream<TriggerInfo> stop(@Name("databaseName") String databaseName, @Name("name")String name) {
checkInSystemLeader();
Map<String, Object> paused = TriggerHandlerWrite.updatePaused(databaseName, name, true);
Map<String, Object> paused = TriggerHandlerNewProcedures.updatePaused(databaseName, name, true);

return Stream.of(new TriggerInfo(name,
(String)paused.get("statement"),
(Map<String,Object>) paused.get("selector"),
(Map<String,Object>) paused.get("params"),true, true));
return Stream.of(TriggerInfo.from(paused,true));
}

// TODO - change with @SystemOnlyProcedure
Expand All @@ -144,12 +150,22 @@ public Stream<TriggerInfo> stop(@Name("databaseName") String databaseName, @Name
@Description("CALL apoc.trigger.start(databaseName, name) | eventually unpauses the paused trigger")
public Stream<TriggerInfo> start(@Name("databaseName") String databaseName, @Name("name")String name) {
checkInSystemLeader();
Map<String, Object> resume = TriggerHandlerWrite.updatePaused(databaseName, name, false);
Map<String, Object> resume = TriggerHandlerNewProcedures.updatePaused(databaseName, name, false);

return Stream.of(new TriggerInfo(name,
(String)resume.get("statement"),
(Map<String,Object>) resume.get("selector"),
(Map<String,Object>) resume.get("params"),true, false));
return Stream.of(TriggerInfo.from(resume, true));
}

// TODO - change with @SystemOnlyProcedure
@SystemProcedure
@Admin
@Procedure(mode = Mode.READ)
@Description("CALL apoc.trigger.show(databaseName) | it lists all eventually installed triggers for a database")
public Stream<TriggerInfo> show(@Name("databaseName") String databaseName) {
checkInSystemLeader();

return TriggerHandlerNewProcedures.getTriggerNodesList(databaseName, tx)
.stream()
.map(trigger -> TriggerInfo.from(trigger, true)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public void testTriggersAllowedOnlyWithAdmin() {
failsWithNonAdminUser(sysUserSession, "apoc.trigger.dropAll", "call apoc.trigger.dropAll('neo4j')");
failsWithNonAdminUser(sysUserSession, "apoc.trigger.stop", "call apoc.trigger.stop('neo4j', 'qwe')");
failsWithNonAdminUser(sysUserSession, "apoc.trigger.start", "call apoc.trigger.start('neo4j', 'qwe')");
failsWithNonAdminUser(sysUserSession, "apoc.trigger.show", "call apoc.trigger.show('neo4j')");
}

try (Session neo4jUserSession = userDriver.session(SessionConfig.forDatabase(DEFAULT_DATABASE_NAME))) {
Expand Down
30 changes: 30 additions & 0 deletions core/src/test/java/apoc/trigger/TriggerNewProceduresTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static apoc.util.TestUtil.testCallCount;
import static apoc.util.TestUtil.testCallCountEventually;
import static apoc.util.TestUtil.testCallEventually;
import static apoc.util.TestUtil.testResult;
import static apoc.util.TestUtil.waitDbsAvailable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -503,6 +504,35 @@ public void testDeleteRelationships() {
//
// new test cases
//

@Test
public void testTriggerShow() {
String name = "test-show1";
String name2 = "test-show2";
String query = "MATCH (c:TestShow) SET c.count = 1";

testCall(sysDb, "CALL apoc.trigger.install('neo4j', $name, $query,{}) YIELD name",
map("query", query, "name", name),
r -> assertEquals(name, r.get("name")));

testCall(sysDb, "CALL apoc.trigger.install('neo4j', $name, $query,{}) YIELD name",
map("query", query, "name", name2),
r -> assertEquals(name2, r.get("name")));

// not updated
testResult(sysDb, "CALL apoc.trigger.show('neo4j') " +
"YIELD name, query RETURN * ORDER BY name",
map("query", query, "name", name),
res -> {
Map<String, Object> row = res.next();
assertEquals(name, row.get("name"));
assertEquals(query, row.get("query"));
row = res.next();
assertEquals(name2, row.get("name"));
assertEquals(query, row.get("query"));
assertFalse(res.hasNext());
});
}

@Test
public void testInstallTriggerInUserDb() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
¦apoc.trigger.remove(name :: STRING?) :: (name :: STRING?, query :: STRING?, selector :: MAP?, params :: MAP?, installed :: BOOLEAN?, paused :: BOOLEAN?)
¦apoc.trigger.removeAll() :: (name :: STRING?, query :: STRING?, selector :: MAP?, params :: MAP?, installed :: BOOLEAN?, paused :: BOOLEAN?)
¦apoc.trigger.resume(name :: STRING?) :: (name :: STRING?, query :: STRING?, selector :: MAP?, params :: MAP?, installed :: BOOLEAN?, paused :: BOOLEAN?)
¦apoc.trigger.show(databaseName :: STRING?) :: (name :: STRING?, query :: STRING?, selector :: MAP?, params :: MAP?, installed :: BOOLEAN?, paused :: BOOLEAN?)
¦apoc.trigger.start(databaseName :: STRING?, name :: STRING?) :: (name :: STRING?, query :: STRING?, selector :: MAP?, params :: MAP?, installed :: BOOLEAN?, paused :: BOOLEAN?)
¦apoc.trigger.stop(databaseName :: STRING?, name :: STRING?) :: (name :: STRING?, query :: STRING?, selector :: MAP?, params :: MAP?, installed :: BOOLEAN?, paused :: BOOLEAN?)
¦apoc.trigger.nodesByLabel(labelEntries :: ANY?, label :: STRING?) :: (LIST? OF ANY?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ add a trigger kernelTransaction under a name, in the kernelTransaction you can u
|label:apoc-core[]
|xref::overview/apoc.trigger/apoc.trigger.adoc[apoc.trigger.list icon:book[]]

list all installed triggers
CALL apoc.trigger.list() | list all currently working triggers for all databases for the session database
|label:procedure[]
|label:apoc-core[]
|xref::overview/apoc.trigger/apoc.trigger.adoc[apoc.trigger.pause icon:book[]]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
¦xref::overview/apoc.trigger/apoc.trigger.list.adoc[apoc.trigger.list icon:book[]] +

`list all installed triggers`
`CALL apoc.trigger.list() | list all currently working triggers for all databases for the session database`
¦label:procedure[]
¦label:apoc-core[]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
¦type¦qualified name¦signature¦description
¦procedure¦apoc.trigger.list¦apoc.trigger.list() :: (name :: STRING?, query :: STRING?, selector :: MAP?, params :: MAP?, installed :: BOOLEAN?, paused :: BOOLEAN?)¦list all installed triggers
¦procedure¦apoc.trigger.list¦apoc.trigger.list() :: (name :: STRING?, query :: STRING?, selector :: MAP?, params :: MAP?, installed :: BOOLEAN?, paused :: BOOLEAN?)¦CALL apoc.trigger.list() | list all currently working triggers for all databases for the session database
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
¦xref::overview/apoc.trigger/apoc.trigger.show.adoc[apoc.trigger.show icon:book[]] +

`CALL apoc.trigger.show(databaseName) | it lists all eventually installed triggers for a database`
¦label:procedure[]
¦label:apoc-core[]
Loading

0 comments on commit f13b6c4

Please sign in to comment.