From 21b97acbd579d7077b08f787a7380f4f02a73ed8 Mon Sep 17 00:00:00 2001 From: Jong Choi Date: Wed, 10 Aug 2022 08:54:53 -0400 Subject: [PATCH 1/2] update on pg --- .../operator/compress/CompressMGARDPlus.cpp | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/source/adios2/operator/compress/CompressMGARDPlus.cpp b/source/adios2/operator/compress/CompressMGARDPlus.cpp index af1ce47252..6c0ea77e06 100644 --- a/source/adios2/operator/compress/CompressMGARDPlus.cpp +++ b/source/adios2/operator/compress/CompressMGARDPlus.cpp @@ -408,27 +408,11 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co int latent_dim = 4; std::cout << "input_dim, latent_dim: " << input_dim << " " << latent_dim << std::endl; - // Pytorch DDP Training - // torch::Device device(torch::kCUDA); + // Pytorch Options options; options.device = torch::kCUDA; uint8_t train_yes = atoi(get_param(m_Parameters, "train", "1").c_str()); - std::string path = ".filestore"; - remove(path.c_str()); - auto store = c10::make_intrusive<::c10d::FileStore>(path, comm_size); - c10::intrusive_ptr opts = c10::make_intrusive(); - auto pg = std::make_shared<::c10d::ProcessGroupNCCL>(store, my_rank, comm_size, std::move(opts)); - - // check if pg is working - auto mytensor = torch::ones(1) * my_rank; - mytensor = mytensor.to(options.device); - std::vector tmp = {mytensor}; - auto work = pg->allreduce(tmp); - work->wait(kNoTimeout); - auto expected = (comm_size * (comm_size - 1)) / 2; - assert(mytensor.item() == expected); - Autoencoder model(input_dim, latent_dim); model->to(options.device); @@ -442,6 +426,21 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co double start = MPI_Wtime(); if (train_yes == 1) { + // Pytorch DDP + std::string path = ".filestore"; + auto store = c10::make_intrusive<::c10d::FileStore>(path, comm_size); + c10::intrusive_ptr opts = c10::make_intrusive(); + auto pg = std::make_shared<::c10d::ProcessGroupNCCL>(store, my_rank, comm_size, std::move(opts)); + + // check if pg is working + auto mytensor = torch::ones(1) * my_rank; + mytensor = mytensor.to(options.device); + std::vector tmp = {mytensor}; + auto work = pg->allreduce(tmp); + work->wait(kNoTimeout); + auto expected = (comm_size * (comm_size - 1)) / 2; + assert(mytensor.item() == expected); + for (size_t epoch = 1; epoch <= options.iterations; ++epoch) { train(pg, model, *loader, optimizer, epoch, dataset_size, options); @@ -451,7 +450,7 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co else { // (2022/08) jyc: We can restart from the model saved in python. - torch::load(model, "my_ae.pt"); + torch::load(model, "my_iter.pt"); } // Encode From 337153f1b3d48dd5f7beb6f8d11bed9868673811 Mon Sep 17 00:00:00 2001 From: Jong Choi Date: Wed, 10 Aug 2022 10:04:25 -0400 Subject: [PATCH 2/2] minor fix on shape --- .../operator/compress/CompressMGARDPlus.cpp | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/source/adios2/operator/compress/CompressMGARDPlus.cpp b/source/adios2/operator/compress/CompressMGARDPlus.cpp index 6c0ea77e06..c4f2622e18 100644 --- a/source/adios2/operator/compress/CompressMGARDPlus.cpp +++ b/source/adios2/operator/compress/CompressMGARDPlus.cpp @@ -419,8 +419,8 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co auto ds = CustomDataset((double *)dataIn, {blockCount[0], blockCount[1], blockCount[2], blockCount[3]}); auto dataset = ds.map(torch::data::transforms::Stack<>()); const size_t dataset_size = dataset.size().value(); - auto loader = - torch::data::make_data_loader(std::move(dataset), options.batch_size); + auto loader = torch::data::make_data_loader(std::move(dataset), + options.batch_size); torch::optim::Adam optimizer(model->parameters(), torch::optim::AdamOptions(1e-3 /*learning rate*/)); double start = MPI_Wtime(); @@ -429,7 +429,8 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co // Pytorch DDP std::string path = ".filestore"; auto store = c10::make_intrusive<::c10d::FileStore>(path, comm_size); - c10::intrusive_ptr opts = c10::make_intrusive(); + c10::intrusive_ptr opts = + c10::make_intrusive(); auto pg = std::make_shared<::c10d::ProcessGroupNCCL>(store, my_rank, comm_size, std::move(opts)); // check if pg is working @@ -464,7 +465,7 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co _encode = _encode.to(torch::kCPU); encode_vector.push_back(_encode); } - // All of encodes: shape (nmesh, latent_dim), where nmesh = number of total mesh nodes this process has + // encodes shape is (nmesh, latent_dim), where nmesh = number of total mesh nodes this process has auto encode = torch::cat(encode_vector, 0); std::cout << "encode.sizes = " << encode.sizes() << std::endl; @@ -523,23 +524,28 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co decode = decode + mu.reshape({-1, 1, 1}); // std::cout << "decode.sizes = " << decode.sizes() << std::endl; + // forg and docode shape: (nmesh, nx, ny) auto diff = ds.forg - decode; - std::cout << "forg min,max = " << ds.forg.min().item() << " " << ds.forg.max().item() << std::endl; - std::cout << "decode min,max = " << decode.min().item() << " " << decode.max().item() << std::endl; + std::cout << "forg min,max = " << ds.forg.min().item() << " " << ds.forg.max().item() + << std::endl; + std::cout << "decode min,max = " << decode.min().item() << " " << decode.max().item() + << std::endl; std::cout << "diff min,max = " << diff.min().item() << " " << diff.max().item() << std::endl; // use MGARD to compress the residuals - auto perm_diff = diff.reshape({1, -1, ds.nx, ds.ny}).permute({0,2,1,3}).contiguous().cpu(); - std::vector diff_data(perm_diff.data_ptr(), perm_diff.data_ptr() + perm_diff.numel()); + auto perm_diff = diff.reshape({1, -1, ds.nx, ds.ny}).permute({0, 2, 1, 3}).contiguous().cpu(); + std::vector diff_data(perm_diff.data_ptr(), perm_diff.data_ptr() + perm_diff.numel()); // reserve space in output buffer to store MGARD buffer size size_t offsetForDecompresedData = offset; offset += sizeof(size_t); std::cout << "residual data is ready" << std::endl; - // apply MGARD operate + // apply MGARD operate. + // Make sure that the shape of the input and the output of MGARD is (1, nmesh, nx, ny) CompressMGARD mgard(m_Parameters); - size_t mgardBufferSize = mgard.Operate(reinterpret_cast(diff_data.data()), blockStart, blockCount, type, bufferOut + offset); + size_t mgardBufferSize = + mgard.Operate(reinterpret_cast(diff_data.data()), blockStart, blockCount, type, bufferOut + offset); // size_t mgardBufferSize = mgard.Operate(dataIn, blockStart, blockCount, type, bufferOut + offset); std::cout << "mgard is ready" << std::endl; @@ -552,13 +558,18 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co offset += mgardBufferSize; // reconstruct data from the residuals - auto decompressed_residual_data = torch::from_blob((void *)tmpDecompressBuffer.data(), {blockCount[0], blockCount[1], blockCount[2], blockCount[3]}, torch::kFloat64) - .permute({0, 2, 1, 3}); - auto recon_data = decode + decompressed_residual_data; - auto recon_data_permuted = recon_data.permute({0, 2, 1, 3}).cpu(); - std::vector recon_vec(recon_data_permuted.data_ptr(), recon_data_permuted.data_ptr() + recon_data_permuted.numel()); + auto decompressed_residual_data = + torch::from_blob((void *)tmpDecompressBuffer.data(), + {blockCount[0], blockCount[1], blockCount[2], blockCount[3]}, torch::kFloat64) + .permute({0, 2, 1, 3}); + auto recon_data = decode.reshape({1, -1, ds.nx, ds.ny}) + decompressed_residual_data; + // recon_data shape (1, nmesh, nx, ny) and make it contiguous in memory + recon_data = recon_data.contiguous().cpu(); + std::vector recon_vec(recon_data.data_ptr(), + recon_data.data_ptr() + recon_data.numel()); // apply post processing + // recon_vec shape: (1, nmesh, nx, ny) optim.computeLagrangeParameters(recon_vec.data(), pq_yes); std::cout << "Lagrange is ready" << std::endl;