Skip to content

Commit

Permalink
Merge pull request ornladios#7 from taniabanerjee/dev-mgardplus-torch
Browse files Browse the repository at this point in the history
Add param print
  • Loading branch information
taniabanerjee authored Aug 11, 2022
2 parents 8689317 + b666145 commit ce04064
Showing 1 changed file with 70 additions and 6 deletions.
76 changes: 70 additions & 6 deletions source/adios2/operator/compress/CompressMGARDPlus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
std::cout << "rank,size:" << my_rank << " " << comm_size << std::endl;

std::cout << "Parameters:" << std::endl;
for (auto const& x : m_Parameters)
{
std::cout << " " << x.first << ": " << x.second << std::endl;
}

// Instantiate LagrangeOptimizer
LagrangeOptimizer optim;
// Read ADIOS2 files end, use data for your algorithm
Expand Down Expand Up @@ -416,18 +422,29 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
Autoencoder model(input_dim, latent_dim);
model->to(options.device);

double start = MPI_Wtime();
auto ds = CustomDataset((double *)dataIn, {blockCount[0], blockCount[1], blockCount[2], blockCount[3]});
auto dataset = ds.map(torch::data::transforms::Stack<>());
const size_t dataset_size = dataset.size().value();
auto loader = torch::data::make_data_loader<torch::data::samplers::SequentialSampler>(std::move(dataset),
options.batch_size);
torch::optim::Adam optimizer(model->parameters(), torch::optim::AdamOptions(1e-3 /*learning rate*/));
// if (my_rank == 0)
{
double end = MPI_Wtime();
printf("Time taken for Prep: %f\n", (end - start));
}

double start = MPI_Wtime();
start = MPI_Wtime();
if (train_yes == 1)
{
// Pytorch DDP
std::string path = ".filestore";
if (my_rank == 0)
{
remove(path.c_str());
}
MPI_Barrier(MPI_COMM_WORLD);
auto store = c10::make_intrusive<::c10d::FileStore>(path, comm_size);
c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts =
c10::make_intrusive<c10d::ProcessGroupNCCL::Options>();
Expand All @@ -447,15 +464,26 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
train(pg, model, *loader, optimizer, epoch, dataset_size, options);
}
torch::save(model, "xgcf_ae_model.pt");

// This is just for testing purpose.
// We load a pre-trained model anyway to ensure the rest of the computation is stable.
const char* mname = get_param(m_Parameters, "ae", "").c_str();
torch::load(model, mname);
}
else
{
// (2022/08) jyc: We can restart from the model saved in python.
const char* mname = get_param(m_Parameters, "ae", "").c_str();
torch::load(model, mname);
}
// if (my_rank == 0)
{
double end = MPI_Wtime();
printf("Time taken for Training: %f\n", (end - start));
}

// Encode
start = MPI_Wtime();
model->eval();
std::vector<torch::Tensor> encode_vector;
for (auto &batch : *loader)
Expand All @@ -469,8 +497,14 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
// encodes shape is (nmesh, latent_dim), where nmesh = number of total mesh nodes this process has
auto encode = torch::cat(encode_vector, 0);
std::cout << "encode.sizes = " << encode.sizes() << std::endl;
// if (my_rank == 0)
{
double end = MPI_Wtime();
printf("Time taken for encode: %f\n", (end - start));
}

// Kmean
start = MPI_Wtime();
*reinterpret_cast<int *>(bufferOut) = latent_dim;
int offset = sizeof(int);
int numObjs = encode.size(0);
Expand Down Expand Up @@ -500,8 +534,14 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
encode[j, i] = clusters[membership[j]];
}
std::cout << "kmean is done " << std::endl;
// if (my_rank == 0)
{
double end = MPI_Wtime();
printf("Time taken for kmean: %f\n", (end - start));
}

// Decode
start = MPI_Wtime();
std::vector<torch::Tensor> decode_vector;
for (int i = 0; i < numObjs; i += options.batch_size)
{
Expand All @@ -515,8 +555,14 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
auto decode = torch::cat(decode_vector, 0);
decode = decode.to(torch::kFloat64).reshape({-1, ds.nx, ds.ny});
std::cout << "decode.sizes = " << decode.sizes() << std::endl;
// if (my_rank == 0)
{
double end = MPI_Wtime();
printf("Time taken for decode: %f\n", (end - start));
}

// Un-normalize
start = MPI_Wtime();
auto mu = ds.mu;
auto sig = ds.sig;
std::cout << "mu.sizes = " << mu.sizes() << std::endl;
Expand All @@ -541,24 +587,42 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
size_t offsetForDecompresedData = offset;
offset += sizeof(size_t);
std::cout << "residual data is ready" << std::endl;
// if (my_rank == 0)
{
double end = MPI_Wtime();
printf("Time taken for residual: %f\n", (end - start));
}

// apply MGARD operate.
start = MPI_Wtime();
// Make sure that the shape of the input and the output of MGARD is (1, nmesh, nx, ny)
CompressMGARD mgard(m_Parameters);
size_t mgardBufferSize =
mgard.Operate(reinterpret_cast<char *>(diff_data.data()), blockStart, blockCount, type, bufferOut + offset);
// size_t mgardBufferSize = mgard.Operate(dataIn, blockStart, blockCount, type, bufferOut + offset);
std::cout << "mgard is ready" << std::endl;
// if (my_rank == 0)
{
double end = MPI_Wtime();
printf("Time taken for mgard: %f\n", (end - start));
}

start = MPI_Wtime();
PutParameter(bufferOut, offsetForDecompresedData, mgardBufferSize);

// use MGARD decompress
std::vector<char> tmpDecompressBuffer(helper::GetTotalSize(blockCount, helper::GetDataTypeSize(type)));
mgard.InverseOperate(bufferOut + offset, mgardBufferSize, tmpDecompressBuffer.data());
std::cout << "mgard inverse is ready" << std::endl;
offset += mgardBufferSize;
// if (my_rank == 0)
{
double end = MPI_Wtime();
printf("Time taken for mgard-decomp: %f\n", (end - start));
}

// reconstruct data from the residuals
start = MPI_Wtime();
auto decompressed_residual_data =
torch::from_blob((void *)tmpDecompressBuffer.data(),
{blockCount[0], blockCount[1], blockCount[2], blockCount[3]}, torch::kFloat64)
Expand All @@ -573,6 +637,11 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
// recon_vec shape: (1, nmesh, nx, ny)
optim.computeLagrangeParameters(recon_vec.data(), pq_yes);
std::cout << "Lagrange is ready" << std::endl;
// if (my_rank == 0)
{
double end = MPI_Wtime();
printf("Time taken for Lagrange: %f\n", (end - start));
}

// for (auto &batch : *loader)
// {
Expand All @@ -588,11 +657,6 @@ size_t CompressMGARDPlus::Operate(const char *dataIn, const Dims &blockStart, co
// With this total decompressed output, compute Lagrange parameters
// Figure out PD and QoI errors

if (my_rank == 0)
{
double end = MPI_Wtime();
printf("Time taken for Training: %f\n", (end - start));
}
bufferOutOffset += offset;
}
else if (compression_method == 4)
Expand Down

0 comments on commit ce04064

Please sign in to comment.