From 846b3f1829c2a5f5f92fd4cf3ed328c01842c691 Mon Sep 17 00:00:00 2001 From: David Date: Thu, 8 Sep 2016 18:33:11 -0500 Subject: [PATCH] fixed asynchronous reading/writing bugs --- stim/envi/binary.h | 3 ++- stim/envi/bsq.h | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------- stim/envi/hsi.h | 4 ++-- 3 files changed, 71 insertions(+), 37 deletions(-) diff --git a/stim/envi/binary.h b/stim/envi/binary.h index ff8a725..f6d0dc2 100644 --- a/stim/envi/binary.h +++ b/stim/envi/binary.h @@ -404,7 +404,7 @@ public: } /// Reads a block specified by an (x, y, z) position and size using the largest possible contiguous reads - bool read_block(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){ + bool read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){ size_t size_bytes = sx * sy * sz * sizeof(T); //size of the block to read in bytes @@ -443,6 +443,7 @@ public: } file.seekg(jump_y_bytes, std::ios::cur); //skip to the beginning of the next slice } + return false; } }; diff --git a/stim/envi/bsq.h b/stim/envi/bsq.h index f8eb890..7504518 100644 --- a/stim/envi/bsq.h +++ b/stim/envi/bsq.h @@ -9,6 +9,7 @@ #include #include #include +#include @@ -376,6 +377,10 @@ public: } + void readlines(T* dest, size_t start, size_t n){ + hsi::read(dest, 0, start, 0, X(), n, Z()); + } + bool bil(std::string outname, bool PROGRESS = false){ size_t in_time, out_time, calc_time; //initialize the timing variables @@ -385,66 +390,94 @@ public: size_t XB = X() * Z(); //number of elements in an output slice size_t XYbytes = XY * sizeof(T); //number of bytes in an input slice size_t XBbytes = XB * sizeof(T); //number of bytes in an output slice - size_t batch_slices = binary::buffer_size / (2*XBbytes); //calculate the number of slices that can fit in memory - if(Y() < batch_slices) batch_slices = Y(); //if the entire data set will fit in memory, do it - size_t batchN = XB * batch_slices; //number of elements in a batch + size_t batch_slices[2]; + batch_slices[0] = binary::buffer_size / (4*XBbytes); //calculate the number of slices that can fit in memory + batch_slices[1] = batch_slices[0]; + if(batch_slices == 0){ + std::cout<<"error, insufficient memory for stim::bsq::bil()"< rthread; + + readlines(ptrIn[0], 0, batch_slices[0]); + y += batch_slices[i]; + + std::future wthread; + + std::chrono::high_resolution_clock::time_point t_start; //high-resolution timers + std::chrono::high_resolution_clock::time_point t_end; + size_t t_batch; //number of milliseconds to process a batch + size_t t_total = 0; + for(size_t c = 0; c < batches; c++){ - if(c == (batches - 1)){ - batch_slices = Y() - (batches - 1) * batch_slices; //if this is the last batch, calculate the remaining # of bands - jump = (Y() - batch_slices) * X() * sizeof(T); - batchN = XB * batch_slices; - batch_bytes = batchN * sizeof(T); + t_start = std::chrono::high_resolution_clock::now(); //start the timer for this batch + if(c == (batches - 2)){ + batch_slices[!i] = Y() - (batches - 1) * batch_slices[!i]; //if this is the last batch, calculate the remaining # of bands } + jump = (Y() - batch_slices[i]) * X() * sizeof(T); + batchN = XB * batch_slices[i]; + batch_bytes = batchN * sizeof(T); - auto in_begin = std::chrono::high_resolution_clock::now(); - hsi::read_block(ptrIn, 0, y, 0, X(), batch_slices, Z()); //read the input block - y += batch_slices; - auto in_end = std::chrono::high_resolution_clock::now(); - in_time += std::chrono::duration_cast(in_end-in_begin).count(); - - auto calc_begin = std::chrono::high_resolution_clock::now(); - + rthread = std::async(&stim::bsq::readlines, this, ptrIn[!i], y, batch_slices[!i]); //start reading the next batch + y += batch_slices[i]; + for(size_t b = 0; b < Z(); b++){ //for each line, store an XB slice in ptrDest - ptrSrc = ptrIn + (b * X() * batch_slices); - ptrDst = ptrOut + (b * X()); //initialize ptrDst to the start of the XB output slice + ptrSrc = ptrIn[i] + (b * X() * batch_slices[i]); + ptrDst = ptrOut[i] + (b * X()); //initialize ptrDst to the start of the XB output slice - for(size_t y = 0; y < batch_slices; y++){ //for each band in the current line + for(size_t y = 0; y < batch_slices[i]; y++){ //for each band in the current line memcpy(ptrDst, ptrSrc, X() * sizeof(T)); //copy the band line from the source to the destination ptrSrc += X(); //increment the pointer within the current buffer array (batch) ptrDst += X() * Z(); //increment the pointer within the XB slice (to be output) } } - auto calc_end = std::chrono::high_resolution_clock::now(); - calc_time += std::chrono::duration_cast(calc_end-calc_begin).count(); + + wthread = std::async( &std::fstream::write, &target, (char*)ptrOut[i], batch_bytes); - auto out_begin = std::chrono::high_resolution_clock::now(); - target.write((char*)ptrOut, batch_bytes); //write the batch to disk - auto out_end = std::chrono::high_resolution_clock::now(); - out_time += std::chrono::duration_cast(out_end-out_begin).count(); if(PROGRESS) progress = (double)( c + 1 ) / (batches) * 100; + i = !i; + + rthread.wait(); + wthread.wait(); + t_end = std::chrono::high_resolution_clock::now(); + t_batch = std::chrono::duration_cast(t_end-t_start).count(); + t_total += t_batch; } + + std::cout<<"Total time to execute: "<BIL reads: "<<(double)in_time / (1000 * 60)<<" min"<BIL calculations: "<<(double)calc_time / (1000 * 60)<<" min"<BIL writes: "<<(double)out_time / (1000 * 60)<<" min"<BIL reads: "<<(double)in_time / (1000 * 60)<<" min"<BIL calculations: "<<(double)calc_time / (1000 * 60)<<" min"<BIL writes: "<<(double)out_time / (1000 * 60)<<" min"<::read_block(dest, d[0], d[1], d[2], sd[0], sd[1], sd[2])){ + if(!binary::read(dest, d[0], d[1], d[2], sd[0], sd[1], sd[2])){ std::cout<<"error reading block in stim::hsi: ("<