updt
This commit is contained in:
99
tests/conftest.py
Normal file
99
tests/conftest.py
Normal file
@@ -0,0 +1,99 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
import xdist
|
||||
from pytest import Session, fixture, hookimpl
|
||||
from sqlalchemy.ext.asyncio import (
|
||||
AsyncEngine,
|
||||
AsyncSession,
|
||||
async_sessionmaker,
|
||||
create_async_engine,
|
||||
)
|
||||
|
||||
from fastfood_two.application.config import Config
|
||||
from tests.utils import db_creator, db_deleter, get_test_settings
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
worker_id = os.environ.get("PYTEST_XDIST_WORKER")
|
||||
if worker_id is not None:
|
||||
logging.basicConfig(
|
||||
format=config.getini("log_file_format"),
|
||||
filename=f"tests_{worker_id}.log",
|
||||
level=config.getini("log_file_level"),
|
||||
)
|
||||
|
||||
|
||||
@hookimpl(trylast=True)
|
||||
def pytest_sessionstart(session: Session) -> None:
|
||||
"""Hook запуска приложения в отдельном процессе."""
|
||||
worker_id = xdist.get_xdist_worker_id(session)
|
||||
|
||||
import asyncio
|
||||
|
||||
db_name = f"fastfood_two_test_db_{worker_id}"
|
||||
loop = asyncio.get_event_loop_policy().new_event_loop()
|
||||
loop.run_until_complete(db_creator(name=db_name))
|
||||
loop.close()
|
||||
|
||||
|
||||
@hookimpl(trylast=True)
|
||||
def pytest_sessionfinish(session: Session) -> None:
|
||||
"""Hook остановки приложения."""
|
||||
worker_id = xdist.get_xdist_worker_id(session)
|
||||
import asyncio
|
||||
|
||||
db_name = f"fastfood_two_test_db_{worker_id}"
|
||||
loop = asyncio.get_event_loop_policy().new_event_loop()
|
||||
loop.run_until_complete(db_deleter(name=db_name))
|
||||
loop.close()
|
||||
|
||||
|
||||
@fixture(scope="session")
|
||||
def app_settings() -> Config:
|
||||
worker_id = os.environ.get("PYTEST_XDIST_WORKER")
|
||||
if worker_id == None:
|
||||
worker_id = "master"
|
||||
settings = get_test_settings(db_name=f"fastfood_two_test_db_{worker_id}")
|
||||
return settings
|
||||
|
||||
|
||||
@fixture
|
||||
def get_loop():
|
||||
import asyncio
|
||||
|
||||
loop = asyncio.get_event_loop_policy().new_event_loop()
|
||||
yield loop
|
||||
loop.close()
|
||||
|
||||
|
||||
@fixture
|
||||
def test_engine(app_settings):
|
||||
return create_async_engine(url=app_settings.db.url)
|
||||
|
||||
|
||||
@fixture
|
||||
def test_sessionmaker(test_engine):
|
||||
return async_sessionmaker(test_engine, expire_on_commit=False)
|
||||
|
||||
|
||||
@fixture
|
||||
async def test_session(test_sessionmaker):
|
||||
|
||||
async with test_sessionmaker() as session:
|
||||
try:
|
||||
yield session
|
||||
finally:
|
||||
pass
|
||||
|
||||
|
||||
@fixture
|
||||
def app(app_settings, test_engine, test_sessionmaker, test_session):
|
||||
from fastfood_two.presentation.fastapi_backend.main import app_factory
|
||||
|
||||
app = app_factory()
|
||||
app.dependency_overrides[Config] = lambda: app_settings
|
||||
app.dependency_overrides[AsyncEngine] = lambda: test_engine
|
||||
app.dependency_overrides[async_sessionmaker[AsyncSession]] = lambda: test_sessionmaker
|
||||
app.dependency_overrides[AsyncSession] = lambda: test_session
|
||||
return app
|
0
tests/e2e_tests/__init__.py
Normal file
0
tests/e2e_tests/__init__.py
Normal file
0
tests/e2e_tests/api/__init__.py
Normal file
0
tests/e2e_tests/api/__init__.py
Normal file
0
tests/e2e_tests/api/menu/__init__.py
Normal file
0
tests/e2e_tests/api/menu/__init__.py
Normal file
74
tests/e2e_tests/api/menu/conftest.py
Normal file
74
tests/e2e_tests/api/menu/conftest.py
Normal file
@@ -0,0 +1,74 @@
|
||||
from uuid import uuid4
|
||||
|
||||
from _pytest.fixtures import SubRequest
|
||||
from pytest import fixture
|
||||
from sqlalchemy import delete
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from fastfood_two.infrastructure.pg_storage.models import SQLAMenu
|
||||
|
||||
|
||||
def generate_str():
|
||||
return str(uuid4())
|
||||
|
||||
|
||||
@fixture
|
||||
async def menu(request: SubRequest, test_session: AsyncSession):
|
||||
if request.param is not None:
|
||||
id = uuid4()
|
||||
title = "test_menu"
|
||||
description = None
|
||||
|
||||
match len(request.param):
|
||||
case 2:
|
||||
need_create, need_delete = request.param
|
||||
if not isinstance(need_create, bool):
|
||||
raise ValueError
|
||||
if not isinstance(need_delete, bool):
|
||||
raise ValueError
|
||||
case 3:
|
||||
need_create, need_delete, title = request.param
|
||||
if not isinstance(need_create, bool):
|
||||
raise ValueError
|
||||
if not isinstance(need_delete, bool):
|
||||
raise ValueError
|
||||
if not isinstance(title, str | bool):
|
||||
raise ValueError
|
||||
|
||||
if isinstance(title, bool):
|
||||
title = "test_menu" if not title else generate_str
|
||||
case 4:
|
||||
need_create, need_delete, title, description = request.param
|
||||
if not isinstance(need_create, bool):
|
||||
raise ValueError
|
||||
if not isinstance(need_delete, bool):
|
||||
raise ValueError
|
||||
if not isinstance(title, str | bool):
|
||||
raise ValueError
|
||||
if not isinstance(description, str | bool | None):
|
||||
raise ValueError
|
||||
|
||||
if isinstance(title, bool):
|
||||
title = "test_menu" if not title else generate_str
|
||||
|
||||
if isinstance(description, bool):
|
||||
description = "test_desc" if not description else generate_str
|
||||
|
||||
case _:
|
||||
raise ValueError
|
||||
|
||||
menu = SQLAMenu(id=id, title=title, description=description)
|
||||
|
||||
if need_create is True:
|
||||
test_session.add(menu)
|
||||
await test_session.commit()
|
||||
|
||||
yield menu
|
||||
|
||||
if need_delete is True:
|
||||
stmt = delete(SQLAMenu).where(SQLAMenu.id == menu.id)
|
||||
await test_session.execute(stmt)
|
||||
await test_session.commit()
|
||||
|
||||
else:
|
||||
raise ValueError
|
65
tests/e2e_tests/api/menu/test_menu.py
Normal file
65
tests/e2e_tests/api/menu/test_menu.py
Normal file
@@ -0,0 +1,65 @@
|
||||
from pytest import mark
|
||||
|
||||
|
||||
@mark.parametrize(
|
||||
"menu, is_created",
|
||||
[((False, False), False), ((True, True), True)],
|
||||
ids=["Get empty menu list", "Get non-empty menu list"],
|
||||
indirect=["menu"],
|
||||
)
|
||||
async def test_get_menu_list(client, menu, is_created: bool):
|
||||
response = await client.get("menu/")
|
||||
assert response.status_code == 200
|
||||
if is_created:
|
||||
expected = [menu.to_json()]
|
||||
else:
|
||||
expected = []
|
||||
assert response.json() == expected
|
||||
|
||||
|
||||
@mark.parametrize(
|
||||
"menu",
|
||||
[(False, True, False, False), (False, True, False, None)],
|
||||
ids=["Post menu with description", "Post menu without description"],
|
||||
indirect=True,
|
||||
)
|
||||
async def test_post_menu_add(client, menu):
|
||||
response = await client.post("menu/", json=menu.to_json())
|
||||
assert response.status_code == 200
|
||||
assert response.json() == menu.to_json()
|
||||
|
||||
|
||||
@mark.parametrize(
|
||||
"menu, json",
|
||||
[
|
||||
((True, True), {"title": "updated_title"}),
|
||||
((True, True), {"title": "updated_title", "description": "updated_description"}),
|
||||
((True, True), {"title": "updated_title", "description": None}),
|
||||
],
|
||||
ids=[
|
||||
"Update menu title",
|
||||
"Update menu title and description",
|
||||
"Update menu title and description to None",
|
||||
],
|
||||
indirect=["menu"],
|
||||
)
|
||||
async def test_update_menu(client, menu, json):
|
||||
|
||||
response = await client.patch(f"menu/{menu.id}", json=json)
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {
|
||||
"id": str(menu.id),
|
||||
"title": json.get("title"),
|
||||
"description": json.get("description", None),
|
||||
}
|
||||
|
||||
|
||||
@mark.parametrize(
|
||||
"menu",
|
||||
[(True, False)],
|
||||
ids=["Delete menu"],
|
||||
indirect=True,
|
||||
)
|
||||
async def test_delete_menu(client, menu):
|
||||
response = await client.delete(f"menu/{menu.id}")
|
||||
assert response.status_code == 204
|
13
tests/e2e_tests/conftest.py
Normal file
13
tests/e2e_tests/conftest.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from pytest import fixture
|
||||
|
||||
|
||||
@fixture
|
||||
async def client(app):
|
||||
transport = ASGITransport(app=app)
|
||||
|
||||
async with AsyncClient(
|
||||
transport=transport,
|
||||
base_url="http://127.0.0.1:8000/api/v1/",
|
||||
) as async_client:
|
||||
yield async_client
|
0
tests/integration_tests/__init__.py
Normal file
0
tests/integration_tests/__init__.py
Normal file
0
tests/unittests/__init__.py
Normal file
0
tests/unittests/__init__.py
Normal file
Binary file not shown.
@@ -1,6 +0,0 @@
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_coreapp():
|
||||
assert 1 == 1
|
0
tests/unittests/domain_tests/__init__.py
Normal file
0
tests/unittests/domain_tests/__init__.py
Normal file
0
tests/unittests/domain_tests/menu/__init__.py
Normal file
0
tests/unittests/domain_tests/menu/__init__.py
Normal file
3
tests/unittests/domain_tests/menu/test_menu_domain.py
Normal file
3
tests/unittests/domain_tests/menu/test_menu_domain.py
Normal file
@@ -0,0 +1,3 @@
|
||||
class TestMenuDomain:
|
||||
def test_entity(self):
|
||||
assert 1 == 1
|
50
tests/utils.py
Normal file
50
tests/utils.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import contextlib
|
||||
import os
|
||||
from functools import lru_cache
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.exc import ProgrammingError
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
|
||||
from fastfood_two.application.config import Config
|
||||
from fastfood_two.infrastructure.pg_storage.config import PostgresConfig
|
||||
from fastfood_two.infrastructure.pg_storage.models import Base
|
||||
|
||||
|
||||
async def db_creator(name: str) -> None:
|
||||
sql = f"create database {name} with owner test_user;"
|
||||
async with create_async_engine(
|
||||
url="postgresql+asyncpg://pi3c:test_password@localhost:5432/test_db",
|
||||
isolation_level="AUTOCOMMIT",
|
||||
).begin() as conn:
|
||||
with contextlib.suppress(ProgrammingError):
|
||||
await conn.execute(text(sql))
|
||||
|
||||
async with create_async_engine(
|
||||
url=f"postgresql+asyncpg://test_user:test_password@localhost:5432/{name}",
|
||||
).begin() as conn:
|
||||
await conn.run_sync(Base.metadata.drop_all)
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
|
||||
async def db_deleter(name: str) -> None:
|
||||
sql = f"DROP DATABASE {name} WITH (FORCE);"
|
||||
async with create_async_engine(
|
||||
url="postgresql+asyncpg://pi3c:test_password@localhost:5432/test_db",
|
||||
isolation_level="AUTOCOMMIT",
|
||||
).begin() as conn:
|
||||
await conn.execute(text(sql))
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_test_settings(db_name: str) -> Config:
|
||||
"""Возвращает настройки приложения"""
|
||||
return Config(
|
||||
db=PostgresConfig(
|
||||
user=os.getenv("DB_USER", "test_user"),
|
||||
password=os.getenv("DB_PASS", "test_password"),
|
||||
host=os.getenv("DB_HOST", "localhost"),
|
||||
port=int(os.getenv("DB_PORT", 5432)),
|
||||
dbname=db_name,
|
||||
),
|
||||
)
|
Reference in New Issue
Block a user