diff --git a/stim/envi/binary.h b/stim/envi/binary.h index 2fd0a6b..2f0d7bb 100644 --- a/stim/envi/binary.h +++ b/stim/envi/binary.h @@ -17,6 +17,96 @@ namespace stim{ +/// This class calculates the optimal setting for independent parameter b (batch size) for +/// minimizing the dependent parameter bps (bytes per second) +class stream_optimizer{ +protected: + size_t Bps; //bytes per second for the previous batch + size_t interval_B; //number of bytes processed this interval + size_t interval_ms; //number of milliseconds spent in the current interval + size_t n[2]; //current batch size (in bytes) + size_t dn; //delta value (in bytes) for setting the batch size (minimum change in batch parameter) + size_t maxn; //maximum value for the batch size + + bool sample_step; //calculating the derivative (this class alternates between calculating dBps and B) + bool forward_diff; //evaluate the derivative using forward differences + + size_t window_ms; //size of the interval (in milliseconds) integrated to get a reliable bps value + + // This function rounds x to the nearest value within dB + size_t round_limit(size_t n0){ + if(n0 > maxn) n0 = maxn; //limit the returned size of x to within the specified bounds + if(n0 < dn) n0 = dn; + + size_t lowest = n0 / dn; + size_t highest = lowest + dn; + size_t diff[2] = {n0 - lowest, highest - n0}; //calculate the two differences + if(diff[0] < diff[1]) + return lowest; + return highest; + } + +public: + + //constructor initializes a stream optimizer + stream_optimizer(size_t current_batch_size, size_t min_batch_size, size_t max_batch_size, size_t window = 1000){ + Bps = 0; //initialize to zero bytes per second processed + interval_B = 0; //zero bytes have been processed at initialization + interval_ms = 0; //no time has been spent on the batch so far + dn = min_batch_size; //set the minimum batch size as the minimum change in batch size + maxn = max_batch_size; //set the maximum batch size + n[0] = current_batch_size; //set B + window_ms = window; //minimum integration interval (for getting a reliable bps measure) + sample_step = true; //the first step is to calculate the derivative + forward_diff = true; //start with the forward difference (since we start at the maximum batch size) + } + + // this function updates the optimizer, given the number of bytes processed in an interval and time spent processing + size_t update(size_t bytes_processed, size_t ms_spent){ + interval_B += bytes_processed; //increment the number of bytes processed + interval_ms += ms_spent; //increment the number of milliseconds spent processing + + //if we have sufficient information to evaluate the optimization function at this point + if(interval_ms >= window_ms){ //if sufficient time has passed to get a reliable Bps measurement + size_t new_Bps = interval_B / interval_ms; //calculate the current Bps + + if(sample_step){ //if this is a sample step, collect the information for Bps = f(n0) + Bps = new_Bps; //set the Bps to the evaluated value + n[1] = n[0] - dn; //reduce the batch size by one delta to take a second sample + if(n[1] == 0){ //if the resulting batch size is zero + n[1] = 2*dn; //we're at the left edge: set the new sample point to 2*dn + } + + interval_B = interval_ms = 0; //start a new interval at the new sample point + sample_step = false; //next step will calculate the new batch size via optimization + return n[1]; //return the new batch size + } + else{ //if we have sufficient information to evaluate the derivative and optimize + double f = (double)new_Bps; //we have evaluated the function at this location + double fprime; + if(n[1] < n[0] ){ //if the new point is less than the previous point (usually the case) + fprime = (double)(Bps - new_Bps) / (double)dn; //calculate the forward difference + } + else{ //if the new point is larger (only happens at the minimum limit) + fprime = (double)(new_Bps - Bps) / (double)dn; //calculate the backward difference + } + size_t bestn = n[1] - (size_t)(f / fprime); //calculate the best value for B using Newton's method + n[0] = round_limit( (size_t)bestn ); //set the new dependent point + sample_step = true; //the next step will be a sample step + } + + } + if(sample_step) return n[0]; + return n[1]; //insufficient information, keep the same batch size + } + + size_t update(size_t bytes_processed, size_t ms_spent, size_t& data_rate){ + size_t time = update(bytes_processed, ms_spent); + data_rate = Bps; + return time; + } +}; + /** This class manages the streaming of large multidimensional binary files. * Generally these are hyperspectral files with 2 spatial and 1 spectral dimension. However, this class supports * other dimensions via the template parameter D. @@ -404,8 +494,8 @@ public: } /// Reads a block specified by an (x, y, z) position and size using the largest possible contiguous reads - bool read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){ - + size_t read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){ + auto t0 = std::chrono::high_resolution_clock::now(); size_t size_bytes = sx * sy * sz * sizeof(T); //size of the block to read in bytes size_t start = z * R[0] * R[1] + y * R[0] + x; //calculate the start postion @@ -415,10 +505,8 @@ public: if(sx == R[0] && sy == R[1]){ //if sx and sy result in a contiguous volume along z file.read((char*)dest, size_bytes); //read the block in one pass - return true; } - - if(sx == R[0]){ //if sx is contiguous, read each z-axis slice can be read in one pass + else if(sx == R[0]){ //if sx is contiguous, read each z-axis slice can be read in one pass size_t jump_bytes = (R[1] - sy) * R[0] * sizeof(T); //jump between each slice size_t slice_bytes = sx * sy * sizeof(T); //size of the slice to be read for(size_t zi = 0; zi < sz; zi++){ //for each z-axis slice @@ -426,29 +514,31 @@ public: dest += sx * sy; //move the destination pointer to the next slice file.seekg(jump_bytes, std::ios::cur); //skip to the next slice in the file } - return true; } - - //in this case, x is not contiguous so the volume must be read line-by-line - size_t jump_x_bytes = (R[0] - sx) * sizeof(T); //number of bytes skipped in the x direction - size_t jump_y_bytes = (R[1] - sy) * R[0] * sizeof(T) + jump_x_bytes; //number of bytes skipped between slices - size_t line_bytes = sx * sizeof(T); //size of the line to be read - size_t zi, yi; - for(zi = 0; zi < sz; zi++){ //for each slice - file.read((char*)dest, line_bytes); //read the first line - for(yi = 1; yi < sy; yi++){ //read each additional line - dest += sx; //move the pointer in the destination block to the next line - file.seekg(jump_x_bytes, std::ios::cur); //skip to the next line in the file - file.read((char*)dest, line_bytes); //read the line to the destination block + else{ + //in this case, x is not contiguous so the volume must be read line-by-line + size_t jump_x_bytes = (R[0] - sx) * sizeof(T); //number of bytes skipped in the x direction + size_t jump_y_bytes = (R[1] - sy) * R[0] * sizeof(T) + jump_x_bytes; //number of bytes skipped between slices + size_t line_bytes = sx * sizeof(T); //size of the line to be read + size_t zi, yi; + for(zi = 0; zi < sz; zi++){ //for each slice + file.read((char*)dest, line_bytes); //read the first line + for(yi = 1; yi < sy; yi++){ //read each additional line + dest += sx; //move the pointer in the destination block to the next line + file.seekg(jump_x_bytes, std::ios::cur); //skip to the next line in the file + file.read((char*)dest, line_bytes); //read the line to the destination block + } + file.seekg(jump_y_bytes, std::ios::cur); //skip to the beginning of the next slice } - file.seekg(jump_y_bytes, std::ios::cur); //skip to the beginning of the next slice } - return false; + auto t1 = std::chrono::high_resolution_clock::now(); + return std::chrono::duration_cast(t1-t0).count(); } // permutes a block of data from the current interleave to the interleave specified (re-arranged dimensions to the order specified by [d0, d1, d2]) - void permute(T* dest, T* src, size_t sx, size_t sy, size_t sz, size_t d0, size_t d1, size_t d2){ + size_t permute(T* dest, T* src, size_t sx, size_t sy, size_t sz, size_t d0, size_t d1, size_t d2){ + auto t0 = std::chrono::high_resolution_clock::now(); size_t d[3] = {d0, d1, d2}; size_t s[3] = {sx, sy, sz}; size_t p[3];// = {x, y, z}; @@ -467,7 +557,6 @@ public: p[1] = y; src_idx = z * sx * sy + y * sx; dest_idx = p[d[2]] * s[d[0]] * s[d[1]] + p[d[1]] * s[d[0]]; - //std::cout<(t1-t0).count(); } }; diff --git a/stim/envi/bsq.h b/stim/envi/bsq.h index c5b6d36..983bb0f 100644 --- a/stim/envi/bsq.h +++ b/stim/envi/bsq.h @@ -377,12 +377,19 @@ public: } - void readlines(T* dest, size_t start, size_t n){ - hsi::read(dest, 0, start, 0, X(), n, Z()); + size_t readlines(T* dest, size_t start, size_t n){ + return hsi::read(dest, 0, start, 0, X(), n, Z()); + } + + size_t writeblock(std::ofstream* f, T* src, size_t n){ + auto t0 = std::chrono::high_resolution_clock::now(); + f->write((char*)src, n); + auto t1 = std::chrono::high_resolution_clock::now(); + return std::chrono::duration_cast(t1-t0).count(); } /// Convert this BSQ file to a BIL - bool bil(std::string outname, bool PROGRESS = false){ + bool bil(std::string outname, bool PROGRESS = false, bool VERBOSE = false){ const size_t buffers = 4; //number of buffers required for this algorithm size_t mem_per_batch = binary::buffer_size / buffers; //calculate the maximum memory available for a batch @@ -395,6 +402,8 @@ public: } size_t max_batch_bytes = max_slices_per_batch * slice_bytes; //calculate the amount of memory that will be allocated for all four buffers + stream_optimizer O(max_slices_per_batch, 1, max_slices_per_batch); + T* src[2]; //source double-buffer for asynchronous batching src[0] = (T*) malloc(max_batch_bytes); src[1] = (T*) malloc(max_batch_bytes); @@ -409,48 +418,61 @@ public: //initialize with buffer 0 (used for double buffering) size_t y_load = 0; size_t y_proc = 0; - std::future rthread; + std::future rthread; std::future wthread; //create asynchronous threads for reading and writing readlines(src[0], 0, N[0]); //read the first batch into the 0 source buffer y_load += N[0]; //increment the loaded slice counter int b = 1; - std::chrono::high_resolution_clock::time_point t_start; //high-resolution timers - std::chrono::high_resolution_clock::time_point t_end; + std::chrono::high_resolution_clock::time_point t_start, pt_start; //high-resolution timers + std::chrono::high_resolution_clock::time_point t_end, pt_end; size_t t_batch; //number of milliseconds to process a batch - size_t t_total = 0; + size_t t_total = 0; //total time for operation + size_t pt_total = 0; //total time spent processing data + size_t rt_total = 0; //total time spent reading data + size_t wt_total = 0; + size_t data_rate; while(y_proc < Y()){ //while there are still slices to be processed t_start = std::chrono::high_resolution_clock::now(); //start the timer for this batch if(y_load < Y()){ //if there are still slices to be loaded, load them + if(y_proc > 0){ + N[b] = O.update(N[!b] * slice_bytes, t_batch, data_rate); //set the batch size based on optimization + std::cout<<"New N = "< Y()) N[b] = Y() - y_load; //if the next batch would process more than the total slices, adjust the batch size rthread = std::async(std::launch::async, &stim::bsq::readlines, this, src[b], y_load, N[b]); - //rthread.wait(); + rt_total += rthread.get(); y_load += N[b]; //increment the number of loaded slices } b = !b; //swap the double-buffer - - binary::permute(dst[b], src[b], X(), N[b], Z(), 0, 2, 1); //permute the batch to a BIL file - target.write((char*)dst[b], N[b] * slice_bytes); //write the permuted data to the output file + pt_total += binary::permute(dst[b], src[b], X(), N[b], Z(), 0, 2, 1); //permute the batch to a BIL file + //target.write((char*)dst[b], N[b] * slice_bytes); //write the permuted data to the output file + wt_total += writeblock(&target, dst[b], N[b] * slice_bytes); //write the permuted data to the output file y_proc += N[b]; //increment the counter of processed pixels if(PROGRESS) progress = (double)( y_proc + 1 ) / Y() * 100; //increment the progress counter based on the number of processed pixels t_end = std::chrono::high_resolution_clock::now(); t_batch = std::chrono::duration_cast(t_end-t_start).count(); t_total += t_batch; - if(y_load < Y()) rthread.wait(); //if a new batch was set to load, make sure it loads after calculations + //if(y_load < Y()) rt_total += rthread.get(); //if a new batch was set to load, make sure it loads after calculations } - - std::cout<<"Total time to execute: "<::buffer_size / buffers; //calculate the maximum memory available for a batch @@ -477,7 +499,7 @@ public: //initialize with buffer 0 (used for double buffering) size_t y_load = 0; size_t y_proc = 0; - std::future rthread; + std::future rthread; std::future wthread; //create asynchronous threads for reading and writing readlines(src[0], 0, N[0]); //read the first batch into the 0 source buffer @@ -488,6 +510,8 @@ public: std::chrono::high_resolution_clock::time_point t_end; size_t t_batch; //number of milliseconds to process a batch size_t t_total = 0; + size_t pt_total = 0; + size_t rt_total = 0; while(y_proc < Y()){ //while there are still slices to be processed t_start = std::chrono::high_resolution_clock::now(); //start the timer for this batch if(y_load < Y()){ //if there are still slices to be loaded, load them @@ -499,17 +523,21 @@ public: b = !b; //swap the double-buffer - binary::permute(dst[b], src[b], X(), N[b], Z(), 2, 0, 1); //permute the batch to a BIP file + pt_total += binary::permute(dst[b], src[b], X(), N[b], Z(), 2, 0, 1); //permute the batch to a BIP file target.write((char*)dst[b], N[b] * slice_bytes); //write the permuted data to the output file y_proc += N[b]; //increment the counter of processed pixels if(PROGRESS) progress = (double)( y_proc + 1 ) / Y() * 100; //increment the progress counter based on the number of processed pixels t_end = std::chrono::high_resolution_clock::now(); t_batch = std::chrono::duration_cast(t_end-t_start).count(); t_total += t_batch; - if(y_load < Y()) rthread.wait(); //if a batch was threaded to load, make sure it finishes + if(y_load < Y()) rt_total += rthread.get(); //if a batch was threaded to load, make sure it finishes } - std::cout<<"Total time to execute: "< BIL - ((bsq*)file)->bil(outfile, PROGRESS); + ((bsq*)file)->bil(outfile, PROGRESS, VERBOSE); else if(interleave == envi_header::BIP){ //ERROR //std::cout<<"ERROR: conversion from BSQ to BIP isn't practical; use BSQ->BIL->BIP instead"<*)file)->bip(outfile, PROGRESS); + ((bsq*)file)->bip(outfile, PROGRESS, VERBOSE); //exit(1); } } diff --git a/stim/envi/hsi.h b/stim/envi/hsi.h index 2226dbc..c9f3bea 100644 --- a/stim/envi/hsi.h +++ b/stim/envi/hsi.h @@ -202,7 +202,7 @@ public: } } - void read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){ + size_t read(T* dest, size_t x, size_t y, size_t z, size_t sx, size_t sy, size_t sz){ size_t d[3]; //position in the binary coordinate system size_t sd[3]; //size in the binary coordinate system @@ -214,10 +214,7 @@ public: sd[O[1]] = sy; sd[O[2]] = sz; - if(!binary::read(dest, d[0], d[1], d[2], sd[0], sd[1], sd[2])){ - std::cout<<"error reading block in stim::hsi: ("<::read(dest, d[0], d[1], d[2], sd[0], sd[1], sd[2]); } }; -- libgit2 0.21.4