# Copyright 2023 (c) Anna Schumaker """Tests our database Connection object.""" import pathlib import sqlite3 import unittest import xfstestsdb.sqlite from xdg.BaseDirectory import save_data_path class TestConnection(unittest.TestCase): """Test the database connection.""" def setUp(self): """Set up common variables.""" self.sql = xfstestsdb.sqlite.Connection() def test_paths(self): """Check that path constants are pointing in the right places.""" data_dir = pathlib.Path(save_data_path("xfstestsdb")) self.assertEqual(xfstestsdb.sqlite.DATA_DIR, data_dir) self.assertEqual(xfstestsdb.sqlite.DATA_FILE, data_dir / "xfstestsdb-debug.sqlite3") self.assertEqual(xfstestsdb.sqlite.DATABASE, ":memory:") self.assertEqual(xfstestsdb.sqlite.SQL_SCRIPTS, pathlib.Path(xfstestsdb.__file__).parent / "scripts") self.assertEqual(xfstestsdb.sqlite.SQL_V1_SCRIPT, xfstestsdb.sqlite.SQL_SCRIPTS / "xfstestsdb.sql") self.assertEqual(xfstestsdb.sqlite.SQL_V2_SCRIPT, xfstestsdb.sqlite.SQL_SCRIPTS / "upgrade-v2.sql") def test_foreign_keys(self): """Test that foreign key constraints are enabled.""" cur = self.sql("PRAGMA foreign_keys") self.assertEqual(cur.fetchone()["foreign_keys"], 1) def test_version(self): """Test checking the database schema version.""" cur = self.sql("PRAGMA user_version") self.assertEqual(cur.fetchone()["user_version"], 2) def test_connection(self): """Check that the connection manager is initialized properly.""" self.assertIsInstance(self.sql.sql, sqlite3.Connection) self.assertEqual(self.sql.sql.row_factory, sqlite3.Row) self.assertTrue(self.sql.connected) def test_call(self): """Test that the connection manager can run sql statements.""" self.sql("CREATE TABLE test (a INT UNIQUE, b INT)") self.sql("INSERT INTO test VALUES (?, ?)", 1, 2) cur = self.sql("SELECT * FROM test") self.assertIsInstance(cur, sqlite3.Cursor) row = cur.fetchone() self.assertIsInstance(row, sqlite3.Row) self.assertEqual(row["a"], 1) self.assertEqual(row["b"], 2) def test_call_keyword(self): """Test running a sql statement with keyword arguments.""" self.sql("CREATE TABLE test (a INT UNIQUE, b INT)") self.sql("INSERT INTO test VALUES (:a, :b)", a=1, b=2) cur = self.sql("SELECT * FROM test") self.assertIsInstance(cur, sqlite3.Cursor) row = cur.fetchone() self.assertIsInstance(row, sqlite3.Row) self.assertEqual(row["a"], 1) self.assertEqual(row["b"], 2) def test_executemany(self): """Test that the connection manager can run several statements.""" self.sql("CREATE TABLE test (a INT, b INT)") self.sql.executemany("INSERT INTO test VALUES (?, ?)", (1, 2), (3, 4), (5, 6), (7, 8), (9, 0)) rows = self.sql("SELECT * FROM test").fetchall() self.assertListEqual([(row["a"], row["b"]) for row in rows], [(1, 2), (3, 4), (5, 6), (7, 8), (9, 0)]) def test_executescript(self): """Test running a sql script.""" script = pathlib.Path(__file__).parent / "test-script.sql" cur = self.sql.executescript(script) self.assertIsInstance(cur, sqlite3.Cursor) rows = self.sql("SELECT * FROM test").fetchall() self.assertListEqual([(row["a"], row["b"]) for row in rows], [(1, 2), (3, 4), (5, 6), (7, 8), (9, 0)]) self.assertIsNone(self.sql.executescript(script.parent / "no-script")) def test_transaction(self): """Test that we can manually start a transaction.""" self.assertFalse(self.sql.sql.in_transaction) with self.sql: self.assertTrue(self.sql.sql.in_transaction) self.sql("CREATE TABLE test_table (test TEXT)") self.sql("INSERT INTO test_table VALUES (?)", "Test") self.assertTrue(self.sql.sql.in_transaction) self.assertFalse(self.sql.sql.in_transaction) cur = self.sql("SELECT COUNT(*) FROM test_table") self.assertEqual(cur.fetchone()["COUNT(*)"], 1) def test_transaction_rollback(self): """Test that errors roll back the transaction.""" with self.assertRaises(Exception): with self.sql: self.sql("CREATE TABLE other_table (test TEXT)") self.sql("INSERT INTO other_table VALUES (?)", "Test") raise Exception("Test Exeption") self.assertFalse(self.sql.sql.in_transaction) with self.assertRaises(sqlite3.OperationalError): self.sql("SELECT COUNT(*) FROM other_table") def test_vacuum(self): """Test vacuuming the database.""" self.assertIsInstance(self.sql.vacuum(), sqlite3.Cursor) def test_close(self): """Check closing the connection.""" self.sql.close() self.assertFalse(self.sql.connected) with self.assertRaises(sqlite3.ProgrammingError): self.assertIsNone(self.sql("SELECT COUNT(*) FROM test_table")) self.sql.close()