Skip to content

Commit

Permalink
feat: handle ContractNegotiationEventMessage.ACCEPTED message (eclips…
Browse files Browse the repository at this point in the history
…e-edc#3381)

* feat: handle ContractNegotiationEventMessage.ACCEPTED message

* Update dependencies
  • Loading branch information
ndr-brt authored Aug 28, 2023
1 parent a88daa4 commit 149710a
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import static java.lang.String.format;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation.Type.PROVIDER;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.ACCEPTED;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.AGREEING;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.FINALIZING;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.OFFERING;
Expand All @@ -51,6 +52,7 @@ public void start() {
stateMachineManager = StateMachineManager.Builder.newInstance("provider-contract-negotiation", monitor, executorInstrumentation, waitStrategy)
.processor(processNegotiationsInState(OFFERING, this::processOffering))
.processor(processNegotiationsInState(REQUESTED, this::processRequested))
.processor(processNegotiationsInState(ACCEPTED, this::processAccepted))
.processor(processNegotiationsInState(AGREEING, this::processAgreeing))
.processor(processNegotiationsInState(VERIFIED, this::processVerified))
.processor(processNegotiationsInState(FINALIZING, this::processFinalizing))
Expand Down Expand Up @@ -137,6 +139,17 @@ private boolean processRequested(ContractNegotiation negotiation) {
return true;
}

/**
* Processes {@link ContractNegotiation} in state ACCEPTED. It transitions to AGREEING.
*
* @return true if processed, false otherwise
*/
@WithSpan
private boolean processAccepted(ContractNegotiation negotiation) {
transitionToAgreeing(negotiation);
return true;
}

/**
* Processes {@link ContractNegotiation} in state CONFIRMING. Tries to send a contract agreement to the respective
* consumer. If this succeeds, the ContractNegotiation is transitioned to state CONFIRMED. Else, it is transitioned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.ACCEPTED;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.AGREED;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.AGREEING;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.FINALIZED;
Expand Down Expand Up @@ -140,6 +141,20 @@ void requested_shouldTransitionToAgreeing() {
});
}

@Test
void accepted_shouldTransitionToAgreeing() {
var negotiation = contractNegotiationBuilder().state(ACCEPTED.code()).build();
when(store.nextNotLeased(anyInt(), stateIs(ACCEPTED.code()))).thenReturn(List.of(negotiation)).thenReturn(emptyList());
when(store.findById(negotiation.getId())).thenReturn(negotiation);

manager.start();

await().untilAsserted(() -> {
verify(store).save(argThat(p -> p.getState() == AGREEING.code()));
verifyNoInteractions(dispatcherRegistry);
});
}

@Test
void verified_shouldTransitionToFinalizing() {
var negotiation = contractNegotiationBuilder().state(VERIFIED.code()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementVerificationMessage;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractNegotiationEventMessage;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationTerminationMessage;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractOfferMessage;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestMessage;
Expand All @@ -40,7 +39,6 @@
import java.util.Optional;
import java.util.UUID;

import static java.lang.String.format;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation.Type.PROVIDER;

public class ContractNegotiationProtocolServiceImpl implements ContractNegotiationProtocolService {
Expand Down Expand Up @@ -71,7 +69,6 @@ public ServiceResult<ContractNegotiation> notifyRequested(ContractRequestMessage
return transactionContext.execute(() -> validateOffer(message, claimToken)
.compose(validatedOffer -> createNegotiation(message, validatedOffer))
.onSuccess(negotiation -> {
monitor.debug(() -> "[Provider] Contract offer received.");
negotiation.transitionRequested();
update(negotiation);
observable.invokeForEach(l -> l.requested(negotiation));
Expand All @@ -85,7 +82,6 @@ public ServiceResult<ContractNegotiation> notifyOffered(ContractOfferMessage mes
return transactionContext.execute(() -> getNegotiation(message)
.compose(negotiation -> validateRequest(claimToken, negotiation))
.onSuccess(negotiation -> {
monitor.debug(() -> "[Consumer] Contract offer received.");
negotiation.addContractOffer(message.getContractOffer());
negotiation.transitionOffered();
update(negotiation);
Expand All @@ -97,7 +93,13 @@ public ServiceResult<ContractNegotiation> notifyOffered(ContractOfferMessage mes
@WithSpan
@NotNull
public ServiceResult<ContractNegotiation> notifyAccepted(ContractNegotiationEventMessage message, ClaimToken claimToken) {
throw new UnsupportedOperationException("not implemented");
return transactionContext.execute(() -> getNegotiation(message)
.compose(negotiation -> validateRequest(claimToken, negotiation))
.onSuccess(negotiation -> {
negotiation.transitionAccepted();
update(negotiation);
observable.invokeForEach(l -> l.accepted(negotiation));
}));
}

@Override
Expand All @@ -107,7 +109,6 @@ public ServiceResult<ContractNegotiation> notifyAgreed(ContractAgreementMessage
return transactionContext.execute(() -> getNegotiation(message)
.compose(negotiation -> validateAgreed(message, claimToken, negotiation))
.onSuccess(negotiation -> {
monitor.debug("[Consumer] Contract agreement received. Validation successful.");
negotiation.setContractAgreement(message.getContractAgreement());
negotiation.transitionAgreed();
update(negotiation);
Expand Down Expand Up @@ -160,7 +161,7 @@ public ServiceResult<ContractNegotiation> notifyTerminated(ContractNegotiationTe
public ServiceResult<ContractNegotiation> findById(String id, ClaimToken claimToken) {
return transactionContext.execute(() -> Optional.ofNullable(store.findById(id))
.map(negotiation -> validateRequest(claimToken, negotiation))
.orElse(ServiceResult.notFound(format("No negotiation with id %s found", id))));
.orElse(ServiceResult.notFound("No negotiation with id %s found".formatted(id))));
}

@NotNull
Expand Down Expand Up @@ -221,8 +222,8 @@ private ServiceResult<ContractNegotiation> getNegotiation(ContractRemoteMessage

private void update(ContractNegotiation negotiation) {
store.save(negotiation);
monitor.debug(String.format("[%s] ContractNegotiation %s is now in state %s.",
negotiation.getType(), negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
monitor.debug(() -> "[%s] ContractNegotiation %s is now in state %s."
.formatted(negotiation.getType(), negotiation.getId(), negotiation.stateAsString()));
}

}
Loading

0 comments on commit 149710a

Please sign in to comment.