Skip to content

Commit

Permalink
several fixes to protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Taylor committed Jul 17, 2024
1 parent 345286c commit 50318e1
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,35 +154,45 @@ namespace hpx::parcelset::policies::openshmem {
const std::size_t rcv_numitrs_term = rcv_numitrs - 1;
std::size_t data_seg [2] = { sys_pgsz, num_bytes % sys_pgsz };

int idx = 0;
for(idx = 0; idx < page_count; ++idx) {
if(shmem_test(hpx::util::openshmem_environment::shmem_buffer + beg_rcv_signal + idx, SHMEM_CMP_EQ, 1)) {
break;
std::size_t idx = 0;
{
bool term = false;
while(!term) {
for(idx = 0; idx < page_count; ++idx) {
if(hpx::util::openshmem_environment::test(hpx::util::openshmem_environment::segments[idx].rcv, 1)) {
term = true;
break;
}
}
}
}

(*(hpx::util::openshmem_environment::segments[idx].rcv)) = 0;

std::memcpy(reinterpret_cast<std::uint8_t*>(rcv_header_[idx].data()),
std::memcpy(reinterpret_cast<std::uint8_t*>(rcv_header_.data()+(sizeof(header)*idx)),
hpx::util::openshmem_environment::segments[idx].beg_addr,
data_seg[0]
);

hpx::util::openshmem_environment::put_signal(nullptr, idx,
nullptr, 1, hpx::util::openshmem_environment::segments[self_].xmt);

auto chunk_beg = sys_pgsz;

for(std::size_t itr = 1; itr < header_numitrs; ++itr) {
while(shmem_test(hpx::util::openshmem_environment::segments[idx].rcv, SHMEM_CMP_EQ, 1)) {}
for(std::size_t itr = 1; itr < rcv_numitrs; ++itr) {
while(hpx::util::openshmem_environment::test(hpx::util::openshmem_environment::segments[idx].rcv, 1)) {}
(*(hpx::util::openshmem_environment::segments[idx].rcv)) = 0;

std::memcpy(reinterpret_cast<std::uint8_t*>(rcv_header_[idx].data())+chunk_beg,
hpx::util::openshmem_environment::segments[idx].beg_addr,
data_seg[(itr == rcv_numitrs_term)]
);

if(i != rcv_numitrs_term) {
(*(hpx::util::openshmem_environment::segments[idx].rcv)) = 0;
chunk_beg = i * sys_pgsz;
hpx::util::openshmem_environment::put_signal(nullptr, src_,
nullptr, 0, hpx::util::openshmem_environment::segments[idx].xmt);
if(itr != rcv_numitrs_term) {
chunk_beg = itr * sys_pgsz;

hpx::util::openshmem_environment::put_signal(nullptr, idx,
nullptr, 1, hpx::util::openshmem_environment::segments[self_].xmt);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ namespace hpx::parcelset::policies::openshmem {
receiver_connection(int src, header h, Parcelport& pp) noexcept
: state_(initialized)
, src_(src)
, idx(0)
, header_(h)
, request_ptr_(false)
, num_bytes(0)
Expand All @@ -57,6 +58,7 @@ namespace hpx::parcelset::policies::openshmem {
{
std::cout << "receiver_connection" << std::endl;
header_.assert_valid();
idx = hpx::util::openshmem_environment::rank();

num_bytes = header_.numbytes();

Expand Down Expand Up @@ -113,7 +115,6 @@ std::cout << "rcvd_chunks" << std::endl;
bool receive_transmission_chunks()
{
std::cout << "receive_transmission_chunks" << std::endl;
const auto idx = hpx::util::openshmem_environment::rank();

const std::size_t sys_pgsz =
sysconf(_SC_PAGESIZE);
Expand Down Expand Up @@ -142,16 +143,17 @@ std::cout << "receive_transmission_chunks" << std::endl;

auto chunk_beg = 0;
for(std::size_t i = 0; i < rcv_numitrs; ++i) {
while(shmem_test(hpx::util::openshmem_environment::segments[idx].rcv, SHMEM_CMP_EQ, 1)) {}
while(!hpx::util::openshmem_environment::test(hpx::util::openshmem_environment::segments[src_].rcv, 1)) {}
(*(hpx::util::openshmem_environment::segments[src_].rcv)) = 0;

std::memcpy(reinterpret_cast<std::uint8_t*>(buffer_.transmission_chunks_.data())+chunk_beg,
hpx::util::openshmem_environment::segments[idx].beg_addr,
hpx::util::openshmem_environment::segments[src_].beg_addr,
data_seg[(i == rcv_numitrs_term)]
);

if(i != rcv_numitrs_term) {
(*(hpx::util::openshmem_environment::segments[idx].rcv)) = 0;
chunk_beg = i * sys_pgsz;

hpx::util::openshmem_environment::put_signal(nullptr, src_,
nullptr, 0, hpx::util::openshmem_environment::segments[idx].xmt);
}
Expand Down Expand Up @@ -182,13 +184,11 @@ std::cout << "receive_data" << std::endl;
}
else
{
const auto idx = hpx::util::openshmem_environment::rank();

const std::size_t sys_pgsz =
sysconf(_SC_PAGESIZE);

const std::size_t num_bytes =
buffer_.data_.size() * sizeof(decltype(buffer_.data_);
buffer_.data_.size() * sizeof(decltype(buffer_.data_));

const std::size_t rcv_numitrs =
(num_bytes + sys_pgsz - 1) / sys_pgsz;
Expand All @@ -200,15 +200,15 @@ std::cout << "receive_data" << std::endl;
auto chunk_beg = 0;

for(std::size_t i = 0; i < rcv_numitrs; ++i) {
while(shmem_test(hpx::util::openshmem_environment::segments[idx].rcv, SHMEM_CMP_EQ, 1)) {}
while(!hpx::util::openshmem_environment::test(hpx::util::openshmem_environment::segments[src_].rcv, 1)) {}
(*(hpx::util::openshmem_environment::segments[src_].rcv)) = 0;

std::memcpy(reinterpret_cast<std::uint8_t*>(buffer_.transmission_chunks_.data())+chunk_beg,
hpx::util::openshmem_environment::segments[idx].beg_addr,
hpx::util::openshmem_environment::segments[src_].beg_addr,
data_seg[(i == rcv_numitrs_term)]
);

if(i != rcv_numitrs_term) {
(*(hpx::util::openshmem_environment::segments[idx].rcv)) = 0;
chunk_beg = i * sys_pgsz;
hpx::util::openshmem_environment::put_signal(nullptr, src_,
nullptr, 0, hpx::util::openshmem_environment::segments[idx].xmt);
Expand All @@ -228,13 +228,11 @@ std::cout << "receive_data" << std::endl;
const std::size_t sys_pgsz =
sysconf(_SC_PAGESIZE);

const auto idx = hpx::util::openshmem_environment::rank();

for(auto i = 0; i < buffer_.chunks_.size(); ++i) {
for(std::size_t i = 0; i < buffer_.chunks_.size(); ++i) {
buffer_.chunks_[i].resize(buffer_.transmission_chunks_[i].second);
}

for(auto i = 0; i < buffer_.chunks_.size(); ++i) {
for(std::size_t i = 0; i < buffer_.chunks_.size(); ++i) {
data_type& c = buffer_.chunks_[i];

const std::size_t num_bytes = c.size() * sizeof(decltype(c.data()));
Expand All @@ -248,16 +246,15 @@ std::cout << "receive_data" << std::endl;
auto chunk_beg = 0;

for(std::size_t i = 0; i < rcv_numitrs; ++i) {
while(shmem_test(hpx::util::openshmem_environment::segments[idx].rcv, SHMEM_CMP_EQ, 1)) {}
(*(hpx::util::openshmem_environment::segments[idx].rcv)) = 0;
while(!hpx::util::openshmem_environment::test(hpx::util::openshmem_environment::segments[src_].rcv, 1)) {}
(*(hpx::util::openshmem_environment::segments[src_].rcv)) = 0;

std::memcpy(reinterpret_cast<std::uint8_t*>(c.data())+chunk_beg,
hpx::util::openshmem_environment::segments[idx].beg_addr,
hpx::util::openshmem_environment::segments[src_].beg_addr,
data_seg[(i == rcv_numitrs_term)]
);

if(i != rcv_numitrs_term) {
(*(hpx::util::openshmem_environment::segments[idx].rcv)) = 0;
chunk_beg = i * sys_pgsz;
hpx::util::openshmem_environment::put_signal(nullptr, src_,
nullptr, 0, hpx::util::openshmem_environment::segments[idx].xmt);
Expand Down Expand Up @@ -295,6 +292,7 @@ std::cout << "request_done" << std::endl;
connection_state state_;

int src_;
int idx;

header header_;
buffer_type buffer_;
Expand Down
Loading

0 comments on commit 50318e1

Please sign in to comment.