From 7d90ca778f974626d67ba0e8426e83683c8c0165 Mon Sep 17 00:00:00 2001 From: Bezleputh Date: Wed, 21 May 2014 17:56:01 +0200 Subject: [PATCH] [Twitter] Add new twitter module --- modules/twitter/__init__.py | 24 ++++++++ modules/twitter/backend.py | 99 +++++++++++++++++++++++++++++++ modules/twitter/browser.py | 96 +++++++++++++++++++++++++++++++ modules/twitter/pages.py | 112 ++++++++++++++++++++++++++++++++++++ modules/twitter/test.py | 31 ++++++++++ 5 files changed, 362 insertions(+) create mode 100644 modules/twitter/__init__.py create mode 100644 modules/twitter/backend.py create mode 100644 modules/twitter/browser.py create mode 100644 modules/twitter/pages.py create mode 100644 modules/twitter/test.py diff --git a/modules/twitter/__init__.py b/modules/twitter/__init__.py new file mode 100644 index 00000000..d9beadcf --- /dev/null +++ b/modules/twitter/__init__.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Bezleputh +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + + +from .backend import TwitterBackend + + +__all__ = ['TwitterBackend'] diff --git a/modules/twitter/backend.py b/modules/twitter/backend.py new file mode 100644 index 00000000..34baa90e --- /dev/null +++ b/modules/twitter/backend.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Bezleputh +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + +from datetime import time, datetime, timedelta + +from weboob.tools.value import Value, ValueBackendPassword +from weboob.tools.backend import BaseBackend, BackendConfig +from weboob.capabilities.messages import ICapMessages, Thread, ICapMessagesPost +from weboob.capabilities.base import find_object +from weboob.tools.exceptions import BrowserForbidden +from .browser import TwitterBrowser + + +__all__ = ['TwitterBackend'] + + +class TwitterBackend(BaseBackend, ICapMessages, ICapMessagesPost): + NAME = 'twitter' + DESCRIPTION = u'twitter website' + MAINTAINER = u'Bezleputh' + EMAIL = 'carton_ben@yahoo.fr' + LICENSE = 'AGPLv3+' + VERSION = '0.j' + BROWSER = TwitterBrowser + STORAGE = {'seen': {}} + + CONFIG = BackendConfig(Value('username', label='Username', default=''), + ValueBackendPassword('password', label='Password', default='')) + + def create_default_browser(self): + return self.create_browser(self.config['username'].get(), self.config['password'].get()) + + def iter_threads(self): + return self.browser.iter_threads() + + def get_thread(self, _id, thread=None, getseen=True): + seen = None + if getseen: + seen = self.storage.get('seen', default={}) + return self.browser.get_thread(_id, thread, seen) + + def fill_thread(self, thread, fields, getseen=True): + return self.get_thread(thread.id, thread, getseen) + + def set_message_read(self, message): + self.storage.set('seen', message.thread.id, 'comments', + self.storage.get('seen', + message.thread.id, + 'comments', default=[]) + [message.id]) + self.storage.save() + self._purge_message_read() + + def _purge_message_read(self): + lastpurge = self.storage.get('lastpurge', default=0) + + if time.time() - lastpurge > 86400: + self.storage.set('lastpurge', time.time()) + self.storage.save() + + # we can't directly delete without a "RuntimeError: dictionary changed size during iteration" + todelete = [] + + for id in self.storage.get('seen', default={}): + date = self.storage.get('date', id, default=0) + # if no date available, create a new one (compatibility with "old" storage) + if date == 0: + self.storage.set('date', id, datetime.now()) + elif datetime.now() - date > timedelta(days=60): + todelete.append(id) + + for id in todelete: + self.storage.delete('hash', id) + self.storage.delete('date', id) + self.storage.delete('seen', id) + self.storage.save() + + def post_message(self, message): + if not self.browser.username: + raise BrowserForbidden() + self.browser.post(find_object(self.iter_threads(), id=message.full_id.split('.')[0]), + message.content) + + OBJECTS = {Thread: fill_thread} diff --git a/modules/twitter/browser.py b/modules/twitter/browser.py new file mode 100644 index 00000000..14d13d46 --- /dev/null +++ b/modules/twitter/browser.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Bezleputh +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + +from weboob.tools.browser2 import LoginBrowser, URL, need_login +from weboob.tools.browser import BrowserIncorrectPassword +from weboob.capabilities.messages import Message + +from .pages import LoginPage, LoginErrorPage, ThreadPage, HomePage, Tweet + + +__all__ = ['TwitterBrowser'] + + +class TwitterBrowser(LoginBrowser): + BASEURL = 'https://twitter.com' + + thread_page = URL(u'(?P.+)/status/(?P<_id>.+)', ThreadPage) + login_error = URL(u'login/error.+', LoginErrorPage) + tweet = URL(u'i/tweet/create', Tweet) + home = URL(u'(?P.+)', HomePage) + login = URL(u'', LoginPage) + + def do_login(self): + self.login.go() + + if not self.page.logged: + self.authenticity_token = self.page.login(self.username, self.password) + + if not self.page.logged or self.login_error.is_here(): + raise BrowserIncorrectPassword() + + self.me = self.page.get_me() + + @need_login + def iter_threads(self): + return self.login.stay_or_go().iter_threads() + + @need_login + def post(self, thread, message): + datas = {'place_id': '', + 'tagged_users': ''} + datas['authenticity_token'] = self.authenticity_token + datas['status'] = message + if thread: + datas['in_reply_to_status_id'] = thread.id.split('#')[-1] + + self.tweet.open(data=datas) + + def get_thread(self, _id, thread=None, seen=None): + splitted_id = _id.split('#') + + if not thread: + thread = self.thread_page.go(_id=splitted_id[1].split('.')[-1], user=splitted_id[0]).get_thread(obj=thread) + + title_content = thread.title.split('\n\t')[-1] + + thread.root = Message(thread=thread, + id=splitted_id[1].split('.')[-1], + title=title_content[:50] if len(title_content) > 50 else title_content, + sender=splitted_id[0], + receivers=None, + date=thread.date, + parent=thread.root, + content=title_content, + signature=u'', + children=[] + ) + + if seen and splitted_id[1] not in seen: + thread.root.flags = Message.IS_UNREAD + + comments = self.thread_page.stay_or_go(_id=splitted_id[1].split('.')[-1], user=splitted_id[0]).iter_comments() + for comment in comments: + if seen and comment.id in seen: + comment.thread = thread + comment.parent = thread.root + comment.flags = Message.IS_UNREAD + thread.root.children.append(comment) + + return thread diff --git a/modules/twitter/pages.py b/modules/twitter/pages.py new file mode 100644 index 00000000..3478d8ef --- /dev/null +++ b/modules/twitter/pages.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Bezleputh +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + +from datetime import datetime +from weboob.tools.date import DATE_TRANSLATE_FR + +from weboob.tools.browser2.page import HTMLPage, JsonPage, method, ListElement, ItemElement, FormNotFound +from weboob.tools.browser2.filters import CleanText, Format, Link, Regexp, Env, DateTime, Attr, Filter +from weboob.capabilities.messages import Thread, Message + +__all__ = ['LoginPage', 'LoginErrorPage', 'ThreadPage', 'HomePage', 'Tweet'] + + +class DatetimeFromTimestamp(Filter): + def filter(self, el): + return datetime.fromtimestamp(float(el)) + + +class LoginPage(HTMLPage): + def login(self, login, passwd): + form = self.get_form(xpath='//form[@action="https://twitter.com/sessions"]') + form['session[username_or_email]'] = login + form['session[password]'] = passwd + form.submit() + return form['authenticity_token'] + + @property + def logged(self): + try: + self.get_form(xpath='//form[@action="https://twitter.com/sessions"]') + return False + except FormNotFound: + return True + + def get_me(self): + return Regexp(Link('//a[@data-nav="profile"]'), '/(.+)')(self.doc) + + @method + class iter_threads(ListElement): + item_xpath = '//li[@data-item-type="tweet"]/div' + + class item(ItemElement): + klass = Thread + + obj_id = Regexp(Link('./div/div/a[@class="details with-icn js-details"]'), '/(.+)/status/(.+)', '\\1#\\2') + obj_title = Format('%s \n\t %s', + CleanText('./div/div[@class="stream-item-header"]/a'), + CleanText('./div/p')) + obj_date = DatetimeFromTimestamp(Attr('./div/div[@class="stream-item-header"]/small/a/span', 'data-time'), DATE_TRANSLATE_FR) + + +class ThreadPage(HTMLPage): + + @method + class get_thread(ItemElement): + klass = Thread + + def parse(self, el): + pass + + obj_id = Format('%s#%s', Env('user'), Env('_id')) + obj_title = Format('%s \n\t %s', + CleanText('//div[@class="permalink-inner permalink-tweet-container"]/div/div/div/a'), + CleanText('//div[@class="permalink-inner permalink-tweet-container"]/div/p')) + + obj_date = DateTime(Regexp(CleanText('//div[@class="permalink-inner permalink-tweet-container"]/div/div/div/div[@class="client-and-actions"]/span'), + '(\d+:\d+).+- (.+\d{4})', + '\\2 \\1')) + + @method + class iter_comments(ListElement): + item_xpath = '//ol[@id="stream-items-id"]/li/div' + + class item(ItemElement): + klass = Message + + def parse(self, el): + pass + + obj_id = Regexp(Link('./div/div/a[@class="details with-icn js-details"]'), '/.+/status/(.+)') + obj_title = Regexp(CleanText('./div/p'), '(.{50}|.+).+') + obj_content = CleanText('./div/p') + obj_sender = Regexp(Link('./div/div/a[@class="details with-icn js-details"]'), '/(.+)/status/.+') + obj_date = DatetimeFromTimestamp(Attr('./div/div[@class="stream-item-header"]/small/a/span', 'data-time')) + + +class LoginErrorPage(HTMLPage): + pass + + +class Tweet(JsonPage): + pass + + +class HomePage(HTMLPage): + pass diff --git a/modules/twitter/test.py b/modules/twitter/test.py new file mode 100644 index 00000000..4a878c9b --- /dev/null +++ b/modules/twitter/test.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Bezleputh +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + + +from weboob.tools.test import BackendTest + + +class TwitterTest(BackendTest): + BACKEND = 'twitter' + + def test_twitter(self): + l = list(self.backend.iter_threads()) + assert len(l) + thread = self.backend.get_thread(l[0].id) + assert len(thread.root.content)