From bb00eabd70449ccb38b0903d536e36a50dbce9ad Mon Sep 17 00:00:00 2001 From: nojhan Date: Sun, 12 Apr 2026 22:33:43 +0200 Subject: [PATCH] feat(lift): mastodon PoC --- pyproject.toml | 2 + src/forthlift/forthlift.py | 201 ++++++++++++++++++++++++++++++++++++- 2 files changed, 202 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d7af3be..4776791 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,9 @@ dependencies = [ "argparse>=1.4.0", "configparser>=7.2.0", "datetime>=6.0", + "mastodon-py>=2.1.4", "rich>=14.3.3", + "toml>=0.10.2", ] [project.scripts] diff --git a/src/forthlift/forthlift.py b/src/forthlift/forthlift.py index 2aeccbb..fe03c55 100755 --- a/src/forthlift/forthlift.py +++ b/src/forthlift/forthlift.py @@ -1,12 +1,19 @@ #!/usr/bin/env python +import io +import os import re import sys +import toml +import time import locale import logging import inspect +import tempfile import argparse import datetime +import mastodon +import webbrowser from configparser import ConfigParser from rich.panel import Panel @@ -217,10 +224,12 @@ class format: class lift: class Lift: - def call(self, items): + def call(self, item): + """Interface for posting a single independant item.""" raise NotImplementedError def __call__(self, items): + """Interface for posting all items.""" count = 0 for item in items: self.call(item) @@ -240,6 +249,171 @@ class lift: logger.debug("Empty item") + class mastodon(Lift): + + def __init__(self, dryrun = 'dry'): + self._scopes = ['read', 'write'] + + if dryrun == 'dry': + self.dry_run = True + elif dryrun == 'nodry': + self.dry_run = False + else: + msg = f"I do not understand what you mean by `{dryrun}`. Please either indicate `dry` or `nodry`." + logger.error(msg) + raise RuntimeError(msg) + + pyp = toml.load("pyproject.toml") + self.name = pyp["project"]["name"] + self.version = pyp["project"]["version"] + + self.config_path = f"{self.name}.toml" + if not os.path.isfile(self.config_path): + with open(self.config_path, 'w') as fd: + fd.write("") + self.config = self.load_config() + + if self.needs_init(): + self.init() + + self.masto = mastodon.Mastodon( + api_base_url = self.config["instance"], + # client_id = self.config["client_id"], + client_secret = self.config["client_secret"], + access_token = self.config["token"], + user_agent = f"{self.name}:{self.version}" + ) + + logger.debug(self.config) + + def needs_init(self): + if not self.config["instance"] \ + or not self.config["account"] \ + or not self.config["client_id"] \ + or not self.config["client_secret"] \ + or not self.config["token"]: + return True + else: + return False + + def init(self): + if not self.needs_init(): + print(f"The {__name__} operator has already been initialized") + print("Current configuration is:") + print("\tinstance:", self.config["instance"]) + print("\tuser:", self.config["account"]) + + ans = input("Do you want to re-init it? (yes/no): ").strip() + if ans[0].lower() == 'n': + return + elif ans[1].lower() != 'y': + print("I did not understand your answer, " + " I'll assume you meant `no` and stop here.") + return + + print("URL of your Mastodon instance?") + print("For example: https://social.antigene.org") + # instance = input("URL: ").strip() + instance = "https://social.antigene.org" + self.config["instance"] = instance + + self.register_app() + self.oauth() + + def save_config(self, config): + logger.debug(f"Save config in: {self.config_path}") + with open(self.config_path, 'w') as fd: + toml.dump(config, fd) + + def load_config(self): + logger.debug(f"Load config from: {self.config_path}") + config = { + "instance": None, + "account": None, + "client_id": None, + "client_secret": None, + "token": None + } + local_config = toml.load(self.config_path) + config.update(local_config) + logger.debug(config) + return config + + def register_app(self): + logger.debug(f"Register {self.name} on {self.config['instance']}") + + client_id, client_secret = mastodon.Mastodon.create_app( + self.name, + scopes = self._scopes, + api_base_url = self.config["instance"], + website = "https://nojhan.net/git/nojhan/forthlift", + user_agent = f"{self.name}:{self.version}", + ) + + self.config["client_id"] = client_id + logger.debug(f"ID: {client_id}") + self.config["client_secret"] = client_secret + logger.debug(f"Secret: {client_secret}") + self.save_config(self.config) + + def oauth(self): + logger.debug(f"OAuth to: {self.config['instance']}") + oauth = mastodon.Mastodon( + client_id = self.config["client_id"], + client_secret = self.config["client_secret"], + api_base_url = self.config["instance"], + ) + oauth_url = oauth.auth_request_url(scopes = self._scopes) + logger.debug(f"Opening web page: {oauth_url}") + logger.debug("Please log in there.") + webbrowser.open_new(oauth_url) + time.sleep(5) + print("After logging in, paste here the code you received:") + oauth_code = input("Code: ").strip() + + token = oauth.log_in( + code = oauth_code, + scopes = self._scopes, + ) + self.config["token"] = token + logger.debug(f"Token: {token}") + + account = oauth.me().acct + self.config["account"] = account + logger.debug(f"Account: {account}") + self.save_config(self.config) + + def post(self, item, prev_status = None): + if self.dry_run: + print(item) + else: + print(item) + + def __call__(self, items): + n = 0 + first_item = next(items, None) + if first_item == None: + logger.error("No item to post") + return + else: + logger.debug(f"Post {n+1}") + n += 1 + # prev_status = self.masto.status_post( + # status = first_item, + # ) + # logger.debug(prev_status) + self.post(first_item) + for item in items: + logger.debug(f"Post {n+1}") + n += 1 + # prev_status = self.masto.status_reply( + # to_status = prev_status, + # status = item, + # ) + # logger.debug(prev_status) + self.post(item) + logger.debug(f"Posted {n} items") + class Forthlifter: def __init__(self, consumer = consume.lines(), @@ -367,11 +541,13 @@ def main(): usage = "Post text read on the standard input to a website or an API." + # Dictionaries of {name: class} streamers = classes_of(stream) consumers = classes_of(consume) formaters = classes_of(format) lifters = classes_of(lift) + # Extract docstrings epilog = "" epilog += help_op(streamers) epilog += help_op(consumers) @@ -406,12 +582,35 @@ def main(): action="append", help="How to send items somewhere (several occurences possibles, order matters).") + initializables = [] + + for cls in \ + list(streamers.values()) \ + + list(consumers.values()) \ + + list(formaters.values()) \ + + list(lifters.values()): + if hasattr(cls, "needs_init"): + initializables.append(cls.__name__) + + if initializables: + parser.add_argument("-i", "--init", + choices = initializables, + help="Initialize the given operator.") asked = parser.parse_args() logging.basicConfig() logger.setLevel("DEBUG") + if asked.init: + for ops in [streamers, consumers, formaters, lifters]: + if asked.init in ops: + logger.debug(f"Initialize operator: {asked.init}") + op = ops[asked.init]() + op.init() + logger.debug(f"Done. You can use the `{asked.init}` operator.") + sys.exit(0) + logger.debug("Available operators:") logger.debug(f"├ streamers: {', '.join(streamers)}") logger.debug(f"├ consumers: {', '.join(consumers)}")