Skip to content

Commit

Permalink
Merge pull request ornladios#6 from taniabanerjee/dev-mgardplus-torch
Browse files Browse the repository at this point in the history
Update on pg
  • Loading branch information
taniabanerjee authored Aug 10, 2022
2 parents dae50a1 + 337153f commit 150f626
Showing 1 changed file with 42 additions and 32 deletions.
74 changes: 42 additions & 32 deletions source/adios2/operator/compress/CompressMGARDPlus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,40 +408,40 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
int latent_dim = 4;
std::cout << "input_dim, latent_dim: " << input_dim << " " << latent_dim << std::endl;

// Pytorch DDP Training
// torch::Device device(torch::kCUDA);
// Pytorch
Options options;
options.device = torch::kCUDA;
uint8_t train_yes = atoi(get_param(m_Parameters, "train", "1").c_str());

std::string path = ".filestore";
remove(path.c_str());
auto store = c10::make_intrusive<::c10d::FileStore>(path, comm_size);
c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts = c10::make_intrusive<c10d::ProcessGroupNCCL::Options>();
auto pg = std::make_shared<::c10d::ProcessGroupNCCL>(store, my_rank, comm_size, std::move(opts));

// check if pg is working
auto mytensor = torch::ones(1) * my_rank;
mytensor = mytensor.to(options.device);
std::vector<torch::Tensor> tmp = {mytensor};
auto work = pg->allreduce(tmp);
work->wait(kNoTimeout);
auto expected = (comm_size * (comm_size - 1)) / 2;
assert(mytensor.item<int>() == expected);

Autoencoder model(input_dim, latent_dim);
model->to(options.device);

auto ds = CustomDataset((double *)dataIn, {blockCount[0], blockCount[1], blockCount[2], blockCount[3]});
auto dataset = ds.map(torch::data::transforms::Stack<>());
const size_t dataset_size = dataset.size().value();
auto loader =
torch::data::make_data_loader<torch::data::samplers::SequentialSampler>(std::move(dataset), options.batch_size);
auto loader = torch::data::make_data_loader<torch::data::samplers::SequentialSampler>(std::move(dataset),
options.batch_size);
torch::optim::Adam optimizer(model->parameters(), torch::optim::AdamOptions(1e-3 /*learning rate*/));

double start = MPI_Wtime();
if (train_yes == 1)
{
// Pytorch DDP
std::string path = ".filestore";
auto store = c10::make_intrusive<::c10d::FileStore>(path, comm_size);
c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts =
c10::make_intrusive<c10d::ProcessGroupNCCL::Options>();
auto pg = std::make_shared<::c10d::ProcessGroupNCCL>(store, my_rank, comm_size, std::move(opts));

// check if pg is working
auto mytensor = torch::ones(1) * my_rank;
mytensor = mytensor.to(options.device);
std::vector<torch::Tensor> tmp = {mytensor};
auto work = pg->allreduce(tmp);
work->wait(kNoTimeout);
auto expected = (comm_size * (comm_size - 1)) / 2;
assert(mytensor.item<int>() == expected);

for (size_t epoch = 1; epoch <= options.iterations; ++epoch)
{
train(pg, model, *loader, optimizer, epoch, dataset_size, options);
Expand All @@ -451,7 +451,7 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
else
{
// (2022/08) jyc: We can restart from the model saved in python.
torch::load(model, "my_ae.pt");
torch::load(model, "my_iter.pt");
}

// Encode
Expand All @@ -465,7 +465,7 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
_encode = _encode.to(torch::kCPU);
encode_vector.push_back(_encode);
}
// All of encodes: shape (nmesh, latent_dim), where nmesh = number of total mesh nodes this process has
// encodes shape is (nmesh, latent_dim), where nmesh = number of total mesh nodes this process has
auto encode = torch::cat(encode_vector, 0);
std::cout << "encode.sizes = " << encode.sizes() << std::endl;

Expand Down Expand Up @@ -524,23 +524,28 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
decode = decode + mu.reshape({-1, 1, 1});
// std::cout << "decode.sizes = " << decode.sizes() << std::endl;

// forg and docode shape: (nmesh, nx, ny)
auto diff = ds.forg - decode;
std::cout << "forg min,max = " << ds.forg.min().item<double>() << " " << ds.forg.max().item<double>() << std::endl;
std::cout << "decode min,max = " << decode.min().item<double>() << " " << decode.max().item<double>() << std::endl;
std::cout << "forg min,max = " << ds.forg.min().item<double>() << " " << ds.forg.max().item<double>()
<< std::endl;
std::cout << "decode min,max = " << decode.min().item<double>() << " " << decode.max().item<double>()
<< std::endl;
std::cout << "diff min,max = " << diff.min().item<double>() << " " << diff.max().item<double>() << std::endl;

// use MGARD to compress the residuals
auto perm_diff = diff.reshape({1, -1, ds.nx, ds.ny}).permute({0,2,1,3}).contiguous().cpu();
std::vector <double> diff_data(perm_diff.data_ptr<double>(), perm_diff.data_ptr<double>() + perm_diff.numel());
auto perm_diff = diff.reshape({1, -1, ds.nx, ds.ny}).permute({0, 2, 1, 3}).contiguous().cpu();
std::vector<double> diff_data(perm_diff.data_ptr<double>(), perm_diff.data_ptr<double>() + perm_diff.numel());

// reserve space in output buffer to store MGARD buffer size
size_t offsetForDecompresedData = offset;
offset += sizeof(size_t);
std::cout << "residual data is ready" << std::endl;

// apply MGARD operate
// apply MGARD operate.
// Make sure that the shape of the input and the output of MGARD is (1, nmesh, nx, ny)
CompressMGARD mgard(m_Parameters);
size_t mgardBufferSize = mgard.Operate(reinterpret_cast<char*>(diff_data.data()), blockStart, blockCount, type, bufferOut + offset);
size_t mgardBufferSize =
mgard.Operate(reinterpret_cast<char *>(diff_data.data()), blockStart, blockCount, type, bufferOut + offset);
// size_t mgardBufferSize = mgard.Operate(dataIn, blockStart, blockCount, type, bufferOut + offset);
std::cout << "mgard is ready" << std::endl;

Expand All @@ -553,13 +558,18 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
offset += mgardBufferSize;

// reconstruct data from the residuals
auto decompressed_residual_data = torch::from_blob((void *)tmpDecompressBuffer.data(), {blockCount[0], blockCount[1], blockCount[2], blockCount[3]}, torch::kFloat64)
.permute({0, 2, 1, 3});
auto recon_data = decode + decompressed_residual_data;
auto recon_data_permuted = recon_data.permute({0, 2, 1, 3}).cpu();
std::vector <double> recon_vec(recon_data_permuted.data_ptr<double>(), recon_data_permuted.data_ptr<double>() + recon_data_permuted.numel());
auto decompressed_residual_data =
torch::from_blob((void *)tmpDecompressBuffer.data(),
{blockCount[0], blockCount[1], blockCount[2], blockCount[3]}, torch::kFloat64)
.permute({0, 2, 1, 3});
auto recon_data = decode.reshape({1, -1, ds.nx, ds.ny}) + decompressed_residual_data;
// recon_data shape (1, nmesh, nx, ny) and make it contiguous in memory
recon_data = recon_data.contiguous().cpu();
std::vector<double> recon_vec(recon_data.data_ptr<double>(),
recon_data.data_ptr<double>() + recon_data.numel());

// apply post processing
// recon_vec shape: (1, nmesh, nx, ny)
optim.computeLagrangeParameters(recon_vec.data(), pq_yes);
std::cout << "Lagrange is ready" << std::endl;

Expand Down

0 comments on commit 150f626

Please sign in to comment.