Source code for lognflow.logviewer

import pathlib
import numpy as np
from   matplotlib.pyplot import imread as mpl_imread
from   .utils            import replace_all, dummy_function, name_from_file

deprecated_msg = 'logviewer is deprecated and will be' + \
                 ' removed in a few revisions.please use lognflow only.' + \
                 ' for example you can write logger.get_single(name)' + \
                 ' When using lognflow for reading only, if you dont' + \
                 ' log anything, no new directory will be made.'

[docs] class logviewer: """ log viewer Since lognflow makes lots of files and folders, maybe it is nice to have a logviewer that loads those information. In this module we provide a set of functions for a logged object that can load variables, texts, file lists and etc.. Use it simply by:: from lognflow import logviewer logged = logviewer(log_dir = 'dir_contatining_files') var = logged.get_single('variable_name') """ def __init__(self, log_dir : pathlib.Path, logger = print, not_exist_ok = False): self.log_dir = pathlib.Path(log_dir) self.not_exist_ok = not_exist_ok if not not_exist_ok: print(deprecated_msg) self.assert_log_dir() self.logger = logger self.load = self.get_single self.log_dir_str = str(self.log_dir.absolute())
[docs] def assert_log_dir(self): if not self.log_dir.is_dir(): print('~'*60) print(f'lognflow.logviewer| No such directory: '+ str(self.log_dir)) if self.not_exist_ok: print(f'You probably initialized logviewer from lognflow with ' 'not_exist_ok. You have not logged anything in this directory.' 'You probably have entereed a wrong directory name ' 'for the logger.') print('~'*60) assert self.log_dir.is_dir()
[docs] def name_from_file(self, fpath): """ Given an fpath inside the logger log_dir, what would be its equivalent parameter_name? """ self.assert_log_dir() return name_from_file(self.log_dir_str, fpath)
[docs] def disable_logger(self): self.logger = dummy_function
[docs] def get_flist(self, var_name, suffix = None): """ get list of files return the list of files for a saved variable. Parameters ---------- :param var_name: variable name :param suffix: If there are different suffixes availble for a variable this input needs to be set. npy, npz, mat, and torch are supported. """ self.assert_log_dir() var_name = var_name.replace('\t', '\\t').replace('\n', '\\n')\ .replace('\r', '\\r').replace('\b', '\\b') flist = list((self.log_dir).glob(var_name)) if not flist: if suffix is None: if len(var_name.split('.')) > 1: suffix = var_name.split('.')[-1] name_before_suffix = var_name.split('.')[:-1] if((len(name_before_suffix) == 1) & (name_before_suffix[0] == '')): var_name = '*' else: var_name = ('.').join(var_name.split('.')[:-1]) else: suffix = '*' suffix = suffix.strip('.') flist = [] if((self.log_dir / var_name).is_file()): flist = [self.log_dir / var_name] elif((self.log_dir / f'{var_name}.{suffix}').is_file()): flist = [self.log_dir / f'{var_name}.{suffix}'] else: _var_name = (self.log_dir / var_name).name _var_dir = (self.log_dir / var_name).parent search_patt = f'{_var_name}.{suffix}' search_patt = replace_all(search_patt, '**', '*') flist = list(_var_dir.glob(search_patt)) if(flist): flist.sort() else: var_dir = self.log_dir / var_name if(var_dir.is_dir()): flist = list(var_dir.glob('*')) if(len(flist) > 0): flist.sort() return flist
[docs] def get_namelist(self, var_name, suffix = None): """ get logger names of files return the list of names for a saved variable. Parameters ---------- :param var_name: variable name :param suffix: If there are different suffixes availble for a variable this input needs to be set. npy, npz, mat, and torch are supported. """ self.assert_log_dir() nlist = self.get_flist(var_name, suffix) if nlist: nlist = [name_from_file(self.log_dir_str, fpath) for fpath in nlist] return nlist
[docs] def get_common_files(self, var_name_A, var_name_B, suffix = None, flist_A = None, flist_B = None): """ get common files in two directories It happens often in ML that there are two directories, A and B, and we are interested to get the flist in both that is common between them. returns a tuple of two lists of files. Parameters ---------- :param var_name_A: directory A name :param var_name_B: directory B name """ self.assert_log_dir() if not flist_A: flist_A = self.get_flist(var_name_A, suffix) if not flist_B: flist_B = self.get_flist(var_name_B, suffix) suffix_A = flist_A[0].suffix suffix_B = flist_B[0].suffix parent_A = flist_A[0].parent parent_B = flist_B[0].parent fstems_A = [_fst.stem for _fst in flist_A] fstems_B = [_fst.stem for _fst in flist_B] fstems_A_set = set(fstems_A) fstems_B_set = set(fstems_B) common_stems = list(fstems_A_set.intersection(fstems_B_set)) flist_A_new = [parent_A / (common_stem + suffix_A) \ for common_stem in common_stems] flist_B_new = [parent_B / (common_stem + suffix_B) \ for common_stem in common_stems] return(flist_A_new, flist_B_new)
[docs] def get_text(self, log_name='main_log', flist = None, suffix = 'txt', file_index = -1): """ get text log files Given the log_name, this function returns the text therein. Parameters ---------- :param log_name: the log name. If not given then it is the main log. :param flist: you can give a file list in Posix paths, for text files :param suffix: str to search for specifi files :param file_index: int or list[int] a number or a list of numbers for the index of the file to include, default: -1 """ self.assert_log_dir() if isinstance(file_index, int): file_index = [file_index] if not flist: flist = self.get_flist(log_name, suffix) n_files = len(flist) if (n_files>0): txt = [] for fcnt in file_index: with open(flist[int(fcnt)]) as f_txt: txt.append(f_txt.readlines()) if(n_files == 1): txt = txt[0] return txt
def _get_single(self, var_name, file_index = None, suffix = None, read_func = None, verbose = False): """ get a single variable return the value of a saved variable. Parameters ---------- :param var_name: variable name :param file_index: If there are many snapshots of a variable, this input can limit the returned to a set of indices. :param suffix: If there are different suffixes availble for a variable this input needs to be set. npy, npz, mat, and torch are supported. :param read_func: a function that takes the Posix path and returns data .. note:: when reading a MATLAB file, the output is a dictionary. Also when reading a npz except if it is made by log_var """ self.assert_log_dir() assert file_index == int(file_index), \ f'file_index {file_index} must be an integer' flist = self.get_flist(var_name, suffix) var_path = None if flist: if len(flist) == 1: var_path = flist[0] else: if file_index is not None: if verbose: self.logger( f'There are {len(flist)} files, logged with' + f' name {var_name}.' + f' The given index is {file_index}.') var_path = flist[file_index] else: self.logger('-'*60) self.logger( f'There are {len(flist)} files, logged with' + f' name {var_name} but the index is not given.') self.logger('-'*60) return None if(var_path.is_file()): if verbose: self.logger(f'Loading {var_path}') if read_func is not None: return (read_func(var_path), var_path) if(var_path.suffix == '.npz'): buf = np.load(var_path) try: #check if it is made by log_var assert len(buf.files) == 2 time_array = buf['time_array'] data_array = buf['data_array'] data_array = data_array[time_array > 0] time_array = time_array[time_array > 0] return((time_array, data_array), var_path) except: return(buf, var_path) if(var_path.suffix == '.npy'): return(np.load(var_path), var_path) if(var_path.suffix == '.mat'): from scipy.io import loadmat return(loadmat(var_path), var_path) if(var_path.suffix == '.dm4'): from hyperspy.api import load as hyperspy_api_load return (hyperspy_api_load(var_path).data, var_path) if((var_path.suffix == '.tif') | (var_path.suffix == '.tiff')): from tifffile import imread as tifffile_imread return(tifffile_imread(var_path), var_path) if(var_path.suffix == '.torch'): from torch import load as torch_load return(torch_load(var_path), var_path) try: img = mpl_imread(var_path) return(img, var_path) except: pass # if( (var_path.suffix in ['.txt', '.pdb', '.json', '.fasta'])): # return(var_path.read_text(), var_path) try: txt = var_path.read_text(errors = 'ignore') return(txt, var_path) except: var_path = None else: var_path = None if (var_path is None) & verbose: self.logger(f'Looking for {var_name} failed. ' + \ f'{var_path} is not in: {self.log_dir}') return None, None
[docs] def get_single(self, var_name, file_index = -1, suffix = None, read_func = None, verbose = False, return_fpath = False): """ get a single variable return the value of a saved variable. Parameters ---------- :param var_name: variable name :param file_index: If there are many snapshots of a variable, this input can limit the returned to a set of indices. :param suffix: If there are different suffixes availble for a variable this input needs to be set. npy, npz, mat, and torch are supported. :param read_func: a function that takes the Posix path and returns data .. note:: when reading a MATLAB file, the output is a dictionary. Also when reading a npz except if it is made by log_var """ self.assert_log_dir() get_single_data, fpath = self._get_single( var_name = var_name, file_index = file_index, suffix = suffix, read_func = read_func, verbose = verbose) if return_fpath: return get_single_data, fpath else: return get_single_data
[docs] def get_stack_from_files(self, var_name = None, flist = [], suffix = None, read_func = None): """ Get list or data of all files in a directory This function gives the list of paths of all files in a directory for a single variable. Parameters ---------- :param var_name: The directory or variable name to look for the files :type var_name: str :param flist: list of Paths, if data is returned, this flist input can limit the data requested to this list. :type flist: list :param suffix: the suffix of files to look for, e.g. 'txt' :type siffix: str :param read_func: the function that takes the posix path of a file and returns the data in there. Output ---------- It returns a list of data in all files or a numpy array if concatenation of all is possible. """ self.assert_log_dir() if len(flist) == 0: flist = self.get_flist(var_name, suffix) else: flist = list(flist) assert pathlib.Path(flist[0]).is_file(), \ f'File not found: {flist[0]}. You can use logviewer get_flist' if flist: n_files = len(flist) if(read_func is None): try: fdata = np.load(flist[0]) read_func = np.load except: pass if(read_func is None): try: fdata = mpl_imread(flist[0]) read_func = mpl_imread except: pass try: read_func(flist[0]) except Exception as e: self.logger(f'The data file {flist[0]} could not be opened.' 'Please provide a read_function in the input.') raise e dataset = [read_func(fpath) for fpath in flist] try: dataset_array = np.array(dataset) except: dataset_array = dataset return(dataset_array)
[docs] def get_stack_from_names(self, var_names = None, read_func = None, return_flist = False): self.assert_log_dir() try: var_names_str = str(var_names) except: pass else: var_names = [var_names] assert var_names == list(var_names), \ 'input should be a list of variable names' dataset = [] flist = [] for name in var_names: images_flist = self.get_flist(name) if images_flist: for file_index in range(len(images_flist)): data, fpath = self.get_single( name, file_index = file_index, read_func = read_func, return_fpath = True) if data is not None: dataset.append(data) flist.append(fpath) try: dataset = np.array(dataset) except: pass if return_flist: return dataset, flist else: return dataset
[docs] def replace_time_with_index(self, var_name, verbose = False): """ index in file var_names lognflow uses time stamps to make new log files for a variable. That is done by putting time stamp after the name of the variable. This function changes all of the time stamps, sorted ascendingly, by indices. :param var_name: variable name """ self.assert_log_dir() var_dir = self.log_dir / var_name if(var_dir.is_dir()): var_fname = None flist = list(var_dir.glob(f'*.*')) else: var_fname = var_dir.name var_dir = var_dir.parent flist = list(var_dir.glob(f'{var_fname}')) if (len(flist) == 0) & (not ('*' in var_fname)): self.logger( 'lognflow, replace_time_with_index:' +\ 'the given pattern has no * and no files were found') if flist: flist.sort() fcnt_width = len(str(len(flist))) for fcnt, fpath in enumerate(flist): if verbose: self.log_text(None, f'Changing {flist[fcnt].name}') fname_new = '' if(var_fname is not None): fname_new = var_fname + '_' fname_new += f'{fcnt:0{fcnt_width}d}' + flist[fcnt].suffix fpath_new = flist[fcnt].parent / fname_new if verbose: self.log_text(None, f'To {fpath_new.name}') flist[fcnt].rename(fpath_new)
def __repr__(self): return f'{self.log_dir}' def __bool__(self): return self.log_dir.is_dir()