add a message queue manager

This commit is contained in:
Johann Dreo 2024-02-11 09:16:20 +01:00
commit 524ea5d9ce

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3
import collections
# import rich
from rich.console import Console
import humanize
@ -11,15 +12,15 @@ import gi.repository.GLib
from dbus.mainloop.glib import DBusGMainLoop
class N:
def __init__(self):
def __init__(self, app = "my app", summary = "my summary", body = "my (potentially longer) body"):
self.member = "Notify"
self.interface = "org.freedesktop.Notifications"
self.args_list = [
"my app",
app,
0,
"",
"my summary",
"my (potentially longer) body",
summary,
body,
["s"],
{
"urgency": 1,
@ -55,33 +56,49 @@ class Message:
self.hdate = humanize.naturaltime(self.date)
self.urgency = {0: "low", 1: "normal", 2: "critical", None: "unknown"}
def print_on(self, console = Console()):
console.print(f"[color(33)][white on color(33)]{self.hdate}:[bold black on color(33)]{self.app}[color(33) on color(254)][not bold black on color(254)]{self.summary}[color(254) on color(239)][white on color(239)]{self.body}[reset][color(239)]", end="")
class Broker:
def __init__(self):
self.deck = collections.deque()
self.console = Console()
DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
bus.add_match_string_non_blocking("eavesdrop=true, interface='org.freedesktop.Notifications', member='Notify'")
bus.add_message_filter(self.receive)
def run(self):
mainloop = gi.repository.GLib.MainLoop()
mainloop.run()
def receive(self, bus, notification):
# print(notification, flush = True)
# print("---------------------------------------------", flush = True)
# print("Member:", notification.get_member(), flush = True)
# print("Interface:", notification.get_interface(), flush = True)
if notification.get_member() == "Notify" and notification.get_interface() == 'org.freedesktop.Notifications':
msg = Message(notification)
self.deck.append(msg)
self.print()
def print(self):
console = Console()
console.print(f"[color(33)][black on color(33)]{self.hdate}:[bold white on color(33)]{self.app}[color(33) on color(254)][not bold black on color(254)]{self.summary}[color(254) on color(239)][white on color(239)]{self.body}[reset][color(239)]")
self.console.print(end = "\r")
for msg in self.deck:
msg.print_on(self.console)
def receive(bus, notification):
# print(notification, flush = True)
# print("---------------------------------------------", flush = True)
# print("Member:", notification.get_member(), flush = True)
# print("Interface:", notification.get_interface(), flush = True)
if notification.get_member() == "Notify" and notification.get_interface() == 'org.freedesktop.Notifications':
msg = Message(notification)
msg.print()
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--test":
receive(None, N())
broker = Broker()
broker.receive(None, N())
broker.receive(None, N("other-app", "this is a test", "no much of a body"))
else:
DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
bus.add_match_string_non_blocking("eavesdrop=true, interface='org.freedesktop.Notifications', member='Notify'")
bus.add_message_filter(receive)
mainloop = gi.repository.GLib.MainLoop()
mainloop.run()
broker = Broker()
broker.run()