Commit 846b3f1829c2a5f5f92fd4cf3ed328c01842c691

Authored by David Mayerich
1 parent 8b899c24

fixed asynchronous reading/writing bugs

Showing 3 changed files with 71 additions and 37 deletions   Show diff stats
stim/envi/binary.h
@@ -404,7 +404,7 @@ public: @@ -404,7 +404,7 @@ public:
404 } 404 }
405 405
406 /// Reads a block specified by an (x, y, z) position and size using the largest possible contiguous reads 406 /// Reads a block specified by an (x, y, z) position and size using the largest possible contiguous reads
407 - bool read_block(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){ 407 + bool read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){
408 408
409 size_t size_bytes = sx * sy * sz * sizeof(T); //size of the block to read in bytes 409 size_t size_bytes = sx * sy * sz * sizeof(T); //size of the block to read in bytes
410 410
@@ -443,6 +443,7 @@ public: @@ -443,6 +443,7 @@ public:
443 } 443 }
444 file.seekg(jump_y_bytes, std::ios::cur); //skip to the beginning of the next slice 444 file.seekg(jump_y_bytes, std::ios::cur); //skip to the beginning of the next slice
445 } 445 }
  446 + return false;
446 } 447 }
447 448
448 }; 449 };
@@ -9,6 +9,7 @@ @@ -9,6 +9,7 @@
9 #include <vector> 9 #include <vector>
10 #include <deque> 10 #include <deque>
11 #include <chrono> 11 #include <chrono>
  12 +#include <future>
12 13
13 14
14 15
@@ -376,6 +377,10 @@ public: @@ -376,6 +377,10 @@ public:
376 377
377 } 378 }
378 379
  380 + void readlines(T* dest, size_t start, size_t n){
  381 + hsi<T>::read(dest, 0, start, 0, X(), n, Z());
  382 + }
  383 +
379 bool bil(std::string outname, bool PROGRESS = false){ 384 bool bil(std::string outname, bool PROGRESS = false){
380 385
381 size_t in_time, out_time, calc_time; //initialize the timing variables 386 size_t in_time, out_time, calc_time; //initialize the timing variables
@@ -385,66 +390,94 @@ public: @@ -385,66 +390,94 @@ public:
385 size_t XB = X() * Z(); //number of elements in an output slice 390 size_t XB = X() * Z(); //number of elements in an output slice
386 size_t XYbytes = XY * sizeof(T); //number of bytes in an input slice 391 size_t XYbytes = XY * sizeof(T); //number of bytes in an input slice
387 size_t XBbytes = XB * sizeof(T); //number of bytes in an output slice 392 size_t XBbytes = XB * sizeof(T); //number of bytes in an output slice
388 - size_t batch_slices = binary<T>::buffer_size / (2*XBbytes); //calculate the number of slices that can fit in memory  
389 - if(Y() < batch_slices) batch_slices = Y(); //if the entire data set will fit in memory, do it  
390 - size_t batchN = XB * batch_slices; //number of elements in a batch 393 + size_t batch_slices[2];
  394 + batch_slices[0] = binary<T>::buffer_size / (4*XBbytes); //calculate the number of slices that can fit in memory
  395 + batch_slices[1] = batch_slices[0];
  396 + if(batch_slices == 0){
  397 + std::cout<<"error, insufficient memory for stim::bsq::bil()"<<std::endl;
  398 + exit(1);
  399 + }
  400 + if(Y() < batch_slices[0]) batch_slices[0] = Y(); //if the entire data set will fit in memory, do it
  401 + size_t batchN = XB * batch_slices[0]; //number of elements in a batch
391 size_t batch_bytes = batchN * sizeof(T); //calculate the number of bytes in a batch 402 size_t batch_bytes = batchN * sizeof(T); //calculate the number of bytes in a batch
392 403
393 - T* ptrIn = (T*) malloc(batch_bytes); //allocate a large buffer storing the read data  
394 - T* ptrOut = (T*) malloc(batch_bytes); //allocate space for storing an output buffer 404 + //T* ptrIn = (T*) malloc(batch_bytes); //allocate a large buffer storing the read data
  405 + //T* ptrOut = (T*) malloc(batch_bytes); //allocate space for storing an output buffer
  406 + T* ptrIn[2]; //input double-buffer for asynchronous batching
  407 + ptrIn[0] = (T*) malloc(batch_bytes);
  408 + ptrIn[1] = (T*) malloc(batch_bytes);
  409 + T* ptrOut[2]; //output double-buffer for asynchronous batching
  410 + ptrOut[0] = (T*) malloc(batch_bytes);
  411 + ptrOut[1] = (T*) malloc(batch_bytes);
395 412
396 - size_t jump = (Y() - batch_slices) * X() * sizeof(T); //jump between reads in the input file 413 + size_t jump = (Y() - batch_slices[0]) * X() * sizeof(T); //jump between reads in the input file
397 414
398 std::ofstream target(outname.c_str(), std::ios::binary); 415 std::ofstream target(outname.c_str(), std::ios::binary);
399 std::string headername = outname + ".hdr"; 416 std::string headername = outname + ".hdr";
400 417
401 - size_t batches = (size_t)ceil((double)(Y()) / (double)batch_slices); //calculate the number of batches 418 + size_t batches = (size_t)ceil((double)(Y()) / (double)batch_slices[0]); //calculate the number of batches
402 T* ptrDst; 419 T* ptrDst;
403 T* ptrSrc; 420 T* ptrSrc;
404 size_t y = 0; //initialize the current y-slice position 421 size_t y = 0; //initialize the current y-slice position
  422 + int i = 0;
  423 + std::future<void> rthread;
  424 +
  425 + readlines(ptrIn[0], 0, batch_slices[0]);
  426 + y += batch_slices[i];
  427 +
  428 + std::future<std::ostream&> wthread;
  429 +
  430 + std::chrono::high_resolution_clock::time_point t_start; //high-resolution timers
  431 + std::chrono::high_resolution_clock::time_point t_end;
  432 + size_t t_batch; //number of milliseconds to process a batch
  433 + size_t t_total = 0;
  434 +
405 for(size_t c = 0; c < batches; c++){ 435 for(size_t c = 0; c < batches; c++){
406 - if(c == (batches - 1)){  
407 - batch_slices = Y() - (batches - 1) * batch_slices; //if this is the last batch, calculate the remaining # of bands  
408 - jump = (Y() - batch_slices) * X() * sizeof(T);  
409 - batchN = XB * batch_slices;  
410 - batch_bytes = batchN * sizeof(T); 436 + t_start = std::chrono::high_resolution_clock::now(); //start the timer for this batch
  437 + if(c == (batches - 2)){
  438 + batch_slices[!i] = Y() - (batches - 1) * batch_slices[!i]; //if this is the last batch, calculate the remaining # of bands
411 } 439 }
  440 + jump = (Y() - batch_slices[i]) * X() * sizeof(T);
  441 + batchN = XB * batch_slices[i];
  442 + batch_bytes = batchN * sizeof(T);
412 443
413 - auto in_begin = std::chrono::high_resolution_clock::now();  
414 - hsi<T>::read_block(ptrIn, 0, y, 0, X(), batch_slices, Z()); //read the input block  
415 - y += batch_slices;  
416 - auto in_end = std::chrono::high_resolution_clock::now();  
417 - in_time += std::chrono::duration_cast<std::chrono::milliseconds>(in_end-in_begin).count();  
418 -  
419 - auto calc_begin = std::chrono::high_resolution_clock::now();  
420 - 444 + rthread = std::async(&stim::bsq<T>::readlines, this, ptrIn[!i], y, batch_slices[!i]); //start reading the next batch
  445 + y += batch_slices[i];
  446 +
421 for(size_t b = 0; b < Z(); b++){ //for each line, store an XB slice in ptrDest 447 for(size_t b = 0; b < Z(); b++){ //for each line, store an XB slice in ptrDest
422 - ptrSrc = ptrIn + (b * X() * batch_slices);  
423 - ptrDst = ptrOut + (b * X()); //initialize ptrDst to the start of the XB output slice 448 + ptrSrc = ptrIn[i] + (b * X() * batch_slices[i]);
  449 + ptrDst = ptrOut[i] + (b * X()); //initialize ptrDst to the start of the XB output slice
424 450
425 - for(size_t y = 0; y < batch_slices; y++){ //for each band in the current line 451 + for(size_t y = 0; y < batch_slices[i]; y++){ //for each band in the current line
426 memcpy(ptrDst, ptrSrc, X() * sizeof(T)); //copy the band line from the source to the destination 452 memcpy(ptrDst, ptrSrc, X() * sizeof(T)); //copy the band line from the source to the destination
427 ptrSrc += X(); //increment the pointer within the current buffer array (batch) 453 ptrSrc += X(); //increment the pointer within the current buffer array (batch)
428 ptrDst += X() * Z(); //increment the pointer within the XB slice (to be output) 454 ptrDst += X() * Z(); //increment the pointer within the XB slice (to be output)
429 } 455 }
430 } 456 }
431 - auto calc_end = std::chrono::high_resolution_clock::now();  
432 - calc_time += std::chrono::duration_cast<std::chrono::milliseconds>(calc_end-calc_begin).count(); 457 +
  458 + wthread = std::async( &std::fstream::write, &target, (char*)ptrOut[i], batch_bytes);
433 459
434 - auto out_begin = std::chrono::high_resolution_clock::now();  
435 - target.write((char*)ptrOut, batch_bytes); //write the batch to disk  
436 - auto out_end = std::chrono::high_resolution_clock::now();  
437 - out_time += std::chrono::duration_cast<std::chrono::milliseconds>(out_end-out_begin).count();  
438 if(PROGRESS) progress = (double)( c + 1 ) / (batches) * 100; 460 if(PROGRESS) progress = (double)( c + 1 ) / (batches) * 100;
  461 + i = !i;
  462 +
  463 + rthread.wait();
  464 + wthread.wait();
  465 + t_end = std::chrono::high_resolution_clock::now();
  466 + t_batch = std::chrono::duration_cast<std::chrono::milliseconds>(t_end-t_start).count();
  467 + t_total += t_batch;
439 } 468 }
  469 +
  470 + std::cout<<"Total time to execute: "<<t_total<<" ms"<<std::endl;
440 471
441 - free(ptrIn);  
442 - free(ptrOut); 472 + free(ptrIn[0]);
  473 + free(ptrIn[1]);
  474 + free(ptrOut[0]);
  475 + free(ptrOut[1]);
443 target.close(); 476 target.close();
444 477
445 - std::cout<<"BSQ->BIL reads: "<<(double)in_time / (1000 * 60)<<" min"<<std::endl;  
446 - std::cout<<"BSQ->BIL calculations: "<<(double)calc_time / (1000 * 60)<<" min"<<std::endl;  
447 - std::cout<<"BSQ->BIL writes: "<<(double)out_time / (1000 * 60)<<" min"<<std::endl; 478 + //std::cout<<"BSQ->BIL reads: "<<(double)in_time / (1000 * 60)<<" min"<<std::endl;
  479 + //std::cout<<"BSQ->BIL calculations: "<<(double)calc_time / (1000 * 60)<<" min"<<std::endl;
  480 + //std::cout<<"BSQ->BIL writes: "<<(double)out_time / (1000 * 60)<<" min"<<std::endl;
448 481
449 return true; 482 return true;
450 } 483 }
@@ -202,7 +202,7 @@ public: @@ -202,7 +202,7 @@ public:
202 } 202 }
203 } 203 }
204 204
205 - void read_block(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){ 205 + void read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){
206 size_t d[3]; //position in the binary coordinate system 206 size_t d[3]; //position in the binary coordinate system
207 size_t sd[3]; //size in the binary coordinate system 207 size_t sd[3]; //size in the binary coordinate system
208 208
@@ -214,7 +214,7 @@ public: @@ -214,7 +214,7 @@ public:
214 sd[O[1]] = sy; 214 sd[O[1]] = sy;
215 sd[O[2]] = sz; 215 sd[O[2]] = sz;
216 216
217 - if(!binary<T>::read_block(dest, d[0], d[1], d[2], sd[0], sd[1], sd[2])){ 217 + if(!binary<T>::read(dest, d[0], d[1], d[2], sd[0], sd[1], sd[2])){
218 std::cout<<"error reading block in stim::hsi: ("<<d[0]<<", "<<d[1]<<", "<<d[2]<<") - ["<<sd[0]<<", "<<sd[1]<<", "<<sd[2]<<"]"<<std::endl; 218 std::cout<<"error reading block in stim::hsi: ("<<d[0]<<", "<<d[1]<<", "<<d[2]<<") - ["<<sd[0]<<", "<<sd[1]<<", "<<sd[2]<<"]"<<std::endl;
219 exit(1); 219 exit(1);
220 } 220 }