weboob-devel/modules/velib/backend.py

159 lines
5.2 KiB
Python

# -*- coding: utf-8 -*-
# Copyright(C) 2013 dud
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.backend import BaseBackend, BackendConfig
from weboob.capabilities.gauge import ICapGauge, GaugeSensor, Gauge, GaugeMeasure, SensorNotFound
from weboob.tools.value import Value
from .browser import VelibBrowser
__all__ = ['VelibBackend']
SENSOR_TYPES = {u'available_bike_stands': u'Free stands', u'available_bikes': u'Available bikes', u'bike_stands': u'Total stands'}
CITIES = ("Paris", "Rouen", "Toulouse", "Luxembourg", "Valence", "Stockholm",
"Goteborg", "Santander", "Amiens", "Lillestrom", "Mulhouse", "Lyon",
"Ljubljana", "Seville", "Namur", "Nancy", "Creteil", "Bruxelles-Capitale",
"Cergy-Pontoise", "Vilnius", "Toyama", "Kazan", "Marseille", "Nantes",
"Besancon")
class BikeMeasure(GaugeMeasure):
def __repr__(self):
return '<GaugeMeasure level=%d>' % self.level
class VelibBackend(BaseBackend, ICapGauge):
NAME = 'velib'
DESCRIPTION = u'get Vélib\' information'
MAINTAINER = u'Herve Werner'
EMAIL = 'dud225@hotmail.com'
VERSION = '0.h'
LICENSE = 'AGPLv3'
BROWSER = VelibBrowser
STORAGE = {'boards': {}}
CONFIG = BackendConfig(Value('city', label='City', default='Paris',
choices=CITIES + ("ALL",)))
def __init__(self, *a, **kw):
super(VelibBackend, self).__init__(*a, **kw)
self.cities = None
def _make_gauge(self, info):
gauge = Gauge(info['id'])
gauge.name = unicode(info['name'])
gauge.city = unicode(info['city'])
gauge.object = u'bikes'
return gauge
def _make_sensor(self, sensor_type, info, gauge):
id = '%s.%s' % (sensor_type, gauge.id)
sensor = GaugeSensor(id)
sensor.gaugeid = gauge.id
sensor.name = SENSOR_TYPES[sensor_type]
sensor.address = unicode(info['address'])
sensor.history = []
return sensor
def _make_measure(self, sensor_type, info):
measure = BikeMeasure()
measure.date = info['last_update']
measure.level = float(info[sensor_type])
return measure
def _parse_gauge(self, info):
gauge = self._make_gauge(info)
gauge.sensors = []
for type in SENSOR_TYPES:
sensor = self._make_sensor(type, info, gauge)
measure = self._make_measure(type, info)
sensor.lastvalue = measure
gauge.sensors.append(sensor)
return gauge
def _contract(self):
contract = self.config.get('city').get()
if contract.lower() == 'all':
contract = None
return contract
def iter_gauges(self, pattern=None):
if pattern is None:
for jgauge in self.browser.get_station_list(contract=self._contract()):
yield self._parse_gauge(jgauge)
else:
lowpattern = pattern.lower()
for jgauge in self.browser.get_station_list(contract=self._contract()):
gauge = self._parse_gauge(jgauge)
if lowpattern in gauge.name.lower() or lowpattern in gauge.city.lower():
yield gauge
def iter_sensors(self, gauge, pattern=None):
if not isinstance(gauge, Gauge):
gauge = self._get_gauge_by_id(gauge)
if gauge is None:
raise SensorNotFound()
if pattern is None:
for sensor in gauge.sensors:
yield sensor
else:
lowpattern = pattern.lower()
for sensor in gauge.sensors:
if lowpattern in sensor.name.lower():
yield sensor
def get_last_measure(self, sensor):
if not isinstance(sensor, GaugeSensor):
sensor = self._get_sensor_by_id(sensor)
if sensor is None:
raise SensorNotFound()
return sensor.lastvalue
def _fetch_cities(self):
if self.cities:
return
self.cities = {}
jcontract = self.browser.get_contracts_list()
for jcontract in jcontract:
for city in jcontract['cities']:
self.cities[city.lower()] = jcontract['name']
def _get_gauge_by_id(self, id):
jgauge = self.browser.get_station_infos(id)
if jgauge:
return self._parse_gauge(jgauge)
else:
return None
def _get_sensor_by_id(self, id):
_, gauge_id = id.split('.', 1)
gauge = self._get_gauge_by_id(gauge_id)
if not gauge:
raise SensorNotFound()
for sensor in gauge.sensors:
if sensor.id.lower() == id.lower():
return sensor