Skip to content

Commit

Permalink
[gRNAet7B] Fixes trigger procedures in clusters for neo4j 5.x (#250)
Browse files Browse the repository at this point in the history
* [gRNAet7B] Fixes trigger procedures in clusters (#3183)

* [gRNAet7B] Leaves it working for add trigger

* [gRNAet7B] Fixes trigger procedures with split LifeCycle and System utils

* Deprecated old triggers and included warning - Added check to fail if they are executed against a system = FOLLOWER

* Added cluster tests

* removed unused import

* changed msg wording

* split deprecated and new triggers

* Fixed TriggerExtendedTest

* update apoc.systemdb.export.metadata

* remove lastUpdate, rename classes

* added eventual consistency

* commonize trigger new and deprec. cache

* ignored tests

* added eventually separated tests - restored util.validateQuery - small changes

* renamed Trigger classes

* var names - test changes and additions

* nitpick and lightened test

* test flaky cluster

Co-authored-by: Giuseppe Villani <[email protected]>

* [gRNAet7B] Trigger procedures neo4j 5.x changes

* [gRNAet7B] code clean - removed unused import

* [gRNAet7B] Try fix flaky TriggerClusterRoutingTest (neo4j-contrib/neo4j-apoc-procedures#3323)

* [gRNAet7B] fix some tests

* [gRNAet7B] fix TriggerClusterRoutingTest

* [gRNAet7B] fix TriggerRouting error message

* [gRNAet7B] Added trigger show procedure (neo4j-contrib/neo4j-apoc-procedures#3335)

* Fix flaky trigger dbms availability tests
* added apoc.trigger.show proc
* retry to fix flaky test by separating setLastUpdate()
* removed updated output - some code refactoring
* split pr - changes review
* removed inner transaction
* updated documentation
* reset wrong generated docs
* Added admin in trigger.show
* small adoc changes

* [gRNAet7B] Try de-flaky some tests with new procedure triggers (neo4j-contrib/neo4j-apoc-procedures#3355)

* Try de-flaky some tests with new procedure triggers
* moved TriggerTestUtil and increased trigger.refresh value
* added waitDbsAvailable util

* [gRNAet7B] removed unused import

* [gRNAet7B] fix cluster test

* [gRNAet7B] added documentation about new trigger procedures and deprecated ones (neo4j-contrib/neo4j-apoc-procedures#3325)

* [gRNAet7B] added trigger new procs documentation
* changes review - procedure description
* changed listener wording
* added system db note
* var. small changes

* [gRNAet7B] Fixes triggers (neo4j-contrib/neo4j-apoc-procedures#3369)

* [coZSswV2] Fix typo in triggers error message

* [coZSswV2] Assert new triggers can't add TransactionEventListeners to "system"

* [coZSswV2] Assert old triggers can't add TransactionEventListeners to "system"

* [gRNAet7B] removed unused import

* [gRNAet7B] moved TriggerInfo class, removed wrong extended file

* [gRNAet7B] Add @Admin to trigger procedures (neo4j-contrib/neo4j-apoc-procedures#3357)

* [qwJFy8pp] Add @Admin to trigger procedures
* added admin in trigger.list and docs note
* changed admin doc

* [gRNAet7B] moved classes - changed TriggerInfo - err. handling changes

* [gRNAet7B] Cleanup trigger docs (neo4j-contrib/neo4j-apoc-procedures#3378)

* [gRNAet7B] added TriggerEnterpriseFeaturesTest, removed log and session db check

* [gRNAet7B] changed descriptions consistently with to Trigger.java 5.x ones

Co-authored-by: Nacho Cordón <[email protected]>
Co-authored-by: Daniel Leaver <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2023
1 parent 95121ff commit c575792
Show file tree
Hide file tree
Showing 15 changed files with 1,497 additions and 85 deletions.
2 changes: 1 addition & 1 deletion common/src/main/java/apoc/ApocConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import static org.neo4j.configuration.GraphDatabaseSettings.transaction_logs_root_path;

public class ApocConfig extends LifecycleAdapter {
protected static final String SUN_JAVA_COMMAND = "sun.java.command";
public static final String SUN_JAVA_COMMAND = "sun.java.command";
public static final String APOC_IMPORT_FILE_ENABLED = "apoc.import.file.enabled";
public static final String APOC_EXPORT_FILE_ENABLED = "apoc.export.file.enabled";
public static final String APOC_IMPORT_FILE_USE_NEO4J_CONFIG = "apoc.import.file.use_neo4j_config";
Expand Down
100 changes: 50 additions & 50 deletions core/src/main/java/apoc/trigger/Trigger.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package apoc.trigger;

import apoc.util.Util;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.procedure.Admin;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
Expand All @@ -12,47 +13,43 @@
import java.util.Map;
import java.util.stream.Stream;

import static apoc.ApocConfig.apocConfig;

/**
* @author mh
* @since 20.09.16
*/

public class Trigger {
public static class TriggerInfo {
public String name;
public String query;
public Map<String,Object> selector;
public Map<String, Object> params;
public boolean installed;
public boolean paused;

public TriggerInfo(String name, String query, Map<String, Object> selector, boolean installed, boolean paused) {
this.name = name;
this.query = query;
this.selector = selector;
this.installed = installed;
this.paused = paused;
}
public static final String SYS_DB_NON_WRITER_ERROR = """
This instance is not allowed to write to the system database.
Please open a session against a system database writer when using this procedure.
""";

@Context
public GraphDatabaseAPI db;

public TriggerInfo( String name, String query, Map<String,Object> selector, Map<String,Object> params, boolean installed, boolean paused )
{
this.name = name;
this.query = query;
this.selector = selector;
this.params = params;
this.installed = installed;
this.paused = paused;
}
}
@Context public TriggerHandler triggerHandler;

@Context public GraphDatabaseService db;
private void preprocessDeprecatedProcedures() {
final String msgDeprecation = """
Please note that the current procedure is deprecated,
it is recommended to use the `apoc.trigger.install`, `apoc.trigger.drop`, `apoc.trigger.dropAll`, `apoc.trigger.stop`, and `apoc.trigger.start` procedures,
instead of, respectively, `apoc.trigger.add`, `apoc.trigger.remove`, `apoc.trigger.removeAll`, `apoc.trigger.pause`, and `apoc.trigger.resume`.""";

@Context public TriggerHandler triggerHandler;
if (!Util.isWriteableInstance((GraphDatabaseAPI) apocConfig().getSystemDb())) {
throw new RuntimeException(SYS_DB_NON_WRITER_ERROR + msgDeprecation);
}
}

@Procedure(name = "apoc.trigger.add", mode = Mode.WRITE)
@Admin
@Deprecated
@Procedure(name = "apoc.trigger.add", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.install")
@Description("Adds a trigger to the given Cypher statement.\n" +
"The selector for this procedure is {phase:'before/after/rollback/afterAsync'}.")
public Stream<TriggerInfo> add(@Name("name") String name, @Name("statement") String statement, @Name(value = "selector"/*, defaultValue = "{}"*/) Map<String,Object> selector, @Name(value = "config", defaultValue = "{}") Map<String,Object> config) {
public Stream<TriggerInfo> add(@Name("name") String name, @Name("statement") String statement, @Name(value = "selector") Map<String,Object> selector, @Name(value = "config", defaultValue = "{}") Map<String,Object> config) {
preprocessDeprecatedProcedures();

Util.validateQuery(db, statement);
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());
Map<String, Object> removed = triggerHandler.add(name, statement, selector, params);
Expand All @@ -64,41 +61,37 @@ public Stream<TriggerInfo> add(@Name("name") String name, @Name("statement") Str
return Stream.of(new TriggerInfo(name,statement,selector, params,true, false));
}

@Procedure(name = "apoc.trigger.remove", mode = Mode.WRITE)
@Admin
@Deprecated
@Procedure(name = "apoc.trigger.remove", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.drop")
@Description("Removes the given trigger.")
public Stream<TriggerInfo> remove(@Name("name")String name) {
preprocessDeprecatedProcedures();

Map<String, Object> removed = triggerHandler.remove(name);
if (removed == null) {
return Stream.of(new TriggerInfo(name, null, null, false, false));
}
return Stream.of(new TriggerInfo(name,(String)removed.get("statement"), (Map<String, Object>) removed.get("selector"), (Map<String, Object>) removed.get("params"),false, false));
}

@Procedure(name = "apoc.trigger.removeAll", mode = Mode.WRITE)
@Admin
@Deprecated
@Procedure(name = "apoc.trigger.removeAll", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.dropAll")
@Description("Removes all previously added triggers.")
public Stream<TriggerInfo> removeAll() {
preprocessDeprecatedProcedures();

Map<String, Object> removed = triggerHandler.removeAll();
if (removed == null) {
return Stream.of(new TriggerInfo(null, null, null, false, false));
}
return removed.entrySet().stream().map(this::toTriggerInfo);
}

public TriggerInfo toTriggerInfo(Map.Entry<String, Object> e) {
String name = e.getKey();
if (e.getValue() instanceof Map) {
try {
Map<String, Object> value = (Map<String, Object>) e.getValue();
return new TriggerInfo(name, (String) value.get("statement"), (Map<String, Object>) value.get("selector"), (Map<String, Object>) value.get("params"), false, false);
} catch(Exception ex) {
return new TriggerInfo(name, ex.getMessage(), null, false, false);
}
}
return new TriggerInfo(name, null, null, false, false);
return removed.entrySet().stream().map(TriggerInfo::entryToTriggerInfo);
}

@Admin
@Procedure(name = "apoc.trigger.list", mode = Mode.READ)
@Description("Lists all installed triggers.")
@Description("Lists all currently installed triggers for the session database.")
public Stream<TriggerInfo> list() {
return triggerHandler.list().entrySet().stream()
.map( (e) -> new TriggerInfo(e.getKey(),
Expand All @@ -110,9 +103,13 @@ public Stream<TriggerInfo> list() {
);
}

@Procedure(name = "apoc.trigger.pause", mode = Mode.WRITE)
@Admin
@Deprecated
@Procedure(name = "apoc.trigger.pause", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.stop")
@Description("Pauses the given trigger.")
public Stream<TriggerInfo> pause(@Name("name")String name) {
preprocessDeprecatedProcedures();

Map<String, Object> paused = triggerHandler.updatePaused(name, true);

return Stream.of(new TriggerInfo(name,
Expand All @@ -121,15 +118,18 @@ public Stream<TriggerInfo> pause(@Name("name")String name) {
(Map<String,Object>) paused.get("params"),true, true));
}

@Procedure(name = "apoc.trigger.resume", mode = Mode.WRITE)
@Admin
@Deprecated
@Procedure(name = "apoc.trigger.resume", mode = Mode.WRITE, deprecatedBy = "apoc.trigger.start")
@Description("Resumes the given paused trigger.")
public Stream<TriggerInfo> resume(@Name("name")String name) {
preprocessDeprecatedProcedures();

Map<String, Object> resume = triggerHandler.updatePaused(name, false);

return Stream.of(new TriggerInfo(name,
(String)resume.get("statement"),
(Map<String,Object>) resume.get("selector"),
(Map<String,Object>) resume.get("params"),true, false));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,17 @@ private void updateCache() {
withSystemDb(tx -> {
tx.findNodes(SystemLabels.ApocTrigger,
SystemPropertyKeys.database.name(), db.databaseName()).forEachRemaining(
node -> activeTriggers.put(
(String) node.getProperty(SystemPropertyKeys.name.name()),
MapUtil.map(
"statement", node.getProperty(SystemPropertyKeys.statement.name()),
"selector", Util.fromJson((String) node.getProperty(SystemPropertyKeys.selector.name()), Map.class),
"params", Util.fromJson((String) node.getProperty(SystemPropertyKeys.params.name()), Map.class),
"paused", node.getProperty(SystemPropertyKeys.paused.name())
)
)
node -> {
activeTriggers.put(
(String) node.getProperty(SystemPropertyKeys.name.name()),
MapUtil.map(
"statement", node.getProperty(SystemPropertyKeys.statement.name()),
"selector", Util.fromJson((String) node.getProperty(SystemPropertyKeys.selector.name()), Map.class),
"params", Util.fromJson((String) node.getProperty(SystemPropertyKeys.params.name()), Map.class),
"paused", node.getProperty(SystemPropertyKeys.paused.name())
)
);
}
);
return null;
});
Expand Down Expand Up @@ -314,4 +316,4 @@ private void setLastUpdate(Transaction tx) {
node.setProperty(SystemPropertyKeys.lastUpdated.name(), System.currentTimeMillis());
}

}
}
148 changes: 148 additions & 0 deletions core/src/main/java/apoc/trigger/TriggerHandlerNewProcedures.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package apoc.trigger;

import apoc.SystemLabels;
import apoc.SystemPropertyKeys;
import apoc.util.Util;
import org.apache.commons.lang3.tuple.Pair;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;

import static apoc.ApocConfig.APOC_TRIGGER_ENABLED;
import static apoc.ApocConfig.apocConfig;
import static apoc.trigger.TriggerInfo.fromNode;

public class TriggerHandlerNewProcedures {
public static final String NOT_ENABLED_ERROR = "Triggers have not been enabled." +
" Set 'apoc.trigger.enabled=true' in your apoc.conf file located in the $NEO4J_HOME/conf/ directory.";


private static boolean isEnabled() {
return apocConfig().getBoolean(APOC_TRIGGER_ENABLED);
}

public static void checkEnabled() {
if (!isEnabled()) {
throw new RuntimeException(NOT_ENABLED_ERROR);
}
}

public static TriggerInfo install(String databaseName, String triggerName, String statement, Map<String,Object> selector, Map<String,Object> params) {
AtomicReference<TriggerInfo> previous = new AtomicReference<>();

withSystemDb(tx -> {
Node node = Util.mergeNode(tx, SystemLabels.ApocTrigger, null,
Pair.of(SystemPropertyKeys.database.name(), databaseName),
Pair.of(SystemPropertyKeys.name.name(), triggerName));

// we'll return previous trigger info
previous.set( fromNode(node, true) );

node.setProperty(SystemPropertyKeys.statement.name(), statement);
node.setProperty(SystemPropertyKeys.selector.name(), Util.toJson(selector));
node.setProperty(SystemPropertyKeys.params.name(), Util.toJson(params));
node.setProperty(SystemPropertyKeys.paused.name(), false);

setLastUpdate(databaseName, tx);
});

return previous.get();
}

public static TriggerInfo drop(String databaseName, String triggerName) {
AtomicReference<TriggerInfo> previous = new AtomicReference<>();

withSystemDb(tx -> {
getTriggerNodes(databaseName, tx, triggerName)
.forEachRemaining(node -> {
previous.set( fromNode(node, false) );
node.delete();
});

setLastUpdate(databaseName, tx);
});

return previous.get();
}

public static TriggerInfo updatePaused(String databaseName, String name, boolean paused) {
AtomicReference<TriggerInfo> result = new AtomicReference<>();

withSystemDb(tx -> {
getTriggerNodes(databaseName, tx, name)
.forEachRemaining(node -> {
node.setProperty( SystemPropertyKeys.paused.name(), paused );

// we'll return previous trigger info
result.set( fromNode(node, true) );
});

setLastUpdate(databaseName, tx);
});

return result.get();
}

public static List<TriggerInfo> dropAll(String databaseName) {
final List<TriggerInfo> previous = new ArrayList<>();

withSystemDb(tx -> {
getTriggerNodes(databaseName, tx)
.forEachRemaining(node -> {
String triggerName = (String) node.getProperty(SystemPropertyKeys.name.name());

// we'll return previous trigger info
previous.add( fromNode(node, false) );
node.delete();
});
setLastUpdate(databaseName, tx);
});

return previous;
}

public static Stream<TriggerInfo> getTriggerNodesList(String databaseName, Transaction tx) {
return getTriggerNodes(databaseName, tx)
.stream()
.map(trigger -> TriggerInfo.fromNode(trigger, true));
}

public static ResourceIterator<Node> getTriggerNodes(String databaseName, Transaction tx) {
return getTriggerNodes(databaseName, tx, null);
}

public static ResourceIterator<Node> getTriggerNodes(String databaseName, Transaction tx, String name) {
final SystemLabels label = SystemLabels.ApocTrigger;
final String dbNameKey = SystemPropertyKeys.database.name();
if (name == null) {
return tx.findNodes(label, dbNameKey, databaseName);
}
return tx.findNodes(label, dbNameKey, databaseName,
SystemPropertyKeys.name.name(), name);
}

public static void withSystemDb(Consumer<Transaction> consumer) {
try (Transaction tx = apocConfig().getSystemDb().beginTx()) {
consumer.accept(tx);
tx.commit();
}
}

private static void setLastUpdate(String databaseName, Transaction tx) {
Node node = tx.findNode(SystemLabels.ApocTriggerMeta, SystemPropertyKeys.database.name(), databaseName);
if (node == null) {
node = tx.createNode(SystemLabels.ApocTriggerMeta);
node.setProperty(SystemPropertyKeys.database.name(), databaseName);
}
final long value = System.currentTimeMillis();
node.setProperty(SystemPropertyKeys.lastUpdated.name(), value);
}

}
Loading

0 comments on commit c575792

Please sign in to comment.