feat(lift): mastodon PoC

This commit is contained in:
Johann Dreo 2026-04-12 22:33:43 +02:00
commit bb00eabd70
2 changed files with 202 additions and 1 deletions

View file

@ -8,7 +8,9 @@ dependencies = [
"argparse>=1.4.0", "argparse>=1.4.0",
"configparser>=7.2.0", "configparser>=7.2.0",
"datetime>=6.0", "datetime>=6.0",
"mastodon-py>=2.1.4",
"rich>=14.3.3", "rich>=14.3.3",
"toml>=0.10.2",
] ]
[project.scripts] [project.scripts]

View file

@ -1,12 +1,19 @@
#!/usr/bin/env python #!/usr/bin/env python
import io
import os
import re import re
import sys import sys
import toml
import time
import locale import locale
import logging import logging
import inspect import inspect
import tempfile
import argparse import argparse
import datetime import datetime
import mastodon
import webbrowser
from configparser import ConfigParser from configparser import ConfigParser
from rich.panel import Panel from rich.panel import Panel
@ -217,10 +224,12 @@ class format:
class lift: class lift:
class Lift: class Lift:
def call(self, items): def call(self, item):
"""Interface for posting a single independant item."""
raise NotImplementedError raise NotImplementedError
def __call__(self, items): def __call__(self, items):
"""Interface for posting all items."""
count = 0 count = 0
for item in items: for item in items:
self.call(item) self.call(item)
@ -240,6 +249,171 @@ class lift:
logger.debug("Empty item") logger.debug("Empty item")
class mastodon(Lift):
def __init__(self, dryrun = 'dry'):
self._scopes = ['read', 'write']
if dryrun == 'dry':
self.dry_run = True
elif dryrun == 'nodry':
self.dry_run = False
else:
msg = f"I do not understand what you mean by `{dryrun}`. Please either indicate `dry` or `nodry`."
logger.error(msg)
raise RuntimeError(msg)
pyp = toml.load("pyproject.toml")
self.name = pyp["project"]["name"]
self.version = pyp["project"]["version"]
self.config_path = f"{self.name}.toml"
if not os.path.isfile(self.config_path):
with open(self.config_path, 'w') as fd:
fd.write("")
self.config = self.load_config()
if self.needs_init():
self.init()
self.masto = mastodon.Mastodon(
api_base_url = self.config["instance"],
# client_id = self.config["client_id"],
client_secret = self.config["client_secret"],
access_token = self.config["token"],
user_agent = f"{self.name}:{self.version}"
)
logger.debug(self.config)
def needs_init(self):
if not self.config["instance"] \
or not self.config["account"] \
or not self.config["client_id"] \
or not self.config["client_secret"] \
or not self.config["token"]:
return True
else:
return False
def init(self):
if not self.needs_init():
print(f"The {__name__} operator has already been initialized")
print("Current configuration is:")
print("\tinstance:", self.config["instance"])
print("\tuser:", self.config["account"])
ans = input("Do you want to re-init it? (yes/no): ").strip()
if ans[0].lower() == 'n':
return
elif ans[1].lower() != 'y':
print("I did not understand your answer, "
" I'll assume you meant `no` and stop here.")
return
print("URL of your Mastodon instance?")
print("For example: https://social.antigene.org")
# instance = input("URL: ").strip()
instance = "https://social.antigene.org"
self.config["instance"] = instance
self.register_app()
self.oauth()
def save_config(self, config):
logger.debug(f"Save config in: {self.config_path}")
with open(self.config_path, 'w') as fd:
toml.dump(config, fd)
def load_config(self):
logger.debug(f"Load config from: {self.config_path}")
config = {
"instance": None,
"account": None,
"client_id": None,
"client_secret": None,
"token": None
}
local_config = toml.load(self.config_path)
config.update(local_config)
logger.debug(config)
return config
def register_app(self):
logger.debug(f"Register {self.name} on {self.config['instance']}")
client_id, client_secret = mastodon.Mastodon.create_app(
self.name,
scopes = self._scopes,
api_base_url = self.config["instance"],
website = "https://nojhan.net/git/nojhan/forthlift",
user_agent = f"{self.name}:{self.version}",
)
self.config["client_id"] = client_id
logger.debug(f"ID: {client_id}")
self.config["client_secret"] = client_secret
logger.debug(f"Secret: {client_secret}")
self.save_config(self.config)
def oauth(self):
logger.debug(f"OAuth to: {self.config['instance']}")
oauth = mastodon.Mastodon(
client_id = self.config["client_id"],
client_secret = self.config["client_secret"],
api_base_url = self.config["instance"],
)
oauth_url = oauth.auth_request_url(scopes = self._scopes)
logger.debug(f"Opening web page: {oauth_url}")
logger.debug("Please log in there.")
webbrowser.open_new(oauth_url)
time.sleep(5)
print("After logging in, paste here the code you received:")
oauth_code = input("Code: ").strip()
token = oauth.log_in(
code = oauth_code,
scopes = self._scopes,
)
self.config["token"] = token
logger.debug(f"Token: {token}")
account = oauth.me().acct
self.config["account"] = account
logger.debug(f"Account: {account}")
self.save_config(self.config)
def post(self, item, prev_status = None):
if self.dry_run:
print(item)
else:
print(item)
def __call__(self, items):
n = 0
first_item = next(items, None)
if first_item == None:
logger.error("No item to post")
return
else:
logger.debug(f"Post {n+1}")
n += 1
# prev_status = self.masto.status_post(
# status = first_item,
# )
# logger.debug(prev_status)
self.post(first_item)
for item in items:
logger.debug(f"Post {n+1}")
n += 1
# prev_status = self.masto.status_reply(
# to_status = prev_status,
# status = item,
# )
# logger.debug(prev_status)
self.post(item)
logger.debug(f"Posted {n} items")
class Forthlifter: class Forthlifter:
def __init__(self, def __init__(self,
consumer = consume.lines(), consumer = consume.lines(),
@ -367,11 +541,13 @@ def main():
usage = "Post text read on the standard input to a website or an API." usage = "Post text read on the standard input to a website or an API."
# Dictionaries of {name: class}
streamers = classes_of(stream) streamers = classes_of(stream)
consumers = classes_of(consume) consumers = classes_of(consume)
formaters = classes_of(format) formaters = classes_of(format)
lifters = classes_of(lift) lifters = classes_of(lift)
# Extract docstrings
epilog = "" epilog = ""
epilog += help_op(streamers) epilog += help_op(streamers)
epilog += help_op(consumers) epilog += help_op(consumers)
@ -406,12 +582,35 @@ def main():
action="append", action="append",
help="How to send items somewhere (several occurences possibles, order matters).") help="How to send items somewhere (several occurences possibles, order matters).")
initializables = []
for cls in \
list(streamers.values()) \
+ list(consumers.values()) \
+ list(formaters.values()) \
+ list(lifters.values()):
if hasattr(cls, "needs_init"):
initializables.append(cls.__name__)
if initializables:
parser.add_argument("-i", "--init",
choices = initializables,
help="Initialize the given operator.")
asked = parser.parse_args() asked = parser.parse_args()
logging.basicConfig() logging.basicConfig()
logger.setLevel("DEBUG") logger.setLevel("DEBUG")
if asked.init:
for ops in [streamers, consumers, formaters, lifters]:
if asked.init in ops:
logger.debug(f"Initialize operator: {asked.init}")
op = ops[asked.init]()
op.init()
logger.debug(f"Done. You can use the `{asked.init}` operator.")
sys.exit(0)
logger.debug("Available operators:") logger.debug("Available operators:")
logger.debug(f"├ streamers: {', '.join(streamers)}") logger.debug(f"├ streamers: {', '.join(streamers)}")
logger.debug(f"├ consumers: {', '.join(consumers)}") logger.debug(f"├ consumers: {', '.join(consumers)}")