diff --git a/README.md b/README.md index 6b7e022..2ebdfd6 100644 --- a/README.md +++ b/README.md @@ -1,102 +1,102 @@ forthlift — post sequences of texts on social media =================================================== -Forthlift is a command line application to post sequences of text lines on social media. +Forthlift's primary use case is a command line application to post sequences of +text items on social media. You first prepare your thread in your text editor, +then call forthlift, and everything is posted at once. -Its main use case is to post on Twitter several status that you have prepared first in your text editor. +As side effect, fothlift became a more generic tool, that can process text +streams, format them, assemble their atomic units in anothers, and then send +them elsewhere. It is thus designed to ease automation and integration with +existing tools, and has an Unix-like text-and-pipes philosophy. -It is thus designed to ease automation and integration in existing tools -and had an Unix-like text-and-pipes command line interface. +## Examples -For example, it makes it easy to post "chained" twitter status (that answer to each others) -from your text editor. Just send the selected text to its standard input, selecting the -twitter API, and voilà. Using Vim, you can tweet the visually-selected lines with: -`:'<,'>w !forthlift -a twitter` +### Select in Vim, post on Mastodon + +Forthlift makes it easy to post "chained" Mastodon toots +(that answer to each others) from your text editor. +Just send the selected text to its standard input, selecting the +twitter API, and voilà. + +Using Vim, you can toot the visually-selected lines (each line making a toot) +with: +```vim +:'<,'>w !forthlift --lift mastodon +``` + +### Prepare toots in a file, post on Mastodon + +Let's say that you prepared a text file of the form: + +``` +This is a multi-lines... + +... toot! +-- +And here is an answeer to the previous toot. +(With two lines as well.) +-- +And a final toot. +``` + +You would want the toots to show a counter, managing the expectations of your +reader. + +You can post it right away with: +```sh +cat my_file.txt | forthlift --consume "sections:--,skip" --format counter -l mastodon +``` + +### Manage long threads from Markdown + +Let's say you want to post the sections of this README as a sequence of toots, +with a hashtag that indicates it's gona be long. +And you would want to double-check what it would do first. You can send the +items on the standard output, like: + +```sh +forthlift -s filename:README.md -c sections -f suffix:#longThread -f panel -l stdout +``` -## SYNOPSIS +### Full-featured file-to-Mastodon pipeline -`forthlift` [-h] - -`forthlift` [-a {stdout,twitter}] [-m MAXLEN] [-i|-t] [-c] [-q] [-d] [-s] +```sh +# Define a shortcut +mastopost() { forthlift -s "filename:$1" -c sections:^#,skip -f skip -f counter -f suffix:#longThread -l mastodon; } +# Call it with: +mastopost my_file.txt +``` ## DESCRIPTION ### A generic tool -Generally speaking, it's a Unix-like command that operate a sequence of pre-programmed -chained actions on its text input. +Generally speaking, Forthlift is a Unix-like command that operate a sequence of +pre-programmed chained actions on its text input. -It comes with some existing actions: -* `stdout`: print the input text on the standard output, -* `twitter`: send the input text as status on twitter. +Forthlift has four action operators classes, one for each step of the processing: + +1. `stream`, indicating from where to get the input data. +2. `consume`, telling *how* to parse the data from the input stream. +3. `format`, for manipulating the content itself. +4. `lift`, defining where to send the final items. + +Only one `consume` operator can be passed, but any number of `stream`, `format`, +and `lift` operators can be combined. +To apply several operators of the same class, the user must pass the same flag +several time. For instance: + +```sh +# Applies strip, then skip, then counter on the consumed lines. +forthlift --format strip --format skip --format counter +``` + +Most of the time, the order of the operators matters. -### Features - -The main feature of forthlift is its ability to *chain* actions. -Depending on the chosen API, this could means different things: - -* for twitter, this mean that the sequence of status will be posted - as a sequence of *answers* and not as a list of independent tweets. - -* for the "stdout" API, this means that each printed line will start - with its index in the input list. - - -While it is recommended to prepare the input text with other text-processing tools -(fold, fmt, tr, sed, grep, your text editor, etc.), -forthlift comes with some rough text-processing capabilities, among which: - -* ignore or trim lines that are longer than a given size (see `--trim` and `--ignore`), - -* add a counter of the form `/` at the end of the lines (see `--counter`). - - -## OPTIONS - -* -h, --help: show this help message and exit - -* -a {stdout,twitter}, --api {stdout,twitter}: Name of the API to use. - (default: stdout) - -* -m MAXLEN, --max-len MAXLEN: Maximum number of characters in the lines. - (default: 140) - -* -i, --ignore: Ignore lines that are longer than MAXLEN (default: False) - -* -t, --trim: Trim down lines that are longer than MAXLEN. (default: False) - -* -c, --counter: Add a counter of the form " x/N" at the end of the lines, - with N being the number of lines read and x the current index of the line. - NOTE: this necessitate to read all the input before processing it. - (default: False) - -* -q, --quiet: Do not print errors and warnings on the standard error output. - (default: False) - -* -s, --setup: Setup the selected API (e.g. authenticate and get authorization to post). - (default: False) - -* -d, --independent: Post each item independently from the previous one - The behaviour of this option depends on the selected API. - For example on Twitter: do not post a line as an answer to the previous one but as a new tweet. - (default: False) - -* --twitter-images FILENAME(S) [FILENAME(S) ...]: - Upload each given image files along with the corresponding tweets in the sequence. - If there are more images than tweets, they are silently ignored. - (default: None) - - -## INSTALLATION - -### Twitter - -1) Copy `twitter.conf-dist` as `twitter.conf` and indicate your developer's API keys in the corresponding fields. -2) Run `forthlift --api twitter --setup` and follow the instructions (go to the given URL, then paste the given PIN). - -Why should you get developer's API keys? Because Twitter does not like open-source desktop applications, see: -http://arstechnica.com/security/2010/09/twitter-a-case-study-on-how-to-do-oauth-wrong/ +## SYNOPSIS +{{FORTHLIFT_HELP}} diff --git a/forthlift.py b/forthlift.py deleted file mode 100755 index b48ecaf..0000000 --- a/forthlift.py +++ /dev/null @@ -1,331 +0,0 @@ -#!/usr/bin/env python - -import sys -import locale -import argparse -import datetime -import ConfigParser - -import tweepy - - -class AppKeyError(Exception): - def __init__(self, msg): - self.msg = msg - - -def write(data, stream = sys.stdout): - """ - Write "data" on the given stream, then flush, silently handling broken pipes. - """ - try: - stream.write(data.encode(locale.getpreferredencoding(True) or "utf-8")) - stream.flush() - - # Silently handle broken pipes - except IOError: - try: - stream.close() - except IOError: - pass - - -def readline( stream_in ): - while True: - try: - line = stream_in.readline().decode(stream_in.encoding or locale.getpreferredencoding(True)).strip() - except UnicodeDecodeError: - continue - except KeyboardInterrupt: - break - if not line: - break - yield line - - raise StopIteration - - -def setup_counter( data, asked ): - assert(asked.counter) - - # unroll everything - data = list(data) - nb = len(data) - # " int/int" - counter_size = len(str(nb).encode("utf-8").decode("utf-8") )*2 + 1 + 1 - - for i,line in enumerate(data): - counter = u" %i/%i" % (i+1,nb) - curmax = asked.max_len - len(counter) - if len(line) > curmax: - if asked.ignore: - data[i] = line - elif asked.trim: - data[i] = line[:curmax] - data[i] = data[i] + counter - - return data - - -def setup_hem(data, asked): - for line in data: - if len(line) > asked.max_len: - if asked.ignore: - pass - elif asked.trim: - line = line[:asked.max_len] - - yield line - - -def setup( data, asked ): - if asked.counter: - f = setup_counter - else: - f = setup_hem - - for line in f(data, asked): - yield line - - -def operate( func, *args ): - for line in func( readline(sys.stdin), *args ): - write(line) - - -# -# STDOUT API -# - -def on_stdout( data, asked, endline="\n" ): - lines = setup(data, asked) - # You can do something on the whole set of lines if you want. - - for i,line in enumerate(lines): - if endline: - if line[-1] != endline: - line += endline - - if asked.independent: - yield line - else: - l = u"%i %s" % (i,line) - yield l - - -# -# TWITTER API -# - -def on_twitter( data, api, asked, endline="\n" ): - lines = setup(data, asked) - - prev_status_id = None - - images = asked.twitter_images - if images: - images.reverse() - - for line in lines: - if images: - img = images.pop() - else: - img = None - - if img: - # API.update_with_media(filename[, status][, in_reply_to_status_id][, lat][, long][, source][, place_id][, file]) - status = api.update_with_media(img, line, prev_status_id) - else: - # API.update_status(status[, in_reply_to_status_id][, lat][, long][, source][, place_id]) - status = api.update_status(line, prev_status_id) - - if asked.independent: - prev_status_id = None - else: - prev_status_id = status.id - - yield status.text + endline - - -def setup_twitter(configfile="twitter.conf"): - - config = ConfigParser.RawConfigParser() - - # Authenticate the application. - config.read(configfile) - - if not config.has_section("App"): - raise AppKeyError("ERROR: did not found application keys, ask your distribution maintainer or get keys from Twitter first.") - - app_key = config.get("App","app_key") - app_secret = config.get("App","app_key_secret") - - auth = tweepy.OAuthHandler(app_key, app_secret) - - - # Authenticate the user. - auth_url = auth.get_authorization_url() - print "Copy and paste this URL in your browser while your are logged in the twitter account where you want to post." - print "Authorization URL: " + auth_url - - print "Then paste the Personal Identification Number given by Twitter:" - verifier = raw_input('PIN: ').strip() - token = auth.get_access_token(verifier) - - # Authenticate and get the user name. - token_key, token_secret = token - auth.set_access_token(token_key, token_secret) - api = tweepy.API(auth) - username = api.me().name - print "Authentication successful, ready to post to account: " + username - - - # Save authentication tokens. - if not config.has_section("Info"): - config.add_section('Info') - config.set('Info','account', username) - config.set('Info','auth_date', datetime.datetime.utcnow().isoformat()) - - if not config.has_section("Auth"): - config.add_section('Auth') - config.set('Auth', 'local_token', token_key) - config.set('Auth', 'local_token_secret', token_secret) - - # Writing our configuration file to 'example.cfg' - with open(configfile, 'wb') as fd: - config.write(fd) - - -# -# CLI -# - -if __name__=="__main__": - - errors = {"NO_ERROR":0, "UNKNOWN_ERROR":1, "NO_SETUP_NEEDED":2, "NO_APP_KEY":10} - - usage = "Post text read on the standard input to a website or an API." - apis =["stdout", "twitter"] # TODO "http_post", "http_get", - - parser = argparse.ArgumentParser( description=usage, - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - - parser.add_argument("-a", "--api", choices=apis, default="stdout", - help="Name of the API to use.") - - # Generic options - - parser.add_argument("-m", "--max-len", metavar="MAXLEN", type=int, default=140, - help="Maximum number of characters in the lines.") - - parser.add_argument("-i", "--ignore", action="store_true", - help="Ignore lines that are longer than MAXLEN") - - parser.add_argument("-t", "--trim", action="store_true", - help="Trim down lines that are longer than MAXLEN.") - - parser.add_argument("-c", "--counter", action="store_true", - help="Add a counter of the form \" x/N\" at the end of the lines, \ - with N being the number of lines read and x the current index of the line. \ - NOTE: this necessitate to read all the input before processing it.") - - parser.add_argument("-q", "--quiet", action="store_true", - help="Do not print errors and warnings on the standard error output.") - - # TODO option: rate at which to post lines - - # API-dependent options - - parser.add_argument("-s", "--setup", action="store_true", - help="Setup the selected API (e.g. authenticate and get authorization to post).") - - parser.add_argument("-d", "--independent", action="store_true", - help="Post each item independently from the previous one. \ - The behaviour of this option depends on the selected API. \ - For example on Twitter: do not post a line as an answer to the previous one but as a new tweet.") - - # Twitter - parser.add_argument("--twitter-images", metavar="FILENAME(S)", nargs="+", type=str, - help="Upload each given image files along with the corresponding tweets in the sequence. If there are more images than tweets, they are silently ignored.") - - asked = parser.parse_args() - - # Setup - if asked.setup: - configfile = asked.api+".conf" - if asked.api == "twitter": - try: - setup_twitter(configfile) - except AppKeyError as e: - if not asked.quiet: - sys.stderr.write(e.msg) - sys.exit(errors["NO_APP_KEY"]) - except: - print "Unexpected error:", sys.exc_info()[0] - raise - else: - sys.exit(errors["NO_ERROR"]) - - else: # other API - if not asked.quiet: - sys.stderr.write("This API does not need setup.") - sys.exit(errors["NO_SETUP_NEEDED"]) - - - # Consistency checks - - if asked.ignore and asked.trim: - if not asked.quiet: - sys.stderr.write("WARNING: asking to trim AND to ignore is not logical, I will ignore.") - assert( not (asked.ignore and asked.trim) ) - - if asked.twitter_images: - if not asked.api == "twitter": - if not asked.quiet: - sys.stderr.write("WARNING: asking to upload images on twitter while not using the twitter API is not logical, I will ignore.") - assert( not (asked.twitter_images and not asked.api=="twitter") ) - - else: # Test readable images - cannot=[] - for img in asked.twitter_images: - try: - with open(img) as f: - f.read() - except OSError: - cannot.append(img) - if cannot: - print("Cannot open the following image files, I will not continue: ") - for img in cannot: - print(img) - sys.exit(5) # I/O Error - - # APIs - - if asked.api == "stdout": - - operate( on_stdout, asked ) - - - elif asked.api == "twitter": - - # Authenticate - config = ConfigParser.RawConfigParser() - config.read('twitter.conf') - - app_key = config.get("App","app_key") - app_secret = config.get("App","app_key_secret") - - try: - verifier_code = config.get("Auth","code") - except: - access_token = config.get("Auth","local_token") - access_token_secret = config.get("Auth","local_token_secret") - - auth = tweepy.OAuthHandler(app_key, app_secret, "https://api.twitter.com/1.1/") - auth.set_access_token(access_token, access_token_secret) - - api = tweepy.API(auth) - - # Post - operate( on_twitter, api, asked ) - diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4776791 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "forthlift" +version = "0.1.0" +description = "A command line application to post sequences of text lines on social media" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "argparse>=1.4.0", + "configparser>=7.2.0", + "datetime>=6.0", + "mastodon-py>=2.1.4", + "rich>=14.3.3", + "toml>=0.10.2", +] + +[project.scripts] +forthlift = "forthlift.forthlift:main" + +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["src"] +include = ["forthlift*"] + diff --git a/src/forthlift/forthlift.py b/src/forthlift/forthlift.py new file mode 100755 index 0000000..2b290bf --- /dev/null +++ b/src/forthlift/forthlift.py @@ -0,0 +1,655 @@ +#!/usr/bin/env python + +import io +import os +import re +import sys +import toml +import time +import locale +import logging +import inspect +import tempfile +import argparse +import datetime +import mastodon +import webbrowser + +from configparser import ConfigParser +from rich.panel import Panel +from rich import print + +# +# LIBRARY +# + +logger = logging.getLogger("forthlift") + + +class stream: + class Stream: + def __call__(self): + raise NotImplementedError + + class stdin(Stream): + """Stream from the standard input.""" + def __call__(self): + return sys.stdin + + class filename(Stream): + """Stream from the given file.""" + def __init__(self, filename = ""): + self.filename = filename + + def __call__(self): + self.fd = open(self.filename, 'r') + return self.fd + + def __del__(self): + self.fd.close() + +class consume: + class Consume: + def __call__(self, stream): + raise NotImplementedError + + class lines(Consume): + """Consume line by line.""" + def __call__(self, stream): + for line in stream: + yield line + + class paragraphs(Consume): + """Consume paragraph by paragraph (separated by an empty line).""" + def __call__(self, stream): + current = "" + for item in stream: + # Not counting spaces as legit content. + if item.strip(): + current += item + else: + yield current + current = "" + if current.strip(): + yield current + + class sections(Consume): + """Consume section by section. A new section starts when a line matches the `mark` regexp. If `skip` is set to 'skip', the marked line is not consumed.""" + def __init__(self, mark = r"^#", skip = "noskip"): + self.mark = mark + if skip == 'skip': + self.skip = True + elif skip == 'noskip': + self.skip = False + else: + self.skip = bool(skip) + + def __call__(self, stream): + current = "" + for item in stream: + if re.match(self.mark, item.strip()): + yield current + if self.skip: + current = "" + else: + current = item + else: + current += item + yield current + + class nlines(Consume): + """Consume by groups of `nb` lines.""" + def __init__(self, nb = "10"): + self.nb = int(nb) + + def __call__(self, stream): + count = 0 + current = "" + for item in stream: + if count >= self.nb: + yield current + current = "" + count = 0 + else: + current += item + count += 1 + yield current + + +class format: + class Format: + def __call__(self, items): + raise NotImplementedError + + class asis(Format): + """Do not format anything.""" + def __call__(self, items): + for item in items: + yield item + + class trim(Format): + """Split items if their length is longer than `max`, and create new items with the remaining parts.""" + # Every argument is a string. + def __init__(self, max = "140"): + self.max = int(max) + + def trim(self, item): + if len(item) > self.max: + yield item[:self.max] + for subitem in self.trim(item[self.max:]): + yield subitem + else: + yield item + + def __call__(self, items): + for item in items: + for separated in self.trim(item): + yield separated + + class eol(Format): + """Add an end of line after the item.""" + def __call__(self, items): + for item in items: + yield item + "\n" + + class strip(Format): + """Remove any space character around the item.""" + def __call__(self, items): + for item in items: + yield item.strip() + + class skip(Format): + """Skip items containing only spaces or being empty.""" + def __call__(self, items): + for item in items: + if item.strip(): + yield item + + class glue(Format): + """Glue consecutive items together if they are not separated by an empty one. Make a new item after an empty one.""" + def __call__(self, items): + current = "" + for item in items: + if item.strip(): + current += item + else: + yield current + current = "" + if current.strip(): + yield current + + class panel(Format): + """Surround each item by an ascii-art box. NOTE: only works when lifted on stdout.""" + def __call__(self, items): + for item in items: + yield Panel.fit(item.strip()) + + class counter(Format): + """Add a counter at the end of each items, with the current index and the total. If `end` is given, it is added at the very last item. If `sep` is given, it is appended to the item before the counter itself.""" + def __init__(self, end = ' ␄', sep = '\n'): + self.sep = sep + self.end = end + + def __call__(self, counted): + items = list(counted) # Consume everything at once. + total = len(items) + res = [] + for i,item in enumerate(items): + res.append(f"{item}{self.sep}{i+1}/{total}") + if res: + res[-1] += self.end + for r in res: + yield r + + class suffix(Format): + """Add the `content` string after each item. If `sep` is given, it is appended to the item before the content and after the item.""" + def __init__(self, content = "", sep = '\n'): + self.content = content + self.sep = sep + + def __call__(self, items): + for i in items: + yield i+self.sep+self.content + + class prefix(Format): + """Add the `content` string before each item. If `sep` is given, it is prepended to the item after the content and before the item.""" + def __init__(self, content = "", sep = '\n'): + self.content = content + self.sep = sep + + def __call__(self, items): + for i in items: + yield self.content+self.sep+i + + +class lift: + class Lift: + def call(self, item): + """Interface for posting a single independant item.""" + raise NotImplementedError + + def __call__(self, items): + """Interface for posting all items.""" + count = 0 + for item in items: + self.call(item) + count += 1 + logger.debug(f"│ │ ├ {count}th item") + logger.debug(f"│ │ └OK {count} items") + + class stdout(Lift): + """Print the items on the standard output.""" + def call(self, item): + if item: + if __debug__: + print(item, end = '', file = sys.stdout, flush = True) + else: + print(item, end = '') + else: + logger.debug("Empty item") + + + class mastodon(Lift): + + def __init__(self, dryrun = 'nodry'): + self._scopes = ['read', 'write'] + + if dryrun == 'dry': + self.dry_run = True + elif dryrun == 'nodry': + self.dry_run = False + else: + msg = f"I do not understand what you mean by `{dryrun}`. Please either indicate `dry` or `nodry`." + logger.error(msg) + raise RuntimeError(msg) + + pyp = toml.load("pyproject.toml") + self.name = pyp["project"]["name"] + self.version = pyp["project"]["version"] + + self.config_path = f"{self.name}.toml" + if not os.path.isfile(self.config_path): + with open(self.config_path, 'w') as fd: + fd.write("") + self.config = self.load_config() + + if self.needs_init(): + self.init() + else: + logger.debug( + "│ │ ├ Mastodon lifter logged in: " \ + f"@{self.config['mastodon']['account']}" \ + f"@{self.config['mastodon']['instance']}" \ + ) + logger.debug("│ │ └OK,") + + self.masto = mastodon.Mastodon( + api_base_url = self.config["mastodon"]["instance"], + # client_id = self.config["mastodon"]["client_id"], + client_secret = self.config["mastodon"]["client_secret"], + access_token = self.config["mastodon"]["token"], + user_agent = f"{self.name}:{self.version}" + ) + + def needs_init(self): + if not self.config["mastodon"]["instance"] \ + or not self.config["mastodon"]["account"] \ + or not self.config["mastodon"]["client_id"] \ + or not self.config["mastodon"]["client_secret"] \ + or not self.config["mastodon"]["token"]: + return True + else: + return False + + def init(self): + if not self.needs_init(): + print(f"The {__name__} operator has already been initialized") + print("Current configuration is:") + print("\tinstance:", self.config["mastodon"]["instance"]) + print("\tuser:", self.config["mastodon"]["account"]) + + ans = input("Do you want to re-init it? (yes/no): ").strip() + if ans[0].lower() == 'n': + return + elif ans[1].lower() != 'y': + print("I did not understand your answer, " + " I'll assume you meant `no` and stop here.") + return + + print("URL of your Mastodon instance?") + print("For example: https://social.antigene.org") + # instance = input("URL: ").strip() + instance = "https://social.antigene.org" + self.config["mastodon"]["instance"] = instance + + self.register_app() + self.oauth() + + def save_config(self, config): + logger.debug(f"Save config in: {self.config_path}") + with open(self.config_path, 'w') as fd: + toml.dump(config, fd) + + def load_config(self): + logger.debug(f"│ │ ├ Load config from: {self.config_path}") + config = { + "mastodon": { + "instance": None, + "account": None, + "client_id": None, + "client_secret": None, + "token": None, + } + } + local_config = toml.load(self.config_path) + config.update(local_config) + + for k,v in config["mastodon"].items(): + logger.debug(f"│ │ │ ├ {k}: {v}") + logger.debug(f"│ │ │ └OK") + return config + + def register_app(self): + logger.debug(f"Register {self.name} on {self.config["mastodon"]['instance']}") + + client_id, client_secret = mastodon.Mastodon.create_app( + self.name, + scopes = self._scopes, + api_base_url = self.config['mastodon']["instance"], + website = "https://nojhan.net/git/nojhan/forthlift", + user_agent = f"{self.name}:{self.version}", + ) + + self.config["mastodon"]["client_id"] = client_id + logger.debug(f"ID: {client_id}") + self.config["mastodon"]["client_secret"] = client_secret + logger.debug(f"Secret: {client_secret}") + self.save_config(self.config) + + def oauth(self): + logger.debug(f"OAuth to: {self.config['mastodon']['instance']}") + oauth = mastodon.Mastodon( + client_id = self.config['mastodon']["client_id"], + client_secret = self.config['mastodon']["client_secret"], + api_base_url = self.config['mastodon']["instance"], + ) + oauth_url = oauth.auth_request_url(scopes = self._scopes) + logger.debug(f"Opening web page: {oauth_url}") + logger.debug("Please log in there.") + webbrowser.open_new(oauth_url) + time.sleep(5) + print("After logging in, paste here the code you received:") + oauth_code = input("Code: ").strip() + + token = oauth.log_in( + code = oauth_code, + scopes = self._scopes, + ) + self.config['mastodon']["token"] = token + logger.debug(f"Token: {token}") + + account = oauth.me().acct + self.config['mastodon']["account"] = account + logger.debug(f"Account: {account}") + self.save_config(self.config) + + def post(self, item, prev_status = None): + if self.dry_run: + print(item) + else: + if prev_status: + logger.debug(item) + return self.masto.status_reply(to_status = prev_status, status = item) + else: + logger.debug(item) + return self.masto.status_post(item) + + def __call__(self, items): + n = 0 + first_item = next(items, None) + if first_item == None: + logger.error("│ │ │ ├ No item to post") + return + else: + logger.debug(f"│ │ │ ├ Post #{n+1}") + n += 1 + prev_status = self.post(first_item) + for item in items: + logger.debug(f"│ │ │ ├ Post #{n+1}") + n += 1 + prev_status = self.post(item) + logger.debug(f"│ │ │ └OK, posted {n} items") + +class Forthlifter: + def __init__(self, + consumer = consume.lines(), + streamers = [stream.stdin()], + formatters = [format.asis()], + lifters = [lift.stdout()], + ): + if not isinstance(streamers, list): + streamers = [streamers] + self.streamers = streamers + logger.debug(f"├ streamers: {', '.join(type(i).__name__ for i in self.streamers)}") + + self.consumer = consumer + logger.debug(f"├ consumer: {type(consumer).__name__}") + + if not isinstance(formatters, list): + formatters = [formatters] + self.formatters = formatters + logger.debug(f"├ formatters: {', '.join(type(i).__name__ for i in self.formatters)}") + + if not isinstance(lifters, list): + lifters = [lifters] + self.lifters = lifters + logger.debug(f"└ lifters: {', '.join(type(i).__name__ for i in self.lifters)}") + + def consume(self): + logger.debug("│ ├ consume") + items = [] + for streamer in self.streamers: + logger.debug(f"│ │ ├ {type(self.consumer).__name__}({type(streamer).__name__})") + # Concatenate + items += self.consumer(streamer()) + logger.debug(f"│ │ └OK") + return items + + def format(self, items): + logger.debug(f"│ ├ format") + for formatter in self.formatters: + logger.debug(f"│ │ ├ {type(formatter).__name__}") + # Replace + items = formatter(items) + logger.debug(f"│ │ └OK") + return items + + def lift(self, items): + logger.debug(f"│ ├ lift") + for lifter in self.lifters: + logger.debug(f"│ │ ├ {type(lifter).__name__}") + # Call + lifter(items) + logger.debug(f"│ │ └OK") + + def __call__(self): + logger.debug("├ call") + self.lift(self.format(self.consume())) + logger.debug("│ └OK") + +# +# CLI +# + +def classes_of(namespace): + itf_name = namespace.__name__[0].upper()+namespace.__name__[1:] + itf = getattr(namespace, itf_name) + subs = {cls.__name__:cls for cls in itf.__subclasses__()} + return subs + + +def operator(asked_op): + logger.debug(f"├ Parsed operators:") + for op in asked_op: + # We do not use a f(a) notation, + # because the shell would try to interpret + # any unescaped/unquoted parentheses. + # It is faster to use f:a for the user. + m = re.match(r"(\w+):(.+)", op) + if m: + name = m.group(1) + args = [a.strip() for a in m.group(2).split(',')] + else: + name = op + args = [] + logger.debug(f"│ ├ {name}({','.join(args)})") + yield name,args + logger.debug("│ └OK") + + +def help_op(ops): + h = "" + itf = list(ops.values())[0].__mro__[1].__name__.lower() + h += f"\nAVAILABLE OPERATORS FOR --{itf}:\n" + for name,cls in ops.items(): + # Signature + hsig = f" -{itf[0]} {name.lower()}" + args = inspect.getfullargspec(cls)[0] + if args: + sign = inspect.signature(cls) + args = [sign.parameters[a].name for a in sign.parameters] + hsig += f"[:{','.join(args)}]" + h += hsig + + # Example (using defaults) + hex = f"-{itf[0]} " + args = inspect.getfullargspec(cls)[0] + if args: + sep = ":" + sign = inspect.signature(cls) + defs = [sign.parameters[a].default for a in sign.parameters] + if not all(defs) or any(re.match(r'\s', d) for d in defs): + hdefs = [d.replace("\n", r"\n") for d in defs] + hex += f"'{name.lower()}{sep}{','.join(hdefs)}'" + else: + hex += f"{name.lower()}{sep}{','.join(defs)}" + h += f"\n\tDefault: {hex}" + else: + hex += name.lower() + + if cls.__doc__: + h+= f"\n\t{cls.__doc__}\n" + else: + h += "\n" + return h + +def main(): + errors = {"NO_ERROR":0, "UNKNOWN_ERROR":1, "NO_SETUP_NEEDED":2, "NO_APP_KEY":10} + + usage = "Post text read on the standard input to a website or an API." + + # Dictionaries of {name: class} + streamers = classes_of(stream) + consumers = classes_of(consume) + formaters = classes_of(format) + lifters = classes_of(lift) + + # Extract docstrings + epilog = "" + epilog += help_op(streamers) + epilog += help_op(consumers) + epilog += help_op(formaters) + epilog += help_op(lifters) + + parser = argparse.ArgumentParser( + description=usage, + formatter_class = argparse.RawTextHelpFormatter, + epilog = epilog) + + parser.add_argument("-s", "--stream", + metavar = "STREAM(S)", + default = [], + action="append", + help="Where to get items (several occurences possibles, order matters).") + + parser.add_argument("-c", "--consume", + metavar = "CONSUME", + default = "lines", + help="How to extract the content from the stream (only one occurence).") + + parser.add_argument("-f", "--format", + metavar = "FORMAT(S)", + default = [], + action="append", + help="How to format items (several occurences possibles, order matters).") + + parser.add_argument("-l", "--lift", + metavar = "LIFT(S)", + default = [], + action="append", + help="How to send items somewhere (several occurences possibles, order matters).") + + initializables = [] + + for cls in \ + list(streamers.values()) \ + + list(consumers.values()) \ + + list(formaters.values()) \ + + list(lifters.values()): + if hasattr(cls, "needs_init"): + initializables.append(cls.__name__) + + if initializables: + parser.add_argument("-i", "--init", + choices = initializables, + help="Initialize the given operator.") + + asked = parser.parse_args() + + logging.basicConfig() + logger.setLevel("DEBUG") + + if asked.init: + for ops in [streamers, consumers, formaters, lifters]: + if asked.init in ops: + logger.debug(f"Initialize operator: {asked.init}") + op = ops[asked.init]() + op.init() + logger.debug(f"Done. You can use the `{asked.init}` operator.") + sys.exit(0) + + logger.debug("Available operators:") + logger.debug(f"├ streamers: {', '.join(streamers)}") + logger.debug(f"├ consumers: {', '.join(consumers)}") + logger.debug(f"├ formaters: {', '.join(formaters)}") + logger.debug(f"└ lifters: {', '.join(lifters)}") + + # Sane defaults. Cannot be a default in argparse, + # because choices would be append to it. + if not asked.stream: + asked.stream = ["stdin"] + + if not asked.format: + asked.format = ["asis"] + + if not asked.lift: + asked.lift = ["stdout"] + + logger.debug("Chosen operators:") + + cop,cargs = list(operator([asked.consume]))[0] + + forthlift = Forthlifter( + consumer = consumers[cop](*cargs), + streamers = [streamers[op](*args) for op,args in operator(asked.stream)], + formatters = [formaters[op](*args) for op,args in operator(asked.format)], + lifters = [ lifters[op](*args) for op,args in operator(asked.lift )], + ) + logger.debug("Run:") + forthlift() + logger.debug("└OK") + + +if __name__ == "__main__": + main() +