weboob-devel/contrib/boobot.py
Laurent Bachelier a488edbd69 boobot: Support ignored users
And ignore irkers by default
2013-06-23 21:17:13 +02:00

315 lines
11 KiB
Python
Executable file

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright(C) 2012 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import logging
import re
import os
import sys
from threading import Thread, Event
from math import log
import urlparse
import urllib
from irc.bot import SingleServerIRCBot
import mechanize
from mechanize import _headersutil as headersutil
from mechanize._html import EncodingFinder
from weboob.core import Weboob
from weboob.tools.browser import StandardBrowser, BrowserUnavailable, BrowserHTTPError
from weboob.tools.misc import get_backtrace
from weboob.tools.misc import to_unicode
from weboob.tools.storage import StandardStorage
IRC_CHANNELS = os.getenv('BOOBOT_CHANNELS', '#weboob').split(',')
IRC_NICKNAME = os.getenv('BOOBOT_NICKNAME', 'boobot')
IRC_SERVER = os.getenv('BOOBOT_SERVER', 'chat.freenode.net')
IRC_IGNORE = [re.compile(i) for i in os.getenv('BOOBOT_IGNORE', '!~?irker@').split(',')]
STORAGE_FILE = os.getenv('BOOBOT_STORAGE', 'boobot.storage')
def fixurl(url):
url = to_unicode(url)
# remove javascript crap
url = url.replace('/#!/', '/')
# parse it
parsed = urlparse.urlsplit(url)
# divide the netloc further
userpass, at, hostport = parsed.netloc.rpartition('@')
user, colon1, pass_ = userpass.partition(':')
host, colon2, port = hostport.partition(':')
# encode each component
scheme = parsed.scheme.encode('utf8')
user = urllib.quote(user.encode('utf8'))
colon1 = colon1.encode('utf8')
pass_ = urllib.quote(pass_.encode('utf8'))
at = at.encode('utf8')
host = host.encode('idna')
colon2 = colon2.encode('utf8')
port = port.encode('utf8')
path = '/'.join(pce.encode('utf8') for pce in parsed.path.split('/'))
query = parsed.query.encode('utf8')
fragment = parsed.fragment.encode('utf8')
# put it back together
netloc = ''.join((user, colon1, pass_, at, host, colon2, port))
return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
class HeadRequest(mechanize.Request):
def get_method(self):
return "HEAD"
class BoobotBrowser(StandardBrowser):
ENCODING = None
DEFAULT_TIMEOUT = 3
def urlinfo(self, url):
if urlparse.urlsplit(url).netloc == 'mobile.twitter.com':
url = url.replace('mobile.twitter.com', 'twitter.com', 1)
try:
r = self.openurl(HeadRequest(url), _tries=2, _delay=0.2)
body = False
except BrowserHTTPError as e:
if 'HTTP Error 501' in unicode(e) or 'HTTP Error 405' in unicode(e):
r = self.openurl(url, _tries=2, _delay=0.2)
body = True
else:
raise e
headers = r.info()
content_type = headers.get('Content-Type')
try:
size = int(headers.get('Content-Length'))
hsize = self.human_size(size)
except TypeError:
size = None
hsize = None
is_html = headersutil.is_html([content_type], url, True)
title = None
if is_html:
if not body:
r = self.openurl(url, _tries=2, _delay=0.2)
# update size has we might not have it from headers
size = len(r.read())
hsize = self.human_size(size)
r.seek(0)
encoding = EncodingFinder('windows-1252').encoding(r).lower()
try:
h = self.get_document(r, parser='lxml', encoding=encoding)
for meta in h.xpath('//head/meta'):
encoding = meta.attrib.get('charset', encoding).lower()
except Exception as e:
print e
finally:
r.seek(0)
if encoding == 'iso-8859-1':
encoding = 'windows-1252'
try:
h = self.get_document(r, parser='lxml', encoding=encoding)
for title in h.xpath('//head/title'):
title = to_unicode(title.text_content()).strip()
title = ' '.join(title.splitlines())
if urlparse.urlsplit(url).netloc.endswith('twitter.com'):
for title in h.getroot().cssselect('.permalink-tweet .tweet-text'):
title = to_unicode(title.text_content()).strip()
title = ' '.join(title.splitlines())
except AssertionError as e:
# invalid HTML
print e
return content_type, hsize, title
def human_size(self, size):
if size:
units = ('B', 'KiB', 'MiB', 'GiB',
'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
exponent = int(log(size, 1024))
return "%.1f %s" % (float(size) / pow(1024, exponent), units[exponent])
return '0 B'
class MyThread(Thread):
daemon = True
def __init__(self, bot):
Thread.__init__(self)
self.weboob = Weboob(storage=StandardStorage(STORAGE_FILE))
self.weboob.load_backends()
self.bot = bot
self.bot.weboob = self.weboob
def run(self):
for ev in self.bot.joined.itervalues():
ev.wait()
self.weboob.repeat(300, self.check_board)
self.weboob.repeat(600, self.check_dlfp)
self.weboob.loop()
def find_keywords(self, text):
for word in [
'weboob', 'videoob', 'havesex', 'havedate', 'monboob', 'boobmsg',
'flatboob', 'boobill', 'pastoob', 'radioob', 'translaboob', 'traveloob',
'boobathon', 'boobank', 'boobtracker', 'comparoob', 'wetboobs',
'webcontentedit', 'weboorrents', u'sàt', u'salut à toi', 'ass2m',
'budget insight', 'budget-insight', 'budgetinsight', 'budgea']:
if word in text.lower():
return word
return None
def check_dlfp(self):
for backend, msg in self.weboob.do('iter_unread_messages', backends=['dlfp']):
word = self.find_keywords(msg.content)
if word is not None:
url = msg.signature[msg.signature.find('https://linuxfr'):]
self.bot.send_message('[DLFP] %s talks about %s: %s' % (
msg.sender, word, url))
backend.set_message_read(msg)
def check_board(self):
def iter_messages(backend):
with backend.browser:
return backend.browser.iter_new_board_messages()
for backend, msg in self.weboob.do(iter_messages, backends=['dlfp']):
word = self.find_keywords(msg.message)
if word is not None and msg.login != 'moules':
message = msg.message.replace(word, '\002%s\002' % word)
self.bot.send_message('[DLFP] <%s> %s' % (msg.login, message))
def stop(self):
self.weboob.want_stop()
self.weboob.deinit()
class Boobot(SingleServerIRCBot):
def __init__(self, channels, nickname, server, port=6667):
SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname)
# self.connection.add_global_handler('pubmsg', self.on_pubmsg)
self.connection.add_global_handler('join', self.on_join)
self.connection.add_global_handler('welcome', self.on_welcome)
self.mainchannel = channels[0]
self.joined = dict()
for channel in channels:
self.joined[channel] = Event()
self.weboob = None
def on_welcome(self, c, event):
for channel in self.joined.keys():
c.join(channel)
def on_join(self, c, event):
# irclib 5.0 compatibility
if callable(event.target):
channel = event.target()
else:
channel = event.target
self.joined[channel].set()
def send_message(self, msg, channel=None):
for m in msg.splitlines():
self.connection.privmsg(channel or self.mainchannel, m)
def on_pubmsg(self, c, event):
# irclib 5.0 compatibility
if callable(event.arguments):
text = ' '.join(event.arguments())
channel = event.target()
nick = event.source()
else:
text = ' '.join(event.arguments)
channel = event.target
nick = event.source
for ignore in IRC_IGNORE:
if ignore.search(nick):
return
for m in re.findall('([\w\d_\-]+@\w+)', text):
for msg in self.on_boobid(m):
self.send_message(msg, channel)
for m in re.findall(u'(https?://[^\s\xa0+]+)', text):
for msg in self.on_url(m):
self.send_message(msg, channel)
def on_boobid(self, boobid):
_id, backend_name = boobid.split('@', 1)
if backend_name in self.weboob.backend_instances:
backend = self.weboob.backend_instances[backend_name]
for cap in backend.iter_caps():
func = 'obj_info_%s' % cap.__name__[4:].lower()
if hasattr(self, func):
try:
for msg in getattr(self, func)(backend, _id):
yield msg
except Exception as e:
print get_backtrace()
yield 'Oops: [%s] %s' % (type(e).__name__, e)
break
def on_url(self, url):
url = fixurl(url)
try:
content_type, hsize, title = BoobotBrowser().urlinfo(url)
if title:
yield u'URL: %s' % title
elif hsize:
yield u'URL (file): %s, %s' % (content_type, hsize)
else:
yield u'URL (file): %s' % content_type
except BrowserUnavailable as e:
yield u'URL (error): %s' % e
except Exception as e:
print get_backtrace()
yield 'Oops: [%s] %s' % (type(e).__name__, e)
def obj_info_video(self, backend, id):
v = backend.get_video(id)
if v:
yield u'Video: %s (%s)' % (v.title, v.duration)
def obj_info_housing(self, backend, id):
h = backend.get_housing(id)
if h:
yield u'Housing: %s (%sm² / %s%s)' % (h.title, h.area, h.cost, h.currency)
def main():
logging.basicConfig(level=logging.DEBUG)
bot = Boobot(IRC_CHANNELS, IRC_NICKNAME, IRC_SERVER)
thread = MyThread(bot)
thread.start()
try:
bot.start()
except KeyboardInterrupt:
print "Stopped."
thread.stop()
if __name__ == "__main__":
sys.exit(main())