Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout when use reliable qos. #491

Open
ruoruoniao opened this issue May 31, 2024 · 3 comments
Open

Timeout when use reliable qos. #491

ruoruoniao opened this issue May 31, 2024 · 3 comments

Comments

@ruoruoniao
Copy link
Contributor

ruoruoniao commented May 31, 2024

Question

  1. I think if two-point is all reliable, DataWriter should waiting for DataReader consume (on_data_available function end) before send next message even though without ResourceLimits. Is that right?
  2. If resource limits is reached, DataWriter should wait for enabled then send next message, it shouldn't send so many messages before dataReader enable.

Operate Step:
Publisher and subscriber run in the same machine.

//publisher.cpp
int main(int argc, char **argv) {
    dds::domain::DomainParticipant participant(13);
    dds::topic::Topic<HelloWorldData::Msg> topic(participant, "topic");

    dds::pub::Publisher publisher(participant);

    auto qos = publisher.default_datawriter_qos();
    qos->policy(dds::core::policy::Reliability::Reliable(dds::core::Duration::from_secs(20)));
    qos->policy(dds::core::policy::History::KeepAll());
    qos->policy(dds::core::policy::Durability::TransientLocal());
    auto limits = dds::core::policy::ResourceLimits();
    limits.max_samples(1);
    qos->policy(limits);
    dds::pub::DataWriter<HelloWorldData::Msg> writer(publisher, topic, qos);

    std::cout << "=== [Publisher] Waiting for subscriber." << std::endl;
    while (writer.publication_matched_status().current_count() == 0) {
        std::this_thread::sleep_for(std::chrono::milliseconds(20));
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(5000));

    int i = 0;
    HelloWorldData::Msg msg(0, "Hello World");
    while(true) {
        msg.userID(i++);
        writer.write(msg);
        std::cout << "=== [Publisher] Write " << i << std::endl;
    }
}
//subscribe.cpp
int main(int argc, char **argv) {
    dds::domain::DomainParticipant participant(13);
    dds::topic::Topic<HelloWorldData::Msg> topic(participant, "topic");

    dds::sub::Subscriber subscriber(participant);

    auto qos = subscriber.default_datareader_qos();
    qos->policy(dds::core::policy::Reliability::Reliable());
    qos->policy(dds::core::policy::History::KeepAll());
    qos->policy(dds::core::policy::Durability::TransientLocal());
    auto limits = dds::core::policy::ResourceLimits();
    limits.max_samples(1);
    qos->policy(limits);

    class Listener : public dds::sub::NoOpDataReaderListener<HelloWorldData::Msg> {
    public:
        void on_data_available(dds::sub::DataReader<HelloWorldData::Msg> &reader) override {
            const auto samples = reader.take;
            for (auto iter = samples.begin; iter < samples.end; ++iter) {
                if (!iter->info().valid()) { continue; }
                std::cout << "read " << iter->data().userID() << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }

    dds::sub::DataReader reader(subscriber, topic, qos, new Listener(), dds::core::status::StatusMask::all());
    while(true) { std::this_thread::sleep_for(std::chrono::seconds(1)); }
}

Expect:

  • Subscriber should receive all messages from publisher.
  • DataWriter should wait DataReader consume all samples then send next message.

Now

  • Subscriber receive all messages until publisher crash.(√)
  • DataWriter send 442 messages before DataReader read any sample, then crash by Write Timeout after max_blocking_time.(×)

Question

  1. I think if two-point is all reliable, DataWriter should waiting for DataReader consume (on_data_available function end) before send next message even though without ResourceLimits. Is that right?
  2. If resource limits is reached, DataWriter should wait for enabled then send next message, it shouldn't send so many messages before dataReader enable.
@ruoruoniao
Copy link
Contributor Author

Or this behavior is normal? I should confirm whether I should find why...

@ruoruoniao
Copy link
Contributor Author

OK, I found there is a c api in cyclone called dds_wait_for_acks.
So if there is a plan to support wait_for_acknowledgments in org::eclipse::cyclonedds::pub::AnyDataWriterDelegate? Now it is:

void
AnyDataWriterDelegate::wait_for_acknowledgments(
    const dds::core::Duration& timeout)
{
    ISOCPP_THROW_EXCEPTION(ISOCPP_UNSUPPORTED_ERROR, "Function not currently supported");
    (void)timeout;
}

@eboasson
Copy link
Contributor

So if there is a plan to support wait_for_acknowledgments in org::eclipse::cyclonedds::pub::AnyDataWriterDelegate

Yes, it just flew under the radar (the C++ binding doesn't get as much love as the C bit, I am afraid). I imagine something as simple as:

    int ddsc_ret = dds_wait_for_acks (ddsc_entity, org::eclipse::cyclonedds::core::convertDuration(timeout));
    ISOCPP_DDSC_RESULT_CHECK_AND_THROW(ddsc_ret, "dds_wait_for_acks failed.");

should do the trick.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants