diff --git a/README.md b/README.md index 2ebdfd6..6b7e022 100644 --- a/README.md +++ b/README.md @@ -1,102 +1,102 @@ forthlift — post sequences of texts on social media =================================================== -Forthlift's primary use case is a command line application to post sequences of -text items on social media. You first prepare your thread in your text editor, -then call forthlift, and everything is posted at once. +Forthlift is a command line application to post sequences of text lines on social media. -As side effect, fothlift became a more generic tool, that can process text -streams, format them, assemble their atomic units in anothers, and then send -them elsewhere. It is thus designed to ease automation and integration with -existing tools, and has an Unix-like text-and-pipes philosophy. +Its main use case is to post on Twitter several status that you have prepared first in your text editor. -## Examples +It is thus designed to ease automation and integration in existing tools +and had an Unix-like text-and-pipes command line interface. -### Select in Vim, post on Mastodon - -Forthlift makes it easy to post "chained" Mastodon toots -(that answer to each others) from your text editor. -Just send the selected text to its standard input, selecting the -twitter API, and voilà. - -Using Vim, you can toot the visually-selected lines (each line making a toot) -with: -```vim -:'<,'>w !forthlift --lift mastodon -``` - -### Prepare toots in a file, post on Mastodon - -Let's say that you prepared a text file of the form: - -``` -This is a multi-lines... - -... toot! --- -And here is an answeer to the previous toot. -(With two lines as well.) --- -And a final toot. -``` - -You would want the toots to show a counter, managing the expectations of your -reader. - -You can post it right away with: -```sh -cat my_file.txt | forthlift --consume "sections:--,skip" --format counter -l mastodon -``` - -### Manage long threads from Markdown - -Let's say you want to post the sections of this README as a sequence of toots, -with a hashtag that indicates it's gona be long. -And you would want to double-check what it would do first. You can send the -items on the standard output, like: - -```sh -forthlift -s filename:README.md -c sections -f suffix:#longThread -f panel -l stdout -``` +For example, it makes it easy to post "chained" twitter status (that answer to each others) +from your text editor. Just send the selected text to its standard input, selecting the +twitter API, and voilà. Using Vim, you can tweet the visually-selected lines with: +`:'<,'>w !forthlift -a twitter` -### Full-featured file-to-Mastodon pipeline +## SYNOPSIS -```sh -# Define a shortcut -mastopost() { forthlift -s "filename:$1" -c sections:^#,skip -f skip -f counter -f suffix:#longThread -l mastodon; } -# Call it with: -mastopost my_file.txt -``` +`forthlift` [-h] + +`forthlift` [-a {stdout,twitter}] [-m MAXLEN] [-i|-t] [-c] [-q] [-d] [-s] ## DESCRIPTION ### A generic tool -Generally speaking, Forthlift is a Unix-like command that operate a sequence of -pre-programmed chained actions on its text input. +Generally speaking, it's a Unix-like command that operate a sequence of pre-programmed +chained actions on its text input. -Forthlift has four action operators classes, one for each step of the processing: - -1. `stream`, indicating from where to get the input data. -2. `consume`, telling *how* to parse the data from the input stream. -3. `format`, for manipulating the content itself. -4. `lift`, defining where to send the final items. - -Only one `consume` operator can be passed, but any number of `stream`, `format`, -and `lift` operators can be combined. -To apply several operators of the same class, the user must pass the same flag -several time. For instance: - -```sh -# Applies strip, then skip, then counter on the consumed lines. -forthlift --format strip --format skip --format counter -``` - -Most of the time, the order of the operators matters. +It comes with some existing actions: +* `stdout`: print the input text on the standard output, +* `twitter`: send the input text as status on twitter. -## SYNOPSIS +### Features + +The main feature of forthlift is its ability to *chain* actions. +Depending on the chosen API, this could means different things: + +* for twitter, this mean that the sequence of status will be posted + as a sequence of *answers* and not as a list of independent tweets. + +* for the "stdout" API, this means that each printed line will start + with its index in the input list. + + +While it is recommended to prepare the input text with other text-processing tools +(fold, fmt, tr, sed, grep, your text editor, etc.), +forthlift comes with some rough text-processing capabilities, among which: + +* ignore or trim lines that are longer than a given size (see `--trim` and `--ignore`), + +* add a counter of the form `/` at the end of the lines (see `--counter`). + + +## OPTIONS + +* -h, --help: show this help message and exit + +* -a {stdout,twitter}, --api {stdout,twitter}: Name of the API to use. + (default: stdout) + +* -m MAXLEN, --max-len MAXLEN: Maximum number of characters in the lines. + (default: 140) + +* -i, --ignore: Ignore lines that are longer than MAXLEN (default: False) + +* -t, --trim: Trim down lines that are longer than MAXLEN. (default: False) + +* -c, --counter: Add a counter of the form " x/N" at the end of the lines, + with N being the number of lines read and x the current index of the line. + NOTE: this necessitate to read all the input before processing it. + (default: False) + +* -q, --quiet: Do not print errors and warnings on the standard error output. + (default: False) + +* -s, --setup: Setup the selected API (e.g. authenticate and get authorization to post). + (default: False) + +* -d, --independent: Post each item independently from the previous one + The behaviour of this option depends on the selected API. + For example on Twitter: do not post a line as an answer to the previous one but as a new tweet. + (default: False) + +* --twitter-images FILENAME(S) [FILENAME(S) ...]: + Upload each given image files along with the corresponding tweets in the sequence. + If there are more images than tweets, they are silently ignored. + (default: None) + + +## INSTALLATION + +### Twitter + +1) Copy `twitter.conf-dist` as `twitter.conf` and indicate your developer's API keys in the corresponding fields. +2) Run `forthlift --api twitter --setup` and follow the instructions (go to the given URL, then paste the given PIN). + +Why should you get developer's API keys? Because Twitter does not like open-source desktop applications, see: +http://arstechnica.com/security/2010/09/twitter-a-case-study-on-how-to-do-oauth-wrong/ -{{FORTHLIFT_HELP}} diff --git a/forthlift.py b/forthlift.py new file mode 100755 index 0000000..b48ecaf --- /dev/null +++ b/forthlift.py @@ -0,0 +1,331 @@ +#!/usr/bin/env python + +import sys +import locale +import argparse +import datetime +import ConfigParser + +import tweepy + + +class AppKeyError(Exception): + def __init__(self, msg): + self.msg = msg + + +def write(data, stream = sys.stdout): + """ + Write "data" on the given stream, then flush, silently handling broken pipes. + """ + try: + stream.write(data.encode(locale.getpreferredencoding(True) or "utf-8")) + stream.flush() + + # Silently handle broken pipes + except IOError: + try: + stream.close() + except IOError: + pass + + +def readline( stream_in ): + while True: + try: + line = stream_in.readline().decode(stream_in.encoding or locale.getpreferredencoding(True)).strip() + except UnicodeDecodeError: + continue + except KeyboardInterrupt: + break + if not line: + break + yield line + + raise StopIteration + + +def setup_counter( data, asked ): + assert(asked.counter) + + # unroll everything + data = list(data) + nb = len(data) + # " int/int" + counter_size = len(str(nb).encode("utf-8").decode("utf-8") )*2 + 1 + 1 + + for i,line in enumerate(data): + counter = u" %i/%i" % (i+1,nb) + curmax = asked.max_len - len(counter) + if len(line) > curmax: + if asked.ignore: + data[i] = line + elif asked.trim: + data[i] = line[:curmax] + data[i] = data[i] + counter + + return data + + +def setup_hem(data, asked): + for line in data: + if len(line) > asked.max_len: + if asked.ignore: + pass + elif asked.trim: + line = line[:asked.max_len] + + yield line + + +def setup( data, asked ): + if asked.counter: + f = setup_counter + else: + f = setup_hem + + for line in f(data, asked): + yield line + + +def operate( func, *args ): + for line in func( readline(sys.stdin), *args ): + write(line) + + +# +# STDOUT API +# + +def on_stdout( data, asked, endline="\n" ): + lines = setup(data, asked) + # You can do something on the whole set of lines if you want. + + for i,line in enumerate(lines): + if endline: + if line[-1] != endline: + line += endline + + if asked.independent: + yield line + else: + l = u"%i %s" % (i,line) + yield l + + +# +# TWITTER API +# + +def on_twitter( data, api, asked, endline="\n" ): + lines = setup(data, asked) + + prev_status_id = None + + images = asked.twitter_images + if images: + images.reverse() + + for line in lines: + if images: + img = images.pop() + else: + img = None + + if img: + # API.update_with_media(filename[, status][, in_reply_to_status_id][, lat][, long][, source][, place_id][, file]) + status = api.update_with_media(img, line, prev_status_id) + else: + # API.update_status(status[, in_reply_to_status_id][, lat][, long][, source][, place_id]) + status = api.update_status(line, prev_status_id) + + if asked.independent: + prev_status_id = None + else: + prev_status_id = status.id + + yield status.text + endline + + +def setup_twitter(configfile="twitter.conf"): + + config = ConfigParser.RawConfigParser() + + # Authenticate the application. + config.read(configfile) + + if not config.has_section("App"): + raise AppKeyError("ERROR: did not found application keys, ask your distribution maintainer or get keys from Twitter first.") + + app_key = config.get("App","app_key") + app_secret = config.get("App","app_key_secret") + + auth = tweepy.OAuthHandler(app_key, app_secret) + + + # Authenticate the user. + auth_url = auth.get_authorization_url() + print "Copy and paste this URL in your browser while your are logged in the twitter account where you want to post." + print "Authorization URL: " + auth_url + + print "Then paste the Personal Identification Number given by Twitter:" + verifier = raw_input('PIN: ').strip() + token = auth.get_access_token(verifier) + + # Authenticate and get the user name. + token_key, token_secret = token + auth.set_access_token(token_key, token_secret) + api = tweepy.API(auth) + username = api.me().name + print "Authentication successful, ready to post to account: " + username + + + # Save authentication tokens. + if not config.has_section("Info"): + config.add_section('Info') + config.set('Info','account', username) + config.set('Info','auth_date', datetime.datetime.utcnow().isoformat()) + + if not config.has_section("Auth"): + config.add_section('Auth') + config.set('Auth', 'local_token', token_key) + config.set('Auth', 'local_token_secret', token_secret) + + # Writing our configuration file to 'example.cfg' + with open(configfile, 'wb') as fd: + config.write(fd) + + +# +# CLI +# + +if __name__=="__main__": + + errors = {"NO_ERROR":0, "UNKNOWN_ERROR":1, "NO_SETUP_NEEDED":2, "NO_APP_KEY":10} + + usage = "Post text read on the standard input to a website or an API." + apis =["stdout", "twitter"] # TODO "http_post", "http_get", + + parser = argparse.ArgumentParser( description=usage, + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument("-a", "--api", choices=apis, default="stdout", + help="Name of the API to use.") + + # Generic options + + parser.add_argument("-m", "--max-len", metavar="MAXLEN", type=int, default=140, + help="Maximum number of characters in the lines.") + + parser.add_argument("-i", "--ignore", action="store_true", + help="Ignore lines that are longer than MAXLEN") + + parser.add_argument("-t", "--trim", action="store_true", + help="Trim down lines that are longer than MAXLEN.") + + parser.add_argument("-c", "--counter", action="store_true", + help="Add a counter of the form \" x/N\" at the end of the lines, \ + with N being the number of lines read and x the current index of the line. \ + NOTE: this necessitate to read all the input before processing it.") + + parser.add_argument("-q", "--quiet", action="store_true", + help="Do not print errors and warnings on the standard error output.") + + # TODO option: rate at which to post lines + + # API-dependent options + + parser.add_argument("-s", "--setup", action="store_true", + help="Setup the selected API (e.g. authenticate and get authorization to post).") + + parser.add_argument("-d", "--independent", action="store_true", + help="Post each item independently from the previous one. \ + The behaviour of this option depends on the selected API. \ + For example on Twitter: do not post a line as an answer to the previous one but as a new tweet.") + + # Twitter + parser.add_argument("--twitter-images", metavar="FILENAME(S)", nargs="+", type=str, + help="Upload each given image files along with the corresponding tweets in the sequence. If there are more images than tweets, they are silently ignored.") + + asked = parser.parse_args() + + # Setup + if asked.setup: + configfile = asked.api+".conf" + if asked.api == "twitter": + try: + setup_twitter(configfile) + except AppKeyError as e: + if not asked.quiet: + sys.stderr.write(e.msg) + sys.exit(errors["NO_APP_KEY"]) + except: + print "Unexpected error:", sys.exc_info()[0] + raise + else: + sys.exit(errors["NO_ERROR"]) + + else: # other API + if not asked.quiet: + sys.stderr.write("This API does not need setup.") + sys.exit(errors["NO_SETUP_NEEDED"]) + + + # Consistency checks + + if asked.ignore and asked.trim: + if not asked.quiet: + sys.stderr.write("WARNING: asking to trim AND to ignore is not logical, I will ignore.") + assert( not (asked.ignore and asked.trim) ) + + if asked.twitter_images: + if not asked.api == "twitter": + if not asked.quiet: + sys.stderr.write("WARNING: asking to upload images on twitter while not using the twitter API is not logical, I will ignore.") + assert( not (asked.twitter_images and not asked.api=="twitter") ) + + else: # Test readable images + cannot=[] + for img in asked.twitter_images: + try: + with open(img) as f: + f.read() + except OSError: + cannot.append(img) + if cannot: + print("Cannot open the following image files, I will not continue: ") + for img in cannot: + print(img) + sys.exit(5) # I/O Error + + # APIs + + if asked.api == "stdout": + + operate( on_stdout, asked ) + + + elif asked.api == "twitter": + + # Authenticate + config = ConfigParser.RawConfigParser() + config.read('twitter.conf') + + app_key = config.get("App","app_key") + app_secret = config.get("App","app_key_secret") + + try: + verifier_code = config.get("Auth","code") + except: + access_token = config.get("Auth","local_token") + access_token_secret = config.get("Auth","local_token_secret") + + auth = tweepy.OAuthHandler(app_key, app_secret, "https://api.twitter.com/1.1/") + auth.set_access_token(access_token, access_token_secret) + + api = tweepy.API(auth) + + # Post + operate( on_twitter, api, asked ) + diff --git a/pyproject.toml b/pyproject.toml deleted file mode 100644 index 4776791..0000000 --- a/pyproject.toml +++ /dev/null @@ -1,26 +0,0 @@ -[project] -name = "forthlift" -version = "0.1.0" -description = "A command line application to post sequences of text lines on social media" -readme = "README.md" -requires-python = ">=3.12" -dependencies = [ - "argparse>=1.4.0", - "configparser>=7.2.0", - "datetime>=6.0", - "mastodon-py>=2.1.4", - "rich>=14.3.3", - "toml>=0.10.2", -] - -[project.scripts] -forthlift = "forthlift.forthlift:main" - -[build-system] -requires = ["setuptools>=68", "wheel"] -build-backend = "setuptools.build_meta" - -[tool.setuptools.packages.find] -where = ["src"] -include = ["forthlift*"] - diff --git a/src/forthlift/forthlift.py b/src/forthlift/forthlift.py deleted file mode 100755 index 2b290bf..0000000 --- a/src/forthlift/forthlift.py +++ /dev/null @@ -1,655 +0,0 @@ -#!/usr/bin/env python - -import io -import os -import re -import sys -import toml -import time -import locale -import logging -import inspect -import tempfile -import argparse -import datetime -import mastodon -import webbrowser - -from configparser import ConfigParser -from rich.panel import Panel -from rich import print - -# -# LIBRARY -# - -logger = logging.getLogger("forthlift") - - -class stream: - class Stream: - def __call__(self): - raise NotImplementedError - - class stdin(Stream): - """Stream from the standard input.""" - def __call__(self): - return sys.stdin - - class filename(Stream): - """Stream from the given file.""" - def __init__(self, filename = ""): - self.filename = filename - - def __call__(self): - self.fd = open(self.filename, 'r') - return self.fd - - def __del__(self): - self.fd.close() - -class consume: - class Consume: - def __call__(self, stream): - raise NotImplementedError - - class lines(Consume): - """Consume line by line.""" - def __call__(self, stream): - for line in stream: - yield line - - class paragraphs(Consume): - """Consume paragraph by paragraph (separated by an empty line).""" - def __call__(self, stream): - current = "" - for item in stream: - # Not counting spaces as legit content. - if item.strip(): - current += item - else: - yield current - current = "" - if current.strip(): - yield current - - class sections(Consume): - """Consume section by section. A new section starts when a line matches the `mark` regexp. If `skip` is set to 'skip', the marked line is not consumed.""" - def __init__(self, mark = r"^#", skip = "noskip"): - self.mark = mark - if skip == 'skip': - self.skip = True - elif skip == 'noskip': - self.skip = False - else: - self.skip = bool(skip) - - def __call__(self, stream): - current = "" - for item in stream: - if re.match(self.mark, item.strip()): - yield current - if self.skip: - current = "" - else: - current = item - else: - current += item - yield current - - class nlines(Consume): - """Consume by groups of `nb` lines.""" - def __init__(self, nb = "10"): - self.nb = int(nb) - - def __call__(self, stream): - count = 0 - current = "" - for item in stream: - if count >= self.nb: - yield current - current = "" - count = 0 - else: - current += item - count += 1 - yield current - - -class format: - class Format: - def __call__(self, items): - raise NotImplementedError - - class asis(Format): - """Do not format anything.""" - def __call__(self, items): - for item in items: - yield item - - class trim(Format): - """Split items if their length is longer than `max`, and create new items with the remaining parts.""" - # Every argument is a string. - def __init__(self, max = "140"): - self.max = int(max) - - def trim(self, item): - if len(item) > self.max: - yield item[:self.max] - for subitem in self.trim(item[self.max:]): - yield subitem - else: - yield item - - def __call__(self, items): - for item in items: - for separated in self.trim(item): - yield separated - - class eol(Format): - """Add an end of line after the item.""" - def __call__(self, items): - for item in items: - yield item + "\n" - - class strip(Format): - """Remove any space character around the item.""" - def __call__(self, items): - for item in items: - yield item.strip() - - class skip(Format): - """Skip items containing only spaces or being empty.""" - def __call__(self, items): - for item in items: - if item.strip(): - yield item - - class glue(Format): - """Glue consecutive items together if they are not separated by an empty one. Make a new item after an empty one.""" - def __call__(self, items): - current = "" - for item in items: - if item.strip(): - current += item - else: - yield current - current = "" - if current.strip(): - yield current - - class panel(Format): - """Surround each item by an ascii-art box. NOTE: only works when lifted on stdout.""" - def __call__(self, items): - for item in items: - yield Panel.fit(item.strip()) - - class counter(Format): - """Add a counter at the end of each items, with the current index and the total. If `end` is given, it is added at the very last item. If `sep` is given, it is appended to the item before the counter itself.""" - def __init__(self, end = ' ␄', sep = '\n'): - self.sep = sep - self.end = end - - def __call__(self, counted): - items = list(counted) # Consume everything at once. - total = len(items) - res = [] - for i,item in enumerate(items): - res.append(f"{item}{self.sep}{i+1}/{total}") - if res: - res[-1] += self.end - for r in res: - yield r - - class suffix(Format): - """Add the `content` string after each item. If `sep` is given, it is appended to the item before the content and after the item.""" - def __init__(self, content = "", sep = '\n'): - self.content = content - self.sep = sep - - def __call__(self, items): - for i in items: - yield i+self.sep+self.content - - class prefix(Format): - """Add the `content` string before each item. If `sep` is given, it is prepended to the item after the content and before the item.""" - def __init__(self, content = "", sep = '\n'): - self.content = content - self.sep = sep - - def __call__(self, items): - for i in items: - yield self.content+self.sep+i - - -class lift: - class Lift: - def call(self, item): - """Interface for posting a single independant item.""" - raise NotImplementedError - - def __call__(self, items): - """Interface for posting all items.""" - count = 0 - for item in items: - self.call(item) - count += 1 - logger.debug(f"│ │ ├ {count}th item") - logger.debug(f"│ │ └OK {count} items") - - class stdout(Lift): - """Print the items on the standard output.""" - def call(self, item): - if item: - if __debug__: - print(item, end = '', file = sys.stdout, flush = True) - else: - print(item, end = '') - else: - logger.debug("Empty item") - - - class mastodon(Lift): - - def __init__(self, dryrun = 'nodry'): - self._scopes = ['read', 'write'] - - if dryrun == 'dry': - self.dry_run = True - elif dryrun == 'nodry': - self.dry_run = False - else: - msg = f"I do not understand what you mean by `{dryrun}`. Please either indicate `dry` or `nodry`." - logger.error(msg) - raise RuntimeError(msg) - - pyp = toml.load("pyproject.toml") - self.name = pyp["project"]["name"] - self.version = pyp["project"]["version"] - - self.config_path = f"{self.name}.toml" - if not os.path.isfile(self.config_path): - with open(self.config_path, 'w') as fd: - fd.write("") - self.config = self.load_config() - - if self.needs_init(): - self.init() - else: - logger.debug( - "│ │ ├ Mastodon lifter logged in: " \ - f"@{self.config['mastodon']['account']}" \ - f"@{self.config['mastodon']['instance']}" \ - ) - logger.debug("│ │ └OK,") - - self.masto = mastodon.Mastodon( - api_base_url = self.config["mastodon"]["instance"], - # client_id = self.config["mastodon"]["client_id"], - client_secret = self.config["mastodon"]["client_secret"], - access_token = self.config["mastodon"]["token"], - user_agent = f"{self.name}:{self.version}" - ) - - def needs_init(self): - if not self.config["mastodon"]["instance"] \ - or not self.config["mastodon"]["account"] \ - or not self.config["mastodon"]["client_id"] \ - or not self.config["mastodon"]["client_secret"] \ - or not self.config["mastodon"]["token"]: - return True - else: - return False - - def init(self): - if not self.needs_init(): - print(f"The {__name__} operator has already been initialized") - print("Current configuration is:") - print("\tinstance:", self.config["mastodon"]["instance"]) - print("\tuser:", self.config["mastodon"]["account"]) - - ans = input("Do you want to re-init it? (yes/no): ").strip() - if ans[0].lower() == 'n': - return - elif ans[1].lower() != 'y': - print("I did not understand your answer, " - " I'll assume you meant `no` and stop here.") - return - - print("URL of your Mastodon instance?") - print("For example: https://social.antigene.org") - # instance = input("URL: ").strip() - instance = "https://social.antigene.org" - self.config["mastodon"]["instance"] = instance - - self.register_app() - self.oauth() - - def save_config(self, config): - logger.debug(f"Save config in: {self.config_path}") - with open(self.config_path, 'w') as fd: - toml.dump(config, fd) - - def load_config(self): - logger.debug(f"│ │ ├ Load config from: {self.config_path}") - config = { - "mastodon": { - "instance": None, - "account": None, - "client_id": None, - "client_secret": None, - "token": None, - } - } - local_config = toml.load(self.config_path) - config.update(local_config) - - for k,v in config["mastodon"].items(): - logger.debug(f"│ │ │ ├ {k}: {v}") - logger.debug(f"│ │ │ └OK") - return config - - def register_app(self): - logger.debug(f"Register {self.name} on {self.config["mastodon"]['instance']}") - - client_id, client_secret = mastodon.Mastodon.create_app( - self.name, - scopes = self._scopes, - api_base_url = self.config['mastodon']["instance"], - website = "https://nojhan.net/git/nojhan/forthlift", - user_agent = f"{self.name}:{self.version}", - ) - - self.config["mastodon"]["client_id"] = client_id - logger.debug(f"ID: {client_id}") - self.config["mastodon"]["client_secret"] = client_secret - logger.debug(f"Secret: {client_secret}") - self.save_config(self.config) - - def oauth(self): - logger.debug(f"OAuth to: {self.config['mastodon']['instance']}") - oauth = mastodon.Mastodon( - client_id = self.config['mastodon']["client_id"], - client_secret = self.config['mastodon']["client_secret"], - api_base_url = self.config['mastodon']["instance"], - ) - oauth_url = oauth.auth_request_url(scopes = self._scopes) - logger.debug(f"Opening web page: {oauth_url}") - logger.debug("Please log in there.") - webbrowser.open_new(oauth_url) - time.sleep(5) - print("After logging in, paste here the code you received:") - oauth_code = input("Code: ").strip() - - token = oauth.log_in( - code = oauth_code, - scopes = self._scopes, - ) - self.config['mastodon']["token"] = token - logger.debug(f"Token: {token}") - - account = oauth.me().acct - self.config['mastodon']["account"] = account - logger.debug(f"Account: {account}") - self.save_config(self.config) - - def post(self, item, prev_status = None): - if self.dry_run: - print(item) - else: - if prev_status: - logger.debug(item) - return self.masto.status_reply(to_status = prev_status, status = item) - else: - logger.debug(item) - return self.masto.status_post(item) - - def __call__(self, items): - n = 0 - first_item = next(items, None) - if first_item == None: - logger.error("│ │ │ ├ No item to post") - return - else: - logger.debug(f"│ │ │ ├ Post #{n+1}") - n += 1 - prev_status = self.post(first_item) - for item in items: - logger.debug(f"│ │ │ ├ Post #{n+1}") - n += 1 - prev_status = self.post(item) - logger.debug(f"│ │ │ └OK, posted {n} items") - -class Forthlifter: - def __init__(self, - consumer = consume.lines(), - streamers = [stream.stdin()], - formatters = [format.asis()], - lifters = [lift.stdout()], - ): - if not isinstance(streamers, list): - streamers = [streamers] - self.streamers = streamers - logger.debug(f"├ streamers: {', '.join(type(i).__name__ for i in self.streamers)}") - - self.consumer = consumer - logger.debug(f"├ consumer: {type(consumer).__name__}") - - if not isinstance(formatters, list): - formatters = [formatters] - self.formatters = formatters - logger.debug(f"├ formatters: {', '.join(type(i).__name__ for i in self.formatters)}") - - if not isinstance(lifters, list): - lifters = [lifters] - self.lifters = lifters - logger.debug(f"└ lifters: {', '.join(type(i).__name__ for i in self.lifters)}") - - def consume(self): - logger.debug("│ ├ consume") - items = [] - for streamer in self.streamers: - logger.debug(f"│ │ ├ {type(self.consumer).__name__}({type(streamer).__name__})") - # Concatenate - items += self.consumer(streamer()) - logger.debug(f"│ │ └OK") - return items - - def format(self, items): - logger.debug(f"│ ├ format") - for formatter in self.formatters: - logger.debug(f"│ │ ├ {type(formatter).__name__}") - # Replace - items = formatter(items) - logger.debug(f"│ │ └OK") - return items - - def lift(self, items): - logger.debug(f"│ ├ lift") - for lifter in self.lifters: - logger.debug(f"│ │ ├ {type(lifter).__name__}") - # Call - lifter(items) - logger.debug(f"│ │ └OK") - - def __call__(self): - logger.debug("├ call") - self.lift(self.format(self.consume())) - logger.debug("│ └OK") - -# -# CLI -# - -def classes_of(namespace): - itf_name = namespace.__name__[0].upper()+namespace.__name__[1:] - itf = getattr(namespace, itf_name) - subs = {cls.__name__:cls for cls in itf.__subclasses__()} - return subs - - -def operator(asked_op): - logger.debug(f"├ Parsed operators:") - for op in asked_op: - # We do not use a f(a) notation, - # because the shell would try to interpret - # any unescaped/unquoted parentheses. - # It is faster to use f:a for the user. - m = re.match(r"(\w+):(.+)", op) - if m: - name = m.group(1) - args = [a.strip() for a in m.group(2).split(',')] - else: - name = op - args = [] - logger.debug(f"│ ├ {name}({','.join(args)})") - yield name,args - logger.debug("│ └OK") - - -def help_op(ops): - h = "" - itf = list(ops.values())[0].__mro__[1].__name__.lower() - h += f"\nAVAILABLE OPERATORS FOR --{itf}:\n" - for name,cls in ops.items(): - # Signature - hsig = f" -{itf[0]} {name.lower()}" - args = inspect.getfullargspec(cls)[0] - if args: - sign = inspect.signature(cls) - args = [sign.parameters[a].name for a in sign.parameters] - hsig += f"[:{','.join(args)}]" - h += hsig - - # Example (using defaults) - hex = f"-{itf[0]} " - args = inspect.getfullargspec(cls)[0] - if args: - sep = ":" - sign = inspect.signature(cls) - defs = [sign.parameters[a].default for a in sign.parameters] - if not all(defs) or any(re.match(r'\s', d) for d in defs): - hdefs = [d.replace("\n", r"\n") for d in defs] - hex += f"'{name.lower()}{sep}{','.join(hdefs)}'" - else: - hex += f"{name.lower()}{sep}{','.join(defs)}" - h += f"\n\tDefault: {hex}" - else: - hex += name.lower() - - if cls.__doc__: - h+= f"\n\t{cls.__doc__}\n" - else: - h += "\n" - return h - -def main(): - errors = {"NO_ERROR":0, "UNKNOWN_ERROR":1, "NO_SETUP_NEEDED":2, "NO_APP_KEY":10} - - usage = "Post text read on the standard input to a website or an API." - - # Dictionaries of {name: class} - streamers = classes_of(stream) - consumers = classes_of(consume) - formaters = classes_of(format) - lifters = classes_of(lift) - - # Extract docstrings - epilog = "" - epilog += help_op(streamers) - epilog += help_op(consumers) - epilog += help_op(formaters) - epilog += help_op(lifters) - - parser = argparse.ArgumentParser( - description=usage, - formatter_class = argparse.RawTextHelpFormatter, - epilog = epilog) - - parser.add_argument("-s", "--stream", - metavar = "STREAM(S)", - default = [], - action="append", - help="Where to get items (several occurences possibles, order matters).") - - parser.add_argument("-c", "--consume", - metavar = "CONSUME", - default = "lines", - help="How to extract the content from the stream (only one occurence).") - - parser.add_argument("-f", "--format", - metavar = "FORMAT(S)", - default = [], - action="append", - help="How to format items (several occurences possibles, order matters).") - - parser.add_argument("-l", "--lift", - metavar = "LIFT(S)", - default = [], - action="append", - help="How to send items somewhere (several occurences possibles, order matters).") - - initializables = [] - - for cls in \ - list(streamers.values()) \ - + list(consumers.values()) \ - + list(formaters.values()) \ - + list(lifters.values()): - if hasattr(cls, "needs_init"): - initializables.append(cls.__name__) - - if initializables: - parser.add_argument("-i", "--init", - choices = initializables, - help="Initialize the given operator.") - - asked = parser.parse_args() - - logging.basicConfig() - logger.setLevel("DEBUG") - - if asked.init: - for ops in [streamers, consumers, formaters, lifters]: - if asked.init in ops: - logger.debug(f"Initialize operator: {asked.init}") - op = ops[asked.init]() - op.init() - logger.debug(f"Done. You can use the `{asked.init}` operator.") - sys.exit(0) - - logger.debug("Available operators:") - logger.debug(f"├ streamers: {', '.join(streamers)}") - logger.debug(f"├ consumers: {', '.join(consumers)}") - logger.debug(f"├ formaters: {', '.join(formaters)}") - logger.debug(f"└ lifters: {', '.join(lifters)}") - - # Sane defaults. Cannot be a default in argparse, - # because choices would be append to it. - if not asked.stream: - asked.stream = ["stdin"] - - if not asked.format: - asked.format = ["asis"] - - if not asked.lift: - asked.lift = ["stdout"] - - logger.debug("Chosen operators:") - - cop,cargs = list(operator([asked.consume]))[0] - - forthlift = Forthlifter( - consumer = consumers[cop](*cargs), - streamers = [streamers[op](*args) for op,args in operator(asked.stream)], - formatters = [formaters[op](*args) for op,args in operator(asked.format)], - lifters = [ lifters[op](*args) for op,args in operator(asked.lift )], - ) - logger.debug("Run:") - forthlift() - logger.debug("└OK") - - -if __name__ == "__main__": - main() -