Commit 6db250f53e04b197a18453d95f401bfc64459928

Authored by David Mayerich
1 parent ad934572

adaptive streaming Windows fixes

stim/envi/binary.h
... ... @@ -17,6 +17,96 @@
17 17  
18 18 namespace stim{
19 19  
  20 +/// This class calculates the optimal setting for independent parameter b (batch size) for
  21 +/// minimizing the dependent parameter bps (bytes per second)
  22 +class stream_optimizer{
  23 +protected:
  24 + size_t Bps; //bytes per second for the previous batch
  25 + size_t interval_B; //number of bytes processed this interval
  26 + size_t interval_ms; //number of milliseconds spent in the current interval
  27 + size_t n[2]; //current batch size (in bytes)
  28 + size_t dn; //delta value (in bytes) for setting the batch size (minimum change in batch parameter)
  29 + size_t maxn; //maximum value for the batch size
  30 +
  31 + bool sample_step; //calculating the derivative (this class alternates between calculating dBps and B)
  32 + bool forward_diff; //evaluate the derivative using forward differences
  33 +
  34 + size_t window_ms; //size of the interval (in milliseconds) integrated to get a reliable bps value
  35 +
  36 + // This function rounds x to the nearest value within dB
  37 + size_t round_limit(size_t n0){
  38 + if(n0 > maxn) n0 = maxn; //limit the returned size of x to within the specified bounds
  39 + if(n0 < dn) n0 = dn;
  40 +
  41 + size_t lowest = n0 / dn;
  42 + size_t highest = lowest + dn;
  43 + size_t diff[2] = {n0 - lowest, highest - n0}; //calculate the two differences
  44 + if(diff[0] < diff[1])
  45 + return lowest;
  46 + return highest;
  47 + }
  48 +
  49 +public:
  50 +
  51 + //constructor initializes a stream optimizer
  52 + stream_optimizer(size_t current_batch_size, size_t min_batch_size, size_t max_batch_size, size_t window = 1000){
  53 + Bps = 0; //initialize to zero bytes per second processed
  54 + interval_B = 0; //zero bytes have been processed at initialization
  55 + interval_ms = 0; //no time has been spent on the batch so far
  56 + dn = min_batch_size; //set the minimum batch size as the minimum change in batch size
  57 + maxn = max_batch_size; //set the maximum batch size
  58 + n[0] = current_batch_size; //set B
  59 + window_ms = window; //minimum integration interval (for getting a reliable bps measure)
  60 + sample_step = true; //the first step is to calculate the derivative
  61 + forward_diff = true; //start with the forward difference (since we start at the maximum batch size)
  62 + }
  63 +
  64 + // this function updates the optimizer, given the number of bytes processed in an interval and time spent processing
  65 + size_t update(size_t bytes_processed, size_t ms_spent){
  66 + interval_B += bytes_processed; //increment the number of bytes processed
  67 + interval_ms += ms_spent; //increment the number of milliseconds spent processing
  68 +
  69 + //if we have sufficient information to evaluate the optimization function at this point
  70 + if(interval_ms >= window_ms){ //if sufficient time has passed to get a reliable Bps measurement
  71 + size_t new_Bps = interval_B / interval_ms; //calculate the current Bps
  72 +
  73 + if(sample_step){ //if this is a sample step, collect the information for Bps = f(n0)
  74 + Bps = new_Bps; //set the Bps to the evaluated value
  75 + n[1] = n[0] - dn; //reduce the batch size by one delta to take a second sample
  76 + if(n[1] == 0){ //if the resulting batch size is zero
  77 + n[1] = 2*dn; //we're at the left edge: set the new sample point to 2*dn
  78 + }
  79 +
  80 + interval_B = interval_ms = 0; //start a new interval at the new sample point
  81 + sample_step = false; //next step will calculate the new batch size via optimization
  82 + return n[1]; //return the new batch size
  83 + }
  84 + else{ //if we have sufficient information to evaluate the derivative and optimize
  85 + double f = (double)new_Bps; //we have evaluated the function at this location
  86 + double fprime;
  87 + if(n[1] < n[0] ){ //if the new point is less than the previous point (usually the case)
  88 + fprime = (double)(Bps - new_Bps) / (double)dn; //calculate the forward difference
  89 + }
  90 + else{ //if the new point is larger (only happens at the minimum limit)
  91 + fprime = (double)(new_Bps - Bps) / (double)dn; //calculate the backward difference
  92 + }
  93 + size_t bestn = n[1] - (size_t)(f / fprime); //calculate the best value for B using Newton's method
  94 + n[0] = round_limit( (size_t)bestn ); //set the new dependent point
  95 + sample_step = true; //the next step will be a sample step
  96 + }
  97 +
  98 + }
  99 + if(sample_step) return n[0];
  100 + return n[1]; //insufficient information, keep the same batch size
  101 + }
  102 +
  103 + size_t update(size_t bytes_processed, size_t ms_spent, size_t& data_rate){
  104 + size_t time = update(bytes_processed, ms_spent);
  105 + data_rate = Bps;
  106 + return time;
  107 + }
  108 +};
  109 +
20 110 /** This class manages the streaming of large multidimensional binary files.
21 111 * Generally these are hyperspectral files with 2 spatial and 1 spectral dimension. However, this class supports
22 112 * other dimensions via the template parameter D.
... ... @@ -404,8 +494,8 @@ public:
404 494 }
405 495  
406 496 /// Reads a block specified by an (x, y, z) position and size using the largest possible contiguous reads
407   - bool read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){
408   -
  497 + size_t read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){
  498 + auto t0 = std::chrono::high_resolution_clock::now();
409 499 size_t size_bytes = sx * sy * sz * sizeof(T); //size of the block to read in bytes
410 500  
411 501 size_t start = z * R[0] * R[1] + y * R[0] + x; //calculate the start postion
... ... @@ -415,10 +505,8 @@ public:
415 505  
416 506 if(sx == R[0] && sy == R[1]){ //if sx and sy result in a contiguous volume along z
417 507 file.read((char*)dest, size_bytes); //read the block in one pass
418   - return true;
419 508 }
420   -
421   - if(sx == R[0]){ //if sx is contiguous, read each z-axis slice can be read in one pass
  509 + else if(sx == R[0]){ //if sx is contiguous, read each z-axis slice can be read in one pass
422 510 size_t jump_bytes = (R[1] - sy) * R[0] * sizeof(T); //jump between each slice
423 511 size_t slice_bytes = sx * sy * sizeof(T); //size of the slice to be read
424 512 for(size_t zi = 0; zi < sz; zi++){ //for each z-axis slice
... ... @@ -426,29 +514,31 @@ public:
426 514 dest += sx * sy; //move the destination pointer to the next slice
427 515 file.seekg(jump_bytes, std::ios::cur); //skip to the next slice in the file
428 516 }
429   - return true;
430 517 }
431   -
432   - //in this case, x is not contiguous so the volume must be read line-by-line
433   - size_t jump_x_bytes = (R[0] - sx) * sizeof(T); //number of bytes skipped in the x direction
434   - size_t jump_y_bytes = (R[1] - sy) * R[0] * sizeof(T) + jump_x_bytes; //number of bytes skipped between slices
435   - size_t line_bytes = sx * sizeof(T); //size of the line to be read
436   - size_t zi, yi;
437   - for(zi = 0; zi < sz; zi++){ //for each slice
438   - file.read((char*)dest, line_bytes); //read the first line
439   - for(yi = 1; yi < sy; yi++){ //read each additional line
440   - dest += sx; //move the pointer in the destination block to the next line
441   - file.seekg(jump_x_bytes, std::ios::cur); //skip to the next line in the file
442   - file.read((char*)dest, line_bytes); //read the line to the destination block
  518 + else{
  519 + //in this case, x is not contiguous so the volume must be read line-by-line
  520 + size_t jump_x_bytes = (R[0] - sx) * sizeof(T); //number of bytes skipped in the x direction
  521 + size_t jump_y_bytes = (R[1] - sy) * R[0] * sizeof(T) + jump_x_bytes; //number of bytes skipped between slices
  522 + size_t line_bytes = sx * sizeof(T); //size of the line to be read
  523 + size_t zi, yi;
  524 + for(zi = 0; zi < sz; zi++){ //for each slice
  525 + file.read((char*)dest, line_bytes); //read the first line
  526 + for(yi = 1; yi < sy; yi++){ //read each additional line
  527 + dest += sx; //move the pointer in the destination block to the next line
  528 + file.seekg(jump_x_bytes, std::ios::cur); //skip to the next line in the file
  529 + file.read((char*)dest, line_bytes); //read the line to the destination block
  530 + }
  531 + file.seekg(jump_y_bytes, std::ios::cur); //skip to the beginning of the next slice
443 532 }
444   - file.seekg(jump_y_bytes, std::ios::cur); //skip to the beginning of the next slice
445 533 }
446   - return false;
  534 + auto t1 = std::chrono::high_resolution_clock::now();
  535 + return std::chrono::duration_cast<std::chrono::milliseconds>(t1-t0).count();
447 536 }
448 537  
449 538 // permutes a block of data from the current interleave to the interleave specified (re-arranged dimensions to the order specified by [d0, d1, d2])
450 539  
451   - void permute(T* dest, T* src, size_t sx, size_t sy, size_t sz, size_t d0, size_t d1, size_t d2){
  540 + size_t permute(T* dest, T* src, size_t sx, size_t sy, size_t sz, size_t d0, size_t d1, size_t d2){
  541 + auto t0 = std::chrono::high_resolution_clock::now();
452 542 size_t d[3] = {d0, d1, d2};
453 543 size_t s[3] = {sx, sy, sz};
454 544 size_t p[3];// = {x, y, z};
... ... @@ -467,7 +557,6 @@ public:
467 557 p[1] = y;
468 558 src_idx = z * sx * sy + y * sx;
469 559 dest_idx = p[d[2]] * s[d[0]] * s[d[1]] + p[d[1]] * s[d[0]];
470   - //std::cout<<z<<", "<<y<<" ------- "<<p[d[2]]<<" * "<<s[d[0]]<<" * "<<s[d[1]]<<" + "<<p[d[1]]<<" * "<<s[d[0]]<<std::endl;
471 560 memcpy(dest + dest_idx, src + src_idx, x_bytes);
472 561 }
473 562 }
... ... @@ -491,6 +580,8 @@ public:
491 580 }
492 581 }
493 582 }
  583 + auto t1 = std::chrono::high_resolution_clock::now();
  584 + return std::chrono::duration_cast<std::chrono::milliseconds>(t1-t0).count();
494 585 }
495 586  
496 587 };
... ...
stim/envi/bsq.h
... ... @@ -377,12 +377,19 @@ public:
377 377  
378 378 }
379 379  
380   - void readlines(T* dest, size_t start, size_t n){
381   - hsi<T>::read(dest, 0, start, 0, X(), n, Z());
  380 + size_t readlines(T* dest, size_t start, size_t n){
  381 + return hsi<T>::read(dest, 0, start, 0, X(), n, Z());
  382 + }
  383 +
  384 + size_t writeblock(std::ofstream* f, T* src, size_t n){
  385 + auto t0 = std::chrono::high_resolution_clock::now();
  386 + f->write((char*)src, n);
  387 + auto t1 = std::chrono::high_resolution_clock::now();
  388 + return std::chrono::duration_cast<std::chrono::milliseconds>(t1-t0).count();
382 389 }
383 390  
384 391 /// Convert this BSQ file to a BIL
385   - bool bil(std::string outname, bool PROGRESS = false){
  392 + bool bil(std::string outname, bool PROGRESS = false, bool VERBOSE = false){
386 393  
387 394 const size_t buffers = 4; //number of buffers required for this algorithm
388 395 size_t mem_per_batch = binary<T>::buffer_size / buffers; //calculate the maximum memory available for a batch
... ... @@ -395,6 +402,8 @@ public:
395 402 }
396 403 size_t max_batch_bytes = max_slices_per_batch * slice_bytes; //calculate the amount of memory that will be allocated for all four buffers
397 404  
  405 + stream_optimizer O(max_slices_per_batch, 1, max_slices_per_batch);
  406 +
398 407 T* src[2]; //source double-buffer for asynchronous batching
399 408 src[0] = (T*) malloc(max_batch_bytes);
400 409 src[1] = (T*) malloc(max_batch_bytes);
... ... @@ -409,48 +418,61 @@ public:
409 418 //initialize with buffer 0 (used for double buffering)
410 419 size_t y_load = 0;
411 420 size_t y_proc = 0;
412   - std::future<void> rthread;
  421 + std::future<size_t> rthread;
413 422 std::future<std::ostream&> wthread; //create asynchronous threads for reading and writing
414 423  
415 424 readlines(src[0], 0, N[0]); //read the first batch into the 0 source buffer
416 425 y_load += N[0]; //increment the loaded slice counter
417 426 int b = 1;
418 427  
419   - std::chrono::high_resolution_clock::time_point t_start; //high-resolution timers
420   - std::chrono::high_resolution_clock::time_point t_end;
  428 + std::chrono::high_resolution_clock::time_point t_start, pt_start; //high-resolution timers
  429 + std::chrono::high_resolution_clock::time_point t_end, pt_end;
421 430 size_t t_batch; //number of milliseconds to process a batch
422   - size_t t_total = 0;
  431 + size_t t_total = 0; //total time for operation
  432 + size_t pt_total = 0; //total time spent processing data
  433 + size_t rt_total = 0; //total time spent reading data
  434 + size_t wt_total = 0;
  435 + size_t data_rate;
423 436 while(y_proc < Y()){ //while there are still slices to be processed
424 437 t_start = std::chrono::high_resolution_clock::now(); //start the timer for this batch
425 438 if(y_load < Y()){ //if there are still slices to be loaded, load them
  439 + if(y_proc > 0){
  440 + N[b] = O.update(N[!b] * slice_bytes, t_batch, data_rate); //set the batch size based on optimization
  441 + std::cout<<"New N = "<<N[b]<<" at "<<(double)data_rate / 1000000<<" MB/s"<<std::endl;
  442 + }
426 443 if(y_load + N[b] > Y()) N[b] = Y() - y_load; //if the next batch would process more than the total slices, adjust the batch size
427 444 rthread = std::async(std::launch::async, &stim::bsq<T>::readlines, this, src[b], y_load, N[b]);
428   - //rthread.wait();
  445 + rt_total += rthread.get();
429 446 y_load += N[b]; //increment the number of loaded slices
430 447 }
431 448  
432 449 b = !b; //swap the double-buffer
433   -
434   - binary<T>::permute(dst[b], src[b], X(), N[b], Z(), 0, 2, 1); //permute the batch to a BIL file
435   - target.write((char*)dst[b], N[b] * slice_bytes); //write the permuted data to the output file
  450 + pt_total += binary<T>::permute(dst[b], src[b], X(), N[b], Z(), 0, 2, 1); //permute the batch to a BIL file
  451 + //target.write((char*)dst[b], N[b] * slice_bytes); //write the permuted data to the output file
  452 + wt_total += writeblock(&target, dst[b], N[b] * slice_bytes); //write the permuted data to the output file
436 453 y_proc += N[b]; //increment the counter of processed pixels
437 454 if(PROGRESS) progress = (double)( y_proc + 1 ) / Y() * 100; //increment the progress counter based on the number of processed pixels
438 455 t_end = std::chrono::high_resolution_clock::now();
439 456 t_batch = std::chrono::duration_cast<std::chrono::milliseconds>(t_end-t_start).count();
440 457 t_total += t_batch;
441   - if(y_load < Y()) rthread.wait(); //if a new batch was set to load, make sure it loads after calculations
  458 + //if(y_load < Y()) rt_total += rthread.get(); //if a new batch was set to load, make sure it loads after calculations
442 459 }
443   -
444   - std::cout<<"Total time to execute: "<<t_total<<" ms"<<std::endl;
  460 +
445 461 free(src[0]); //free buffer resources
446 462 free(src[1]);
447 463 free(dst[0]);
448 464 free(dst[1]);
  465 + if(VERBOSE){
  466 + std::cout<<"total time to execute bsq::bil(): "<<t_total<<" ms"<<std::endl;
  467 + std::cout<<" total time spent processing: "<<pt_total<<" ms"<<std::endl;
  468 + std::cout<<" total time spent reading: "<<rt_total<<" ms"<<std::endl;
  469 + std::cout<<" total time spent writing: "<<wt_total<<" ms"<<std::endl;
  470 + }
449 471 return true; //return true
450 472 }
451 473  
452 474 /// Convert this BSQ file to a BIP
453   - bool bip(std::string outname, bool PROGRESS = false){
  475 + bool bip(std::string outname, bool PROGRESS = false, bool VERBOSE = false){
454 476  
455 477 const size_t buffers = 4; //number of buffers required for this algorithm
456 478 size_t mem_per_batch = binary<T>::buffer_size / buffers; //calculate the maximum memory available for a batch
... ... @@ -477,7 +499,7 @@ public:
477 499 //initialize with buffer 0 (used for double buffering)
478 500 size_t y_load = 0;
479 501 size_t y_proc = 0;
480   - std::future<void> rthread;
  502 + std::future<size_t> rthread;
481 503 std::future<std::ostream&> wthread; //create asynchronous threads for reading and writing
482 504  
483 505 readlines(src[0], 0, N[0]); //read the first batch into the 0 source buffer
... ... @@ -488,6 +510,8 @@ public:
488 510 std::chrono::high_resolution_clock::time_point t_end;
489 511 size_t t_batch; //number of milliseconds to process a batch
490 512 size_t t_total = 0;
  513 + size_t pt_total = 0;
  514 + size_t rt_total = 0;
491 515 while(y_proc < Y()){ //while there are still slices to be processed
492 516 t_start = std::chrono::high_resolution_clock::now(); //start the timer for this batch
493 517 if(y_load < Y()){ //if there are still slices to be loaded, load them
... ... @@ -499,17 +523,21 @@ public:
499 523  
500 524 b = !b; //swap the double-buffer
501 525  
502   - binary<T>::permute(dst[b], src[b], X(), N[b], Z(), 2, 0, 1); //permute the batch to a BIP file
  526 + pt_total += binary<T>::permute(dst[b], src[b], X(), N[b], Z(), 2, 0, 1); //permute the batch to a BIP file
503 527 target.write((char*)dst[b], N[b] * slice_bytes); //write the permuted data to the output file
504 528 y_proc += N[b]; //increment the counter of processed pixels
505 529 if(PROGRESS) progress = (double)( y_proc + 1 ) / Y() * 100; //increment the progress counter based on the number of processed pixels
506 530 t_end = std::chrono::high_resolution_clock::now();
507 531 t_batch = std::chrono::duration_cast<std::chrono::milliseconds>(t_end-t_start).count();
508 532 t_total += t_batch;
509   - if(y_load < Y()) rthread.wait(); //if a batch was threaded to load, make sure it finishes
  533 + if(y_load < Y()) rt_total += rthread.get(); //if a batch was threaded to load, make sure it finishes
510 534 }
511 535  
512   - std::cout<<"Total time to execute: "<<t_total<<" ms"<<std::endl;
  536 + if(VERBOSE){
  537 + std::cout<<"total time to execute bsq::bil(): "<<t_total<<" ms"<<std::endl;
  538 + std::cout<<" total time spent processing: "<<pt_total<<" ms"<<std::endl;
  539 + std::cout<<" total time spent reading: "<<rt_total<<" ms"<<std::endl;
  540 + }
513 541 free(src[0]); //free buffer resources
514 542 free(src[1]);
515 543 free(dst[0]);
... ...
stim/envi/envi.h
... ... @@ -509,7 +509,7 @@ public:
509 509  
510 510 /// @param outfile is the file name for the converted output
511 511 /// @param interleave is the interleave format for the destination file
512   - bool convert(std::string outfile, stim::envi_header::interleaveType interleave, bool PROGRESS = false){
  512 + bool convert(std::string outfile, stim::envi_header::interleaveType interleave, bool PROGRESS = false, bool VERBOSE = false){
513 513  
514 514 if(header.interleave == envi_header::BSQ){ //if the infile is bsq file
515 515  
... ... @@ -519,10 +519,10 @@ public:
519 519 exit(1);
520 520 }
521 521 else if(interleave == envi_header::BIL) //convert BSQ -> BIL
522   - ((bsq<float>*)file)->bil(outfile, PROGRESS);
  522 + ((bsq<float>*)file)->bil(outfile, PROGRESS, VERBOSE);
523 523 else if(interleave == envi_header::BIP){ //ERROR
524 524 //std::cout<<"ERROR: conversion from BSQ to BIP isn't practical; use BSQ->BIL->BIP instead"<<std::endl;
525   - ((bsq<float>*)file)->bip(outfile, PROGRESS);
  525 + ((bsq<float>*)file)->bip(outfile, PROGRESS, VERBOSE);
526 526 //exit(1);
527 527 }
528 528 }
... ...
stim/envi/hsi.h
... ... @@ -202,7 +202,7 @@ public:
202 202 }
203 203 }
204 204  
205   - void read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){
  205 + size_t read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){
206 206 size_t d[3]; //position in the binary coordinate system
207 207 size_t sd[3]; //size in the binary coordinate system
208 208  
... ... @@ -214,10 +214,7 @@ public:
214 214 sd[O[1]] = sy;
215 215 sd[O[2]] = sz;
216 216  
217   - if(!binary<T>::read(dest, d[0], d[1], d[2], sd[0], sd[1], sd[2])){
218   - std::cout<<"error reading block in stim::hsi: ("<<d[0]<<", "<<d[1]<<", "<<d[2]<<") - ["<<sd[0]<<", "<<sd[1]<<", "<<sd[2]<<"]"<<std::endl;
219   - exit(1);
220   - }
  217 + return binary<T>::read(dest, d[0], d[1], d[2], sd[0], sd[1], sd[2]);
221 218 }
222 219  
223 220 };
... ...