Skip to content

Commit

Permalink
Holder: Remove 'try_reply(..)', add 'get_final_message()' instead. Tr…
Browse files Browse the repository at this point in the history
…ack whether ack was requested

Signed-off-by: Patrik Stas <[email protected]>
  • Loading branch information
Patrik-Stas committed Sep 1, 2023
1 parent 8a1f084 commit 6e48283
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 74 deletions.
16 changes: 9 additions & 7 deletions agents/rust/aries-vcx-agent/src/services/holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,22 @@ impl ServiceCredentialsHolder {
let connection = self.service_connections.get_by_id(&connection_id)?;
let wallet = self.profile.inject_wallet();

let send_closure: SendClosure = Box::new(|msg: AriesMessage| {
Box::pin(async move { connection.send_message(&wallet, &msg, &HttpClient).await })
});

holder
.process_credential(
&self.profile.inject_anoncreds_ledger_read(),
&self.profile.inject_anoncreds(),
msg_issue_credential.clone(),
)
.await?;
holder
.try_reply(send_closure, Some(msg_issue_credential.into()))
.await?;
match holder.get_final_message() {
None => {}
Some(msg_response) => {
let send_closure: SendClosure = Box::new(|msg: AriesMessage| {
Box::pin(async move { connection.send_message(&wallet, &msg, &HttpClient).await })
});
send_closure(msg_response).await?;
}
}
self.creds_holder
.insert(&holder.get_thread_id()?, HolderWrapper::new(holder, &connection_id))
}
Expand Down
68 changes: 12 additions & 56 deletions aries_vcx/src/handlers/issuance/holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::errors::error::prelude::*;
use crate::handlers::connection::mediated_connection::MediatedConnection;
use crate::handlers::revocation_notification::receiver::RevocationNotificationReceiver;
use crate::protocols::issuance::holder::state_machine::{HolderFullState, HolderSM, HolderState};
use crate::protocols::SendClosure;

fn build_credential_ack(thread_id: &str) -> AckCredential {
let content = AckCredentialContent::new(AckStatus::Ok);
Expand Down Expand Up @@ -251,61 +250,18 @@ impl Holder {
Ok(())
}

// While we have removed message sending logic from state machine layer, we still want to preserve
// the logic on the upper layers (vcx, rust-agent, ...)
// Instead of having to reimplement the message sending logic on upper layers, this provide shared
// reusable logic. Yet this is just helper function, and should not be used in tests or any new code.
//
// This function is mean to be called after processing a message. Currently the state machines
// mutate themselves and after processing the message, the state machine might be in subsequent state,
// or might be in Failed state.
// Based on what state is, different reply shall be sent to counterparty. This function handles these cases.
#[deprecated]
pub async fn try_reply(&self, send_message: SendClosure, last_message: Option<AriesMessage>) -> VcxResult<()> {
trace!("Holder::try_reply >>> trying to send reply to counterparty");
match self.get_state() {
HolderState::Failed => {
let problem_report = self.get_problem_report()?;
send_message(problem_report.into()).await?;
}
HolderState::Finished => match last_message {
// todo: add please_ack flag to state machine so we don't have to provide last_message arg to this function
None => {
return Err(AriesVcxError::from_msg(
AriesVcxErrorKind::InvalidState,
"Holder::try_reply called, but expected to be provided last received message",
))
}
Some(last_message) => match last_message {
AriesMessage::CredentialIssuance(message) => match message {
CredentialIssuance::IssueCredential(message) => {
trace!("Holder::try_reply >>> checking if counterparty requested credential ack");
if message.decorators.please_ack.is_some() {
let ack_msg = build_credential_ack(&self.get_thread_id()?);
trace!("Holder::try_reply >>> sending credential ack");
send_message(ack_msg.into()).await?;
}
}
_ => {
return Err(AriesVcxError::from_msg(
AriesVcxErrorKind::InvalidState,
"Holder::try_reply called, but unexpected last message type was supplied",
))
}
},
_ => {
return Err(AriesVcxError::from_msg(
AriesVcxErrorKind::InvalidState,
"Holder::try_reply called, but unexpected last message family was supplied",
))
pub fn get_final_message(&self) -> Option<AriesMessage> {
match &self.holder_sm.state {
HolderFullState::Finished(state) => {
if let Some(ack_requested) = state.ack_requested {
if ack_requested {
let ack_msg = build_credential_ack(&self.get_thread_id()?);
return Some(ack_msg.into());
}
},
},
HolderState::Initial => {}
HolderState::ProposalSet => {}
HolderState::OfferReceived => {}
HolderState::RequestSet => {}
}
Ok(())
}
}
_ => {}
};
return None;
}
}
13 changes: 10 additions & 3 deletions aries_vcx/src/protocols/issuance/holder/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,16 @@ impl HolderSM {
)
.await
{
Ok((cred_id, rev_reg_def_json)) => {
HolderFullState::Finished((state_data, cred_id, credential, rev_reg_def_json).into())
}
Ok((cred_id, rev_reg_def_json)) => HolderFullState::Finished(
(
state_data,
cred_id,
credential,
rev_reg_def_json,
credential.decorators.please_ack.is_some(),
)
.into(),
),
Err(err) => {
let problem_report = build_problem_report_msg(Some(err.to_string()), &self.thread_id);
error!("Failed to process or save received credential: {problem_report:?}");
Expand Down
2 changes: 2 additions & 0 deletions aries_vcx/src/protocols/issuance/holder/states/finished.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct FinishedHolderState {
pub credential: Option<IssueCredential>,
pub status: Status,
pub rev_reg_def_json: Option<String>,
pub ack_requested: Option<bool>,
}

impl FinishedHolderState {
Expand Down Expand Up @@ -146,6 +147,7 @@ impl FinishedHolderState {
pub fn new(problem_report: ProblemReport) -> Self {
trace!("SM is now in Finished state");
FinishedHolderState {
ack_requested: None,
cred_id: None,
credential: None,
status: Status::Failed(problem_report),
Expand Down
10 changes: 8 additions & 2 deletions aries_vcx/src/protocols/issuance/holder/states/request_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ pub struct RequestSetState {

impl From<(RequestSetState, String, IssueCredential, Option<String>)> for FinishedHolderState {
fn from(
(_, cred_id, credential, rev_reg_def_json): (RequestSetState, String, IssueCredential, Option<String>),
(_, cred_id, credential, rev_reg_def_json, ack_requested): (
RequestSetState,
String,
IssueCredential,
Option<String>,
bool,
),
) -> Self {
trace!("SM is now in Finished state");
FinishedHolderState {
cred_id: Some(cred_id),
credential: Some(credential),
status: Status::Success,
rev_reg_def_json,
ack_requested: Some(ack_requested),
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions aries_vcx/tests/utils/devsetup_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,17 @@ pub async fn holder_update_with_mediator(
if sm.is_terminal_state() {
return Ok(sm.get_state());
}
let send_message = connection.send_message_closure(Arc::clone(wallet)).await?;

let messages = connection.get_messages(agency_client).await?;
if let Some((uid, msg)) = mediated_holder::holder_find_message_to_handle(sm, messages) {
sm.process_aries_msg(ledger, anoncreds, msg.clone()).await?;
connection.update_message_status(&uid, agency_client).await?;
sm.try_reply(send_message, Some(msg)).await?;
match sm.get_final_message() {
None => {}
Some(msg_response) => {
let send_message = connection.send_message_closure(Arc::clone(wallet)).await?;
send_message(msg_response).await?;
}
}
}
Ok(sm.get_state())
}
Expand Down
10 changes: 7 additions & 3 deletions libvcx_core/src/api_vcx/api_handle/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ pub async fn update_state(credential_handle: u32, message: Option<&str>, connect
if credential.is_terminal_state() {
return Ok(credential.get_state().into());
}
let send_message = mediated_connection::send_message_closure(connection_handle).await?;

let (mediator_uid, aries_msg) = if let Some(message) = message {
let message: AriesMessage = serde_json::from_str(message).map_err(|err| {
LibvcxError::from_msg(
Expand Down Expand Up @@ -156,7 +154,13 @@ pub async fn update_state(credential_handle: u32, message: Option<&str>, connect
trace!("credential::update_state >>> updating messages status in mediator");
mediated_connection::update_message_status(connection_handle, &uid).await?;
}
credential.try_reply(send_message, Some(aries_msg)).await?;
match credential.get_final_message() {
None => {}
Some(msg_response) => {
let send_message = mediated_connection::send_message_closure(connection_handle).await?;
send_message(msg_response).await?;
}
}
}
}
let state = credential.get_state().into();
Expand Down

0 comments on commit 6e48283

Please sign in to comment.