Source code for lognflow.printprogress

from math import ceil
from time import time

[docs] def needs_s(number): return 's' if not ( (number == 1) or (number == -1) ) else ''
[docs] class printprogress: """ While there are packages that use \r to show a progress bar, there are cases e.g. a print_function or an ssh terminal that does not support \r. In such cases, if something is typed at the end of the line, it cannot be deleted. The following code provides a good looking progress bar with a simple title and limits and is very simple to use. Define the object with number of steps of the loop and then call it in every iteration of the loop. If you'd like it to go faster, call it with give number of steps that have passed. """ def __init__(self, int_or_iterable, numTicks = 78, title = None, desc = None, method = 'linear', print_function = print, ETA_only = False, **print_function_kwargs): """ int_or_iterable: int or iterable Number of iterations in the for loop or iterable n_steps = int(it) or n_steps = len(it) numTicks: int The number of charachters in a row of the screen - 2 default: 78 for old screens that have 80 coloumns title : str The title of progress bar. default: f'Progress bar for {n_steps} steps in {numTicks} ticks' desc : str it is the same as title used by tqdm print_function: print_function must be callable with a string and should not add \n to the its input. If you pass None as the print_function, nothing will be printed. yet the output of the __call__ will be the remaining time in seconds. method: options how to calculate the remaining time are 'linear': p x |-----------|--------------------------| ck n_steps As such x/(n-c) = p/c => x = p(n/c - 1) more options to come """ assert method in ['linear', 'linear_robust'] if desc is not None: if title is not None: assert desc == title, 'either give me title or desc' title = desc self.yielding_data = False try: n_steps = int(int_or_iterable) except: n_steps = len(int_or_iterable) self.iterable = int_or_iterable if n_steps > 0: self.yielding_data = True self.yielding_data_call_warning = False if(n_steps<2): print_function = None self.FLAG_first_tick = True self.print_function_kwargs = print_function_kwargs self.method = method self.print_function = print_function if (title is None): needs_s(n_steps) title = f'Progress for {n_steps} step{needs_s(n_steps)}' self.FLAG_ended = False self.FLAG_warning = False self.startTime = time() self.ck = 0 self.prog = 0 self.n_steps = n_steps if(numTicks < len(title) + 2 ): self.numTicks = len(title)+2 else: self.numTicks = numTicks self.ETA_only = ETA_only if not self.ETA_only: self._print_func(' ', end='') self._print_func('_'*self.numTicks, end='') self._print_func(' ') self._print_func('/', end='') self._print_func(' '*int((self.numTicks - len(title))/2), end='') self._print_func(title, end='') if not self.ETA_only: self._print_func(' '*int(ceil((self.numTicks-len(title))/2)-1), end='') self._print_func(' \\') self._print_func(' ', end = '') else: self._print_func('') self.len_prog_text = 0 self.remTimeS_perv = 0 self.average_filter_coeff = 0 def _print_func(self, text = '', end='\n'): if (self.print_function is not None): if (self.print_function == print): print(text, end = end, flush = True) else: self.print_function(text, end = end, **self.print_function_kwargs) def _calc_ETA(self): if(self.method == 'linear'): passedTime = time() - self.startTime if self.ck > 0: remTimeS = passedTime * ( self.n_steps / self.ck - 1) if(self.average_filter_coeff): if (self.remTimeS_perv > 5) & (remTimeS > 5): remTimeS = (1 - self.average_filter_coeff)*remTimeS + \ self.average_filter_coeff*self.remTimeS_perv self.remTimeS_perv = remTimeS else: remTimeS = 1e+7 return remTimeS def _make_progress(self, ck = 1): remTimeS = 0 if(self.FLAG_ended): if(not self.FLAG_warning): self.FLAG_warning = True if not self.ETA_only: self._print_func('-' * (self.numTicks + 2)) else: self.ck += ck if(self.ck <= self.n_steps): remTimeS = self._calc_ETA() # useful when print_function is None try: cProg = int(self.numTicks*self.ck/(self.n_steps-1)/3) except: cProg = int(self.numTicks/3) # 3 charachters are used while((self.prog < cProg) & (not self.FLAG_ended)): self.prog += 1 remTimeS = self._calc_ETA() if remTimeS < 86400*100: if(remTimeS>356400): # less than 99d and more than 99h progStr = "%02d" % int(ceil(remTimeS/86400)) if self.ETA_only: self._print_func('ETA: ', end='') self._print_func(progStr, end='') self._print_func('d', end='') if self.ETA_only: self._print_func() self.len_prog_text += 3 elif(remTimeS>5940): # less than 99h and more than 99m progStr = "%02d" % int(ceil(remTimeS/3600)) if self.ETA_only: self._print_func('ETA: ', end='') self._print_func(progStr, end='') self._print_func('h', end='') if self.ETA_only: self._print_func() self.len_prog_text += 3 elif(remTimeS>99): # less than 99m and more than 99s progStr = "%02d" % int(ceil(remTimeS/60)) if self.ETA_only: self._print_func('ETA: ', end='') self._print_func(progStr, end='') self._print_func('m', end='') if self.ETA_only: self._print_func() self.len_prog_text += 3 elif(remTimeS>=0): # less than 99s and more than 0 progStr = "%02d" % int(ceil(remTimeS)) if self.ETA_only: self._print_func('ETA: ', end='') self._print_func(progStr, end='') self._print_func('s', end='') if self.ETA_only: self._print_func() self.len_prog_text += 3 else: self._end() if((self.ck >= self.n_steps) | (self.len_prog_text >= self.numTicks)): self._end() return remTimeS def __call__(self, ck=1): """ ticking the progress bar just call the object and the progress bar moves ck steps ahead when ready. output ~~~~~~ :param ETA: the remaining time in seconds will be provided at the output """ if(self.yielding_data): if not self.yielding_data_call_warning: print('printprogress is used as an Iterator,' ' You can not call it. Perhaps you gave it a list,' ' rather than the length of the list as the first arg.') self.yielding_data_call_warning = True else: return self._make_progress(ck) # Supporting iterator type usage def __iter__(self): self.FLAG_iter_ended = False self.iter_ck = 0 return self def __next__(self): if(self.yielding_data): if not self.FLAG_iter_ended: if self.FLAG_first_tick: self._make_progress(0) self.FLAG_first_tick = False else: self._make_progress() if self.iter_ck == self.n_steps - 1: self.FLAG_iter_ended = True toret = self.iterable[self.iter_ck] self.iter_ck += 1 return toret else: self.FLAG_iter_ended = True self.FLAG_ended = True raise StopIteration else: if self.n_steps: print('printprogress is used as a Generator, e.g. in a for loop' ' but the first argument does not have length via __len__' ' perhaps you gave it a number or the length of a list as' ' the first argument. To use it in a for loop ' ' (as a generator), provide the list itself or a generator.' ' If you wish to give a number of steps, define the pbar' ' before the for loop via pbar = printprogress(n_steps)' ' then call the pbar() inside the for loop when a step is' ' finished.' ) self.FLAG_iter_ended = True self.FLAG_ended = True raise StopIteration def _end(self): if(not self.FLAG_ended): if (self.print_function is not None): self._print_func(f' --> {time() - self.startTime:.2f}s') else: self._print_func('') self.FLAG_ended = True def __del__(self): self._end()