From f0d44a9c9851fcc54c860eaa364a39f107e99006 Mon Sep 17 00:00:00 2001 From: nojhan Date: Tue, 13 Dec 2016 00:49:29 +0100 Subject: [PATCH] Refactor curses hmi Use handlers for keys Better formatting --- ereshkigal.py | 294 +++++++++++++++++++++++++++----------------------- 1 file changed, 161 insertions(+), 133 deletions(-) diff --git a/ereshkigal.py b/ereshkigal.py index 032759e..6772e32 100755 --- a/ereshkigal.py +++ b/ereshkigal.py @@ -285,10 +285,100 @@ class CursesMonitor: # colors # FIXME different colors for different types of tunnels (auto or raw) - self.colors_tunnel = {'kind':4, 'autossh_pid':0, 'in_port':3, 'via_host':2, 'target_host':2, 'out_port':3, 'tunnels_nb':4, 'tunnels_nb_none':1} - self.colors_highlight = {'kind':9, 'autossh_pid':9, 'in_port':9, 'via_host':9, 'target_host':9, 'out_port':9, 'tunnels_nb':9, 'tunnels_nb_none':9} + self.colors_tunnel = {'kind_auto':4, 'kind_raw':5, 'ssh_pid':0, 'in_port':3, 'via_host':2, 'target_host':2, 'out_port':3, 'tunnels_nb':4, 'tunnels_nb_none':1} + self.colors_highlight = {'kind_auto':9, 'kind_raw':5, 'ssh_pid':9, 'in_port':9, 'via_host':9, 'target_host':9, 'out_port':9, 'tunnels_nb':9, 'tunnels_nb_none':9} self.colors_connection = {'ssh_pid':0, 'autossh_pid':0, 'status':4, 'status_out':1, 'local_address':2, 'in_port':3, 'foreign_address':2, 'out_port':3} + def do_Q(self): + """Quit""" + logging.debug("Waited: %s" % self.log_ticks) + self.log_ticks = "" + logging.debug("Key pushed: Q") + return False + + + def do_R(self): + """Reload autossh tunnel""" + logging.debug("Waited: %s" % self.log_ticks) + self.log_ticks = "" + logging.debug("Key pushed: R") + # if a pid is selected + if self.cur_pid != -1: + # send the SIGUSR1 signal + if type(self.tp.get_tunnel(self.cur_line)) == AutoTunnel: + # autossh performs a reload of existing tunnels that it manages + logging.debug("SIGUSR1 on PID: %i" % self.cur_pid) + os.kill( self.cur_pid, signal.SIGUSR1 ) + else: + logging.debug("Cannot reload a RAW tunnel") + return True + + + def do_C(self): + """Close tunnel""" + logging.debug("Waited: %s" % self.log_ticks) + self.log_ticks = "" + logging.debug("Key pushed: C") + if self.cur_pid != -1: + # send a SIGKILL + # the related process is stopped + # FIXME SIGTERM or SIGKILL ? + + tunnel = self.tp.get_tunnel(self.cur_line) + if type(tunnel) == AutoTunnel: + logging.debug("SIGKILL on autossh PID: %i" % self.cur_pid) + try: + os.kill( self.cur_pid, signal.SIGKILL ) + except OSError: + logging.error("No such process: %i" % self.cur_pid) + + logging.debug("SIGKILL on ssh PID: %i" % tunnel.ssh_pid) + try: + os.kill( tunnel.ssh_pid, signal.SIGKILL ) + except OSError: + logging.error("No such process: %i" % tunnel.ssh_pid) + self.cur_line = -1 + self.cur_pid = -1 + # FIXME update cur_pid or get rid of it everywhere + return True + + + def do_N(self): + """Show connections""" + logging.debug("Waited: %s" % self.log_ticks) + self.log_ticks = "" + logging.debug("Key pushed: N") + self.show_connections = not self.show_connections + return True + + + def do_258(self): + """Move down""" + logging.debug("Waited: %s" % self.log_ticks) + self.log_ticks = "" + logging.debug("Key pushed: down") + # if not the end of the list + if self.cur_line < len(self.tp.tunnels)-1: + self.cur_line += 1 + # get the pid + if type(self.tp.get_tunnel(self.cur_line)) == AutoTunnel: + self.cur_pid = self.tp.get_tunnel(self.cur_line).autossh_pid + else: + self.cur_pid = self.tp.get_tunnel(self.cur_line).ssh_pid + return True + + + def do_259(self): + """Move up""" + logging.debug("Waited: %s" % self.log_ticks) + self.log_ticks = "" + logging.debug("Key pushed: up") + if self.cur_line > -1: + self.cur_line -= 1 + if self.cur_line > 0: + self.cur_pid = self.tp.get_tunnel(self.cur_line).pid + return True + def __call__(self): """Start the interface""" @@ -300,32 +390,33 @@ class CursesMonitor: self.display() # first update counter - last_update = time.clock() - last_state = None - log_ticks = "" + self.last_update = time.clock() + self.last_state = None + self.log_ticks = "" # infinite loop - while(1): + notquit = True + while(notquit): # wait some time # necessary to not overload the system with unnecessary calls time.sleep( self.ui_delay ) # if its time to update - if time.time() > last_update + self.update_delay: + if time.time() > self.last_update + self.update_delay: self.tp.update() # reset the counter - last_update = time.time() + self.last_update = time.time() state = "%s" % self.tp - if state != last_state: - logging.debug("Waited: %s" % log_ticks) - log_ticks = "" + if state != self.last_state: + logging.debug("Waited: %s" % self.log_ticks) + self.log_ticks = "" logging.debug("----- Time of screen update: %s -----" % time.time()) logging.debug("State of tunnels:\n%s" % self.tp) - last_state = state + self.last_state = state else: - log_ticks += "." + self.log_ticks += "." kc = self.scr.getch() # keycode @@ -339,89 +430,15 @@ class CursesMonitor: # ascii character from the keycode ch = chr(kc) - # Quit - if ch in 'Qq': - logging.debug("Waited: %s" % log_ticks) - log_ticks = "" - logging.debug("Key pushed: Q") - break - - # Reload related autossh tunnels - elif ch in 'rR': - logging.debug("Waited: %s" % log_ticks) - log_ticks = "" - logging.debug("Key pushed: R") - # if a pid is selected - if self.cur_pid != -1: - # send the SIGUSR1 signal - if type(self.tp.get_tunnel(self.cur_line)) == AutoTunnel: - # autossh performs a reload of existing tunnels that it manages - logging.debug("SIGUSR1 on PID: %i" % self.cur_pid) - os.kill( self.cur_pid, signal.SIGUSR1 ) - else: - logging.debug("Cannot reload a RAW tunnel") - - # Kill autossh process - elif ch in 'kK': - logging.debug("Waited: %s" % log_ticks) - log_ticks = "" - logging.debug("Key pushed: K") - if self.cur_pid != -1: - # send a SIGKILL - # the related process is stopped - # FIXME SIGTERM or SIGKILL ? - - tunnel = self.tp.get_tunnel(self.cur_line) - if type(tunnel) == AutoTunnel: - logging.debug("SIGKILL on autossh PID: %i" % self.cur_pid) - try: - os.kill( self.cur_pid, signal.SIGKILL ) - except OSError: - logging.error("No such process: %i" % self.cur_pid) - - logging.debug("SIGKILL on ssh PID: %i" % tunnel.ssh_pid) - try: - os.kill( tunnel.ssh_pid, signal.SIGKILL ) - except OSError: - logging.error("No such process: %i" % tunnel.ssh_pid) - # FIXME update cur_pid or get rid of it everywhere - - - # Switch to show ssh connections - # only available for root - elif ch in 'tT':# and os.getuid() == 0: - logging.debug("Waited: %s" % log_ticks) - log_ticks = "" - logging.debug("Key pushed: T") - self.show_connections = not self.show_connections - - # key pushed - elif kc == curses.KEY_DOWN: - logging.debug("Waited: %s" % log_ticks) - log_ticks = "" - logging.debug("Key pushed: down") - # if not the end of the list - if self.cur_line < len(self.tp.tunnels)-1: - self.cur_line += 1 - # get the pid - if type(self.tp.get_tunnel(self.cur_line)) == AutoTunnel: - self.cur_pid = self.tp.get_tunnel(self.cur_line).autossh_pid - else: - self.cur_pid = self.tp.get_tunnel(self.cur_line).ssh_pid - - # key up - elif kc == curses.KEY_UP: - logging.debug("Waited: %s" % log_ticks) - log_ticks = "" - logging.debug("Key pushed: up") - if self.cur_line > -1: - self.cur_line -= 1 - if self.cur_line > 0: - self.cur_pid = self.tp.get_tunnel(self.cur_line).pid - - else: - # do nothing and wait until the next refresh - pass + # Call the do_* handler. + fch = "do_%s" % ch.capitalize() + fkc = "do_%i" % kc + logging.debug("key func: %s / %s" % (fch,fkc)) + if fch in dir(self): + notquit = eval("self."+fch+"()") + elif fkc in dir(self): + notquit = eval("self."+fkc+"()") + logging.debug("notquit = %s" % notquit) # update the display self.display() @@ -435,11 +452,16 @@ class CursesMonitor: def display(self): """Generate the interface screen""" - # First line: help - help_msg = "[R]:reload autossh [K]:kill tunnel [Q]:quit" - # if os.geteuid() == 0: - help_msg += " [T]:show network connections" - help_msg += '\n' + # Automagically format help line with available do_* handlers. + h = [] + for f in dir(self): + if "do_" in f: + key = f.replace("do_","") + if key.isalpha(): # We do not want arrows. + msg = "[%s] %s" % (key,eval("self.%s.__doc__" % f)) + h.append(msg) + help_msg = ", ".join(h) + help_msg += "\n" self.scr.addstr(0,0, help_msg, curses.color_pair(4) ) self.scr.clrtoeol() @@ -460,16 +482,16 @@ class CursesMonitor: self.cur_pid = -1 # header line - header_msg = "TYPE\tPID \tINPORT\tVIA \tTARGET \tOUTPORT" + header_msg = "TYPE\tINPORT\tVIA \tTARGET \tOUTPORT" # if os.geteuid() == 0: header_msg += "\tCONNECTIONS" self.scr.addstr( header_msg, curses.color_pair(color) ) self.scr.clrtoeol() - # for each autossh processes available in the monitor + # for each tunnel processes available in the monitor for l in range(len(self.tp.tunnels)): # add a line for the l-th autossh process - self.add_autossh( l ) + self.add_tunnel( l ) # if one want to show connections if self.show_connections:# and os.getuid() == 0: @@ -484,49 +506,55 @@ class CursesMonitor: colors = self.colors_connection # for each connections related to te line-th autossh process - for t in self.tp.get_tunnel(line).connections: + for t in sorted(self.tp.get_tunnel(line).connections, key=lambda c:c.status): + # FIXME fail if the screen's height is too small. self.scr.addstr( '\n\t+ ' ) + color = self.colors_connection['status'] + # if the connections is established + # TODO avoid hard-coded constants + if t.status != 'ESTABLISHED' and t.status != 'LISTEN': + color = self.colors_connection['status_out'] + + self.scr.addstr( t.status, curses.color_pair( color ) ) + + self.scr.addstr( '\t' ) + # self.scr.addstr( str( t['ssh_pid'] ), curses.color_pair(colors['ssh_pid'] ) ) # self.scr.addstr( '\t' ) self.scr.addstr( str( t.local_address ) , curses.color_pair(colors['local_address'] )) self.scr.addstr( ':' ) self.scr.addstr( str( t.in_port ) , curses.color_pair(colors['in_port'] )) - self.scr.addstr( ' -> ' ) - self.scr.addstr( str( t.foreign_address ) , curses.color_pair(colors['foreign_address'] )) - self.scr.addstr( ':' ) - self.scr.addstr( str( t.out_port ) , curses.color_pair(colors['out_port'] )) - - self.scr.addstr( '\t' ) - - color = self.colors_connection['status'] - # if the connections is established - # TODO avoid hard-coded constants - if t.status != 'ESTABLISHED': - color = self.colors_connection['status_out'] - - self.scr.addstr( t.status, curses.color_pair( color ) ) + if t.foreign_address and t.out_port: + self.scr.addstr( ' -> ' ) + self.scr.addstr( str( t.foreign_address ) , curses.color_pair(colors['foreign_address'] )) + self.scr.addstr( ':' ) + self.scr.addstr( str( t.out_port ) , curses.color_pair(colors['out_port'] )) self.scr.clrtoeol() - def add_autossh(self, line): + def add_tunnel(self, line): """Add line corresponding to the line-th autossh process""" self.scr.addstr( '\n' ) - if type(self.tp.get_tunnel(line)) == AutoTunnel: - self.scr.addstr( 'auto', curses.color_pair(self.colors_tunnel['kind']) ) - self.scr.addstr( '\t', curses.color_pair(self.colors_tunnel['kind']) ) - else: - self.scr.addstr( 'ssh', curses.color_pair(self.colors_tunnel['kind']) ) - self.scr.addstr( '\t', curses.color_pair(self.colors_tunnel['kind']) ) + colors = self.colors_tunnel + if self.cur_line == line: + colors = self.colors_highlight - self.add_autossh_info('autossh_pid', line) - self.add_autossh_info('in_port', line) - self.add_autossh_info('via_host', line) - self.add_autossh_info('target_host', line) - self.add_autossh_info('out_port', line) + if type(self.tp.get_tunnel(line)) == AutoTunnel: + self.scr.addstr( 'auto', curses.color_pair(colors['kind_auto']) ) + self.scr.addstr( '\t', curses.color_pair(colors['kind_auto']) ) + else: + self.scr.addstr( 'ssh', curses.color_pair(colors['kind_raw']) ) + self.scr.addstr( '\t', curses.color_pair(colors['kind_raw']) ) + + # self.add_tunnel_info('ssh_pid', line) + self.add_tunnel_info('in_port', line) + self.add_tunnel_info('via_host', line) + self.add_tunnel_info('target_host', line) + self.add_tunnel_info('out_port', line) nb = len(self.tp.get_tunnel(line).connections ) if nb > 0: @@ -534,7 +562,7 @@ class CursesMonitor: for i in self.tp.get_tunnel(line).connections: # add a vertical bar | # the color change according to the status of the connection - if i.status == 'ESTABLISHED': + if i.status == 'ESTABLISHED' or i.status == 'LISTEN': self.scr.addstr( '|', curses.color_pair(self.colors_connection['status']) ) else: self.scr.addstr( '|', curses.color_pair(self.colors_connection['status_out']) ) @@ -546,7 +574,7 @@ class CursesMonitor: self.scr.clrtoeol() - def add_autossh_info( self, key, line ): + def add_tunnel_info( self, key, line ): """Add an information of an autossh process, in the configured color""" colors = self.colors_tunnel