Skip to content

Improved testing #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 11, 2020
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
*.egg-info
.tox
__pycache__
pip-wheel-metadata
pip-wheel-metadata
build
dist
1 change: 0 additions & 1 deletion mopidy_qr/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def stop(self):

def _loop(self):
logging.debug("Starting QR Reader")

import imutils.video

logger.debug("Import conpleted")
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ install_requires =
setuptools
pyzbar
imutils
numpy


[options.extras_require]
Expand Down
137 changes: 137 additions & 0 deletions tests/dummy_audio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""A dummy audio actor for use in tests.

This class implements the audio API in the simplest way possible. It is used in
tests of the core and backends.
"""


import pykka
from mopidy import audio


def create_proxy(config=None, mixer=None):
return DummyAudio.start(config, mixer).proxy()


# TODO: reset position on track change?
class DummyAudio(pykka.ThreadingActor):
def __init__(self, config=None, mixer=None):
super().__init__()
self.state = audio.PlaybackState.STOPPED
self._volume = 0
self._position = 0
self._callback = None
self._uri = None
self._stream_changed = False
self._live_stream = False
self._tags = {}
self._bad_uris = set()

def set_uri(self, uri, live_stream=False):
assert self._uri is None, "prepare change not called before set"
self._position = 0
self._uri = uri
self._stream_changed = True
self._live_stream = live_stream
self._tags = {}

def set_appsrc(self, *args, **kwargs):
pass

def emit_data(self, buffer_):
pass

def get_position(self):
return self._position

def set_position(self, position):
self._position = position
audio.AudioListener.send("position_changed", position=position)
return True

def start_playback(self):
return self._change_state(audio.PlaybackState.PLAYING)

def pause_playback(self):
return self._change_state(audio.PlaybackState.PAUSED)

def prepare_change(self):
self._uri = None
return True

def stop_playback(self):
return self._change_state(audio.PlaybackState.STOPPED)

def get_volume(self):
return self._volume

def set_volume(self, volume):
self._volume = volume
return True

def set_metadata(self, track):
pass

def get_current_tags(self):
return self._tags

def set_about_to_finish_callback(self, callback):
self._callback = callback

def enable_sync_handler(self):
pass

def wait_for_state_change(self):
pass

def _change_state(self, new_state):
if not self._uri:
return False

if new_state == audio.PlaybackState.STOPPED and self._uri:
self._stream_changed = True
self._uri = None

if self._uri is not None:
audio.AudioListener.send("position_changed", position=0)

if self._stream_changed:
self._stream_changed = False
audio.AudioListener.send("stream_changed", uri=self._uri)

old_state, self.state = self.state, new_state
audio.AudioListener.send(
"state_changed",
old_state=old_state,
new_state=new_state,
target_state=None,
)

if new_state == audio.PlaybackState.PLAYING:
self._tags["audio-codec"] = ["fake info..."]
audio.AudioListener.send("tags_changed", tags=["audio-codec"])

return self._uri not in self._bad_uris

def trigger_fake_playback_failure(self, uri):
self._bad_uris.add(uri)

def trigger_fake_tags_changed(self, tags):
self._tags.update(tags)
audio.AudioListener.send("tags_changed", tags=self._tags.keys())

def get_about_to_finish_callback(self):
# This needs to be called from outside the actor or we lock up.
def wrapper():
if self._callback:
self.prepare_change()
self._callback()

if not self._uri or not self._callback:
self._tags = {}
audio.AudioListener.send("reached_end_of_stream")
else:
audio.AudioListener.send("position_changed", position=0)
audio.AudioListener.send("stream_changed", uri=self._uri)

return wrapper
152 changes: 152 additions & 0 deletions tests/dummy_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""A dummy backend for use in tests.

This backend implements the backend API in the simplest way possible. It is
used in tests of the frontends.
"""


import pykka
from mopidy import backend
from mopidy.models import Playlist, Ref, SearchResult


def create_proxy(config=None, audio=None):
return DummyBackend.start(config=config, audio=audio).proxy()


class DummyBackend(pykka.ThreadingActor, backend.Backend):
def __init__(self, config, audio):
super().__init__()

self.library = DummyLibraryProvider(backend=self)
if audio:
self.playback = backend.PlaybackProvider(audio=audio, backend=self)
else:
self.playback = DummyPlaybackProvider(audio=audio, backend=self)
self.playlists = DummyPlaylistsProvider(backend=self)

self.uri_schemes = ["dummy"]


class DummyLibraryProvider(backend.LibraryProvider):
root_directory = Ref.directory(uri="dummy:/", name="dummy")

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.dummy_library = []
self.dummy_get_distinct_result = {}
self.dummy_browse_result = {}
self.dummy_find_exact_result = SearchResult()
self.dummy_search_result = SearchResult()

def browse(self, path):
return self.dummy_browse_result.get(path, [])

def get_distinct(self, field, query=None):
return self.dummy_get_distinct_result.get(field, set())

def lookup(self, uri):
uri = Ref.track(uri=uri).uri
return [t for t in self.dummy_library if uri == t.uri]

def refresh(self, uri=None):
pass

def search(self, query=None, uris=None, exact=False):
if exact: # TODO: remove uses of dummy_find_exact_result
return self.dummy_find_exact_result
return self.dummy_search_result


class DummyPlaybackProvider(backend.PlaybackProvider):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._uri = None
self._time_position = 0

def pause(self):
return True

def play(self):
return self._uri and self._uri != "dummy:error"

def change_track(self, track):
"""Pass a track with URI 'dummy:error' to force failure"""
self._uri = track.uri
self._time_position = 0
return True

def prepare_change(self):
pass

def resume(self):
return True

def seek(self, time_position):
self._time_position = time_position
return True

def stop(self):
self._uri = None
return True

def get_time_position(self):
return self._time_position


class DummyPlaylistsProvider(backend.PlaylistsProvider):
def __init__(self, backend):
super().__init__(backend)
self._playlists = []
self._allow_save = True

def set_dummy_playlists(self, playlists):
"""For tests using the dummy provider through an actor proxy."""
self._playlists = playlists

def set_allow_save(self, enabled):
self._allow_save = enabled

def as_list(self):
return [
Ref.playlist(uri=pl.uri, name=pl.name) for pl in self._playlists
]

def get_items(self, uri):
playlist = self.lookup(uri)
if playlist is None:
return
return [Ref.track(uri=t.uri, name=t.name) for t in playlist.tracks]

def lookup(self, uri):
uri = Ref.playlist(uri=uri).uri
for playlist in self._playlists:
if playlist.uri == uri:
return playlist

def refresh(self):
pass

def create(self, name):
playlist = Playlist(name=name, uri=f"dummy:{name}")
self._playlists.append(playlist)
return playlist

def delete(self, uri):
playlist = self.lookup(uri)
if playlist:
self._playlists.remove(playlist)

def save(self, playlist):
if not self._allow_save:
return None

old_playlist = self.lookup(playlist.uri)

if old_playlist is not None:
index = self._playlists.index(old_playlist)
self._playlists[index] = playlist
else:
self._playlists.append(playlist)

return playlist
30 changes: 30 additions & 0 deletions tests/dummy_mixer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pykka
from mopidy import mixer


def create_proxy(config=None):
return DummyMixer.start(config=None).proxy()


class DummyMixer(pykka.ThreadingActor, mixer.Mixer):
def __init__(self, config):
super().__init__()
# These must be initialised to avoid none type error in tests
self._volume = 50
self._mute = False

def get_volume(self):
return self._volume

def set_volume(self, volume):
self._volume = volume
self.trigger_volume_changed(volume=volume)
return True

def get_mute(self):
return self._mute

def set_mute(self, mute):
self._mute = mute
self.trigger_mute_changed(mute=mute)
return True
Loading