curds: Add a test for parallel track scanning

And add code for generating a large test library to make stress testing
possible.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2019-02-06 14:35:35 -05:00
parent 9b64c590cc
commit 0705f86d81
3 changed files with 38 additions and 1 deletions

View File

@ -3,6 +3,7 @@
clean:
find . -type d -name __pycache__ -exec rm -r {} \+
find trier -type d -name "Test Album" -exec rm -r {} \+
find trier -type d -name "Test Library" -exec rm -r {} \+
.PHONY: trier
trier:

View File

@ -1,10 +1,13 @@
# Copyright 2019 (c) Anna Schumaker.
import asyncio
import concurrent.futures
import mutagen
import unittest
import os
import track
test_tracks = os.path.abspath("./trier/Test Album")
test_tracks = os.path.abspath("./trier/Test Album")
test_library = os.path.abspath("./trier/Test Library")
class TestTrackClass(unittest.TestCase):
def test_init_basic(self):
@ -50,6 +53,7 @@ class TestTrackClass(unittest.TestCase):
self.assertEqual(track.Track(join(test_tracks, "10 - Test {Disc 20}.ogg")).discnumber, 20)
def test_track_add(self):
track.tracklist.clear()
t = track.add(os.path.join(test_tracks, "01 - Test Track.ogg"))
self.assertIsNotNone(t)
self.assertEqual(len(track.tracklist), 1)
@ -65,5 +69,19 @@ class TestTrackClass(unittest.TestCase):
self.assertEqual(len(track.tracklist), 2)
self.assertIn(u, track.tracklist)
def test_parallel_add(self):
track.tracklist.clear()
count = 0
tracks = [ ]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool:
for dirname, subdirs, files in os.walk(test_library):
for f in files:
tracks.append(pool.submit(track.add, os.path.join(dirname, f)))
count += 1
self.assertEqual(len(track.tracklist), count)
self.assertEqual(len(tracks), count)
for t in tracks:
self.assertIsNotNone(t)
if __name__ == '__main--':
unittest.main()

View File

@ -8,6 +8,8 @@ ffmpeg = "ffmpeg -hide_banner -nostdin -f s16le -i /dev/zero -codec libvorbis -l
def generate_track(length, filename, tags={}):
path = os.path.join(trier, filename)
if os.path.exists(path):
return
os.makedirs(os.path.dirname(path), exist_ok=True)
subprocess.run(ffmpeg + [ "-t", str(length), path ])
@ -16,6 +18,7 @@ def generate_track(length, filename, tags={}):
fileinfo[key] = value
fileinfo.save()
# Create a bunch of tracks in the Test Album directory
generate_track( 0, "Test Album/00 - Empty Track.ogg")
generate_track(10, "Test Album/01 - Test Track.ogg", { "title" : "Test Track",
"artist" : "Test Artist",
@ -43,3 +46,18 @@ generate_track(50, "Test Album/09 - Test {Disc 02}.ogg", { "Title" : "Test {Disc
"album" : "Test Album {Disc 02}" })
generate_track(55, "Test Album/10 - Test {Disc 20}.ogg", { "Title" : "Test {Disc 20}",
"album" : "Test Album {Disc 20}" })
# Create a giant library for testing
for artistno in range(1, 26):
artist = f"Test Artist {artistno:02}"
for albumno in range(1, 6):
album = f"Test Album {albumno}"
for trackno in range(1, 11):
title = f"Test Track {trackno:02}"
generate_track(trackno, f"Test Library/{artist}/{album}/{trackno:02} - {title}.ogg",
{ "title" : title,
"artist" : artist,
"album" : album,
"genre" : f"Test Genre {albumno}",
"date" : str(1970 + (albumno * 3)),
"tracknumber" : f"{trackno:02}" })