#!/usr/bin/env python3 import os import re import sys import json import queue import pathlib import textwrap import datetime import humanize import subprocess import rich # For some reason this is not imported by the command above. from rich.console import Console from rich.columns import Columns error_codes = { "NO_DATA_FILE": 100, "CANNOT_INIT": 200, } def error(name,msg): print("ERROR:",msg) sys.exit(error_codes[name]) class Widget: def __init__(self, config, list_separator = ","): self.config = config self.list_separator = list_separator def swatch_of(self, key, val, prefix = "color."): if key: key = prefix+key value = re.sub(r"\s", "_", val) keyval = f"{key}.{value}" if key in self.config or keyval in self.config: # key and keyval in config. if key in self.config and keyval in self.config: # "on" in key and not in keyval. if "on" in self.config[key] and "on" not in self.config[keyval]: return f"{self.config[keyval]} {self.config[key]}" # "on" not in key and in keyval. elif "on" not in self.config[key] and "on" in self.config[keyval]: return f"{self.config[key]} {self.config[keyval]}" else: # "on" not in key and not in keyval or "on" in key and in keyval # Defaults to keyval having precedence if nothing is specified. swatch = self.config[keyval] korder = self.config["rule.precedence.color"].split(self.list_separator) for k in korder: # FIXME reverse korder? if k in keyval: swatch = self.config[keyval] break if k in key: swatch = self.config[key] break return swatch elif key in self.config and keyval not in self.config: return self.config[key] elif key not in self.config and keyval in self.config: return self.config[keyval] else: # key and keyval not in self.config. return "" else: # Not key. return "" def rtext(self, val, swatch, prefix = "color.", end="\n"): return rich.text.Text(val, style=self.swatch_of(swatch, val, prefix), end=end) def rdate(self, date, swatch, prefix = "color.", end="\n"): dt = datetime.datetime.strptime(date, "%Y%m%dT%H%M%SZ") hd = humanize.naturaltime(datetime.datetime.now() - dt) return self.rtext(hd, swatch, prefix, end) class Tasker(Widget): def __init__(self, config, show_only, order = None, group = None, touched = []): super().__init__(config) self.show_only = show_only self.touched = touched self.sorter = order self.grouper = group def __call__(self, task): raise NotImplementedError class Stacker(Widget): def __init__(self, config, tasker, sorter = None): super().__init__(config) self.tasker = tasker if sorter: self.sorter = sorter else: self.sorter = stack.sort.Noop() def __call__(self, tasks): raise NotImplementedError class StackSorter: def __init__(self, field, reverse = False): self.field = field self.reverse = reverse def __call__(self, tasks): raise NotImplementedError class Sectioner(Widget): def __init__(self, config, stacker, order, group): super().__init__(config) self.stacker = stacker self.sorter = order self.grouper = group def order(self, groups): if self.sorter: return self.sorter() else: return groups.keys() def group(self, tasks): if self.grouper: return self.grouper(tasks) else: return {"":tasks} def __call__(self, tasks): raise NotImplementedError class task: class Card(Tasker): def __init__(self, config, show_only, order = None, touched = [], wrap_width = 25): super().__init__(config, show_only, order, group = None, touched = touched) self.wrap_width = wrap_width self.tag_icons = [ self.config["icon.tag.before"], self.config["icon.tag.after"] ] def _make(self, task): if not self.show_only: # Show all existing fields. self.show_only = task.keys() sid = str(task["id"]) if ":" in task["description"]: short, desc = task["description"].split(":") title = self.rtext(sid, "id") + rich.text.Text(":", style="default") + self.rtext(short.strip(), "description.short") desc = rtext("\n".join(textwrap.wrap(desc.strip(), self.wrap_width)), "description.long") elif len(task["description"]) <= self.wrap_width - 8: d = task["description"].strip() title = self.rtext(sid, "id") + rich.text.Text(":", style="default") + self.rtext(d, "description.short") desc = None else: desc = task["description"] desc = self.rtext("\n".join(textwrap.wrap(desc.strip(), self.wrap_width)),"description") title = self.rtext(sid, "id") segments = [] for key in self.show_only: if key in task.keys() and key not in ["id", "description"]: val = task[key] segment = f"{key}: " if type(val) == str: if key in [ "due", "end", "entry", "modified", "scheduled", "start", "until", "wait"]: segments.append( self.rtext(segment, key) + self.rdate(val, key) ) else: segments.append( self.rtext(segment+val, key) ) elif type(val) == list: # FIXME Columns does not fit. # g = Columns([f"+{t}" for t in val], expand = False) lst = [] for t in val: lst.append( \ self.rtext(self.tag_icons[0], "tags.ends") + \ self.rtext(t, key) + \ self.rtext(self.tag_icons[1], "tags.ends") \ ) g = rich.console.Group(*lst, fit = True) # g = rich.console.Group(*[rich.text.Text(f"{self.tag_icons[0]}{t}{self.tag_icons[1]}", style=f"color.{key}") for t in val], fit = True) segments.append(g) else: segments.append(self.rtext(segment+str(val), key)) # FIXME Columns does not fit. # cols = Columns(segments) cols = rich.console.Group(*segments, fit = True) if desc: body = rich.console.Group(desc, cols, fit = True) else: body = cols return title,body def __call__(self, task): title, body = self._make(task) sid = str(task["id"]) if sid in self.touched: panel = rich.panel.Panel(body, title = title, title_align="left", expand = False, padding = (0,1), border_style = "color.touched", box = rich.box.DOUBLE_EDGE) else: panel = rich.panel.Panel(body, title = title, title_align="left", expand = False, padding = (0,1)) return panel class Sheet(Card): def __init__(self, config, show_only, order = None, touched = [], wrap_width = 25): super().__init__(config, show_only, order, touched = touched, wrap_width = wrap_width) self.title_ends = [ self.config["icon.short.before"], self.config["icon.short.after"] ] def __call__(self, task): title, body = self._make(task) t = self.rtext(self.title_ends[0], "description.short.ends") + \ title + \ self.rtext(self.title_ends[1], "description.short.ends") sid = str(task["id"]) if sid in self.touched: b = rich.panel.Panel(body, box = rich.box.SIMPLE_HEAD, style="color.touched") else: b = rich.panel.Panel(body, box = rich.box.SIMPLE_HEAD, style="color.description") sheet = rich.console.Group(t,b) return sheet class Raw(Tasker): def __init__(self, config, show_only, order = None, touched = []): super().__init__(config, show_only, order, group = None, touched = touched) def __call__(self, task): if not self.show_only: # Show all existing fields. self.show_only = task.keys() raw = {} for k in self.show_only: if k in task: raw[k] = task[k] else: raw[k] = None return raw class stack: class sort: class Noop(StackSorter): def __init__(self): super().__init__(None) def __call__(self, tasks): return tasks class Field(StackSorter): def __init__(self, field, reverse = False): super().__init__(field, reverse) def __call__(self, tasks): def field_or_not(task): if self.field in task: return task[self.field] else: return "XXX" # No field comes last. return sorted(tasks, key = field_or_not, reverse = self.reverse) class Priority(StackSorter): def __init__(self, reverse = False): super().__init__("priority", reverse) def __call__(self, tasks): def p_value(task): p_values = {"H": 0, "M": 1, "L": 2, "": 3} if self.field in task: return p_values[task[self.field]] else: return p_values[""] return sorted(tasks, key = p_value, reverse = self.reverse) class RawTable(Stacker): def __init__(self, config, tasker, sorter = None): super().__init__(config, tasker, sorter = sorter) self.tag_icons = [ self.config["icon.tag.before"], self.config["icon.tag.after"] ] def __call__(self, tasks): keys = self.tasker.show_only table = rich.table.Table(box = None, show_header = False, show_lines = True, expand = True, row_styles=["color.row.odd", "color.row.even"]) table.add_column("H") for k in keys: table.add_column(k) for task in self.sorter(tasks): taskers = self.tasker(task) if str(task["id"]) in self.tasker.touched: row = [self.rtext("▶", "r.touched")] else: row = [""] for k in keys: if k in task: val = taskers[k] ##### String keys ##### if type(val) == str: # Description is a special case. if k == "description" and ":" in val: # Split description in "short: long". vals = val.split(":") short, desc = vals[0], ":".join(vals[1:]) # FIXME groups add a newline or hide what follows, no option to avoid it. # row.append( rich.console.Group( # rich.text.Text(short+":", style="color.description.short", end="\n"), # rich.text.Text(desc, style="color.description", end="\n") # )) # FIXME style leaks on all texts: # (Note that "default" is a special color for Rich.) row.append( self.rtext(short, "description.short", end="") + \ rich.text.Text(":", style="default", end="") + \ self.rtext(desc, "description.long", end="") ) elif k in [ "due", "end", "entry", "modified", "scheduled", "start", "until", "wait"]: row.append( self.rdate(val, k) ) # Strings, but not description. else: row.append( self.rtext(val, k) ) ##### List keys. ##### elif type(val) == list: # Tags are a special case. if k == "tags": tags = rich.text.Text("") for t in val: # FIXME use Columns if/when it does not expand. tags += \ self.rtext(self.tag_icons[0], "tags.ends") + \ self.rtext(t, k) + \ self.rtext(self.tag_icons[1], "tags.ends") + \ " " row.append( tags ) # List, but not tags. else: row.append( self.rtext(" ".join(val), k) ) ##### Other type of keys. ##### else: row.append( self.rtext(str(val), k) ) else: row.append("") table.add_row(*[t for t in row]) return table class Vertical(Stacker): def __init__(self, config, tasker, sorter = None): super().__init__(config, tasker, sorter = sorter) def __call__(self, tasks): stack = rich.table.Table(box = None, show_header = False, show_lines = False, expand = True) stack.add_column("Tasks") for task in self.sorter(tasks): stack.add_row( self.tasker(task) ) return stack class Flat(Stacker): def __init__(self, config, tasker, sorter = None): super().__init__(config, tasker, sorter = sorter) def __call__(self, tasks): stack = [] for task in self.sorter(tasks): stack.append( self.tasker(task) ) cols = rich.columns.Columns(stack) return cols class sections: class Vertical(Sectioner): def __init__(self, config, stacker, order, group): super().__init__(config, stacker, order, group) def __call__(self, tasks): sections = [] groups = self.group(tasks) for key in self.order(groups): if key in groups: if self.grouper.field: swatch = f"{self.grouper.field}.{key}" else: swatch = key val = str(key).upper() sections.append( rich.panel.Panel(self.stacker(groups[key]), title = self.rtext(val, swatch), title_align = "left", expand = True, border_style = self.swatch_of(swatch, val))) return rich.console.Group(*sections) class Horizontal(Sectioner): def __init__(self, config, stacker, order, group): super().__init__(config, stacker, order, group) def __call__(self, tasks): sections = [] groups = self.group(tasks) table = rich.table.Table(box = None, show_header = False, show_lines = False) keys = [] for key in self.order(groups): if key in groups: table.add_column(key) keys.append(key) row = [] for k in keys: row.append( rich.panel.Panel(self.stacker(groups[k]), title = self.rtext(k.upper(), k), title_align = "left", expand = True, border_style="color.title")) table.add_row(*row) return table class SectionSorter: def __call__(self): raise NotImplementedError class Grouper: """Group tasks by field values.""" def __init__(self, field): self.field = field def __call__(self, tasks): groups = {} for task in tasks: if self.field in task: if task[self.field] in groups: groups[ task[self.field] ].append(task) else: groups[ task[self.field] ] = [task] else: if "" in groups: groups[ "" ].append( task ) else: groups[ "" ] = [task] return groups class group: class sort: class OnValues(SectionSorter): def __init__(self, values): if values: self.values = values else: print("WARNING: no values.") self.values = [] def __call__(self): return self.values class Field(Grouper): pass class Status(Grouper): def __init__(self): super().__init__("status") def __call__(self, tasks): groups = {} for task in tasks: if "start" in task: if "started" in groups: groups["started"].append(task) else: groups["started"] = [task] elif self.field in task: if task[self.field] in groups: groups[ task[self.field] ].append(task) else: groups[ task[self.field] ] = [task] return groups def call_taskwarrior(args:list[str] = ["export"], taskfile = ".task") -> str: # Local file. env = os.environ.copy() env["TASKDATA"] = taskfile cmd = ["task"] + args # print(cmd) try: p = subprocess.Popen( " ".join(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, env=env, ) out, err = p.communicate() except subprocess.CalledProcessError as exc: print("ERROR:", exc.returncode, exc.output, err) sys.exit(exc.returncode) else: return out.decode("utf-8") def get_data(taskfile, filter = None): if not filter: filter = [] out = call_taskwarrior(filter+["export"], taskfile) try: jdata = json.loads(out) except json.decoder.JSONDecodeError as exc: print("ERROR:", exc.returncode, exc.output) else: return jdata def parse_touched(out): return re.findall("(?:Modifying|Created|Starting|Stopping)+ task ([0-9]+)", out) def get_swatch(config): swatch = { "color.touched" : "", "color.id" : "", "color.title" : "", "color.description" : "", "color.description.short" : "", "color.description.short.ends" : "", "color.description.long" : "", "color.entry" : "", "color.end" : "", "color.modified" : "", "color.started" : "", "color.status" : "", "color.uuid" : "", "color.tags" : "", "color.tags.ends" : "", "color.urgency" : "", "color.row.odd" : "", "color.row.even" : "", "color.priority" : "", } for k in config: if k in swatch: swatch[k] = config[k] return swatch def get_layouts(kind = None, name = None): # FIXME use introspection to extract that automatically. available = { "task": { "Raw": task.Raw, "Card": task.Card, "Sheet": task.Sheet, }, "stack": { "RawTable": stack.RawTable, "Vertical": stack.Vertical, "Flat": stack.Flat, }, "sections": { "Vertical": sections.Vertical, "Horizontal": sections.Horizontal, }, } if kind and name: return available[kind][name] elif kind: return available[kind] elif not kind and not name: return available else: raise KeyError("cannot get layouts with `name` only") def tw_to_rich(color): # color123 -> color(123) color = re.sub(r"color([0-9]{0,3})", r"color(\1)", color) # rgb123abc -> #123abc (not TW but allowed in TWD) color = re.sub(r"rgb([\da-f]{6})", r"#\1", color) # rgb123 -> #336699 # Mapping from TW's own 256 colors to true 16M color space. # TW use a triplet of integers in [0-5] to give a more user-friendly # way to pick an ANSI color in the 256-colors space. # Rich allows true color, so we can just map each RGB component # of TW to its hex equivalent in the classical 16M-colors space. for col5 in re.finditer(r"rgb[0-5]{3}", color): col256 = "#" for c5 in re.finditerl(r"[0-5]", col5): i5 = int(c5) i256 = round(i5/5*256) c256 = hex(i256).replace("0x","") col256 += c256 color.replace(col5, col256) return color # We cannot use tomllib because strings are not quoted. # We cannot use configparser because there is no section and because of the "include" command. def parse_config(filename, current): config = current with open(filename, "r") as fd: for i,line in enumerate(fd.readlines()): if line.strip() and line.strip()[0] != "#": # Starting comment. if "=" in line: key,value = line.split("=") if "#" in value: # Ending comments. value = value.split("#")[0] if "color" in key: # Colors conversion. value = tw_to_rich(value.strip()) if ".uda." in key: key = re.sub(r"color\.uda\.", r"color.", key) config[key.strip()] = value.strip() elif "include" in line: _,path = line.split() # Recursively add/replace with the included config. config.update( parse_config(os.path.expanduser(path.strip()), config) ) else: print(f"Cannot parse line {i} of config file `{filename}`, I'll ignore it.") return config def as_bool(s): if s.lower() in ["true", "yes", "y", "1"]: return True elif s.lower() in ["false", "no", "nope", "n", "0"]: return False else: raise ValueError(f"Cannot interpret `{s}` as a boolean.") def upsearch(filename, at = pathlib.Path.cwd()): current = at root = pathlib.Path(current.root) while current != root: found = current / filename if found.exists(): return found current = current.parent return None def find_config(fname, current): config = current # First, system. p = pathlib.Path("/etc/taskwarrior") / pathlib.Path(fname) if p.exists(): config = parse_config(p, config) # Second, user. p = pathlib.Path(os.path.expanduser("~")) / pathlib.Path(fname) if p.exists(): config = parse_config(p, config) # Third, upper dirs. # LIFO queue allows to fill from current dir, # then read from upper dir. updirs = queue.LifoQueue() here = pathlib.Path.cwd() f = upsearch(fname, here) while f: updirs.put(f) f = upsearch(fname, f.parent.parent) while not updirs.empty(): f = updirs.get() config = parse_config(f, config) return config def find_tasks(fname, current, config): tfile = upsearch(fname, current) if tfile: return tfile elif "data.location" in config: return config["data.location"] else: return None def parse_filter(cmd): tw_commands = [ "active", "all", "annotate", "append", "blocked", "blocking", "burndown", "burndown", "burndown", "completed", "count", "delete", "denotate", "done", "duplicate", "edit", "export", "ghistory", "ghistory", "ghistory", "ghistory", "history", "history", "history", "history", "ids", "information", "list", "long", "ls", "minimal", "modify", "newest", "next", "oldest", "overdue", "prepend", "projects", "purge", "ready", "recurring", "start", "stats", "stop", "summary", "tags", "timesheet", "unblocked", "uuids", "waiting", ] for i,w in enumerate(cmd): if w in tw_commands: filter = cmd[:i] return filter return None if __name__ == "__main__": default_conf = { # taskwarrior "report.list.columns": "id,priority,description,tags", "rule.precedence.color": "", # taskwarrior-deluxe "layout.task": "Raw", "layout.stack": "RawTable", "layout.stack.sort": "urgency", "layout.stack.sort.reverse": "false", # urgency and priority are numeric. "layout.subsections": "", "layout.subsections.group": "", "layout.subsections.group.show": "", "layout.sections": "Horizontal", "layout.sections.group": "status", "layout.sections.group.show": "", "widget.card.wrap": "25", "list.filtered": "false", } # First, taskwarrior"s config... config = find_config(".taskrc", default_conf) # ... overwritten by TWD config. config = find_config(".twdrc", config) # for k in config: # print(k,"=",config[k]) taskfile = find_tasks(".task", pathlib.Path.cwd(), config) if not taskfile: error("NO_DATA_FILE", "Cannot find a data file here, in a parent directory, or configured.") cmd = sys.argv[1:] if len(cmd) == 1 and cmd[0] == "init": try: os.mkdir(".task") except Exception as err: error("CANNOT_INIT", f"Cannot init task database here: {err}") else: print("Empty taskwarrior database initialized in", pathlib.Path.cwd()) sys.exit(0) # First pass arguments to taskwarrior and let it do its magic. out = call_taskwarrior(cmd, taskfile) touched = [] if "Description" not in out: print(out.strip()) touched = parse_touched(out) # Then call again to get the resulting data. if as_bool(config["list.filtered"]): filter = parse_filter(cmd) jdata = get_data(taskfile, filter) else: jdata = get_data(taskfile, filter = None) # If no explicit touch from an editing command, # then just point out tasks matching the filter. if not touched: filter = parse_filter(cmd) filtered = get_data(taskfile, filter) if len(filtered) != len(jdata): touched = [str(t["id"]) for t in filtered] # print(json.dumps(jdata, indent=4)) list_separator = "," showed = config["report.list.columns"].split(list_separator) if not showed: show_only = None else: show_only = showed field = config["layout.sections.group"] if field and field in show_only: # Remove the grouped field from the showed ones, # as it will be displayed by panels anyway. show_only.pop(show_only.index(field)) field = config["layout.subsections.group"] if field and field in show_only: show_only.pop(show_only.index(field)) swatch = rich.theme.Theme(get_swatch(config)) layouts = get_layouts() ##### Tasks ##### if config["layout.task"] == "Card": tasker = layouts["task"]["Card"](config, show_only, touched = touched, wrap_width = int(config["widget.card.wrap"])) elif config["layout.task"] == "Sheet": tasker = layouts["task"]["Sheet"](config, show_only, touched = touched, wrap_width = int(config["widget.card.wrap"])) else: tasker = layouts["task"][config["layout.task"]](config, show_only, touched = touched) ##### Stack ##### if config["layout.stack.sort"]: if config["layout.stack.sort"] == "priority": sorter = stack.sort.Priority(as_bool(config["layout.stack.sort.reverse"])) elif config["layout.stack.sort"] == "urgency": # Natural sort is from high to low values. sorter = stack.sort.Field(config["layout.stack.sort"], not as_bool(config["layout.stack.sort.reverse"])) else: sorter = stack.sort.Field(config["layout.stack.sort"], reverse = as_bool(config["layout.stack.sort.reverse"])) else: sorter = None if config["layout.stack"] == "RawTable": stacker = layouts["stack"]["RawTable"](config, tasker, sorter = sorter) else: stacker = layouts["stack"][config["layout.stack"]](config, tasker, sorter = sorter) ##### Sections ##### if config["layout.sections.group"]: values = config["layout.sections.group.show"].split(list_separator) if values == [""]: values = [] if config["layout.sections.group"].lower() == "status": group_by = group.Status() if not values: values = ["pending","started","completed"] g_sort_on = group.sort.OnValues( values ) elif config["layout.sections.group"].lower() == "priority": group_by = group.Field("priority") if not values: values = ["H","M","L",""] g_sort_on = group.sort.OnValues( values ) else: group_by = group.Field(config["layout.sections.group"]) g_sort_on = None else: group_by = group.Status() g_sort_on = group.sort.OnValues(["pending","started","completed"]) ##### Subsections ##### if config["layout.subsections.group"]: values = config["layout.subsections.group.show"].split(list_separator) if values == [""]: values = [] if config["layout.subsections.group"].lower() == "status": subgroup_by = group.Status() if not values: values = ["pending","started","completed"] g_sort_on = group.sort.OnValues( values ) if config["layout.subsections.group"].lower() == "priority": subgroup_by = group.Field("priority") if not values: values = ["H","M","L",""] g_subsort_on = group.sort.OnValues( values ) else: subgroup_by = group.Field(config["layout.subsections.group"]) g_subsort_on = None else: subgroup_by = None g_subsort_on = None if config["layout.subsections"] and config["layout.subsections.group"]: subsectioner = layouts["sections"][config["layout.subsections"]](config, stacker, g_subsort_on, subgroup_by) sectioner = layouts["sections"][config["layout.sections"]](config, subsectioner, g_sort_on, group_by) else: sectioner = layouts["sections"][config["layout.sections"]](config, stacker, g_sort_on, group_by) console = Console(theme = swatch) # console.rule("taskwarrior-deluxe") console.print(sectioner(jdata))