""" lognflow
lognflow makes logging easy in Python. It is so simple you can code it
yourself, so, why would you?!
lognflow logs all files into a directory by taking care of directories and
files names. This saves you a lot of coding and makes your code readable
when you say::
logger = lognflow(logs_root = 'root_for_time_tagged_log_directories')
logger.log_single('variables/variable1', variable1)
logger('I just logged a variable.')
another_logger = lognflow(log_dir = 'specific_dir')
another_logger.log_plot('final_plot', final_plot_is_a_np_1d_array)
The next syntax is an easy way of just logging a numpy array. It will make
a new directory within the log_dir, called variables and make a npy file
named variable1 and put variable1 in it. The third line of the code above
prints the given text to the __call__ routine in the main txt file made in
the log_dir.
As you can see, first you give it a root (logs_root) to make
a log directory in it or give it the directory itself (log_dir).
Then start dumping data by giving the variable name and the data with
the type and you are set.
Multiple processes in parallel can make as many instances as they want.
There is an option to keep the logged variables in memory for a long time and
then dump them when they reach a ceratin size. This reduces the network load.
for this the txt logs can be buffered for a chosable amount of time and
numpy variables that don't change size can be buffered up to a certain size
before storing into the directory using log_var(name, var).
"""
import time
import atexit
import inspect
import copy
from pathlib import Path as pathlib_Path
from sys import platform as sys_platform
from sys import argv as sys_argv
from os import system as os_system
from tempfile import gettempdir
from dataclasses import dataclass
from typing import Union
import numpy as np
import matplotlib.pyplot as plt
from .utils import (repr_raw,
replace_all,
select_directory,
name_from_file,
text_to_collection,
printv,
deepstr,
prepare_for_np_savez)
[docs]
@dataclass
class varinlog:
data_array : np.ndarray
time_array : np.ndarray
curr_index : int
file_start_time : float
suffix : str
log_counter_limit : int
savefig : bool
plot_start_ago : float
plot_win_length : float
time_tag : bool
[docs]
@dataclass
class textinlog:
to_be_logged : str
log_fpath : pathlib_Path
log_size_limit : int
log_size : int
last_log_flush_time : float
log_flush_period : int
save_configs_script = """\
import numpy as np
import torch
# ---------------- CONFIG VARIABLES ----------------
{assignments}
# ---------------- CONFIG ACCESSOR ----------------
CFG = dict(
{cfg_dict}
)
"""
[docs]
class getLogger:
"""Initialization
lognflow.getLogger creates a directory called log_dir and puts all
logs in there.
Where?
1: if logs_root is given, it makes a log_dir in it with a time_stamp.
2: if log_dir is given, it uses it directly.
3: If you type::
logger = lognflow()
it will try to open a dialog to select a directory, if error occurs,
it will get a temp directory from the os and continues.
The lognflow allows setting global settings that can be overridden
later by calling each of its methods as follows.
:param logs_root:
This is the root directory for all logs.
We will use the time.time() to create a log directory for each
instance of the lognflow.
:type logs_root: pathlib.Path
:param log_dir:
This is the final directory path for the log files.
:type log_dir: pathlib.Path
:param log_prefix:
this string will be put before the time tag for log_dir, when
only logs_root is given.
:type log_prefix: str
:param log_suffix:
if given, time tag will not be used and this string will be
put at the end of the log_dir name.
:type log_prefix: str
:param exist_ok:
if False, if any logging directory exists, it raises an error.
:param print_text:
If True, everything that is logged as text will be printed as well
:type print_text: bool
:param main_log_name:
main log file name, by default: 'main_log'
:type main_log_name: str
:param log_flush_period:
The period between flushing the log files into HDD. By not
flushing, you can reduce network or HDD overhead.
:type log_flush_period: int
:param time_tag:
File names can carry time_tags in time.time() format or indices. This
is pretty much the most fundamental contribution of lognflow beside
carrying the folders and files paths around. By default it is True and
all file names will not have time tags if you set it to False. so,
you can give time_tag argument for all logger functions, whose
default is this default. It can also be a string: options
are 'index', 'time_and_index' and 'keep_initial'.
If you use 'index', instead of time stamps, it will simply put an
index that counts up after each logging.
If you use 'time_and_index', it will use both time_tag and an index.
If you use 'keep_initial', it will save the first instance with suffix
'_initial' and then it will make a new one for the next instance and
update that file onwards.
:type time_tag: bool
"""
def __init__(self,
logs_root : pathlib_Path = None,
log_dir : pathlib_Path = None,
log_dir_prefix : str = None,
log_dir_suffix : str = None,
exist_ok : bool = True,
time_tag : Union[bool, str] = True,
print_text : bool = True,
main_log_name : str = 'log',
log_flush_period : int = 10,
enabled : bool = True):
atexit.register(self.flush_all)
self._init_time = time.time()
self.log_dir_prefix = log_dir_prefix
self.log_dir_suffix = log_dir_suffix
self.exist_ok = exist_ok
self.time_tag = time_tag
frame = inspect.currentframe()
args, _, _, values = inspect.getargvalues(frame)
if 'time_tag' in values and values['time_tag'] is not None:
self._time_tag_provided = True
else:
self._time_tag_provided = False
if(log_dir is None):
if(logs_root is None):
logs_root = gettempdir()
try:
logs_root = select_directory(logs_root)
except:
print('no logs_root was provided.'
+ 'Could not open select_folder'
+ f'So a folder from tmp is chosen: {logs_root}')
new_log_dir_found = False
while(not new_log_dir_found):
log_dir_name = ''
if(log_dir_prefix is not None):
log_dir_name = str(log_dir_prefix)
if len(log_dir_name) > 0:
if log_dir_name[-1] != '_':
log_dir_name += '_'
if(log_dir_suffix is None):
log_dir_name_ = log_dir_name + f'{int(self._init_time)}'
merging_log_dir = pathlib_Path(logs_root) / log_dir_name_
decimal_places = 0
while merging_log_dir.is_dir():
decimal_places += 1
if decimal_places > 3:
decimal_places = 3
self._init_time = time.time()
log_dir_name_ = log_dir_name + f'{self._init_time:.{decimal_places}f}'
merging_log_dir = pathlib_Path(logs_root) / log_dir_name_
else:
log_dir_name += f'{log_dir_suffix}'
merging_log_dir = pathlib_Path(logs_root) / log_dir_name
self.log_dir = merging_log_dir
if(not self.log_dir.is_dir()):
new_log_dir_found = True
else:
self._init_time = time.time()
self.logs_root = logs_root
self.log_dir_provided = False
else:
self.log_dir_provided = True
self.log_dir = pathlib_Path(log_dir)
self.logged = self
self._print_text = print_text
self._loggers_dict = {}
self._vars_dict = {}
self._single_var_call_cnt = 0
self.log_name = main_log_name
self.log_flush_period = log_flush_period
self.log_dir_str = str(self.log_dir.absolute())
self.enabled = enabled
self.counted_vars = {}
self.param_name_set = set()
self.close = self.flush_all
self.warning_log_dir = False
[docs]
def setLevel(self, level = 'info.txt'):
self.log_name = level
[docs]
def assert_log_dir(self):
if (not self.log_dir.is_dir()) & (not self.warning_log_dir):
self.warning_log_dir = True
if self.log_dir_provided:
print(f'lognflow.logdir: No such directory: ')
print({self.log_dir})
elif self.logs_root.is_dir():
self.log_dir = self.logs_root
print('lognflow Warning: You read from the provided logs_root:')
print(self.logs_root)
print('to read from a log, use log_dir as the input argument:')
print(f'logger = lognflow(log_dir = {self.log_dir}')
print('I will assume that this logs_root is log_dir from now on')
else:
print('Provide log_dir when initializing lognflow '
'if you wish to read the stored data first, e.g. as follows:')
print(f'logger = lognflow(log_dir = LOG_DIR_PATH)')
return False
[docs]
def disable(self):
self.enabled = False
[docs]
def enable(self):
self.enabled = True
[docs]
def log_code(self, code_fpath = None):
""" log code, pass __file__
"""
if code_fpath is None:
code_fpath = sys_argv[0]
code_fpath = pathlib_Path(code_fpath)
self.copy(code_fpath.name, code_fpath)
[docs]
def name_from_file(self, fpath):
"""
Given an fpath inside the logger log_dir,
what would be its equivalent parameter_name?
"""
return name_from_file(self.log_dir, fpath)
[docs]
def file_from_name(self, parameter_name):
""" file from name
given a parameter_name, it returns log_dir / parameter_name
"""
return self.log_dir / parameter_name
[docs]
def copy(self, parameter_name = None, source = None, suffix = None,
time_tag = False):
""" copy into a new file
Given a parameter_name, the second argument will be copied into
the first. We will try syntaxes os_system('cp') and 'copy' for
Windows.
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param source: str
if source.is_file() then it is copied into its new location.
Otherwise, we use logger.get_flist(source, suffix) to
obtain a list of files matching the source and copy them into
their new location.
"""
if not self.enabled: return
arg_err_msg = 'when using copy, the first argument is the final name '\
' after copy is finished. The second argument is ' \
' the absolute path of source file, str(fpath.absolute())'
if parameter_name is None:
parameter_name = ''
else:
assert parameter_name == str(parameter_name), arg_err_msg
flist = []
try:
source_as_fpath = pathlib_Path(source)
if source_as_fpath.is_dir():
fpath_dest = self.log_dir / (parameter_name + source_as_fpath.stem)
if sys_platform in ["linux", "linux2", "darwin"]:
os_system(f'cp -r {source_as_fpath} {fpath_dest}')
elif sys_platform == "win32":
os_system(f'xcopy {source_as_fpath} {fpath_dest} /E /I /Y')
return fpath_dest
elif source_as_fpath.is_file():
flist = [source_as_fpath]
else:
raise ValueError
except:
try:
flist = self.get_flist(source, suffix)
except Exception as e:
print(str(e))
assert flist, \
'source could not be found to copy. \n' + arg_err_msg
for fpath in flist:
if len(parameter_name) == 0:
parameter_name = fpath.name
param_dir, param_name, suffix = self._param_dir_name_suffix(
parameter_name, suffix)
if len(param_name) == 0:
new_param_name = fpath.stem
else:
new_param_name = param_name
if suffix is None:
suffix = fpath.suffix
fpath_dest = self._get_fpath(
param_dir, new_param_name, suffix, time_tag)
if sys_platform in ["linux", "linux2", "darwin"]:
os_system(f'cp {fpath} {fpath_dest}')
elif sys_platform == "win32":
os_system(f'copy {fpath} {fpath_dest}')
return fpath_dest
@property
def time_stamp(self):
""" Current time stamp
Gives the time after the start of the lognflow
"""
return time.time() - self._init_time
[docs]
def rename(self, new_name:str, append: bool = False):
""" renaming the log directory
It is possible to rename the log directory while logging is going
on. This is particulary useful when at the end of an experiment,
it is necessary to put some variables in the name of the directory,
which is very realistic in the eyes of an experimentalist.
There is only one input and that is the new name of the directory.
:param new_name: The new name of the directory (without parent path)
:type new_name: str
:param append: keep the time tag for the folder and
append it to the right side of the new name. Default: False.
:type append: bool
"""
if not self.enabled: return
self.flush_all()
if(append):
log_dir_name = ''
if(self.log_dir_prefix is not None):
log_dir_name = str(self.log_dir_prefix)
if len(log_dir_name) > 0:
if log_dir_name[-1] != '_':
log_dir_name += '_'
if(self.log_dir_suffix is None):
log_dir_name_with_suffix = log_dir_name + f'{self._init_time}'
else:
log_dir_name_with_suffix = \
log_dir_name + f'{self.log_dir_suffix}'
if self.log_dir.name == log_dir_name_with_suffix:
log_dir_name += new_name
if log_dir_name[-1] != '_':
log_dir_name += '_'
if(self.log_dir_suffix is None):
log_dir_name += f'{self._init_time}'
else:
log_dir_name += f'{self.log_dir_suffix}'
else:
log_dir_name = self.log_dir.name + '_' + new_name
else:
log_dir_name = new_name
new_dir = self.log_dir.parent / log_dir_name
try:
self.log_dir = self.log_dir.rename(new_dir)
for log_name in list(self._loggers_dict):
curr_textinlog = self._loggers_dict[log_name]
curr_textinlog.log_fpath = \
self.log_dir /curr_textinlog.log_fpath.name
except:
self.text(None, 'Could not rename the log_dir from:')
self.text(None, f'{self.log_dir.name}')
self.text(None, 'into:')
self.text(None, f'{new_name}')
self.text(None, 'Most probably a file was open.')
return self.log_dir
def _param_dir_name_suffix(self, parameter_name: str, suffix: str = None):
assert isinstance(parameter_name, str), \
f'lognflow: The parameter name for this log is not a string.' \
+ f' It is of type {type(parameter_name)}.' \
+ ' Perhaps you forgot to pass the name of the variable first.' \
+ f' Here is its content: {parameter_name}'
parameter_name = ''.join(
[_ for _ in repr(repr_raw(parameter_name)) if _ != '\''])
parameter_name = replace_all(parameter_name, ' ', '_')
parameter_name = replace_all(parameter_name, '\\', '/')
parameter_name = replace_all(parameter_name, '//', '/')
if(parameter_name[-1] == '/'):
param_name = ''
param_dir = parameter_name
else:
parameter_name_split = parameter_name.split('/')
if len(parameter_name_split) == 1:
param_name = parameter_name
param_dir = ''
else:
param_name = parameter_name_split[-1]
param_dir = '/'.join(parameter_name_split[:-1])
if(suffix == 'mat'):
if(len(param_name) == 0):
param_dir_split = param_dir.split('/')
if param_dir_split[-1] == '/':
param_name = param_dir_split[-2]
else:
param_name = param_dir_split[-1]
if(suffix is None):
param_name_split = param_name.split('.')
if len(param_name_split) > 1:
param_suffix = param_name_split[-1]
#Here you can check if it is a valid extention
param_name = '.'.join(param_name_split[:-1])
else:
param_suffix = None
else:
param_suffix = suffix
param_name_split = param_name.split('.')
if len(param_name_split) > 1:
fname_suffix = param_name_split[-1]
if fname_suffix == param_suffix:
param_name = '.'.join(param_name_split[:-1])
return(param_dir, param_name, param_suffix)
def _get_fpath(self, param_dir: pathlib_Path, param_name: str = None,
suffix: str = None, time_tag: bool = None) -> pathlib_Path:
if time_tag is None:
if self._time_tag_provided:
time_tag = self.time_tag
elif param_name in self.param_name_set:
time_tag = True
assert isinstance(time_tag, (bool, str)), \
'Argument time_tag must be a boolean or a string.'
if time_tag == True:
index_tag = False
keep_initial_tag = False
elif time_tag == False:
index_tag = False
keep_initial_tag = False
elif (time_tag.lower() == 'keep_initial'):
time_tag = False
index_tag = False
keep_initial_tag = True
elif (time_tag.lower() == 'index'):
time_tag = False
index_tag = True
keep_initial_tag = False
elif (time_tag.lower() == 'time_and_index'):
time_tag = True
index_tag = True
keep_initial_tag = False
_param_dir = self.log_dir / param_dir
if(not _param_dir.is_dir()):
_param_dir.mkdir(parents = True, exist_ok = self.exist_ok)
if param_name is None:
return _param_dir
else:
self.param_name_set.add(param_name)
if keep_initial_tag:
var_fullname = param_dir + '/' + param_name
if self.counted_vars.get(var_fullname, 0) == 0:
index_tag_str = 'initial'
self.counted_vars[var_fullname] = 1
if(len(param_name) > 0):
param_name += '_' + index_tag_str
else:
param_name = index_tag_str
elif len(param_name) == 0:
param_name = 'latest'
if index_tag:
var_fullname = param_dir + '/' + param_name
self.counted_vars[var_fullname] = self.counted_vars.get(
var_fullname, 0) + 1
index_tag_str = str(self.counted_vars[var_fullname])
if(len(param_name) > 0):
param_name += '_' + index_tag_str
else:
param_name = index_tag_str
if time_tag:
param_name_before_time_stamp = copy.copy(param_name)
time_stamp_str = f'{int(self.time_stamp)}'
if(len(param_name) > 0):
param_name += '_' + time_stamp_str
else:
param_name += time_stamp_str
if(suffix is None):
fpath = _param_dir / param_name
else:
while suffix[0] == '.':
suffix = suffix[1:]
fpath = _param_dir / (param_name + '.' + suffix)
if time_tag:
while fpath.is_file():
param_name = copy.copy(param_name_before_time_stamp)
time_stamp_str = f'{self.time_stamp:.3f}'
if(len(param_name) > 0):
param_name += '_' + time_stamp_str
else:
param_name += time_stamp_str
if suffix is None:
fpath = _param_dir / param_name
else:
while suffix[0] == '.': suffix = suffix[1:]
fpath = _param_dir / (param_name + '.' + suffix)
return fpath
def _get_dirnamesuffix(self, param_dir, param_name, suffix):
log_dirnamesuffix = param_name
if(len(param_dir) > 0):
log_dirnamesuffix = param_dir + '/' + log_dirnamesuffix
if(len(suffix) > 0):
log_dirnamesuffix = log_dirnamesuffix + '.' + suffix
return log_dirnamesuffix
def _text_handler(self, log_name: str,
log_size_limit: int = int(1e+7),
time_tag: bool = None,
log_flush_period = None,
suffix = None):
if (log_flush_period is None):
log_flush_period = self.log_flush_period
param_dir, param_name, suffix = self._param_dir_name_suffix(
log_name, suffix)
if suffix is None:
suffix = 'txt'
log_dirnamesuffix = self._get_dirnamesuffix(
param_dir, param_name, suffix)
fpath = self._get_fpath(param_dir, param_name, suffix, time_tag)
self._loggers_dict[log_dirnamesuffix] = textinlog(
to_be_logged=[],
log_fpath=fpath,
log_size_limit=log_size_limit,
log_size=0,
last_log_flush_time=0,
log_flush_period=log_flush_period)
[docs]
def text_flush(self, log_name = None, flush = False, suffix = None):
""" Flush the text logs
Writing text to open(file, 'a') does not constantly happen on HDD.
There is an OS buffer in between. This funciton should be called
regularly. lognflow calls it once in a while when text is
called multiple times. but use needs to also call it once in a
while.
In later versions, a timer will be used to call it automatically.
:param flush:
force the flush regardless of when the last time was.
default: False
:type flush: bool
"""
if not self.enabled: return
log_name = self.log_name if (log_name is None) else log_name
param_dir, param_name, suffix = self._param_dir_name_suffix(
log_name, suffix)
if suffix is None:
suffix = 'txt'
log_dirnamesuffix = self._get_dirnamesuffix(
param_dir, param_name, suffix)
curr_textinlog = self._loggers_dict[log_dirnamesuffix]
if((self.time_stamp - curr_textinlog.last_log_flush_time \
> curr_textinlog.log_flush_period)
| flush):
with open(curr_textinlog.log_fpath, 'a+') as f:
f.writelines(curr_textinlog.to_be_logged)
f.flush()
curr_textinlog.to_be_logged = []
curr_textinlog.last_log_flush_time = self.time_stamp
[docs]
def text(self,
log_name: str = None,
to_be_logged = '',
log_time_stamp = True,
print_text = None,
log_size_limit: int = int(1e+7),
time_tag: bool = None,
log_flush_period: int = None,
flush = False,
end = '\n',
new_file = False,
suffix = None):
""" log a string into a text file
You can shose a name for the log and give the text to put in it.
Also you can pass a small numpy array. You can ask it to put time
stamp in the log and in the log file name, you can disable
printing the text. You can set the log size limit to split it into
another file with a new time stamp.
:param log_name: str
examples: mylog or myscript/mylog
log_name can be just a name e.g. mylog, or could be a
pathlike name such as myscript/mylog.
:param to_be_logged: str, nd.array, list, dict
the string to be logged, could be a list
or numpy array or even a dictionary. It uses str(...).
:param log_time_stamp: bool
Put time stamp for every entry of the log
:param print_text: bool
if False, what is logged will not be printed.
:param log_size_limit: int
log size limit in bytes.
:param time_tag: bool
put time stamp in file names.
:param log_flush_period: int
How often flush the log in seconds, if time passes this
given period, it will flush the first time a text is logged,
or if the logger is finilized.
:param flush: bool
force flush into the log file
:param end: str
The last charachter for this call.
:param new_file: bool
if a new file is needed. If time_tag is True, it will make
a new file with a new name that has a time tag. If False,
it closees the current text file and overwrites on it.
:param suffix: str
suffix is the extension of the file name.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
log_flush_period = self.log_flush_period \
if (log_flush_period is None) else log_flush_period
log_name = self.log_name if (log_name is None) else log_name
param_dir, param_name, suffix = self._param_dir_name_suffix(
log_name, suffix)
if suffix is None:
suffix = 'txt'
log_dirnamesuffix = self._get_dirnamesuffix(
param_dir, param_name, suffix)
if ( (not (log_dirnamesuffix in self._loggers_dict)) or new_file):
self._text_handler(log_dirnamesuffix,
log_size_limit = log_size_limit,
time_tag = time_tag,
suffix = suffix)
if((print_text is None) | (print_text is True)):
print_text = self._print_text
if(print_text):
if(log_time_stamp):
print(f'T:{self.time_stamp:>6.6f}| ', end='')
print(to_be_logged, end = end)
curr_textinlog = self._loggers_dict[log_dirnamesuffix]
_logger = []
if(log_time_stamp):
_time_str = f'T:{self.time_stamp:>6.6f}| '
_logger.append(_time_str)
if(isinstance(to_be_logged, list)):
for _ in to_be_logged:
_tolog = str(_)
_logger.append(_tolog)
else:
_tolog = str(to_be_logged)
_logger.append(_tolog)
if(len(_logger[-1]) > 0):
if(_logger[-1][-1] != end):
_logger.append(end)
else:
_logger.append(end)
log_size = 0
for _logger_el in _logger:
curr_textinlog.to_be_logged.append(_logger_el)
log_size += len(_logger_el)
curr_textinlog.log_size += log_size
self.text_flush(log_dirnamesuffix, flush)
if(log_size >= curr_textinlog.log_size_limit):
self._text_handler(
log_dirnamesuffix,
log_size_limit = curr_textinlog.log_size_limit,
time_tag = curr_textinlog.time_tag,
suffix = suffix)
curr_textinlog = self._loggers_dict[log_dirnamesuffix]
return curr_textinlog.log_fpath
def _get_log_counter_limit(self, param, log_size_limit):
cnt_limit = int(log_size_limit/(param.size*param.itemsize))
return cnt_limit
[docs]
def record(self, parameter_name: str, parameter_value, flush = False,
suffix = None, log_size_limit: int = int(1e+8), savefig = False,
plot_start_ago = None, plot_win_length = 10, time_tag = False):
"""log a numpy array in buffer then dump
It can be the case that we need to take snapshots of a numpy array
over time. The size of the array would not change and this is hoing
to happen frequently.
This log_ver makes a buffer in RAM and keeps many instances of the
array along with their time stamp and then when the size of the
array reaches a threhshold flushes it into HDD with a file that
has an initial time stamp.
The benefit of using this function over save is that it
does not use the connection to the directoy all time and if that is
on a network, there will be less overhead.
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param parameter_value: np.array
An np array whose size doesn't change
:param suffix: str
can be 'npz' or 'txt' which will save it as text.
:param log_size_limit: int
log_size_limit in bytes, default: 1e+8.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
try:
_ = parameter_value.shape
except:
parameter_value = np.array([parameter_value])
param_dir, param_name, suffix = self._param_dir_name_suffix(
parameter_name, suffix)
if(suffix is None):
suffix = 'npz'
log_dirnamesuffix = self._get_dirnamesuffix(
param_dir, param_name, suffix)
log_counter_limit = self._get_log_counter_limit(\
parameter_value, log_size_limit)
if(log_dirnamesuffix in self._vars_dict):
_var = self._vars_dict[log_dirnamesuffix]
data_array, time_array, curr_index, \
file_start_time, suffix, log_counter_limit = \
(_var.data_array, _var.time_array, _var.curr_index, \
_var.file_start_time, _var.suffix, _var.log_counter_limit)
curr_index += 1
else:
file_start_time = self.time_stamp
curr_index = 0
if(curr_index >= log_counter_limit):
self.record_flush(log_dirnamesuffix, **flush_kwargs)
file_start_time = self.time_stamp
curr_index = 0
elif flush:
self.record_flush(log_dirnamesuffix, **flush_kwargs)
if(curr_index == 0):
data_array = np.zeros((log_counter_limit, ) + parameter_value.shape,
dtype = parameter_value.dtype)
time_array = np.zeros(log_counter_limit)
try:
time_array[curr_index] = self.time_stamp
except: pass
if(parameter_value.shape == data_array[curr_index].shape):
data_array[curr_index] = parameter_value
else:
data_array[curr_index] = data_array[curr_index - 1] * 0 + np.nan
self._vars_dict[log_dirnamesuffix] = varinlog(data_array,
time_array,
curr_index,
file_start_time,
suffix,
log_counter_limit,
savefig,
plot_start_ago,
plot_win_length,
time_tag)
[docs]
def record_flush(self, parameter_name: str, suffix: str = None):
""" Flush the buffered numpy arrays
If you have been using log_ver, this will flush all the buffered
arrays. It is called using log_size_limit for a variable and als
when the code that made the logger ends.
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
"""
if not self.enabled: return
param_dir, param_name, suffix = self._param_dir_name_suffix(
parameter_name, suffix)
if(suffix is None):
suffix = 'npz'
log_dirnamesuffix = self._get_dirnamesuffix(
param_dir, param_name, suffix)
_param_dir = self._get_fpath(param_dir)
_var = self._vars_dict[log_dirnamesuffix]
savefig = _var.savefig
time_tag = _var.time_tag
_var_data_array = _var.data_array[_var.time_array > 0]
_var_time_array = _var.time_array[_var.time_array > 0]
if _var.suffix == 'npz':
fpath = _param_dir / f'{param_name}_{_var.file_start_time:.6f}.npz'
np.savez(fpath,
time_array = _var_time_array,
data_array = _var_data_array)
else:
fpath = _param_dir / f'{param_name}_time_{_var.file_start_time:.6f}.txt'
np.savetxt(fpath, _var_time_array)
fpath = _param_dir / f'{param_name}_data_{_var.file_start_time:.6f}.txt'
np.savetxt(fpath, _var_data_array)
_var_data_array = _var_data_array.squeeze()
if savefig & (len(_var_data_array.squeeze().shape) == 1):
plot_start_ago = _var.plot_start_ago
plot_win_length = _var.plot_win_length
tmax = _var_time_array.max()
if plot_start_ago is None:
plot_start_ago = 0
if plot_start_ago < len(_var_time_array):
_var_time_array = _var_time_array[plot_start_ago:]
_var_data_array = _var_data_array[plot_start_ago:]
from .plt_utils import plt_plot
fig_ax = plt_plot(
[_var_data_array], '.', x_values_list = [_var_time_array])
n_stamps = len(_var_time_array)
if (n_stamps > 4) & (plot_win_length is not None):
n_wins = int(n_stamps // plot_win_length)
ending = plot_win_length * n_wins
v1 = _var_data_array[
:ending].reshape(n_wins, plot_win_length).mean(1)
t1 = _var_time_array[
:ending].reshape(n_wins, plot_win_length).mean(1)
fname = f'{param_dir}/{param_name}_{_var.file_start_time:.6f}'
try:
self.plot(fname,
[v1], '--',fig_ax = fig_ax, x_values_list = [t1],
time_tag = time_tag)
except:
print(f'Cannot plot the average for record {parameter_name}')
return fpath
[docs]
def get_record(self, parameter_name: str, suffix: str = None) -> tuple:
""" Get the buffered numpy arrays
If you need the buffered variable back.
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:return:
A tuple including two np.ndarray. The first on is 1d time
and the second one is nd buffered data.
:rtype:
tuple of two nd.arrays
"""
if not self.enabled: return
param_dir, param_name, suffix = self._param_dir_name_suffix(
parameter_name, suffix)
if(suffix is None):
suffix = 'npz'
log_dirnamesuffix = self._get_dirnamesuffix(
param_dir, param_name, suffix)
_var = self._vars_dict[log_dirnamesuffix]
data_array = _var.data_array[_var.time_array>0].copy()
time_array = _var.time_array[_var.time_array>0].copy()
return(time_array, data_array)
[docs]
def save(self, parameter_name: str,
parameter_value,
suffix = None,
mat_field = None,
time_tag: bool = None,
verify = False):
"""log a single variable
The most frequently used function would probably be this one.
if you call the logger object as a function and give it a parameter
name and something to be logged, the __call__ referes to this
function.
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param parameter_value: np.array
Could be anything and np.save will be used. If it is a
dictionary, np.savez will be used. As you may know, np.save
can save all pickalables.
:param suffix: str
can be 'npz', 'npy', 'mat', 'pth' for pytorch models
or 'txt' or anything else which will save it as text.
This includes 'json', 'pdb', or ...
:param mat_field: str
when saving as 'mat' file, the field can be set.
otherwise it will be the parameter_name
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
if ((not '.' in parameter_name)
& (suffix is None)
& isinstance(parameter_value, np.ndarray) ):
suffix = 'npy'
param_dir, param_name, suffix = self._param_dir_name_suffix(
parameter_name, suffix)
if(suffix is None):
if isinstance(parameter_value, (np.ndarray, int, float)):
suffix = 'npy'
elif (isinstance(parameter_value, dict)):
suffix = 'npz'
else:
suffix = 'txt'
fpath = self._get_fpath(param_dir, param_name, suffix, time_tag)
try:
if(suffix == 'npy'):
np.save(fpath, parameter_value)
elif(suffix == 'npz'):
parameter_value_dict = prepare_for_np_savez(parameter_value)
np.savez(fpath, **parameter_value_dict)
elif((suffix == 'tif') | (suffix == 'tiff')):
from tifffile import imwrite
imwrite(fpath, parameter_value)
elif(suffix == 'mat'):
from scipy.io import savemat
if(mat_field is None):
if isinstance(parameter_value, dict):
savemat(fpath, parameter_value)
else:
mat_field = param_name
if(mat_field is not None):
savemat(fpath, {f'{mat_field}':parameter_value})
elif(suffix == 'pth'):
from torch import save as torch_save
torch_save(parameter_value, fpath)
elif(suffix == 'json'):
import json
obj_str = deepstr(parameter_value)
with open(fpath,'w') as fdata:
json.dump(obj_str, fdata)
else:
with open(fpath,'a') as fdata:
fdata.write(str(parameter_value))
except Exception as e:
print(f"lognflow: An error occurred while saving {parameter_name}")
if verify: raise e
fpath = None
print(e)
return fpath
[docs]
def save_configs(self, configs_dict, max_array_size=256):
if not self.enabled:
return
time_tag = False
suffix = 'py'
param_dir, param_name, suffix = self._param_dir_name_suffix(
'configs_gen', suffix
)
self.configs_fpath = self._get_fpath(param_dir, param_name, suffix, time_tag)
def serialize(obj, k, max_array_size):
# Torch tensor
try:
import torch
if isinstance(obj, torch.Tensor):
numel = obj.numel()
if numel > max_array_size:
self.text(None, f'[save_configs]: variable {k} not '
f'saved as its size {numel} is larger than given '
f'threshold max_array_size: {max_array_size}')
return 'lognflow_save_configs_skipped'
data = obj.detach().cpu().tolist()
dtype = str(obj.dtype).replace("torch.", "")
device = str(obj.device)
return (
f"torch.tensor({data}, "
f"dtype=getattr(torch, '{dtype}'), "
f"device='{device}')"
)
except Exception as e:
pass
# NumPy array
if isinstance(obj, np.ndarray):
numel = obj.size
if numel > max_array_size:
self.text(None, f'[save_configs]: variable {k} not '
f'saved as its size {numel} is larger than given '
f'threshold max_array_size: {max_array_size}')
return 'lognflow_save_configs_skipped'
return f"np.array({obj.tolist()})"
if obj is None:
return repr(obj)
if isinstance(obj, (int, float, bool, str)):
return repr(obj)
if isinstance(obj, list):
return "[" + ", ".join(serialize(x, k, max_array_size) for x in obj) + "]"
if isinstance(obj, tuple):
inner = ", ".join(serialize(x, k, max_array_size) for x in obj)
return f"({inner}{',' if len(obj)==1 else ''})"
if isinstance(obj, dict):
items = []
for k, v in obj.items():
items.append(f'"{k}": {serialize(v, k, max_array_size)}')
return "{" + ", ".join(items) + "}"
self.text(None, f"[save_configs] Unsupported type for {k}: {type(obj)}")
return 'lognflow_save_configs_skipped'
assignments = []
keys = []
for k, v in configs_dict.items():
if v is not None:
val_ = serialize(v, k, max_array_size)
if val_ != 'lognflow_save_configs_skipped':
assignments.append(f"{k} = {val_}")
keys.append(k)
script = save_configs_script.format(
assignments="\n\n".join(assignments),
cfg_dict=",\n".join(f" {k}={k}" for k in keys),
)
self.configs_fpath.write_text(script)
return self.configs_fpath
[docs]
def load_configs(self):
import importlib.util
import sys
try:
assert self.configs_fpath.is_file() , "No configs have been saved."
except:
return None
module_name = f"_dynamic_configs_{self.configs_fpath.stem}"
spec = importlib.util.spec_from_file_location(module_name, self.configs_fpath)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module.CFG
[docs]
def savefig(self,
parameter_name: str,
image_format='jpg', dpi=1200,
time_tag: bool = None,
close_plt = True):
"""log a single plt
log a plt that you have on the screen.
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
param_dir, param_name, image_format = \
self._param_dir_name_suffix(parameter_name, image_format)
fpath = self._get_fpath(param_dir, param_name, image_format, time_tag)
try:
plt.savefig(fpath, format=image_format, dpi=dpi,
bbox_inches='tight')
if(close_plt):
plt.close()
return fpath
except:
if(close_plt):
plt.close()
self.text(
None, f'Cannot save the plt instance {parameter_name}.')
return None
[docs]
def plot(self, parameter_name: str,
parameter_value_list,
*plt_plot_args,
x_values_list = None,
image_format='jpg',
dpi=1200,
title = None,
labels = [],
time_tag: bool = None,
fig_ax = None,
return_figure = False,
**kwargs):
"""log a single plot
If you have a numpy array or a list of arrays (or indexable by
first dimension, an array of 1D arrays), use this to log a plot
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param parameter_value_list: np.array
An np array or a list of np arrays or indexable-by-0th-dim
np arrays
:param x_values_list: np.array
if set, must be a list of one or many np.array of same size of
all y values or a list for each vector in y values where
every element of x-values list is the same as the
y-values element in their list
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
from .plt_utils import plt_plot
plt_plot(
parameter_value_list, *plt_plot_args,
x_values_list = x_values_list, fig_ax = fig_ax,
title = title, labels = labels, **kwargs)
if not return_figure:
fpath = self.savefig(
parameter_name = parameter_name,
image_format=image_format, dpi=dpi,
time_tag = time_tag)
return fpath
else:
return fig, ax
[docs]
def printv(self, var, **kwargs):
import inspect
frame = inspect.currentframe().f_back
var_names = [name for name, value in frame.f_locals.items() if value is var]
var_name = None
if var_names:
if len(var_names) == 1:
var_name = var_names[0]
else:
var_name = repr(var)
if len(var_name) > 20: var_name = type(var)
kwargs['log_time_stamp'] = False
if var_name:
if 'logger' in kwargs:
kwargs.pop('logger')
if 'var_name' in kwargs:
kwargs.pop('var_name')
printv(var, logger = self, var_name = var_name, **kwargs)
else:
if 'var_name' in kwargs:
var_name = kwargs['var_name'] + ': '
else:
var_name = ''
self.text(None, f'{var_name}{var}', **kwargs)
[docs]
def hist(self, parameter_name: str,
parameter_value_list,
bins = 10,
alpha = 0.5,
labels_list = None,
normalize = False,
image_format='jpg',
dpi=1200,
title = None,
time_tag = None,
return_figure = False,
**kwargs):
"""log a single histogram
If you have a numpy array or a list of arrays (or indexable by
first dimension, an array of 1D arrays), use this to log a hist
if multiple inputs are given they will be plotted on top of each
other using the alpha opacity.
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param parameter_value_list: np.array
An np array or a list of np arrays or indexable-by-0th-dim
np arrays
:param bins: number or np.array
used to set the bins for making of the histogram
:param alpha: float
the opacity of histograms, a flot between 0 and 1. If you
have multiple histograms on top of each other,
use 1/number_of_your_variables.
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
from .plt_utils import plt_hist
fig, ax = plt_hist(parameter_value_list,
bins = bins, alpha = alpha,
normalize = normalize,
labels_list = labels_list, **kwargs)
if title is not None:
ax.set_title(title)
if not return_figure:
fpath = self.savefig(
parameter_name = parameter_name,
image_format=image_format, dpi=dpi,
time_tag = time_tag)
return fpath
else:
return fig, ax
[docs]
def scatter3(self, parameter_name: str,
data_N_by_3,
elev_list = None,
azim_list = None,
image_format='jpg',
dpi=300,
title = None,
time_tag: bool = None,
return_figure = False,
make_animation = False,
log_animation_kwargs = {},
**kwargs):
"""log a single scatter in 3D
Scatter plotting in 3D
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param data_N_by_3: np.array
An np array of size 3 x n, to sctter n data points in 3D
:param elev_list: list
Must be an iterable even if has only one number for elev
:param azim_list: list
Must be an iterable even if has only one number for azim
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
if data_N_by_3.shape[0] == 3:
if data_N_by_3.shape[1] != 3:
data_N_by_3 = data_N_by_3.T
self.text(
None, 'lognflow.scatter3> input dataset is transposed.')
from .plt_utils import plt_scatter3
fig_ax_opt_stack = plt_scatter3(data_N_by_3, title = title,
elev_list = elev_list, azim_list = azim_list,
make_animation = make_animation, **kwargs)
if not return_figure:
if make_animation:
self.log_animation(parameter_name, fig_ax_opt_stack[2],
dpi=dpi, time_tag = time_tag,
**log_animation_kwargs)
else:
return self.savefig(
parameter_name = parameter_name,
image_format = image_format, dpi=dpi,
time_tag = time_tag)
else:
return fig_ax_opt_stack
[docs]
def surface(self, parameter_name: str,
parameter_value, image_format='jpg',
dpi=1200, title = None,
time_tag: bool = None, return_figure = False, **kwargs):
"""log a surface in 3D
surface plotting in 3D exactly similar to imshow but in 3D
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param parameter_value: np.array
An np array of size n x m, to plot surface in 3D
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
rest of the parameters (**kwargs) will be passed to plot_surface()
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
from .plt_utils import plt_surface
fig, ax = plt_surface(parameter_value)
if title is not None:
ax.set_title(title)
if not return_figure:
fpath = self.savefig(
parameter_name = parameter_name,
image_format=image_format, dpi=dpi,
time_tag = time_tag)
return fpath
else:
return fig, ax
[docs]
def hexbin(self, parameter_name: str, parameter_value,
gridsize = 20, image_format='jpg', dpi=1200, title = None,
time_tag: bool = None, return_figure = False):
"""log a 2D histogram
The 2D histogram is made out of hexagonals
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param parameter_value: np.array
An np array of size 2 x n, to make the 2D histogram
:param gridsize: int
grid size is the number of bins in 2D
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
fig = plt.figure()
ax = fig.add_subplot(111)
ax.hexbin(parameter_value[0],
parameter_value[1],
gridsize = gridsize)
if title is not None:
ax.set_title(title)
if not return_figure:
fpath = self.savefig(
parameter_name = parameter_name,
image_format=image_format, dpi=dpi,
time_tag = time_tag)
return fpath
else:
return fig, ax
[docs]
def imshow(self,
parameter_name: str,
parameter_value,
frame_shape : tuple = None,
colorbar = True,
remove_axis_ticks = True,
image_format='jpg', dpi=1200, cmap = 'viridis',
title = None, time_tag: bool = None, borders = 0,
return_figure = False, figsize = None, **kwargs):
"""log an image
The image is logged using plt.imshow
Accepted shapes are:
* (n, m)
* (n, m, 3)
* (n_im, n_r, n_c)
* (n_im, n_r, 3, 1)
* (n_im, n_r, n_c, 3)
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param parameter_value: np.array
An np array of shape amongst the following:
* (n, m)
* (n, m, 3)
* (n_im, n_r, n_c)
* (n_im, n_r, 3, 1)
* (n_im, n_r, n_c, 3)
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
parameter_value_shape = parameter_value.shape
n_dims = len(parameter_value_shape)
FLAG_img_ready = False
use_stack_to_frame = False
if(n_dims == 2):
FLAG_img_ready = True
elif(n_dims == 3):
if(parameter_value_shape[2] != 3):
use_stack_to_frame = True
else:
#warning that 3 dimensions as the last axis is RGB
FLAG_img_ready = True
elif(n_dims == 4):
use_stack_to_frame = True
if(use_stack_to_frame):
from .plt_utils import stack_to_frame
parameter_value = stack_to_frame(
parameter_value, frame_shape = frame_shape,
borders = borders)
if parameter_value is not None:
FLAG_img_ready = True
if(FLAG_img_ready):
from .plt_utils import plt_imshow
plt_imshow(parameter_value,
colorbar = colorbar,
remove_axis_ticks = remove_axis_ticks,
title = title,
cmap = cmap,
figsize = figsize,
**kwargs)
if not return_figure:
fpath = self.savefig(
parameter_name = parameter_name,
image_format=image_format, dpi=dpi,
time_tag = time_tag)
return fpath
else:
return fig, ax
else:
self.text(
self.log_name,
f'Cannot imshow variable {parameter_name} with shape' + \
f'{parameter_value.shape}')
return
[docs]
def imshow_subplots(self,
parameter_name: str,
images: np.ndarray,
frame_shape = None,
grid_locations = None,
figsize = None,
image_format='jpg',
dpi=1200,
time_tag: bool = None,
colorbar = False,
remove_axis_ticks = True,
titles = None,
cmaps = None,
return_figure = False,
**kwargs):
"""log multiple images in a tiled frame
The frame image is logged using plt.imshow
Accepted shapes are:
* (n, m)
* (n, m, 3)
* (n_im, n_r, n_c)
* (n_im, n_r, 3, 1)
* (n_im, n_r, n_c, 3)
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param images: np.array
An np array of size n_f x n_r x n_c, to be shown by imshow
as a square tile of side length of n_ch**0.5
:param frame_shape:
n_f images will be tiles according to thi tuple as shape.
:param grid_locations:
if this is of shape n_images x 2, then each subplot will be
located at a specific given location.
To make it beautiful, you better proveide figsize and im_sizes
or im_size_factor to merely scale them to cover a small region
between 0 and 1.
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
from .plt_utils import plt_imshow_subplots
fig, ax = plt_imshow_subplots(images = images,
frame_shape = frame_shape,
grid_locations = grid_locations,
figsize = figsize,
colorbar = colorbar,
remove_axis_ticks = remove_axis_ticks,
titles = titles,
cmaps = cmaps,
**kwargs)
if not return_figure:
fpath = self.savefig(
parameter_name = parameter_name,
image_format=image_format, dpi=dpi,
time_tag = time_tag)
return fpath
else:
return fig, ax
[docs]
def imshow_series(self,
parameter_name: str,
list_of_stacks,
list_of_masks = None,
figsize = None,
text_as_colorbar = False,
colorbar = False,
cmap = 'viridis',
list_of_titles_columns = None,
list_of_titles_rows = None,
fontsize = None,
vmin = None,
vmax = None,
title = None,
colorbar_last_only = True,
colorbar_fraction = 0.046,
colorbar_pad = 0.04,
colorbar_labelsize = 1,
grid_width_space = 0.0,
remove_axis_ticks = True,
aspect = 'equal',
image_format='jpg',
dpi=1200,
time_tag: bool = None,
return_figure = False,
**kwargs):
"""log a cavas of stacks of images
One way to show many images and how they change is to make
stacks of images and put them in a list. Then each
element of the list is supposed to be iteratable by the first
dimension, which should be the same size for all elements in
the list. This function will start putting them in rows of a canvas.
If you have an image with many channels.
Each element of the list must appear as either:
n_frm x n_row x n_clm if there are n_frm images
for all elements of stack
n_frm x n_row x n_clm x 3 if channels are in RGB
if you have multiple images as channels such as the following,
call the prepare_stack_of_images.
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
Displays a grid of image series for comparison with optional customization for annotations, colorbars, and formatting.
Parameters:
list_of_stacks (list):
A list of 3D or 4D arrays, each representing a stack of images.
All stacks must have the same number of images.
list_of_masks (list, optional):
A list of masks corresponding to the stacks. Each mask should have the same shape
as the images in its respective stack. If provided, masked areas will be ignored
when calculating statistics. Defaults to None.
figsize (tuple, optional):
The overall size of the figure in inches. If None, it is determined based on
the number of stacks and images. Defaults to None.
text_as_colorbar (bool, optional):
If True, displays the maximum, mean, and minimum values of each image as text
in place of a colorbar. Defaults to False.
colorbar (bool, optional):
If True, displays a colorbar for each subplot. Defaults to False.
cmap (str, optional):
The colormap to use for displaying the images. Defaults to 'viridis'.
list_of_titles_columns (list, optional):
Titles for each column in the grid. Must have a length equal to the number
of images in each stack. Defaults to None.
list_of_titles_rows (list, optional):
Titles for each row in the grid. Must have a length equal to the number of stacks.
Defaults to None.
fontsize (int, optional):
Font size for the text annotations. If None, it is determined based on the figure size.
Defaults to None.
vmin (float, optional):
The minimum value for image normalization. If None, it is automatically calculated
from the image data. Defaults to None.
vmax (float, optional):
The maximum value for image normalization. If None, it is automatically calculated
from the image data. Defaults to None.
title (str, optional):
The title for the entire figure. Defaults to None.
colorbar_last_only (bool, optional):
If True, displays a colorbar only for the last column. Defaults to False.
colorbar_fraction (float, optional):
Fraction of the original axis allocated for the colorbar. Defaults to 0.046.
colorbar_pad (float, optional):
Padding between the image and colorbar. Defaults to 0.04.
colorbar_labelsize (int, optional):
Label size for the colorbar. Defaults to 1.
grid_width_space (float, optional):
Horizontal spacing between grid columns. Defaults to 0.0.
remove_axis_ticks (bool, optional):
If True, removes axis ticks from all subplots. Defaults to True.
aspect (str, optional):
Aspect ratio of the displayed images. Defaults to 'equal'.
**kwargs:
Additional keyword arguments to pass to the `imshow` function.
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
from .plt_utils import plt_imshow_series
fig, ax = plt_imshow_series(
list_of_stacks = list_of_stacks,
list_of_masks = list_of_masks,
figsize = figsize,
text_as_colorbar = text_as_colorbar,
colorbar = colorbar,
cmap = cmap,
list_of_titles_columns = list_of_titles_columns,
list_of_titles_rows = list_of_titles_rows,
fontsize = fontsize,
vmin = vmin,
vmax = vmax,
title = title,
colorbar_last_only = colorbar_last_only,
colorbar_fraction = colorbar_fraction,
colorbar_pad = colorbar_pad,
colorbar_labelsize = colorbar_labelsize,
grid_width_space = grid_width_space,
remove_axis_ticks = remove_axis_ticks,
aspect = aspect,
**kwargs)
if not return_figure:
fpath = self.savefig(
parameter_name = parameter_name,
image_format=image_format, dpi=dpi,
time_tag = time_tag)
return fpath
else:
return fig, ax
[docs]
def images_to_pdf(self,
parameter_name: str,
parameter_value: list,
time_tag: bool = None,
dpi=1200,
**kwargs):
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
param_dir, param_name, suffix = self._param_dir_name_suffix(
parameter_name, 'pdf')
fpath = self._get_fpath(param_dir, param_name, suffix, time_tag)
try:
from PIL import Image
except Exception as e:
print('install PIL by: --> pip install Pillow')
raise e
images = [Image.fromarray(_) for _ in parameter_value]
images[0].save(
fpath, "PDF" ,
resolution=dpi,
save_all=True,
append_images=images[1:],
**kwargs)
[docs]
def variables_to_pdf(self,
parameter_name: str,
parameter_value: list,
time_tag: bool = None,
dpi = 1200,
**kwargs):
images = self.get_stack_from_names(parameter_value)
self.images_to_pdf(
parameter_name, images, time_tag, dpi, **kwargs)
[docs]
def log_confusion_matrix(self,
parameter_name: str,
cm,
target_names = None,
title='Confusion matrix',
cmap=None,
figsize = None,
image_format = 'jpg',
dpi = 1200,
time_tag = False,
close_plt = True):
"""log a confusion matrix
given a sklearn confusion matrix (cm), make a nice plot
:param cm:
confusion matrix from sklearn.metrics.confusion_matrix
:param target_names:
given classification classes such as [0, 1, 2]
the class names, for example: ['high', 'medium', 'low']
:param title:
the text to display at the top of the matrix
:param cmap:
the gradient of the values displayed from matplotlib.pyplot.cm
(http://matplotlib.org/examples/color/colormaps_reference.html)
plt.get_cmap('viridis') or plt.cm.Blues
:param time_tag:
if True, the file name will be stamped with time
Usage::
-----
from lognflow import lognflow
logger = lognflow(log_roots or log_dir)
logger.plot_confusion_matrix(\
cm = cm, # confusion matrix created by
# sklearn.metrics.confusion_matrix
target_names = y_labels_vals, # list of names of the classes
title = best_estimator_name) # title of graph
Credit
------
http://scikit-learn.org/stable/auto_examples/
model_selection/plot_confusion_matrix.html
"""
if not self.enabled: return
from .plt_utils import plt_confusion_matrix
plt_confusion_matrix(
cm, target_names=target_names, title=title,
cmap=cmap, figsize=figsize)
fpath = self.savefig(
parameter_name = parameter_name,
image_format=image_format, dpi=dpi,
time_tag = time_tag,
close_plt = close_plt)
return fpath
[docs]
def log_animation(
self, parameter_name: str, stack, interval=50, blit=False,
repeat_delay = None, dpi=100, time_tag: bool = None):
"""Make an animation from a stack of images
:param parameter_name: str
examples: myvar or myscript/myvar
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
:param stack: np.array of shape
n_f x n_r x n_c or n_f x n_r x n_c x 3
stack[cnt] needs to be plotable by plt.imshow()
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
param_dir, param_name, suffix = self._param_dir_name_suffix(
parameter_name, 'gif')
fpath = self._get_fpath(param_dir, param_name, suffix, time_tag)
fig, ax = plt.subplots()
ims = []
for img in stack:
im = ax.imshow(img, animated=True)
ax.axis('off')
ims.append([im])
from matplotlib import animation as matplotlib_animation
ani = matplotlib_animation.ArtistAnimation(\
fig, ims, interval = interval, blit = blit, repeat_delay = repeat_delay)
try:
ani.save(fpath, dpi = dpi,
writer = matplotlib_animation.PillowWriter(fps=int(1000/interval)))
plt.close()
return fpath
except Exception as e:
plt.close()
print('lognflow: cannot save the animation. Here is the unraised error:')
print(e)
print('-'*79)
[docs]
def flush_all(self):
if not self.enabled: return
for log_name in list(self._loggers_dict):
self.text_flush(log_name, flush = True)
for parameter_name in list(self._vars_dict):
self.record_flush(parameter_name)
[docs]
def savez(self, parameter_name: str,
parameter_value,
time_tag: bool = None):
return self.save(parameter_name = parameter_name,
parameter_value = parameter_value,
suffix = 'npz',
time_tag = time_tag)
#towards supporting all that logging supports
[docs]
def debug(self, text_to_log):
self.text('debug', text_to_log, time_tag = False)
[docs]
def info(self):
self.text('info', text_to_log, time_tag = False)
[docs]
def warning(self):
self.text('warning', text_to_log, time_tag = False)
[docs]
def error(self):
self.text('error', text_to_log, time_tag = False)
[docs]
def critical(self):
self.text('critical', text_to_log, time_tag = False)
[docs]
def exception(self):
self.text('exception', text_to_log, time_tag = False)
[docs]
def save_torch(self, name, x):
if isinstance(x, dict):
for key in x.keys():
self.save_torch(name+'/'+key, x[key])
else:
self.save(name, x.detach().cpu().numpy())
[docs]
def load_torch(self, name):
self.assert_log_dir()
flist = self.get_flist(name)
for fpath in flist:
if fpath.is_file():
vname = self.name_from_file(fpath)
out = self.load(vname)
return torch.from_numpy(out).cuda()
if fpath.is_dir():
fpath_str = str(fpath.absolute())
vname = fpath_str.split(str(self.log_dir))[1][1:]
flist_dir = self.get_flist(vname + '/*')
output = {}
for fpath_inner in flist_dir:
key = fpath_inner.stem
output[key] = self.load_torch(
vname + '/' + fpath_inner.name)
return output
return None
[docs]
def get_flist(self, var_name, suffix = None):
""" get list of files
return the list of files for a saved variable.
Parameters
----------
:param var_name:
variable name
:param suffix:
If there are different suffixes availble for a variable
this input needs to be set. npy, npz, mat, and torch are
supported.
"""
self.assert_log_dir()
try:
if isinstance(var_name, pathlib_Path):
var_name = self.name_from_file(var_name)
except: pass
assert isinstance(var_name, str)
var_name = var_name.replace('\t', '\\t').replace('\n', '\\n')\
.replace('\r', '\\r').replace('\b', '\\b')
flist = list((self.log_dir).glob(var_name))
if not flist:
if suffix is None:
if len(var_name.split('.')) > 1:
suffix = var_name.split('.')[-1]
name_before_suffix = var_name.split('.')[:-1]
if((len(name_before_suffix) == 1) &
(name_before_suffix[0] == '')):
var_name = '*'
else:
var_name = ('.').join(var_name.split('.')[:-1])
else:
suffix = '*'
suffix = suffix.strip('.')
flist = []
if((self.log_dir / var_name).is_file()):
flist = [self.log_dir / var_name]
elif((self.log_dir / f'{var_name}.{suffix}').is_file()):
flist = [self.log_dir / f'{var_name}.{suffix}']
else:
_var_name = (self.log_dir / var_name).name
_var_dir = (self.log_dir / var_name).parent
search_patt = f'{_var_name}.{suffix}'
search_patt = replace_all(search_patt, '**', '*')
flist = list(_var_dir.glob(search_patt))
if(flist):
try:
flist_tags = np.array([float(fpath_v.stem.split('_')[-1]) for fpath_v in flist])
sortinds = np.argsort(flist_tags)
flist = [flist[_] for _ in sortinds]
except:
flist.sort()
else:
var_dir = self.log_dir / var_name
if(var_dir.is_dir()):
flist = list(var_dir.glob('*'))
if(len(flist) > 0):
flist.sort()
return flist
[docs]
def get_namelist(self, var_name, suffix = None):
""" get logger names of files
return the list of names for a saved variable.
Parameters
----------
:param var_name:
variable name
:param suffix:
If there are different suffixes availble for a variable
this input needs to be set. npy, npz, mat, and torch are
supported.
"""
self.assert_log_dir()
nlist = self.get_flist(var_name, suffix)
if nlist:
nlist = [self.name_from_file(fpath) for fpath in nlist]
return nlist
[docs]
def get_common_files(self, var_name_A, var_name_B, suffix = None,
flist_A = None, flist_B = None):
""" get common files in two directories
It happens often in ML that there are two directories, A and B,
and we are interested to get the flist in both that is common
between them. returns a tuple of two lists of files.
Parameters
----------
:param var_name_A:
directory A name
:param var_name_B:
directory B name
"""
self.assert_log_dir()
if not flist_A:
flist_A = self.get_flist(var_name_A, suffix)
if not flist_B:
flist_B = self.get_flist(var_name_B, suffix)
suffix_A = flist_A[0].suffix
suffix_B = flist_B[0].suffix
parent_A = flist_A[0].parent
parent_B = flist_B[0].parent
fstems_A = [_fst.stem for _fst in flist_A]
fstems_B = [_fst.stem for _fst in flist_B]
fstems_A_set = set(fstems_A)
fstems_B_set = set(fstems_B)
common_stems = list(fstems_A_set.intersection(fstems_B_set))
flist_A_new = [parent_A / (common_stem + suffix_A) \
for common_stem in common_stems]
flist_B_new = [parent_B / (common_stem + suffix_B) \
for common_stem in common_stems]
return(flist_A_new, flist_B_new)
[docs]
def get_text(self, log_name='main_log', flist = None, suffix = 'txt',
file_index = -1):
""" get text log files
Given the log_name, this function returns the text therein.
Parameters
----------
:param log_name:
the log name. If not given then it is the main log.
:param flist:
you can give a file list in Posix paths, for text files
:param suffix: str
to search for specifi files
:param file_index: int or list[int]
a number or a list of numbers for the index of the file
to include, default: -1
"""
self.assert_log_dir()
if isinstance(file_index, int):
file_index = [file_index]
if not flist:
flist = self.get_flist(log_name, suffix)
n_files = len(flist)
if (n_files>0):
txt = []
for fcnt in file_index:
with open(flist[int(fcnt)]) as f_txt:
txt.append(f_txt.readlines())
if(n_files == 1):
txt = txt[0]
return txt
[docs]
def is_file(self, var_name, file_index = None, suffix = None, verbose = False):
""" check if a single variable file exists
returns True is available else returns False
Parameters
----------
:param var_name:
variable name
:param file_index:
If there are many snapshots of a variable, this input can
limit the returned to a set of indices.
:param suffix:
If there are different suffixes availble for a variable
this input needs to be set. npy, npz, mat, and torch are
supported.
"""
self.assert_log_dir()
if file_index is not None:
assert file_index == int(file_index), \
f'file_index {file_index} must be an integer'
flist = self.get_flist(var_name, suffix)
if flist:
if len(flist) == 1:
var_path = flist[0]
else:
if file_index is not None:
if verbose:
self.text(None,
f'There are {len(flist)} files, logged with'
+ f' name {var_name}.'
+ f' The given index is {file_index}.')
var_path = flist[file_index]
else:
self.text(None, '-'*60)
self.text(None,
f'There are {len(flist)} files, logged with'
+ f' name {var_name} but the index is not given.')
self.text(None, '-'*60)
return None
try:
if var_path.is_file(): return True
except: pass
return False
def _load(self, var_name, file_index = None,
suffix = None, read_func = None, verbose = False,
return_collection = False):
""" get a single variable
return the value of a saved variable.
Parameters
----------
:param var_name:
variable name
:param file_index:
If there are many snapshots of a variable, this input can
limit the returned to a set of indices.
:param suffix:
If there are different suffixes availble for a variable
this input needs to be set. npy, npz, mat, and torch are
supported.
:param read_func:
a function that takes the Posix path and returns data
:param return_collection:
if True, then tries to read the text as if a list/dict/tuple had been
logged.
.. note::
when reading a MATLAB file, the output is a dictionary.
Also when reading a npz except if it is made by record
"""
self.assert_log_dir()
assert file_index == int(file_index), \
f'file_index {file_index} must be an integer'
flist = self.get_flist(var_name, suffix)
var_path = None
if flist:
if len(flist) == 1:
var_path = flist[0]
else:
if file_index is not None:
if verbose:
self.text(None,
f'There are {len(flist)} files, logged with'
+ f' name {var_name}.'
+ f' The given index is {file_index}.')
var_path = flist[file_index]
else:
self.text(None, '-'*60)
self.text(None,
f'There are {len(flist)} files, logged with'
+ f' name {var_name} but the index is not given.')
self.text(None, '-'*60)
return None
if(var_path.is_file()):
if verbose:
self.text(None, f'Loading {var_path}')
if read_func is not None:
return (read_func(var_path), var_path)
if(var_path.suffix == '.npz'):
buf = np.load(var_path)
try: #check if it is made by record
assert len(buf.files) == 2
time_array = buf['time']
data_array = buf['data']
data_array = data_array[time_array > 0]
time_array = time_array[time_array > 0]
return((time_array, data_array), var_path)
except:
if return_collection:
buf = dict(buf)
return(buf, var_path)
if(var_path.suffix == '.npy'):
try: return(np.load(var_path), var_path)
except: pass
if(var_path.suffix == '.mat'):
try:
from scipy.io import loadmat
return(loadmat(var_path), var_path)
except: pass
if(var_path.suffix == '.dm4'):
try:
from hyperspy.api import load as hyperspy_api_load
return (hyperspy_api_load(var_path).data, var_path)
except: pass
if((var_path.suffix == '.tif') | (var_path.suffix == '.tiff')):
try:
from tifffile import imread as tifffile_imread
return(tifffile_imread(var_path), var_path)
except: pass
if (var_path.suffix == '.pth'):
try:
from torch import load as torch_load
return(torch_load(var_path), var_path)
except: pass
try: #png, jpg, ...
from matplotlib.pyplot import imread
img = imread(var_path)
return(img, var_path)
except: pass
try:
txt = var_path.read_text(errors = 'ignore')
if (var_path.suffix == '.json'):
import json
txt = json.loads(txt)
return_collection = False
if return_collection:
txt = text_to_collection(txt)
return(txt, var_path)
except: pass
var_path = None
else:
var_path = None
if (var_path is None) & verbose:
self.text(None, f'Looking for {var_name} failed. ' + \
f'{var_path} is not in: {self.log_dir}')
return None, None
[docs]
def load(self, var_name, file_index = -1,
suffix = None, read_func = None, verbose = False,
return_fpath = False, return_collection = False):
""" get a single variable
return the value of a saved variable.
Parameters
----------
:param var_name:
variable name
:param file_index:
If there are many snapshots of a variable, this input can
limit the returned to a set of indices.
:param suffix:
If there are different suffixes availble for a variable
this input needs to be set. npy, npz, mat, and torch are
supported.
:param read_func:
a function that takes the Posix path and returns data
.. note::
when reading a MATLAB file, the output is a dictionary.
Also when reading a npz except if it is made by record
"""
self.assert_log_dir()
loaded_data, fpath = self._load(
var_name = var_name, file_index = file_index, suffix = suffix,
read_func = read_func, verbose = verbose,
return_collection = return_collection)
if return_fpath:
return loaded_data, fpath
else:
return loaded_data
[docs]
def get_stack_from_files(self,
var_name = None, flist = [], suffix = None, read_func = None,
return_flist = False):
""" Get list or data of all files in a directory
This function gives the list of paths of all files in a directory
for a single variable.
Parameters
----------
:param var_name:
The directory or variable name to look for the files
:type var_name: str
:param flist:
list of Paths, if data is returned, this flist input can limit
the data requested to this list.
:type flist: list
:param suffix:
the suffix of files to look for, e.g. 'txt'
:type siffix: str
:param read_func:
the function that takes the posix path of a file and returns
the data in there.
Output
----------
It returns a list of data in all files or a numpy array if
concatenation of all is possible.
"""
self.assert_log_dir()
if not flist:
flist = self.get_flist(var_name, suffix)
else:
flist = list(flist)
assert pathlib_Path(flist[0]).is_file(), \
f'File not found: {flist[0]}. You can use get_flist'
if flist:
n_files = len(flist)
if(read_func is None):
try:
fdata = np.load(flist[0])
read_func = np.load
except: pass
if(read_func is None):
try:
from matplotlib.pyplot import imread
fdata = imread(flist[0])
read_func = imread
except: pass
try:
read_func(flist[0])
except Exception as e:
if flist[0].is_file():
self.text(None,
f'lognflow: The data file {flist[0]} could not be read.'
'Please provide a read_function for this file.')
else:
self.text(
None, f'File {flist[0]} does not exist.')
raise e
dataset = [read_func(fpath) for fpath in flist]
try:
dataset_array = np.array(dataset, dtype=dataset[0].dtype)
except:
dataset_array = dataset
if return_flist:
return(dataset_array, flist)
else:
return(dataset_array)
[docs]
def get_stack_from_names(self,
var_names = None, read_func = None, return_flist = False):
self.assert_log_dir()
try:
var_names_str = str(var_names)
except: pass
else:
var_names = [var_names]
assert var_names == list(var_names), \
'input should be a list of variable names'
dataset = []
flist = []
for name in var_names:
images_flist = self.get_flist(name)
if images_flist:
for file_index in range(len(images_flist)):
data, fpath = self.load(
name, file_index = file_index,
read_func = read_func, return_fpath = True)
if data is not None:
dataset.append(data)
flist.append(fpath)
try:
dataset = np.array(dataset, dtype=dataset[0].dtype)
except: pass
if return_flist:
return dataset, flist
else:
return dataset
[docs]
def replace_time_with_index(self, var_name, verbose = False):
""" index in file var_names
lognflow uses time stamps to make new log files for a variable.
That is done by putting time stamp after the name of the variable.
This function changes all of the time stamps, sorted ascendingly,
by indices.
:param var_name:
variable name
"""
self.assert_log_dir()
var_dir = self.log_dir / var_name
if(var_dir.is_dir()):
var_fname = None
flist = list(var_dir.glob(f'*.*'))
else:
var_fname = var_dir.name
var_dir = var_dir.parent
flist = list(var_dir.glob(f'{var_fname}'))
if (len(flist) == 0) & (not ('*' in var_fname)):
self.text(None,
'lognflow, replace_time_with_index:' +\
'the given pattern has no * and no files were found')
if flist:
flist.sort()
fcnt_width = len(str(len(flist)))
for fcnt, fpath in enumerate(flist):
f_time_stamp = fpath.stem.split('_')[-1]
try:
if int(f_time_stamp) == float(f_time_stamp):
fname_old = fpath.name.split(f_time_stamp)
f_time_stamp = float(f_time_stamp)
fname_new = fname_old[0] + f'{f_time_stamp:0.1f}' + fname_old[1]
fpath_new = flist[fcnt].parent / fname_new
flist[fcnt].rename(fpath_new)
flist[fcnt] = fpath_new
except: pass
for fcnt, fpath in enumerate(flist):
if verbose:
self.text(None, f'Changing {flist[fcnt].name}')
f_time_stamp = fpath.stem.split('_')[-1]
fname_old = fpath.name.split(f_time_stamp)
fname_new = \
fname_old[0] + f'{fcnt:0{fcnt_width}d}' + fname_old[1]
fpath_new = flist[fcnt].parent / fname_new
if verbose:
self.text(None, f'To {fpath_new.name}')
flist[fcnt].rename(fpath_new)
def __call__(self, *args, **kwargs):
"""calling the object
In the case of the following code::
logger = lognflow()
logger('Hello lognflow')
The text (str(...)) will be passed to the main log text file.
"""
fpath = self.text(None, *args, **kwargs)
return fpath
def __repr__(self):
return str(self.log_dir.absolute())
def __bool__(self):
return self.log_dir.is_dir()