Skip to content

Commit

Permalink
fix(engine): avoid duplicated event triggers
Browse files Browse the repository at this point in the history
* don't create another event trigger in the correlated applier - an event trigger is already created by writing a ProcessEvent:triggering
* write the message variables as local variables of the start event, similar to other element

(cherry picked from commit c833d2f)
  • Loading branch information
saig0 committed Nov 1, 2021
1 parent e6ef430 commit a92a63c
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package io.camunda.zeebe.engine.processing.bpmn.behavior;

import io.camunda.zeebe.engine.processing.bpmn.BpmnElementContext;
import io.camunda.zeebe.engine.processing.bpmn.BpmnProcessingException;
import io.camunda.zeebe.engine.processing.common.CatchEventBehavior;
import io.camunda.zeebe.engine.processing.common.EventTriggerBehavior;
import io.camunda.zeebe.engine.processing.common.Failure;
Expand All @@ -22,8 +21,6 @@
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
import io.camunda.zeebe.engine.state.instance.EventTrigger;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.util.Either;
import java.util.Optional;

Expand Down Expand Up @@ -126,38 +123,9 @@ public Optional<EventTrigger> getEventTriggerForProcessDefinition(
}

public void activateTriggeredStartEvent(
final BpmnElementContext context, final EventTrigger triggeredEvent) {
final long processDefinitionKey = context.getProcessDefinitionKey();
final long processInstanceKey = context.getProcessInstanceKey();

final var process = processState.getProcessByKey(context.getProcessDefinitionKey());
if (process == null) {
// this should never happen because processes are never deleted.
throw new BpmnProcessingException(
context, String.format(NO_PROCESS_FOUND_MESSAGE, processDefinitionKey));
}

eventTriggerBehavior.processEventTriggered(
triggeredEvent.getEventKey(),
processDefinitionKey,
processInstanceKey,
processDefinitionKey, /* the event scope for the start event is the process definition */
triggeredEvent.getElementId());

eventRecord.reset();
eventRecord.wrap(context.getRecordValue());

final var record =
eventRecord
.setElementId(triggeredEvent.getElementId())
.setBpmnElementType(BpmnElementType.START_EVENT)
.setProcessInstanceKey(processInstanceKey)
.setVersion(process.getVersion())
.setBpmnProcessId(process.getBpmnProcessId())
.setFlowScopeKey(processInstanceKey);

final var newEventInstanceKey = keyGenerator.nextKey();
commandWriter.appendFollowUpCommand(
newEventInstanceKey, ProcessInstanceIntent.ACTIVATE_ELEMENT, record);
final BpmnElementContext context, final EventTrigger eventTrigger) {

activateTriggeredEvent(
context.getProcessDefinitionKey(), context.getProcessInstanceKey(), eventTrigger, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public Either<Failure, Void> applyOutputMappings(
elementInstanceKey, processDefinitionKey, processInstanceKey, temporaryVariables);

} else if (isConnectedToEventBasedGateway(element)
|| element.getElementType() == BpmnElementType.BOUNDARY_EVENT) {
|| element.getElementType() == BpmnElementType.BOUNDARY_EVENT
|| element.getElementType() == BpmnElementType.START_EVENT) {
// event variables are set local variables instead of temporary variables
final var localVariables = variablesState.getVariablesLocalAsDocument(elementInstanceKey);
variableBehavior.mergeDocument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ private void registerMessageStartEventSubscriptionAppliers(final MutableZeebeSta
state.getMessageStartEventSubscriptionState()));
register(
MessageStartEventSubscriptionIntent.CORRELATED,
new MessageStartEventSubscriptionCorrelatedApplier(
state.getMessageState(), state.getEventScopeInstanceState()));
new MessageStartEventSubscriptionCorrelatedApplier(state.getMessageState()));
register(
MessageStartEventSubscriptionIntent.DELETED,
new MessageStartEventSubscriptionDeletedApplier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package io.camunda.zeebe.engine.state.appliers;

import io.camunda.zeebe.engine.state.TypedEventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableEventScopeInstanceState;
import io.camunda.zeebe.engine.state.mutable.MutableMessageState;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageStartEventSubscriptionRecord;
import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
Expand All @@ -19,13 +18,9 @@ public final class MessageStartEventSubscriptionCorrelatedApplier
MessageStartEventSubscriptionIntent, MessageStartEventSubscriptionRecord> {

private final MutableMessageState messageState;
private final MutableEventScopeInstanceState eventScopeInstanceState;

public MessageStartEventSubscriptionCorrelatedApplier(
final MutableMessageState messageState,
final MutableEventScopeInstanceState eventScopeInstanceState) {
public MessageStartEventSubscriptionCorrelatedApplier(final MutableMessageState messageState) {
this.messageState = messageState;
this.eventScopeInstanceState = eventScopeInstanceState;
}

@Override
Expand All @@ -41,12 +36,5 @@ public void applyState(final long key, final MessageStartEventSubscriptionRecord
messageState.putActiveProcessInstance(value.getBpmnProcessIdBuffer(), correlationKey);
messageState.putProcessInstanceCorrelationKey(value.getProcessInstanceKey(), correlationKey);
}

// write the event trigger for the start event
eventScopeInstanceState.triggerStartEvent(
value.getProcessDefinitionKey(),
value.getMessageKey(),
value.getStartEventIdBuffer(),
value.getVariablesBuffer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ public void applyState(final long elementInstanceKey, final ProcessInstanceRecor
createEventScope(elementInstanceKey, value);
cleanupSequenceFlowsTaken(value);

final var processDefinitionKey = value.getProcessDefinitionKey();
final var eventTrigger = eventScopeInstanceState.peekEventTrigger(processDefinitionKey);
if (eventTrigger != null && value.getElementIdBuffer().equals(eventTrigger.getElementId())) {
variableState.setTemporaryVariables(elementInstanceKey, eventTrigger.getVariables());
eventScopeInstanceState.deleteTrigger(processDefinitionKey, eventTrigger.getEventKey());
}

final var flowScopeInstance = elementInstanceState.getInstance(value.getFlowScopeKey());
elementInstanceState.newInstance(
flowScopeInstance, elementInstanceKey, value, ProcessInstanceIntent.ELEMENT_ACTIVATING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@
import static org.assertj.core.api.Assertions.tuple;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.StartEventBuilder;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.record.Assertions;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.MessageStartEventSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.MsgPackUtil;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -146,7 +150,6 @@ public void shouldCreateNewInstanceWithNameLiteral() {
.containsSequence(
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATING),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.ACTIVATE_ELEMENT),
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATING),
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.COMPLETE_ELEMENT),
Expand All @@ -171,7 +174,6 @@ public void shouldCreateNewInstanceWithNameFeelExpression() {
.containsSequence(
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATING),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.ACTIVATE_ELEMENT),
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATING),
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.COMPLETE_ELEMENT),
Expand Down Expand Up @@ -274,7 +276,8 @@ public void shouldCreateNewInstanceWithMultipleStartEvents() {
.publish();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(2))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,2] to be correlated")
.containsExactly("1", "2");
Expand Down Expand Up @@ -338,7 +341,8 @@ public void shouldNotCorrelateSameMessageToCreatedInstance() {
.publish();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(2))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,2] to be correlated")
.containsExactly("1", "2");
Expand Down Expand Up @@ -374,7 +378,8 @@ public void shouldCreateMultipleInstancesIfCorrelationKeyIsEmpty() {
.publish();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(2))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,2] to be correlated")
.containsExactly("1", "2");
Expand Down Expand Up @@ -410,7 +415,8 @@ public void shouldCreateOnlyOneInstancePerCorrelationKey() {
.publish();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(2))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,3] to be correlated")
.containsExactly("1", "3");
Expand Down Expand Up @@ -454,7 +460,8 @@ public void shouldNotCreateInstanceForDifferentVersion() {
.publish();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(2))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,3] to be correlated")
.containsExactly("1", "3");
Expand Down Expand Up @@ -489,7 +496,8 @@ public void shouldCreateNewInstanceAfterCompletion() {
.publish();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(2))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,2] to be correlated")
.containsExactly("1", "2");
Expand Down Expand Up @@ -520,7 +528,8 @@ public void shouldCreateNewInstanceAfterTermination() {
.publish();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(2))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,2] to be correlated")
.containsExactly("1", "2");
Expand Down Expand Up @@ -561,7 +570,8 @@ public void shouldCreateNewInstanceForBufferedMessageAfterCompletion() {
engine.job().withKey(job2.getKey()).complete();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(3))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(3))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,2,3] to be correlated")
.containsExactly("1", "2", "3");
Expand Down Expand Up @@ -602,7 +612,8 @@ public void shouldCreateNewInstanceForBufferedMessageAfterTermination() {
engine.processInstance().withInstanceKey(job2.getValue().getProcessInstanceKey()).cancel();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(3))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(3))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,2,3] to be correlated")
.containsExactly("1", "2", "3");
Expand Down Expand Up @@ -746,7 +757,8 @@ public void shouldNotCreateNewInstanceForBufferedMessageAfterTTL() {
engine.job().withKey(job.getKey()).complete();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(2))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,3] to be correlated")
.containsExactly("1", "3");
Expand Down Expand Up @@ -782,7 +794,8 @@ public void shouldCreateOnlyOneInstancePerCorrelationKeyWithMultipleStartEvents(
.publish();

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(2))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(2))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,3] to be correlated")
.containsExactly("1", "3");
Expand Down Expand Up @@ -830,9 +843,55 @@ public void shouldCreateNewInstanceForBufferedMessageWithMultipleStartEvents() {
});

// then
assertThat(RecordingExporter.variableRecords().withName("x").limit(4))
assertThat(
RecordingExporter.variableRecords().withName("x").filterProcessInstanceScope().limit(4))
.extracting(r -> r.getValue().getValue())
.describedAs("Expected messages [1,2,3,4] to be correlated")
.containsExactly("1", "2", "3", "4");
}

// https://github.com/camunda-cloud/zeebe/issues/8068
@Test
public void shouldCreateProcessInstancesAndPassVariables() {
// given
engine.deployment().withXmlResource(SINGLE_START_EVENT_1).deploy();

// when - publish two messages concurrently
engine.writeRecords(
RecordToWrite.command()
.message(
MessageIntent.PUBLISH,
new MessageRecord()
.setName(MESSAGE_NAME_1)
.setTimeToLive(0L)
.setCorrelationKey(CORRELATION_KEY_1)
.setVariables(MsgPackUtil.asMsgPack("x", 1))),
RecordToWrite.command()
.message(
MessageIntent.PUBLISH,
new MessageRecord()
.setName(MESSAGE_NAME_1)
.setTimeToLive(0L)
.setCorrelationKey(CORRELATION_KEY_2)
.setVariables(MsgPackUtil.asMsgPack("x", 2))));

// then
final var processInstanceKeys =
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withElementType(BpmnElementType.PROCESS)
.limit(2)
.map(r -> r.getValue().getProcessInstanceKey())
.collect(Collectors.toList());

assertThat(processInstanceKeys)
.describedAs("Expected two process instances to be created")
.hasSize(2);

assertThat(RecordingExporter.variableRecords().filterProcessInstanceScope().limit(2))
.extracting(Record::getValue)
.extracting(VariableRecordValue::getProcessInstanceKey, VariableRecordValue::getValue)
.hasSize(2)
.describedAs("Expected two process instances with different variables")
.contains(tuple(processInstanceKeys.get(0), "1"), tuple(processInstanceKeys.get(1), "2"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.engine.processing.timer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

import io.camunda.zeebe.engine.state.instance.TimerInstance;
import io.camunda.zeebe.engine.util.EngineRule;
Expand Down Expand Up @@ -374,12 +375,12 @@ public void shouldTriggerAndCreateProcessInstance() {
.withProcessDefinitionKey(processDefinitionKey)
.skipUntil(r -> r.getPosition() >= triggerRecordPosition)
.limit(4))
.extracting(Record::getIntent)
.extracting(r -> r.getValue().getBpmnElementType(), Record::getIntent)
.containsExactly(
ProcessInstanceIntent.ACTIVATE_ELEMENT, // causes the flow node activation
ProcessInstanceIntent.ELEMENT_ACTIVATING, // causes the flow node activation
ProcessInstanceIntent.ELEMENT_ACTIVATED, // input mappings applied
ProcessInstanceIntent.ACTIVATE_ELEMENT); // triggers the start event
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ACTIVATE_ELEMENT),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATING),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATING));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ public VariableRecordStream withValue(final String value) {
public VariableRecordStream withProcessInstanceKey(final long processInstanceKey) {
return valueFilter(v -> v.getProcessInstanceKey() == processInstanceKey);
}

/** @return only the variables that are created in the process instance scope (i.e. root scope) */
public VariableRecordStream filterProcessInstanceScope() {
return valueFilter(v -> v.getScopeKey() == v.getProcessInstanceKey());
}
}

0 comments on commit a92a63c

Please sign in to comment.