no more ipaddr dependancy, separation between autossh and connections parsing, two more options to get text only on CLI

This commit is contained in:
nojhan 2009-06-25 15:31:52 +00:00
commit a6db1511b7

View file

@ -26,17 +26,19 @@
# Google's ipaddr module # Google's ipaddr module
# standard in Python 3 # standard in Python 3
import ipaddr # import ipaddr
import os import os
from operator import itemgetter from operator import itemgetter
class SSHTunnel(dict): class SSHTunnel(dict):
def __init__(self, def __init__(self,
_local_address = ipaddr.IPv4('1.1.1.1'), #_local_address = ipaddr.IPv4('1.1.1.1'),
_local_address = '1.1.1.1',
_local_port = 0, _local_port = 0,
#_local_host = "Unknown", #_local_host = "Unknown",
_foreign_address = ipaddr.IPv4('1.1.1.1'), #_foreign_address = ipaddr.IPv4('1.1.1.1'),
_foreign_address = '1.1.1.1',
_foreign_port = 0, _foreign_port = 0,
_target_host = "Unknown", _target_host = "Unknown",
_status = 'UNKNOWN', _status = 'UNKNOWN',
@ -106,11 +108,16 @@ class AutoSSHTunnelMonitor(list):
def __init__(self): def __init__(self):
self.network_cmd = "netstat -ntp" self.network_cmd = "netstat -ntp"
self.ps_cmd = "ps ax" self.ps_cmd = "ps ax"
self.update() #self.update()
def update(self): def update(self):
self[:] = self.__get_tunnels()
autosshs = self.get_autossh_instances()
connections = self.get_connections()
self[:] = self.bind_tunnels(autosshs, connections)
self.sort_on( 'local_port') self.sort_on( 'local_port')
@ -126,7 +133,7 @@ class AutoSSHTunnelMonitor(list):
self[:] = sorted( self, key=itemgetter( key ) ) self[:] = sorted( self, key=itemgetter( key ) )
def __get_tunnels(self): def get_autossh_instances(self):
status = os.popen3( self.ps_cmd ) status = os.popen3( self.ps_cmd )
status_list = [ps for ps in status[1].readlines() if "autossh" in ps] status_list = [ps for ps in status[1].readlines() if "autossh" in ps]
@ -148,6 +155,10 @@ class AutoSSHTunnelMonitor(list):
autosshs += [auto] autosshs += [auto]
return autosshs
def get_connections(self):
status = os.popen3( self.network_cmd ) status = os.popen3( self.network_cmd )
@ -161,11 +172,13 @@ class AutoSSHTunnelMonitor(list):
# tcp connections # tcp connections
local = con[3].split(':') local = con[3].split(':')
local_addr = ipaddr.IPv4( local[0] ) #local_addr = ipaddr.IPv4( local[0] )
local_addr = local[0]
local_port = int(local[1]) local_port = int(local[1])
foreign = con[4].split(':') foreign = con[4].split(':')
foreign_addr = ipaddr.IPv4( foreign[0] ) #foreign_addr = ipaddr.IPv4( foreign[0] )
foreign_addr = foreign[0]
foreign_port = int(foreign[1]) foreign_port = int(foreign[1])
status = con[5] status = con[5]
@ -196,15 +209,18 @@ class AutoSSHTunnelMonitor(list):
f.close() f.close()
# instanciation # instanciation
t = SSHTunnel( local_addr, local_port, foreign_addr, foreign_port, autohost, status, sshpid, ppid ) tunnels += [ SSHTunnel( local_addr, local_port, foreign_addr, foreign_port, autohost, status, sshpid, ppid ) ]
return tunnels
def bind_tunnels(self, autosshs, tunnels):
for t in tunnels:
for i in autosshs: for i in autosshs:
if i['pid'] == ppid: if i['pid'] == ppid:
i['tunnels'] += [t] i['tunnels'] += [t]
#tunnels += [ t ]
#print autosshs
return autosshs return autosshs
@ -382,43 +398,56 @@ if __name__ == "__main__":
import sys import sys
# CURSES # CURSES
if len(sys.argv) > 1 and sys.argv[1] == "--curses": if len(sys.argv) > 1:
import curses if sys.argv[1] == "--curses":
import traceback import curses
import traceback
try: try:
scr = curses.initscr() scr = curses.initscr()
curses.start_color() curses.start_color()
# 0:black, 1:red, 2:green, 3:yellow, 4:blue, 5:magenta, 6:cyan, 7:white # 0:black, 1:red, 2:green, 3:yellow, 4:blue, 5:magenta, 6:cyan, 7:white
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK) curses.init_pair(2, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.init_pair(4, curses.COLOR_BLUE, curses.COLOR_BLACK) curses.init_pair(4, curses.COLOR_BLUE, curses.COLOR_BLACK)
curses.init_pair(5, curses.COLOR_MAGENTA, curses.COLOR_BLACK) curses.init_pair(5, curses.COLOR_MAGENTA, curses.COLOR_BLACK)
curses.init_pair(6, curses.COLOR_CYAN, curses.COLOR_BLACK) curses.init_pair(6, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(7, curses.COLOR_WHITE, curses.COLOR_BLACK) curses.init_pair(7, curses.COLOR_WHITE, curses.COLOR_BLACK)
curses.init_pair(8, curses.COLOR_WHITE, curses.COLOR_GREEN) curses.init_pair(8, curses.COLOR_WHITE, curses.COLOR_GREEN)
curses.init_pair(9, curses.COLOR_WHITE, curses.COLOR_BLUE) curses.init_pair(9, curses.COLOR_WHITE, curses.COLOR_BLUE)
curses.noecho() curses.noecho()
curses.cbreak() curses.cbreak()
scr.keypad(1) scr.keypad(1)
mc = monitorCurses( scr ) mc = monitorCurses( scr )
mc() mc()
scr.keypad(0) scr.keypad(0)
curses.echo() curses.echo()
curses.nocbreak() curses.nocbreak()
curses.endwin() curses.endwin()
except: except:
scr.keypad(0) scr.keypad(0)
curses.echo() curses.echo()
curses.nocbreak() curses.nocbreak()
curses.endwin() curses.endwin()
traceback.print_exc() traceback.print_exc()
elif sys.argv[1] == "--connections":
tm = AutoSSHTunnelMonitor()
con = tm.get_connections()
for c in con:
print con
elif sys.argv[1] == "--autossh":
tm = AutoSSHTunnelMonitor()
auto = tm.get_autossh_instances()
for i in auto:
print auto
# CLI # CLI
else: else: