import contextlib
import os
from collections.abc import Generator
from typing import Callable, Optional
import pandas
from pyfileindex.watcher import FileSystemWatcher
[docs]
class PyFileIndex:
"""
The PyFileIndex maintains a pandas DataFrame to track changes in the file system.
Args:
path (str): file system path
filter_function (Callable): function to filter for specific files (optional)
debug (bool): enable debug print statements (optional)
df (pandas.DataFrame): DataFrame of a previous PyFileIndex object (optional)
watch (bool): keep the file index in sync using a background file system
watcher instead of rescanning the file system on every update() call.
Relies on OS-level file change notifications (via the optional
watchfiles dependency), which are not always delivered reliably on
network filesystems such as NFS, Lustre, or GPFS when the change is
made by a different node -- a common setup when monitoring HPC
simulation output from a separate process or login node. Prefer the
default polling mode (watch=False) in that case (optional)
"""
[docs]
def __init__(
self,
path: str = ".",
filter_function: Optional[Callable] = None,
debug: bool = False,
df: Optional[pandas.DataFrame] = None,
watch: bool = False,
) -> None:
abs_path = os.path.abspath(os.path.expanduser(path))
self._check_if_path_exists(path=abs_path)
self._debug = debug
self._filter_function = filter_function
self._path = abs_path
self._watch_enabled = watch
self._watcher: Optional[FileSystemWatcher] = None
if watch:
self._watcher = FileSystemWatcher(path=self._path)
self._watcher.start()
if df is None or watch:
# A df handed down from a parent index (e.g. via open()) can be
# stale: the parent's watcher may not have drained changes made
# to this path yet, and a freshly started watcher here only
# reports changes from now on, so a missing entry would never be
# backfilled. Scanning is cheap relative to that permanent loss.
self._df = self._create_df_from_lst(
[self._get_lst_entry_from_path(entry=self._path)]
+ list(self._scandir(path=self._path, df=None, recursive=True))
)
else:
self._df = df
@property
def df(self) -> pandas.DataFrame:
"""
The file index as a pandas DataFrame, with one row per file or
directory below the indexed path. Columns:
- ``basename`` (str): file or directory name, e.g. ``"output.txt"``.
- ``path`` (str): absolute path.
- ``dirname`` (str): absolute path of the parent directory.
- ``is_directory`` (bool): ``True`` for directories, ``False`` for files.
- ``mtime`` (float): last modification time as a POSIX timestamp (the
same value ``os.stat().st_mtime`` returns). Useful for finding which
simulation directories have written output most recently.
- ``nlink`` (int): hard link count (``os.stat().st_nlink``). Used
internally to detect changes that don't update ``mtime``; rarely
needed directly.
Returns:
pandas.DataFrame: the file index
"""
return self._df
@property
def dataframe(self) -> pandas.DataFrame:
"""Alias for :attr:`df`."""
return self.df
[docs]
def open(self, path: str) -> "PyFileIndex":
"""
Open PyFileIndex in the subdirectory path
Args:
path (str): subdirectory to open
Returns:
PyFileIndex: PyFileIndex for subdirectory
"""
abs_path = os.path.abspath(os.path.expanduser(os.path.join(self._path, path)))
self._check_if_path_exists(path=abs_path)
self.update()
if abs_path == self._path:
return self
elif (
os.path.commonpath([abs_path, self._path]) == self._path and os.name != "nt"
):
return PyFileIndex(
path=abs_path,
filter_function=self._filter_function,
debug=self._debug,
df=self._df[self._df.path.str.contains(abs_path)],
watch=self._watch_enabled,
)
elif (
os.path.commonpath([abs_path, self._path]) == self._path and os.name != "nt"
):
abs_path_unix = abs_path.replace("\\", "/")
return PyFileIndex(
path=abs_path,
filter_function=self._filter_function,
debug=self._debug,
df=self._df[
self._df.path.str.replace("\\", "/").str.contains(abs_path_unix)
],
watch=self._watch_enabled,
)
else:
return PyFileIndex(
path=abs_path,
filter_function=self._filter_function,
debug=self._debug,
watch=self._watch_enabled,
)
[docs]
def update(self, timeout: float = 0.1) -> None:
"""
Update file index
Args:
timeout (float): when watch=True, a filesystem change made just
before calling update() may not have reached the background
watcher yet. timeout is the max time in seconds to wait for
such a pending change to arrive before applying whatever is
available. Ignored when watch=False (optional, default
100ms, matching watchfiles' own minimum reporting latency).
"""
self._check_if_path_exists(path=self._path)
if self._watcher is not None:
self._apply_watch_changes(
changes=self._watcher.drain_pending_changes(timeout=timeout)
)
return
df_new, files_changed_lst, path_deleted_lst = self._get_changes_quick()
if self._debug:
print("Changes: ", df_new.path.values, files_changed_lst, path_deleted_lst)
if len(path_deleted_lst) != 0:
self._df = self._df[~self._df.path.isin(path_deleted_lst)]
if len(files_changed_lst) != 0:
df_updated = self._create_df_from_lst(
[self._get_lst_entry_from_path(entry=f) for f in files_changed_lst]
)
self._df = self._df[~self._df.path.isin(df_updated.path)]
self._df = (
pandas.concat([self._df, df_updated])
.drop_duplicates()
.reset_index(drop=True)
)
if len(df_new) != 0:
self._df = (
pandas.concat([self._df, df_new])
.drop_duplicates()
.reset_index(drop=True)
)
[docs]
def close(self) -> None:
"""
Stop the background file system watcher started with watch=True. Safe to
call even if no watcher is running.
"""
if self._watcher is not None:
self._watcher.stop()
def __enter__(self) -> "PyFileIndex":
return self
def __exit__(self, exc_type, exc_value, traceback) -> None:
self.close()
def __del__(self) -> None:
with contextlib.suppress(Exception):
self.close()
def _init_df_lst(
self,
path_lst: list,
df: Optional[pandas.DataFrame] = None,
include_root: bool = True,
) -> pandas.DataFrame:
"""
Internal function to build the pandas file index from a list of directories
Args:
path_lst (list): list of directories to scan
df (pandas.DataFrame/ None): existing file index table
include_root (bool): include the root directory in file index
Returns:
pandas.DataFrame: pandas file index
"""
total_lst = []
for p in path_lst:
if include_root:
total_lst.append(self._get_lst_entry_from_path(entry=p))
for entry in list(self._scandir(path=p, df=df, recursive=True)):
total_lst.append(entry)
return self._create_df_from_lst(total_lst)
def _scandir(
self, path: str, df: Optional[pandas.DataFrame] = None, recursive: bool = True
) -> Generator:
"""
Internal function to recursively scan directories
Args:
path (str): file system path
df (pandas.DataFrame/ None): existing file index table
recursive (bool): recursively iterate over subdirectories
Returns:
list: list of file entries
"""
try:
if df is not None and len(df) > 0:
with os.scandir(path) as it:
for entry in it:
if entry.path not in df.path.values:
if entry.is_dir(follow_symlinks=False) and recursive:
yield from self._scandir(
path=entry.path, df=df, recursive=recursive
)
yield self._get_lst_entry(entry=entry)
else:
yield self._get_lst_entry(entry=entry)
else:
with os.scandir(path) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False) and recursive:
yield from self._scandir(
path=entry.path, recursive=recursive
)
yield self._get_lst_entry(entry=entry)
else:
yield self._get_lst_entry(entry=entry)
except FileNotFoundError:
yield from ()
def _apply_watch_changes(self, changes: set) -> None:
"""
Internal function to apply the changes collected by the background
watcher to the file index
Args:
changes (set): set of (watchfiles.Change, path) tuples
"""
import watchfiles
if len(changes) == 0:
return
deleted_lst = [
p for change, p in changes if change == watchfiles.Change.deleted
]
changed_lst = [
p for change, p in changes if change != watchfiles.Change.deleted
]
if self._debug:
print("Changes: ", changed_lst, deleted_lst)
if len(deleted_lst) != 0:
prefix_tuple = tuple(p + os.sep for p in deleted_lst)
self._df = self._df[
~(
self._df.path.isin(deleted_lst)
| self._df.path.str.startswith(prefix_tuple)
)
]
if len(changed_lst) != 0:
entry_lst = []
for p in changed_lst:
entry = self._get_lst_entry_from_path(entry=p)
if len(entry) != 0:
entry_lst.append(entry)
if entry[3]:
entry_lst += list(
self._scandir(path=p, df=None, recursive=True)
)
df_updated = self._create_df_from_lst(entry_lst)
if len(df_updated) != 0:
self._df = self._df[~self._df.path.isin(df_updated.path)]
self._df = (
pandas.concat([self._df, df_updated])
.drop_duplicates()
.reset_index(drop=True)
)
def _get_changes_quick(self) -> tuple:
"""
Internal function to list the changes to the file system
Returns:
tuple: pandas.DataFrame with new entries, list of changed files, and list of deleted paths
"""
path_exists_bool_lst = self._df.path.apply(os.path.exists)
path_deleted_lst = self._df[~path_exists_bool_lst].path.values
df_exists = self._df[path_exists_bool_lst]
stat_lst = [os.stat(p) for p in df_exists.path.values]
st_mtime = pandas.Series([s.st_mtime for s in stat_lst], index=df_exists.index)
st_nlink = pandas.Series([s.st_nlink for s in stat_lst], index=df_exists.index)
df_modified = df_exists[
((df_exists.mtime - st_mtime).abs() > (1e-15 + 1e-10 * st_mtime.abs()))
| (df_exists.nlink != st_nlink)
]
if len(df_modified) > 0:
if sum(df_modified.is_directory.values) > 0:
dir_changed_lst = df_modified[df_modified.is_directory].path.values
else:
dir_changed_lst = []
files_changed_lst = df_modified.path.values
else:
files_changed_lst, dir_changed_lst = [], []
df_new = self._init_df_lst(
path_lst=dir_changed_lst, df=df_exists, include_root=False
)
return df_new, files_changed_lst, path_deleted_lst
def _get_lst_entry_from_path(self, entry: str) -> list:
"""
Internal function to generate file index entry from file system path
Args:
entry (str): file system path
Returns:
list: file index entry
"""
try:
stat = os.stat(entry)
isdir = os.path.isdir(entry)
if not isdir and self._filter_function is not None:
flag = self._filter_function(entry)
else:
flag = True
if flag:
return [
os.path.basename(entry),
entry,
os.path.dirname(entry),
isdir,
stat.st_mtime,
stat.st_nlink,
]
else:
return []
except FileNotFoundError:
return []
def _get_lst_entry(self, entry) -> list:
"""
Internal function to generate file index entry from scandir DirEntry
Args:
entry (DirEntry): scandir DirEntry
Returns:
list: file index entry
"""
try:
stat = entry.stat()
isdir = entry.is_dir()
if not isdir and self._filter_function is not None:
flag = self._filter_function(entry.path)
else:
flag = True
if flag:
return [
entry.name,
entry.path,
os.path.dirname(entry.path),
isdir,
stat.st_mtime,
stat.st_nlink,
]
else:
return []
except FileNotFoundError:
return []
def _repr_html_(self) -> str:
"""
Internal visualization function for iPython notebooks
Returns:
str: iPython notebook representation of the pandas.DataFrame
"""
return self._df._repr_html_()
@staticmethod
def _check_if_path_exists(path: str) -> None:
"""
Internal function to check if the given path exists
Args:
path (str): file system path
Raises:
FileNotFoundError: if the path does not exist
"""
if not os.path.exists(path):
raise FileNotFoundError(
"The path " + path + " does not exist on your filesystem."
)
@staticmethod
def _create_df_from_lst(lst: list) -> pandas.DataFrame:
"""
Internal function to generate file index as pandas from a list of entries
Args:
lst (list): list of file index entries
Returns:
pandas.DataFrame: file index
"""
lst_clean = [sub_lst for sub_lst in lst if len(sub_lst) != 0]
if len(lst_clean) != 0:
name_lst, path_lst, dirname_lst, dir_lst, mtime_lst, nlink_lst = zip(
*lst_clean
)
else:
name_lst, path_lst, dirname_lst, dir_lst, mtime_lst, nlink_lst = (
(),
(),
(),
(),
(),
(),
)
return pandas.DataFrame(
{
"basename": name_lst,
"path": path_lst,
"dirname": dirname_lst,
"is_directory": dir_lst,
"mtime": mtime_lst,
"nlink": nlink_lst,
}
)
def __len__(self) -> int:
"""
Returns the number of files in the index.
Returns:
int: The number of files in the index.
"""
return len(self._df[~self._df.is_directory])