Compare commits

...

13 commits

Author SHA1 Message Date
c764b7b0ba feat(mastodon): firs working version 2026-04-13 08:04:39 +02:00
bb00eabd70 feat(lift): mastodon PoC 2026-04-12 22:33:43 +02:00
b50f5fa79c update README 2026-04-11 17:11:01 +02:00
1418055706 refactor: make all operators generators
Except `format.counter`, of course.
2026-04-06 16:10:05 +02:00
8179e121f5 feat(stream): adds filename 2026-04-06 16:09:49 +02:00
e0adccda32 feat(doc): autoextract operators' help 2026-04-06 14:13:38 +02:00
77bd035f72 feat: adds consume.nlines 2026-04-05 19:10:36 +02:00
70661d39b8 feat: adds consume.sections 2026-04-05 19:00:18 +02:00
d35d0df021 feat: more operators 2026-04-05 17:02:26 +02:00
20d0424930 feat: operator arguments 2026-04-05 12:04:58 +02:00
b8451b3171 basic features 2026-04-05 11:38:17 +02:00
844209e7b6 modular reimplementation from scratch 2026-04-04 21:29:44 +02:00
cee043b6d2 refactor the project up to modern standards 2026-04-04 14:02:06 +02:00
4 changed files with 751 additions and 401 deletions

166
README.md
View file

@ -1,102 +1,102 @@
forthlift — post sequences of texts on social media
===================================================
Forthlift is a command line application to post sequences of text lines on social media.
Forthlift's primary use case is a command line application to post sequences of
text items on social media. You first prepare your thread in your text editor,
then call forthlift, and everything is posted at once.
Its main use case is to post on Twitter several status that you have prepared first in your text editor.
As side effect, fothlift became a more generic tool, that can process text
streams, format them, assemble their atomic units in anothers, and then send
them elsewhere. It is thus designed to ease automation and integration with
existing tools, and has an Unix-like text-and-pipes philosophy.
It is thus designed to ease automation and integration in existing tools
and had an Unix-like text-and-pipes command line interface.
## Examples
For example, it makes it easy to post "chained" twitter status (that answer to each others)
from your text editor. Just send the selected text to its standard input, selecting the
twitter API, and voilà. Using Vim, you can tweet the visually-selected lines with:
`:'<,'>w !forthlift -a twitter`
### Select in Vim, post on Mastodon
Forthlift makes it easy to post "chained" Mastodon toots
(that answer to each others) from your text editor.
Just send the selected text to its standard input, selecting the
twitter API, and voilà.
Using Vim, you can toot the visually-selected lines (each line making a toot)
with:
```vim
:'<,'>w !forthlift --lift mastodon
```
### Prepare toots in a file, post on Mastodon
Let's say that you prepared a text file of the form:
```
This is a multi-lines...
... toot!
--
And here is an answeer to the previous toot.
(With two lines as well.)
--
And a final toot.
```
You would want the toots to show a counter, managing the expectations of your
reader.
You can post it right away with:
```sh
cat my_file.txt | forthlift --consume "sections:--,skip" --format counter -l mastodon
```
### Manage long threads from Markdown
Let's say you want to post the sections of this README as a sequence of toots,
with a hashtag that indicates it's gona be long.
And you would want to double-check what it would do first. You can send the
items on the standard output, like:
```sh
forthlift -s filename:README.md -c sections -f suffix:#longThread -f panel -l stdout
```
## SYNOPSIS
### Full-featured file-to-Mastodon pipeline
`forthlift` [-h]
`forthlift` [-a {stdout,twitter}] [-m MAXLEN] [-i|-t] [-c] [-q] [-d] [-s]
```sh
# Define a shortcut
mastopost() { forthlift -s "filename:$1" -c sections:^#,skip -f skip -f counter -f suffix:#longThread -l mastodon; }
# Call it with:
mastopost my_file.txt
```
## DESCRIPTION
### A generic tool
Generally speaking, it's a Unix-like command that operate a sequence of pre-programmed
chained actions on its text input.
Generally speaking, Forthlift is a Unix-like command that operate a sequence of
pre-programmed chained actions on its text input.
It comes with some existing actions:
* `stdout`: print the input text on the standard output,
* `twitter`: send the input text as status on twitter.
Forthlift has four action operators classes, one for each step of the processing:
1. `stream`, indicating from where to get the input data.
2. `consume`, telling *how* to parse the data from the input stream.
3. `format`, for manipulating the content itself.
4. `lift`, defining where to send the final items.
Only one `consume` operator can be passed, but any number of `stream`, `format`,
and `lift` operators can be combined.
To apply several operators of the same class, the user must pass the same flag
several time. For instance:
```sh
# Applies strip, then skip, then counter on the consumed lines.
forthlift --format strip --format skip --format counter
```
Most of the time, the order of the operators matters.
### Features
The main feature of forthlift is its ability to *chain* actions.
Depending on the chosen API, this could means different things:
* for twitter, this mean that the sequence of status will be posted
as a sequence of *answers* and not as a list of independent tweets.
* for the "stdout" API, this means that each printed line will start
with its index in the input list.
While it is recommended to prepare the input text with other text-processing tools
(fold, fmt, tr, sed, grep, your text editor, etc.),
forthlift comes with some rough text-processing capabilities, among which:
* ignore or trim lines that are longer than a given size (see `--trim` and `--ignore`),
* add a counter of the form `<current index>/<total lines>` at the end of the lines (see `--counter`).
## OPTIONS
* -h, --help: show this help message and exit
* -a {stdout,twitter}, --api {stdout,twitter}: Name of the API to use.
(default: stdout)
* -m MAXLEN, --max-len MAXLEN: Maximum number of characters in the lines.
(default: 140)
* -i, --ignore: Ignore lines that are longer than MAXLEN (default: False)
* -t, --trim: Trim down lines that are longer than MAXLEN. (default: False)
* -c, --counter: Add a counter of the form " x/N" at the end of the lines,
with N being the number of lines read and x the current index of the line.
NOTE: this necessitate to read all the input before processing it.
(default: False)
* -q, --quiet: Do not print errors and warnings on the standard error output.
(default: False)
* -s, --setup: Setup the selected API (e.g. authenticate and get authorization to post).
(default: False)
* -d, --independent: Post each item independently from the previous one
The behaviour of this option depends on the selected API.
For example on Twitter: do not post a line as an answer to the previous one but as a new tweet.
(default: False)
* --twitter-images FILENAME(S) [FILENAME(S) ...]:
Upload each given image files along with the corresponding tweets in the sequence.
If there are more images than tweets, they are silently ignored.
(default: None)
## INSTALLATION
### Twitter
1) Copy `twitter.conf-dist` as `twitter.conf` and indicate your developer's API keys in the corresponding fields.
2) Run `forthlift --api twitter --setup` and follow the instructions (go to the given URL, then paste the given PIN).
Why should you get developer's API keys? Because Twitter does not like open-source desktop applications, see:
http://arstechnica.com/security/2010/09/twitter-a-case-study-on-how-to-do-oauth-wrong/
## SYNOPSIS
{{FORTHLIFT_HELP}}

View file

@ -1,331 +0,0 @@
#!/usr/bin/env python
import sys
import locale
import argparse
import datetime
import ConfigParser
import tweepy
class AppKeyError(Exception):
def __init__(self, msg):
self.msg = msg
def write(data, stream = sys.stdout):
"""
Write "data" on the given stream, then flush, silently handling broken pipes.
"""
try:
stream.write(data.encode(locale.getpreferredencoding(True) or "utf-8"))
stream.flush()
# Silently handle broken pipes
except IOError:
try:
stream.close()
except IOError:
pass
def readline( stream_in ):
while True:
try:
line = stream_in.readline().decode(stream_in.encoding or locale.getpreferredencoding(True)).strip()
except UnicodeDecodeError:
continue
except KeyboardInterrupt:
break
if not line:
break
yield line
raise StopIteration
def setup_counter( data, asked ):
assert(asked.counter)
# unroll everything
data = list(data)
nb = len(data)
# " int/int"
counter_size = len(str(nb).encode("utf-8").decode("utf-8") )*2 + 1 + 1
for i,line in enumerate(data):
counter = u" %i/%i" % (i+1,nb)
curmax = asked.max_len - len(counter)
if len(line) > curmax:
if asked.ignore:
data[i] = line
elif asked.trim:
data[i] = line[:curmax]
data[i] = data[i] + counter
return data
def setup_hem(data, asked):
for line in data:
if len(line) > asked.max_len:
if asked.ignore:
pass
elif asked.trim:
line = line[:asked.max_len]
yield line
def setup( data, asked ):
if asked.counter:
f = setup_counter
else:
f = setup_hem
for line in f(data, asked):
yield line
def operate( func, *args ):
for line in func( readline(sys.stdin), *args ):
write(line)
#
# STDOUT API
#
def on_stdout( data, asked, endline="\n" ):
lines = setup(data, asked)
# You can do something on the whole set of lines if you want.
for i,line in enumerate(lines):
if endline:
if line[-1] != endline:
line += endline
if asked.independent:
yield line
else:
l = u"%i %s" % (i,line)
yield l
#
# TWITTER API
#
def on_twitter( data, api, asked, endline="\n" ):
lines = setup(data, asked)
prev_status_id = None
images = asked.twitter_images
if images:
images.reverse()
for line in lines:
if images:
img = images.pop()
else:
img = None
if img:
# API.update_with_media(filename[, status][, in_reply_to_status_id][, lat][, long][, source][, place_id][, file])
status = api.update_with_media(img, line, prev_status_id)
else:
# API.update_status(status[, in_reply_to_status_id][, lat][, long][, source][, place_id])
status = api.update_status(line, prev_status_id)
if asked.independent:
prev_status_id = None
else:
prev_status_id = status.id
yield status.text + endline
def setup_twitter(configfile="twitter.conf"):
config = ConfigParser.RawConfigParser()
# Authenticate the application.
config.read(configfile)
if not config.has_section("App"):
raise AppKeyError("ERROR: did not found application keys, ask your distribution maintainer or get keys from Twitter first.")
app_key = config.get("App","app_key")
app_secret = config.get("App","app_key_secret")
auth = tweepy.OAuthHandler(app_key, app_secret)
# Authenticate the user.
auth_url = auth.get_authorization_url()
print "Copy and paste this URL in your browser while your are logged in the twitter account where you want to post."
print "Authorization URL: " + auth_url
print "Then paste the Personal Identification Number given by Twitter:"
verifier = raw_input('PIN: ').strip()
token = auth.get_access_token(verifier)
# Authenticate and get the user name.
token_key, token_secret = token
auth.set_access_token(token_key, token_secret)
api = tweepy.API(auth)
username = api.me().name
print "Authentication successful, ready to post to account: " + username
# Save authentication tokens.
if not config.has_section("Info"):
config.add_section('Info')
config.set('Info','account', username)
config.set('Info','auth_date', datetime.datetime.utcnow().isoformat())
if not config.has_section("Auth"):
config.add_section('Auth')
config.set('Auth', 'local_token', token_key)
config.set('Auth', 'local_token_secret', token_secret)
# Writing our configuration file to 'example.cfg'
with open(configfile, 'wb') as fd:
config.write(fd)
#
# CLI
#
if __name__=="__main__":
errors = {"NO_ERROR":0, "UNKNOWN_ERROR":1, "NO_SETUP_NEEDED":2, "NO_APP_KEY":10}
usage = "Post text read on the standard input to a website or an API."
apis =["stdout", "twitter"] # TODO "http_post", "http_get",
parser = argparse.ArgumentParser( description=usage,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-a", "--api", choices=apis, default="stdout",
help="Name of the API to use.")
# Generic options
parser.add_argument("-m", "--max-len", metavar="MAXLEN", type=int, default=140,
help="Maximum number of characters in the lines.")
parser.add_argument("-i", "--ignore", action="store_true",
help="Ignore lines that are longer than MAXLEN")
parser.add_argument("-t", "--trim", action="store_true",
help="Trim down lines that are longer than MAXLEN.")
parser.add_argument("-c", "--counter", action="store_true",
help="Add a counter of the form \" x/N\" at the end of the lines, \
with N being the number of lines read and x the current index of the line. \
NOTE: this necessitate to read all the input before processing it.")
parser.add_argument("-q", "--quiet", action="store_true",
help="Do not print errors and warnings on the standard error output.")
# TODO option: rate at which to post lines
# API-dependent options
parser.add_argument("-s", "--setup", action="store_true",
help="Setup the selected API (e.g. authenticate and get authorization to post).")
parser.add_argument("-d", "--independent", action="store_true",
help="Post each item independently from the previous one. \
The behaviour of this option depends on the selected API. \
For example on Twitter: do not post a line as an answer to the previous one but as a new tweet.")
# Twitter
parser.add_argument("--twitter-images", metavar="FILENAME(S)", nargs="+", type=str,
help="Upload each given image files along with the corresponding tweets in the sequence. If there are more images than tweets, they are silently ignored.")
asked = parser.parse_args()
# Setup
if asked.setup:
configfile = asked.api+".conf"
if asked.api == "twitter":
try:
setup_twitter(configfile)
except AppKeyError as e:
if not asked.quiet:
sys.stderr.write(e.msg)
sys.exit(errors["NO_APP_KEY"])
except:
print "Unexpected error:", sys.exc_info()[0]
raise
else:
sys.exit(errors["NO_ERROR"])
else: # other API
if not asked.quiet:
sys.stderr.write("This API does not need setup.")
sys.exit(errors["NO_SETUP_NEEDED"])
# Consistency checks
if asked.ignore and asked.trim:
if not asked.quiet:
sys.stderr.write("WARNING: asking to trim AND to ignore is not logical, I will ignore.")
assert( not (asked.ignore and asked.trim) )
if asked.twitter_images:
if not asked.api == "twitter":
if not asked.quiet:
sys.stderr.write("WARNING: asking to upload images on twitter while not using the twitter API is not logical, I will ignore.")
assert( not (asked.twitter_images and not asked.api=="twitter") )
else: # Test readable images
cannot=[]
for img in asked.twitter_images:
try:
with open(img) as f:
f.read()
except OSError:
cannot.append(img)
if cannot:
print("Cannot open the following image files, I will not continue: ")
for img in cannot:
print(img)
sys.exit(5) # I/O Error
# APIs
if asked.api == "stdout":
operate( on_stdout, asked )
elif asked.api == "twitter":
# Authenticate
config = ConfigParser.RawConfigParser()
config.read('twitter.conf')
app_key = config.get("App","app_key")
app_secret = config.get("App","app_key_secret")
try:
verifier_code = config.get("Auth","code")
except:
access_token = config.get("Auth","local_token")
access_token_secret = config.get("Auth","local_token_secret")
auth = tweepy.OAuthHandler(app_key, app_secret, "https://api.twitter.com/1.1/")
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
# Post
operate( on_twitter, api, asked )

26
pyproject.toml Normal file
View file

@ -0,0 +1,26 @@
[project]
name = "forthlift"
version = "0.1.0"
description = "A command line application to post sequences of text lines on social media"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"argparse>=1.4.0",
"configparser>=7.2.0",
"datetime>=6.0",
"mastodon-py>=2.1.4",
"rich>=14.3.3",
"toml>=0.10.2",
]
[project.scripts]
forthlift = "forthlift.forthlift:main"
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
include = ["forthlift*"]

655
src/forthlift/forthlift.py Executable file
View file

@ -0,0 +1,655 @@
#!/usr/bin/env python
import io
import os
import re
import sys
import toml
import time
import locale
import logging
import inspect
import tempfile
import argparse
import datetime
import mastodon
import webbrowser
from configparser import ConfigParser
from rich.panel import Panel
from rich import print
#
# LIBRARY
#
logger = logging.getLogger("forthlift")
class stream:
class Stream:
def __call__(self):
raise NotImplementedError
class stdin(Stream):
"""Stream from the standard input."""
def __call__(self):
return sys.stdin
class filename(Stream):
"""Stream from the given file."""
def __init__(self, filename = ""):
self.filename = filename
def __call__(self):
self.fd = open(self.filename, 'r')
return self.fd
def __del__(self):
self.fd.close()
class consume:
class Consume:
def __call__(self, stream):
raise NotImplementedError
class lines(Consume):
"""Consume line by line."""
def __call__(self, stream):
for line in stream:
yield line
class paragraphs(Consume):
"""Consume paragraph by paragraph (separated by an empty line)."""
def __call__(self, stream):
current = ""
for item in stream:
# Not counting spaces as legit content.
if item.strip():
current += item
else:
yield current
current = ""
if current.strip():
yield current
class sections(Consume):
"""Consume section by section. A new section starts when a line matches the `mark` regexp. If `skip` is set to 'skip', the marked line is not consumed."""
def __init__(self, mark = r"^#", skip = "noskip"):
self.mark = mark
if skip == 'skip':
self.skip = True
elif skip == 'noskip':
self.skip = False
else:
self.skip = bool(skip)
def __call__(self, stream):
current = ""
for item in stream:
if re.match(self.mark, item.strip()):
yield current
if self.skip:
current = ""
else:
current = item
else:
current += item
yield current
class nlines(Consume):
"""Consume by groups of `nb` lines."""
def __init__(self, nb = "10"):
self.nb = int(nb)
def __call__(self, stream):
count = 0
current = ""
for item in stream:
if count >= self.nb:
yield current
current = ""
count = 0
else:
current += item
count += 1
yield current
class format:
class Format:
def __call__(self, items):
raise NotImplementedError
class asis(Format):
"""Do not format anything."""
def __call__(self, items):
for item in items:
yield item
class trim(Format):
"""Split items if their length is longer than `max`, and create new items with the remaining parts."""
# Every argument is a string.
def __init__(self, max = "140"):
self.max = int(max)
def trim(self, item):
if len(item) > self.max:
yield item[:self.max]
for subitem in self.trim(item[self.max:]):
yield subitem
else:
yield item
def __call__(self, items):
for item in items:
for separated in self.trim(item):
yield separated
class eol(Format):
"""Add an end of line after the item."""
def __call__(self, items):
for item in items:
yield item + "\n"
class strip(Format):
"""Remove any space character around the item."""
def __call__(self, items):
for item in items:
yield item.strip()
class skip(Format):
"""Skip items containing only spaces or being empty."""
def __call__(self, items):
for item in items:
if item.strip():
yield item
class glue(Format):
"""Glue consecutive items together if they are not separated by an empty one. Make a new item after an empty one."""
def __call__(self, items):
current = ""
for item in items:
if item.strip():
current += item
else:
yield current
current = ""
if current.strip():
yield current
class panel(Format):
"""Surround each item by an ascii-art box. NOTE: only works when lifted on stdout."""
def __call__(self, items):
for item in items:
yield Panel.fit(item.strip())
class counter(Format):
"""Add a counter at the end of each items, with the current index and the total. If `end` is given, it is added at the very last item. If `sep` is given, it is appended to the item before the counter itself."""
def __init__(self, end = '', sep = '\n'):
self.sep = sep
self.end = end
def __call__(self, counted):
items = list(counted) # Consume everything at once.
total = len(items)
res = []
for i,item in enumerate(items):
res.append(f"{item}{self.sep}{i+1}/{total}")
if res:
res[-1] += self.end
for r in res:
yield r
class suffix(Format):
"""Add the `content` string after each item. If `sep` is given, it is appended to the item before the content and after the item."""
def __init__(self, content = "", sep = '\n'):
self.content = content
self.sep = sep
def __call__(self, items):
for i in items:
yield i+self.sep+self.content
class prefix(Format):
"""Add the `content` string before each item. If `sep` is given, it is prepended to the item after the content and before the item."""
def __init__(self, content = "", sep = '\n'):
self.content = content
self.sep = sep
def __call__(self, items):
for i in items:
yield self.content+self.sep+i
class lift:
class Lift:
def call(self, item):
"""Interface for posting a single independant item."""
raise NotImplementedError
def __call__(self, items):
"""Interface for posting all items."""
count = 0
for item in items:
self.call(item)
count += 1
logger.debug(f"│ │ ├ {count}th item")
logger.debug(f"│ │ └OK {count} items")
class stdout(Lift):
"""Print the items on the standard output."""
def call(self, item):
if item:
if __debug__:
print(item, end = '', file = sys.stdout, flush = True)
else:
print(item, end = '')
else:
logger.debug("Empty item")
class mastodon(Lift):
def __init__(self, dryrun = 'nodry'):
self._scopes = ['read', 'write']
if dryrun == 'dry':
self.dry_run = True
elif dryrun == 'nodry':
self.dry_run = False
else:
msg = f"I do not understand what you mean by `{dryrun}`. Please either indicate `dry` or `nodry`."
logger.error(msg)
raise RuntimeError(msg)
pyp = toml.load("pyproject.toml")
self.name = pyp["project"]["name"]
self.version = pyp["project"]["version"]
self.config_path = f"{self.name}.toml"
if not os.path.isfile(self.config_path):
with open(self.config_path, 'w') as fd:
fd.write("")
self.config = self.load_config()
if self.needs_init():
self.init()
else:
logger.debug(
"│ │ ├ Mastodon lifter logged in: " \
f"@{self.config['mastodon']['account']}" \
f"@{self.config['mastodon']['instance']}" \
)
logger.debug("│ │ └OK,")
self.masto = mastodon.Mastodon(
api_base_url = self.config["mastodon"]["instance"],
# client_id = self.config["mastodon"]["client_id"],
client_secret = self.config["mastodon"]["client_secret"],
access_token = self.config["mastodon"]["token"],
user_agent = f"{self.name}:{self.version}"
)
def needs_init(self):
if not self.config["mastodon"]["instance"] \
or not self.config["mastodon"]["account"] \
or not self.config["mastodon"]["client_id"] \
or not self.config["mastodon"]["client_secret"] \
or not self.config["mastodon"]["token"]:
return True
else:
return False
def init(self):
if not self.needs_init():
print(f"The {__name__} operator has already been initialized")
print("Current configuration is:")
print("\tinstance:", self.config["mastodon"]["instance"])
print("\tuser:", self.config["mastodon"]["account"])
ans = input("Do you want to re-init it? (yes/no): ").strip()
if ans[0].lower() == 'n':
return
elif ans[1].lower() != 'y':
print("I did not understand your answer, "
" I'll assume you meant `no` and stop here.")
return
print("URL of your Mastodon instance?")
print("For example: https://social.antigene.org")
# instance = input("URL: ").strip()
instance = "https://social.antigene.org"
self.config["mastodon"]["instance"] = instance
self.register_app()
self.oauth()
def save_config(self, config):
logger.debug(f"Save config in: {self.config_path}")
with open(self.config_path, 'w') as fd:
toml.dump(config, fd)
def load_config(self):
logger.debug(f"│ │ ├ Load config from: {self.config_path}")
config = {
"mastodon": {
"instance": None,
"account": None,
"client_id": None,
"client_secret": None,
"token": None,
}
}
local_config = toml.load(self.config_path)
config.update(local_config)
for k,v in config["mastodon"].items():
logger.debug(f"│ │ │ ├ {k}: {v}")
logger.debug(f"│ │ │ └OK")
return config
def register_app(self):
logger.debug(f"Register {self.name} on {self.config["mastodon"]['instance']}")
client_id, client_secret = mastodon.Mastodon.create_app(
self.name,
scopes = self._scopes,
api_base_url = self.config['mastodon']["instance"],
website = "https://nojhan.net/git/nojhan/forthlift",
user_agent = f"{self.name}:{self.version}",
)
self.config["mastodon"]["client_id"] = client_id
logger.debug(f"ID: {client_id}")
self.config["mastodon"]["client_secret"] = client_secret
logger.debug(f"Secret: {client_secret}")
self.save_config(self.config)
def oauth(self):
logger.debug(f"OAuth to: {self.config['mastodon']['instance']}")
oauth = mastodon.Mastodon(
client_id = self.config['mastodon']["client_id"],
client_secret = self.config['mastodon']["client_secret"],
api_base_url = self.config['mastodon']["instance"],
)
oauth_url = oauth.auth_request_url(scopes = self._scopes)
logger.debug(f"Opening web page: {oauth_url}")
logger.debug("Please log in there.")
webbrowser.open_new(oauth_url)
time.sleep(5)
print("After logging in, paste here the code you received:")
oauth_code = input("Code: ").strip()
token = oauth.log_in(
code = oauth_code,
scopes = self._scopes,
)
self.config['mastodon']["token"] = token
logger.debug(f"Token: {token}")
account = oauth.me().acct
self.config['mastodon']["account"] = account
logger.debug(f"Account: {account}")
self.save_config(self.config)
def post(self, item, prev_status = None):
if self.dry_run:
print(item)
else:
if prev_status:
logger.debug(item)
return self.masto.status_reply(to_status = prev_status, status = item)
else:
logger.debug(item)
return self.masto.status_post(item)
def __call__(self, items):
n = 0
first_item = next(items, None)
if first_item == None:
logger.error("│ │ │ ├ No item to post")
return
else:
logger.debug(f"│ │ │ ├ Post #{n+1}")
n += 1
prev_status = self.post(first_item)
for item in items:
logger.debug(f"│ │ │ ├ Post #{n+1}")
n += 1
prev_status = self.post(item)
logger.debug(f"│ │ │ └OK, posted {n} items")
class Forthlifter:
def __init__(self,
consumer = consume.lines(),
streamers = [stream.stdin()],
formatters = [format.asis()],
lifters = [lift.stdout()],
):
if not isinstance(streamers, list):
streamers = [streamers]
self.streamers = streamers
logger.debug(f"├ streamers: {', '.join(type(i).__name__ for i in self.streamers)}")
self.consumer = consumer
logger.debug(f"├ consumer: {type(consumer).__name__}")
if not isinstance(formatters, list):
formatters = [formatters]
self.formatters = formatters
logger.debug(f"├ formatters: {', '.join(type(i).__name__ for i in self.formatters)}")
if not isinstance(lifters, list):
lifters = [lifters]
self.lifters = lifters
logger.debug(f"└ lifters: {', '.join(type(i).__name__ for i in self.lifters)}")
def consume(self):
logger.debug("│ ├ consume")
items = []
for streamer in self.streamers:
logger.debug(f"│ │ ├ {type(self.consumer).__name__}({type(streamer).__name__})")
# Concatenate
items += self.consumer(streamer())
logger.debug(f"│ │ └OK")
return items
def format(self, items):
logger.debug(f"│ ├ format")
for formatter in self.formatters:
logger.debug(f"│ │ ├ {type(formatter).__name__}")
# Replace
items = formatter(items)
logger.debug(f"│ │ └OK")
return items
def lift(self, items):
logger.debug(f"│ ├ lift")
for lifter in self.lifters:
logger.debug(f"│ │ ├ {type(lifter).__name__}")
# Call
lifter(items)
logger.debug(f"│ │ └OK")
def __call__(self):
logger.debug("├ call")
self.lift(self.format(self.consume()))
logger.debug("│ └OK")
#
# CLI
#
def classes_of(namespace):
itf_name = namespace.__name__[0].upper()+namespace.__name__[1:]
itf = getattr(namespace, itf_name)
subs = {cls.__name__:cls for cls in itf.__subclasses__()}
return subs
def operator(asked_op):
logger.debug(f"├ Parsed operators:")
for op in asked_op:
# We do not use a f(a) notation,
# because the shell would try to interpret
# any unescaped/unquoted parentheses.
# It is faster to use f:a for the user.
m = re.match(r"(\w+):(.+)", op)
if m:
name = m.group(1)
args = [a.strip() for a in m.group(2).split(',')]
else:
name = op
args = []
logger.debug(f"│ ├ {name}({','.join(args)})")
yield name,args
logger.debug("│ └OK")
def help_op(ops):
h = ""
itf = list(ops.values())[0].__mro__[1].__name__.lower()
h += f"\nAVAILABLE OPERATORS FOR --{itf}:\n"
for name,cls in ops.items():
# Signature
hsig = f" -{itf[0]} {name.lower()}"
args = inspect.getfullargspec(cls)[0]
if args:
sign = inspect.signature(cls)
args = [sign.parameters[a].name for a in sign.parameters]
hsig += f"[:{','.join(args)}]"
h += hsig
# Example (using defaults)
hex = f"-{itf[0]} "
args = inspect.getfullargspec(cls)[0]
if args:
sep = ":"
sign = inspect.signature(cls)
defs = [sign.parameters[a].default for a in sign.parameters]
if not all(defs) or any(re.match(r'\s', d) for d in defs):
hdefs = [d.replace("\n", r"\n") for d in defs]
hex += f"'{name.lower()}{sep}{','.join(hdefs)}'"
else:
hex += f"{name.lower()}{sep}{','.join(defs)}"
h += f"\n\tDefault: {hex}"
else:
hex += name.lower()
if cls.__doc__:
h+= f"\n\t{cls.__doc__}\n"
else:
h += "\n"
return h
def main():
errors = {"NO_ERROR":0, "UNKNOWN_ERROR":1, "NO_SETUP_NEEDED":2, "NO_APP_KEY":10}
usage = "Post text read on the standard input to a website or an API."
# Dictionaries of {name: class}
streamers = classes_of(stream)
consumers = classes_of(consume)
formaters = classes_of(format)
lifters = classes_of(lift)
# Extract docstrings
epilog = ""
epilog += help_op(streamers)
epilog += help_op(consumers)
epilog += help_op(formaters)
epilog += help_op(lifters)
parser = argparse.ArgumentParser(
description=usage,
formatter_class = argparse.RawTextHelpFormatter,
epilog = epilog)
parser.add_argument("-s", "--stream",
metavar = "STREAM(S)",
default = [],
action="append",
help="Where to get items (several occurences possibles, order matters).")
parser.add_argument("-c", "--consume",
metavar = "CONSUME",
default = "lines",
help="How to extract the content from the stream (only one occurence).")
parser.add_argument("-f", "--format",
metavar = "FORMAT(S)",
default = [],
action="append",
help="How to format items (several occurences possibles, order matters).")
parser.add_argument("-l", "--lift",
metavar = "LIFT(S)",
default = [],
action="append",
help="How to send items somewhere (several occurences possibles, order matters).")
initializables = []
for cls in \
list(streamers.values()) \
+ list(consumers.values()) \
+ list(formaters.values()) \
+ list(lifters.values()):
if hasattr(cls, "needs_init"):
initializables.append(cls.__name__)
if initializables:
parser.add_argument("-i", "--init",
choices = initializables,
help="Initialize the given operator.")
asked = parser.parse_args()
logging.basicConfig()
logger.setLevel("DEBUG")
if asked.init:
for ops in [streamers, consumers, formaters, lifters]:
if asked.init in ops:
logger.debug(f"Initialize operator: {asked.init}")
op = ops[asked.init]()
op.init()
logger.debug(f"Done. You can use the `{asked.init}` operator.")
sys.exit(0)
logger.debug("Available operators:")
logger.debug(f"├ streamers: {', '.join(streamers)}")
logger.debug(f"├ consumers: {', '.join(consumers)}")
logger.debug(f"├ formaters: {', '.join(formaters)}")
logger.debug(f"└ lifters: {', '.join(lifters)}")
# Sane defaults. Cannot be a default in argparse,
# because choices would be append to it.
if not asked.stream:
asked.stream = ["stdin"]
if not asked.format:
asked.format = ["asis"]
if not asked.lift:
asked.lift = ["stdout"]
logger.debug("Chosen operators:")
cop,cargs = list(operator([asked.consume]))[0]
forthlift = Forthlifter(
consumer = consumers[cop](*cargs),
streamers = [streamers[op](*args) for op,args in operator(asked.stream)],
formatters = [formaters[op](*args) for op,args in operator(asked.format)],
lifters = [ lifters[op](*args) for op,args in operator(asked.lift )],
)
logger.debug("Run:")
forthlift()
logger.debug("└OK")
if __name__ == "__main__":
main()