Source code for soco.events

# pylint: disable=not-context-manager

# NOTE: The pylint not-content-manager warning is disabled pending the fix of
# a bug in pylint. See https://github.com/PyCQA/pylint/issues/782


"""Classes to handle Sonos UPnP Events and Subscriptions.

The `Subscription` class from this module will be used in
:py:mod:`soco.services` unless `config.EVENTS_MODULE` is set to
point to :py:mod:`soco.events_twisted`, in which case
:py:mod:`soco.events_twisted.Subscription` will be used.  See the
Example in :py:mod:`soco.events_twisted`.

Example:

    Run this code, and change your volume, tracks etc::

        from queue import Empty

        import logging
        logging.basicConfig()
        import soco
        from pprint import pprint
        from soco.events import event_listener
        # pick a device at random and use it to get
        # the group coordinator
        device = soco.discover().pop().group.coordinator
        print (device.player_name)
        sub = device.renderingControl.subscribe()
        sub2 = device.avTransport.subscribe()

        while True:
            try:
                event = sub.events.get(timeout=0.5)
                pprint (event.variables)
            except Empty:
                pass
            try:
                event = sub2.events.get(timeout=0.5)
                pprint (event.variables)
            except Empty:
                pass

            except KeyboardInterrupt:
                sub.unsubscribe()
                sub2.unsubscribe()
                event_listener.stop()
                break

"""


import errno
import logging
import socketserver
import threading

from http.server import BaseHTTPRequestHandler
from urllib.error import URLError
from urllib.request import urlopen

import requests

# Event is imported so that 'from events import Events' still works
# pylint: disable=unused-import
from .events_base import Event  # noqa: F401

from .events_base import (
    EventNotifyHandlerBase,
    EventListenerBase,
    SubscriptionBase,
    SubscriptionsMap,
)

from .exceptions import SoCoException

log = logging.getLogger(__name__)  # pylint: disable=C0103


[docs]class EventServer(socketserver.ThreadingMixIn, socketserver.TCPServer): """A TCP server which handles each new request in a new thread.""" allow_reuse_address = True
[docs]class EventNotifyHandler(BaseHTTPRequestHandler, EventNotifyHandlerBase): """Handles HTTP ``NOTIFY`` Verbs sent to the listener server. Inherits from `soco.events_base.EventNotifyHandlerBase`. """ def __init__(self, *args, **kwargs): # The SubscriptionsMap instance created when this module is imported. # This is referenced by soco.events_base.EventNotifyHandlerBase. self.subscriptions_map = subscriptions_map # super appears at the end of __init__, because # BaseHTTPRequestHandler.__init__ does not return. super().__init__(*args, **kwargs)
[docs] def do_NOTIFY(self): # pylint: disable=invalid-name """Serve a ``NOTIFY`` request by calling `handle_notification` with the headers and content. """ headers = requests.structures.CaseInsensitiveDict(self.headers) content_length = int(headers["content-length"]) content = self.rfile.read(content_length) self.handle_notification(headers, content) self.send_response(200) self.end_headers()
# pylint: disable=no-self-use, missing-docstring def log_event(self, seq, service_id, timestamp): log.debug( "Event %s received for %s service on thread %s at %s", seq, service_id, threading.current_thread(), timestamp, )
[docs] def log_message(self, fmt, *args): # pylint: disable=arguments-differ # Divert standard webserver logging to the debug log log.debug(fmt, *args)
[docs]class EventServerThread(threading.Thread): """The thread in which the event listener server will run.""" def __init__(self, server): """ Args: address (tuple): The (ip, port) address on which the server should listen. """ super().__init__() #: `threading.Event`: Used to signal that the server should stop. self.stop_flag = threading.Event() #: `tuple`: The (ip, port) address on which the server is #: configured to listen. self.server = server
[docs] def run(self): """Start the server Handling of requests is delegated to an instance of the `EventNotifyHandler` class. """ log.debug("Event listener running on %s", self.server.server_address) # Listen for events until told to stop while not self.stop_flag.is_set(): self.server.handle_request()
[docs] def stop(self): """Stop the server.""" self.stop_flag.set()
[docs]class EventListener(EventListenerBase): """The Event Listener. Runs an http server in a thread which is an endpoint for ``NOTIFY`` requests from Sonos devices. Inherits from `soco.events_base.EventListenerBase`. """ def __init__(self): super().__init__() #: `EventServerThread`: thread on which to run. self._listener_thread = None
[docs] def listen(self, ip_address): """Start the event listener listening on the local machine at port 1400 (default). If this port is unavailable, the listener will attempt to listen on the next available port, within a range of 100. Make sure that your firewall allows connections to this port. This method is called by `soco.events_base.EventListenerBase.start` Args: ip_address (str): The local network interface on which the server should start listening. Returns: int: `requested_port_number`. Included for compatibility with `soco.events_twisted.EventListener.listen` Note: The port on which the event listener listens is configurable. See `config.EVENT_LISTENER_PORT` """ for port_number in range( self.requested_port_number, self.requested_port_number + 100 ): address = (ip_address, port_number) try: server = EventServer(address, EventNotifyHandler) break except OSError as oserror: if oserror.errno == errno.EADDRINUSE: log.debug("Port %s:%d is in use", ip_address, port_number) else: raise self._listener_thread = EventServerThread(server) self._listener_thread.daemon = True self._listener_thread.start() if port_number != self.requested_port_number: log.debug( "The first available port %d was used instead of %d", port_number, self.requested_port_number, ) return port_number
[docs] def stop_listening(self, address): """Stop the listener.""" # Signal the thread to stop before handling the next request self._listener_thread.stop() # Send a dummy request in case the http server is currently listening try: # pylint: disable=R1732 urlopen("http://{}:{}/".format(address[0], address[1])) except URLError: # If the server is already shut down, we receive a socket error, # which we ignore. pass # wait for the thread to finish, with a timeout of one second # to ensure the main thread does not hang self._listener_thread.join(1) # check if join timed out and issue a warning if it did if self._listener_thread.is_alive(): log.warning("Event Listener did not shutdown gracefully.")
[docs]class Subscription(SubscriptionBase): """A class representing the subscription to a UPnP event. Inherits from `soco.events_base.SubscriptionBase`. """ def __init__(self, service, event_queue=None): """ Args: service (Service): The SoCo `Service` to which the subscription should be made. event_queue (:class:`~queue.Queue`): A queue on which received events will be put. If not specified, a queue will be created and used. """ super().__init__(service, event_queue) # Used to keep track of the auto_renew thread self._auto_renew_thread = None self._auto_renew_thread_flag = threading.Event() # The SubscriptionsMap instance created when this module is imported. # This is referenced by soco.events_base.SubscriptionBase. self.subscriptions_map = subscriptions_map # The EventListener instance created when this module is imported. # This is referenced by soco.events_base.SubscriptionBase. self.event_listener = event_listener # Used to stop race conditions, as autorenewal may occur from a thread self._lock = threading.Lock() # pylint: disable=arguments-differ
[docs] def subscribe(self, requested_timeout=None, auto_renew=False, strict=True): """Subscribe to the service. If requested_timeout is provided, a subscription valid for that number of seconds will be requested, but not guaranteed. Check `timeout` on return to find out what period of validity is actually allocated. This method calls `events_base.SubscriptionBase.subscribe`. Note: SoCo will try to unsubscribe any subscriptions which are still subscribed on program termination, but it is good practice for you to clean up by making sure that you call :meth:`unsubscribe` yourself. Args: requested_timeout(int, optional): The timeout to be requested. auto_renew (bool, optional): If `True`, renew the subscription automatically shortly before timeout. Default `False`. strict (bool, optional): If True and an Exception occurs during execution, the Exception will be raised or, if False, the Exception will be logged and the Subscription instance will be returned. Default `True`. Returns: `Subscription`: The Subscription instance. """ subscribe = super().subscribe return self._wrap(subscribe, strict, requested_timeout, auto_renew)
[docs] def renew(self, requested_timeout=None, is_autorenew=False, strict=True): """renew(requested_timeout=None) Renew the event subscription. You should not try to renew a subscription which has been unsubscribed, or once it has expired. This method calls `events_base.SubscriptionBase.renew`. Args: requested_timeout (int, optional): The period for which a renewal request should be made. If None (the default), use the timeout requested on subscription. is_autorenew (bool, optional): Whether this is an autorenewal. Default 'False'. strict (bool, optional): If True and an Exception occurs during execution, the Exception will be raised or, if False, the Exception will be logged and the Subscription instance will be returned. Default `True`. Returns: `Subscription`: The Subscription instance. """ renew = super().renew return self._wrap(renew, strict, requested_timeout, is_autorenew)
[docs] def unsubscribe(self, strict=True): """unsubscribe() Unsubscribe from the service's events. Once unsubscribed, a Subscription instance should not be reused This method calls `events_base.SubscriptionBase.unsubscribe`. Args: strict (bool, optional): If True and an Exception occurs during execution, the Exception will be raised or, if False, the Exception will be logged and the Subscription instance will be returned. Default `True`. Returns: `Subscription`: The Subscription instance. """ unsubscribe = super().unsubscribe return self._wrap(unsubscribe, strict)
def _auto_renew_start(self, interval): """Starts the auto_renew thread.""" class AutoRenewThread(threading.Thread): """Used by the auto_renew code to renew a subscription from within a thread. """ def __init__(self, interval, stop_flag, sub, *args, **kwargs): super().__init__(*args, **kwargs) self.interval = interval self.subscription = sub self.stop_flag = stop_flag self.daemon = True def run(self): subscription = self.subscription stop_flag = self.stop_flag interval = self.interval while not stop_flag.wait(interval): subscription.renew(is_autorenew=True, strict=False) auto_renew_thread = AutoRenewThread( interval, self._auto_renew_thread_flag, self ) auto_renew_thread.start() def _auto_renew_cancel(self): """Cancels the auto_renew thread""" self._auto_renew_thread_flag.set() # pylint: disable=no-self-use, too-many-arguments def _request(self, method, url, headers, success, unconditional=None): """Sends an HTTP request. Args: method (str): 'SUBSCRIBE' or 'UNSUBSCRIBE'. url (str): The full endpoint to which the request is being sent. headers (dict): A dict of headers, each key and each value being of type `str`. success (function): A function to be called if the request succeeds. The function will be called with a dict of response headers as its only parameter. unconditional (function): An optional function to be called after the request is complete, regardless of its success. Takes no parameters. """ response = None try: response = requests.request(method, url, headers=headers, timeout=3) except requests.exceptions.RequestException: # Ignore timeout for unsubscribe since we are leaving anyway. if method != "UNSUBSCRIBE": raise # Ignore "412 Client Error: Precondition Failed for url:" from # rebooted speakers. The reboot will have unsubscribed us which is # what we are trying to do. if response and response.status_code != 412: response.raise_for_status() if success: success(response.headers) if unconditional: unconditional() # pylint: disable=inconsistent-return-statements def _wrap(self, method, strict, *args, **kwargs): """This is a wrapper for `Subscription.subscribe`, `Subscription.renew` and `Subscription.unsubscribe` which: * Returns the`Subscription` instance. * If an Exception has occurred: * Cancels the Subscription (unless the Exception was caused by a SoCoException upon subscribe). * On an autorenew, if the strict flag was set to False, calls the optional self.auto_renew_fail method with the Exception. This method needs to be threadsafe. * If the strict flag was set to True (the default), reraises the Exception or, if the strict flag was set to False, logs the Exception instead. * Calls the wrapped methods with a threading.Lock, to prevent race conditions (e.g. to prevent unsubscribe and autorenew being called simultaneously). """ action = method.__name__ # A lock is used, because autorenewal occurs in # a thread with self._lock: try: method(*args, **kwargs) except Exception as exc: # pylint: disable=broad-except # If an Exception occurred during execution of subscribe, # renew or unsubscribe, set the cancel flag to True unless # the Exception was a SoCoException upon subscribe cancel = action == "renew" or not isinstance(exc, SoCoException) if cancel: # If the cancel flag was set to true, cancel the # subscription with an explanation. msg = ( "An Exception occurred. Subscription to" + " {}, sid: {} has been cancelled".format( self.service.base_url + self.service.event_subscription_url, self.sid, ) ) self._cancel_subscription(msg) # If we're not being strict, log the Exception if not strict: msg = ( "Exception received in Subscription." + "{} for Subscription to:\n{}, sid: {}".format( action, self.service.base_url + self.service.event_subscription_url, self.sid, ) ) log.exception(msg) # If we're not being strict upon a renewal # (e.g. an autorenewal) call the optional # self.auto_renew_fail method, if it has been set if action == "renew" and self.auto_renew_fail is not None: if hasattr(self.auto_renew_fail, "__call__"): # pylint: disable=not-callable self.auto_renew_fail(exc) # If we're being strict, reraise the Exception else: raise # pylint: disable=raising-bad-type else: # Return the Subscription to the function that # called subscribe, renew or unsubscribe (unless an # Exception occurred and it was reraised above) return self # pylint: disable=lost-exception
subscriptions_map = SubscriptionsMap() # pylint: disable=C0103 event_listener = EventListener() # pylint: disable=C0103