feat: allow multiple targets

This commit is contained in:
Johann Dreo 2025-08-10 14:17:44 +02:00
commit b5ca18992d
2 changed files with 115 additions and 78 deletions

View file

@ -2,6 +2,7 @@
#encoding: utf-8
import os
import sys
import time
import glob
import shutil
@ -12,8 +13,8 @@ import subprocess
try:
from sdbus_block.notifications import FreedesktopNotifications
except Exception as e:
logging.error(e)
logging.error("Suitable `sdbus` module cannot be loaded, the --dbus action is disabled.")
print("WARNING:", e, file=sys.stderr)
print("WARNING: Suitable `sdbus` module cannot be loaded, the --dbus action is disabled.", file=sys.stderr)
HAS_DBUS = False
else:
HAS_DBUS = True
@ -39,27 +40,66 @@ class Flick:
class Flicker:
"""Build a new timestamped file name."""
def __init__(self, target, delay=10):
self.target = target
def __init__(self, targets, delay=10):
self.targets = targets
self.delay = delay
self.last_date = None
self.last_date = {}
def __iter__(self):
return self
def __call__(self):
for target in self.targets:
name,ext = os.path.splitext(target)
date_now = datetime.datetime.now()
def next(self):
name,ext = os.path.splitext(self.target)
date_now = datetime.datetime.now()
if target in self.last_date:
logging.debug(f"Target already actionned at {self.last_date[target]}")
logging.debug("Current date: %s", date_now.isoformat())
assert(self.last_date[target] <= date_now)
if self.last_date:
logging.debug("Current date: %s", date_now.isoformat())
assert(self.last_date <= date_now)
if date_now - self.last_date[target] < datetime.timedelta(seconds=self.delay):
logging.debug("Delay passed, current delta: %s < %s", date_now - self.last_date[target],datetime.timedelta(seconds=self.delay))
yield Flick(target, self.last_date[target], ext)
self.last_date[target] = date_now
else:
logging.debug("Delay not passed")
else:
self.last_date[target] = date_now
yield Flick(target, date_now, ext)
if date_now - self.last_date < datetime.timedelta(seconds=self.delay):
logging.debug("Current delta: %s < %s", date_now - self.last_date,datetime.timedelta(seconds=self.delay))
return Flick(self.target, self.last_date, ext)
return Flick(self.target, date_now, ext)
class Handler(FileSystemEventHandler):
"""Event handler, will call a sequence of operators at each event matching the target."""
def __init__(self, operators, flicker, watched_events = ["modified"] ):
self.flicker = flicker
self.ops = operators
self.watched_events = watched_events
def on_any_event(self, event):
logging.debug(f"##############################\n\t\t\tRECEIVED EVENT:")
logging.debug(f"{event}")
super(Handler,self).on_any_event(event)
for flick in self.flicker():
logging.debug(f"Created flick: {flick}")
# Watchdog cannot watch single files (FIXME bugreport?),
# so we filter it in this event handler.
if (not event.is_directory) and os.path.abspath(event.src_path) == os.path.abspath(flick.target) and event.event_type in self.watched_events:
logging.debug("Handle event")
logging.debug("New flick for %s: %s", event.src_path, flick)
for op in self.ops:
logging.debug("Calling %s", op)
op(os.path.abspath(event.src_path), flick, event)
else:
is_dir = ""
if event.is_directory:
is_dir = " is a directory"
is_not_target = ""
if os.path.abspath(event.src_path) != os.path.abspath(flick.target):
is_not_target = " is not a target"
is_not_watched = ""
if event.event_type not in self.watched_events:
is_not_watched = " is not a watched event"
logging.debug("Not handling event:" + ", ".join([i for i in [is_dir, is_not_target, is_not_watched] if i != ""]))
class Operator:
@ -81,6 +121,9 @@ class Save(Operator):
self.date_sep = date_sep
self.no_overwrite = no_overwrite
# Alternate last_date, xtracted from the file's timestamp.
self.last_date = None
# Make a glob search expression with the date template.
self.fields = {'Y':4,'m':2,'d':2,'H':2,'M':2,'S':2}
self.glob_template = self.date_template
@ -211,7 +254,7 @@ if HAS_DBUS:
return "Log()"
def __call__(self, target, flick, event, alt_ext = None):
logging.info("Event(s) seen for {}: {}".format(target,flick))
logging.info(f"File {target} was {event.event_type}")
notif = FreedesktopNotifications()
hints = notif.create_hint(
urgency = 0,
@ -226,45 +269,25 @@ if HAS_DBUS:
)
class Handler(FileSystemEventHandler):
"""Event handler, will call a sequence of operators at each event matching the target."""
def __init__(self, operators, flicker, watched_types = ["modified"] ):
self.flicker = flicker
self.ops = operators
self.watched_types = watched_types
def on_any_event(self, event):
logging.debug("Received event %s",event)
super(Handler,self).on_any_event(event)
# Watchdog cannot watch single files (FIXME bugreport?),
# so we filter it in this event handler.
if (not event.is_directory) and os.path.abspath(event.src_path) == os.path.abspath(self.flicker.target) and event.event_type in self.watched_types:
logging.debug("Handle event")
flick = self.flicker.next()
logging.debug("New flicker for %s: %s", event.src_path, flick)
for op in self.ops:
logging.debug("Calling %s", op)
op(os.path.abspath(event.src_path), flick, event)
else:
logging.debug("Not handling event: file={}, is_directory={}, watched_types={}".format(os.path.abspath(event.src_path),event.is_directory, self.watched_types))
def flicksave(target, operators=None, delay=10, watched=["modified"]):
def flicksave(globbed_targets, operators=None, delay=10, watched=["modified"]):
"""Start the watch thread."""
# Handle files specified without a directory.
root = os.path.dirname(target)
if not root:
root = '.'
target = os.path.join(root,target)
flicker = Flicker(target, delay)
targets = [os.path.abspath(p) for p in globbed_targets]
logging.debug(f"Watching {len(targets)} files.")
root = os.path.commonpath(targets)
if os.path.isfile(root):
root = os.path.dirname(root)
logging.debug(f"Watching at root directory: `{root}`.")
flicker = Flicker(targets, delay)
handler = Handler(operators, flicker, watched)
# Start the watch thread.
observer = Observer()
observer.schedule(handler, root)
logging.debug("Start watching...")
observer.start()
try:
while True:
@ -303,8 +326,8 @@ if __name__=="__main__":
""")
# Required argument.
parser.add_argument("target",
help="The file to save each time it's modified.")
parser.add_argument("targets", nargs="+",
help="The files to save each time it's modified.")
# Optional arguments.
parser.add_argument("-d","--directory", default='.',
@ -336,16 +359,16 @@ if __name__=="__main__":
existing = {
"save":
["Save a snapshot of the target file.",
["Save a snapshot of the target files.",
None],
"inkscape":
["Save a PNG snapshot of the file, using inkscape.",
["Save a PNG snapshot of the files, using inkscape.",
None],
"git":
["Commit the target file if it has been modified [the repository should be set-up].",
["Commit the target files if it has been modified [the repository should be set-up].",
None],
"log":
["Log when the target file is modified.",
["Log when the target files are touched.",
None],
}
if HAS_DBUS:
@ -355,8 +378,6 @@ if __name__=="__main__":
def help(name):
return existing[name][0]
def instance(name):
return existing[name][1]
for name in existing:
parser.add_argument("--"+name, help=help(name), action="store_true")
@ -365,6 +386,8 @@ if __name__=="__main__":
logging.basicConfig(level=log_as[asked.verbose], format='%(asctime)s -- %(message)s', datefmt=asked.template)
logging.debug("Load actions...")
# Add instances, now that we have all parameters.
available = existing;
available["save"][1] = Save(asked.directory, asked.separator, asked.template, asked.no_overwrite)
@ -387,6 +410,8 @@ if __name__=="__main__":
operators = []
requested = vars(asked)
def instance(name):
return existing[name][1]
for it in [iz for iz in requested if iz in available and requested[iz]==True]:
operators.append(instance(it))
@ -404,5 +429,6 @@ if __name__=="__main__":
logging.debug("\t%s", op)
# Start it.
flicksave(asked.target, operators, asked.delay, asked.events)
logging.debug(asked.targets)
flicksave(asked.targets, operators, asked.delay, asked.events)