Compare commits

...

9 Commits

77 changed files with 2251 additions and 20 deletions

View File

Before

Width:  |  Height:  |  Size: 6.3 KiB

After

Width:  |  Height:  |  Size: 6.3 KiB

View File

@ -2,8 +2,7 @@ import os
import time import time
import pygame as pg import pygame as pg
from pygame.key import name import pygame_menu
import pygame_menu as pgm
from .snake import Snake from .snake import Snake
from .food import Food from .food import Food
@ -28,26 +27,29 @@ class Game:
self.menu = None self.menu = None
self.snake = Snake() self.snake = Snake()
self.foods_array = [] self.foods_array = []
self.game_speed = 0
def game_init(self): def game_init(self):
pg.init() pg.init()
def get_menu(self): def get_menu(self):
self.set_bg() self.set_bg()
self.menu = pgm.Menu('Snakessss game', 400, 300, theme=pgm.pygame_menu.themes.THEME_DARK) self.menu = pygame_menu.Menu('Snakessss game', 400, 300, theme=pygame_menu.themes.THEME_BLUE)
self.menu.add.button('Start new game', self.start_new_game) self.menu.add.button('Start new game', self.start_new_game)
if self.started: if self.started:
self.menu.add.button('Restore game', self.restore_game) self.menu.add.button('Restore game', self.restore_game)
self.menu.add.selector('Speed', [('Slow', 40), ('Medium', 25), ('Fast', 10)], onchange=self.set_speed) self.menu.add.selector('Speed', [('Slow', 0), ('Medium', 1), ('Fast', 2)], onchange=self.set_speed, onreturn=self.restore_game)
self.menu.add.button('Exit', pgm.pygame_menu.events.EXIT) self.menu._widgets[-1].set_value(self.game_speed)
self.menu.add.button('Exit', pygame_menu.events.EXIT)
self.menu.mainloop(self.screen) self.menu.mainloop(self.screen)
def disable_menu(self): def disable_menu(self):
self.menu.disable() if self.menu is not None:
self.menu.disable()
def set_speed(self, value, difficulty):
self.snake.speed = difficulty
def set_speed(self, tuple_celected, difficulty):
self.game_speed = difficulty
self.snake.set_speed(speed=self.game_speed)
def set_title(self): def set_title(self):
pg.display.set_caption(self.title) pg.display.set_caption(self.title)
@ -56,14 +58,19 @@ class Game:
self.screen.blit(self.bg, (0, 0)) self.screen.blit(self.bg, (0, 0))
def start_new_game(self): def start_new_game(self):
print(self.game_speed, self.snake.speed)
self.started = True self.started = True
self.paused = False self.paused = False
self.disable_menu() self.disable_menu()
self.need_reset = True self.need_reset = True
def restore_game(self): def restore_game(self, *args, **kwargs):
self.disable_menu() if self.started:
self.paused = False self.disable_menu()
self.paused = False
else:
self.start_new_game()
def save_game(self): def save_game(self):
pass pass
@ -72,10 +79,14 @@ class Game:
pass pass
def gameover(self): def gameover(self):
surf = self.font.render('Game over', True, (20, 20, 20)) surf = self.font.render('GAME OVER', True, (20, 20, 20))
rect = surf.get_rect() rect = surf.get_rect()
score = self.font.render(f'Score: {self.score}', True, (20, 20, 20))
rect2 = score.get_rect()
rect.midtop = (390, 250) rect.midtop = (390, 250)
rect2.midtop = (390, 320)
self.screen.blit(surf, rect) self.screen.blit(surf, rect)
self.screen.blit(score, rect2)
pg.display.flip() pg.display.flip()
time.sleep(3) time.sleep(3)
@ -89,6 +100,7 @@ class Game:
for f in self.foods_array: for f in self.foods_array:
if self.snake.get_head_coords() == f.get_coords(): if self.snake.get_head_coords() == f.get_coords():
self.score += 5 * self.snake.length
self.foods_array.remove(f) self.foods_array.remove(f)
self.snake.length += 1 self.snake.length += 1
@ -99,23 +111,24 @@ class Game:
pg.display.update() pg.display.update()
if self.snake.impacted: if self.snake.impacted:
self.gameover()
self.started = False self.started = False
self.gameover()
def mainloop(self): def mainloop(self):
while not self.done: while not self.done:
if self.need_reset: if self.need_reset:
self.snake.reset() self.snake.reset(speed=self.game_speed)
self.start_new_game() self.start_new_game()
self.need_reset = False self.need_reset = False
for event in pg.event.get(): for event in pg.event.get():
if event.type == pg.QUIT: if event.type == pg.QUIT:
self.done = True self.done = True
pressed = pg.key.get_pressed() pressed = pg.key.get_pressed()
if pressed[pg.K_ESCAPE]: if pressed[pg.K_ESCAPE]:
self.paused = False if self.paused else True self.paused = False if self.paused else True
if self.paused: if self.paused:
@ -124,12 +137,15 @@ class Game:
if pressed[pg.K_UP]: if pressed[pg.K_UP]:
if self.snake.direction != 'down': if self.snake.direction != 'down':
self.snake.direction = 'up' self.snake.direction = 'up'
if pressed[pg.K_DOWN]: if pressed[pg.K_DOWN]:
if self.snake.direction != 'up': if self.snake.direction != 'up':
self.snake.direction = 'down' self.snake.direction = 'down'
if pressed[pg.K_LEFT]: if pressed[pg.K_LEFT]:
if self.snake.direction != 'right': if self.snake.direction != 'right':
self.snake.direction = 'left' self.snake.direction = 'left'
if pressed[pg.K_RIGHT]: if pressed[pg.K_RIGHT]:
if self.snake.direction != 'left': if self.snake.direction != 'left':
self.snake.direction = 'right' self.snake.direction = 'right'

View File

Before

Width:  |  Height:  |  Size: 1013 KiB

After

Width:  |  Height:  |  Size: 1013 KiB

View File

Before

Width:  |  Height:  |  Size: 5.3 KiB

After

Width:  |  Height:  |  Size: 5.3 KiB

View File

Before

Width:  |  Height:  |  Size: 5.9 KiB

After

Width:  |  Height:  |  Size: 5.9 KiB

View File

Before

Width:  |  Height:  |  Size: 5.8 KiB

After

Width:  |  Height:  |  Size: 5.8 KiB

View File

Before

Width:  |  Height:  |  Size: 5.9 KiB

After

Width:  |  Height:  |  Size: 5.9 KiB

View File

Before

Width:  |  Height:  |  Size: 13 KiB

After

Width:  |  Height:  |  Size: 13 KiB

View File

Before

Width:  |  Height:  |  Size: 5.4 KiB

After

Width:  |  Height:  |  Size: 5.4 KiB

View File

Before

Width:  |  Height:  |  Size: 15 KiB

After

Width:  |  Height:  |  Size: 15 KiB

View File

Before

Width:  |  Height:  |  Size: 15 KiB

After

Width:  |  Height:  |  Size: 15 KiB

View File

Before

Width:  |  Height:  |  Size: 3.0 KiB

After

Width:  |  Height:  |  Size: 3.0 KiB

View File

Before

Width:  |  Height:  |  Size: 2.8 KiB

After

Width:  |  Height:  |  Size: 2.8 KiB

View File

Before

Width:  |  Height:  |  Size: 2.9 KiB

After

Width:  |  Height:  |  Size: 2.9 KiB

View File

Before

Width:  |  Height:  |  Size: 2.9 KiB

After

Width:  |  Height:  |  Size: 2.9 KiB

View File

@ -24,13 +24,16 @@ class Snake:
self.direction = 'right' self.direction = 'right'
self.direction_prev = 'right' self.direction_prev = 'right'
self.length = 10 self.length = 2
self.speed = 40 # скорость движения змейки чем меньше, тем быстрее self.speed = 30 # скорость движения змейки чем меньше, тем быстрее
self.counter = 0 # Просто счетчик, для регулировки скорости нужен self.counter = 0 # Просто счетчик, для регулировки скорости нужен
self.body = kwargs.get('body', [('h', 60, 300), ('h', 30, 300)]) self.body = kwargs.get('body', [('h', 60, 300), ('h', 30, 300)])
self.impacted = False self.impacted = False
def reset(self): def set_speed(self, **kwargs):
self.speed = (30, 20, 10)[kwargs.get('speed', 30)]
def reset(self, **kwargs):
self.x = 90 self.x = 90
self.y = 300 self.y = 300
self.body = [('h', 60, 300), ('h', 30, 300)] self.body = [('h', 60, 300), ('h', 30, 300)]
@ -38,7 +41,7 @@ class Snake:
self.direction_prev = 'right' self.direction_prev = 'right'
self.impacted = False self.impacted = False
self.length = 2 self.length = 2
self.speed = 40 self.set_speed(**kwargs)
def get_head_coords(self): def get_head_coords(self):
return self.x, self.y return self.x, self.y

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,34 @@
[tool.poetry]
name = "servicemanager"
version = "0.0.1"
description = "Computer equipment prevention and maintenance management system"
authors = ["Sergey Vanyushkin <pi3c@yandex.ru>"]
license = "MIT"
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.10"
fastapi = {extras = ["all"], version = "^0.108.0"}
SQLAlchemy = "^2.0.25"
asyncpg = "^0.29.0"
pydantic-settings = "^2.1.0"
uvicorn = "^0.25.0"
fastapi-users = {extras = ["sqlalchemy"], version = "^12.1.2"}
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.4"
flake8 = "^7.0.0"
mypy = "^1.8.0"
pytest-cov = "^4.1.0"
faker = "^22.1.0"
[tool.pytest.ini_options]
pythonpath = ". src"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
app = "src.app:run_app"

View File

@ -0,0 +1,9 @@
from fastapi import APIRouter
api_router = APIRouter()
@api_router.get("/ping")
async def pong():
"""Тестовый роут"""
return {"ping": "pong!"}

View File

@ -0,0 +1,50 @@
import asyncio
import sys
from fastapi import FastAPI
import uvicorn
from api.routes import api_router
from frontend.routes import site_router
async def generate_test_data():
"""
Создание БД и наполнение ее данными
"""
from backend.database import create_db_and_tables
from backend.queries import Crud
await create_db_and_tables()
await Crud.insert_demo_data()
def create_app():
"""
Создание экземпляра приложения FastAPI и врзврат его
"""
app = FastAPI()
app.include_router(api_router)
app.include_router(site_router)
return app
def run_app():
"""
Запуск локального вебсервера для тестов и проверки
"""
uvicorn.run(
app="main:create_app",
reload=True,
factory=True,
)
if __name__ == "__main__":
if "--configure" in sys.argv:
asyncio.run(generate_test_data())
if "--webserver" in sys.argv:
run_app()

View File

@ -0,0 +1,35 @@
from typing import AsyncGenerator
from fastapi import Depends
from fastapi_users.db import SQLAlchemyUserDatabase
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from config import settings
from backend.models import Base, User
async_engine = create_async_engine(settings.DATABASE_URL_asyncpg)
async_session_maker = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def create_db_and_tables():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User)

View File

@ -0,0 +1,121 @@
from datetime import datetime
import enum
from typing import List, Annotated, Optional
from fastapi_users.db import SQLAlchemyBaseUserTable
from sqlalchemy import ForeignKey, text
from sqlalchemy.orm import Mapped, mapped_column, relationship, DeclarativeBase
intpk = Annotated[int, mapped_column(primary_key=True)]
str_25 = Annotated[str, 25]
created_at = Annotated[
datetime,
mapped_column(server_default=text("TIMEZONE('utc', now())")),
]
updated_at = Annotated[
datetime,
mapped_column(
server_default=text("TIMEZONE('utc', now())"),
onupdate=datetime.utcnow,
),
]
class Base(DeclarativeBase):
pass
class UserRoles(enum.Enum):
"""
Доступные роли юзеров/клиентов
"""
admin = "Администратор"
director = "Руководитель"
worker = "Работник"
client = "Клиент"
class UserRoleAssociation(Base):
"""
Модель таблицы связи Many2Many для Role & User
"""
__tablename__ = "user_role_association"
user_id: Mapped[int] = mapped_column(
ForeignKey("user.id"),
primary_key=True,
)
role_id: Mapped[int] = mapped_column(
ForeignKey("role.id"),
primary_key=True,
)
role: Mapped["Role"] = relationship()
class Role(Base):
"""
Модель таблицы role
"""
__tablename__ = "role"
id: Mapped[intpk]
role: Mapped[Optional[UserRoles]]
class User(SQLAlchemyBaseUserTable[int], Base):
"""
Модель таблицы user
"""
__tablename__ = "user"
id: Mapped[intpk]
name: Mapped[str_25]
patronim: Mapped[str_25]
last_name: Mapped[str_25]
roles: Mapped[List["Role"]] = relationship(
secondary="user_role_association",
)
equipment: Mapped[List["Equipment"]] = relationship()
eq_locations: Mapped[List["UserLocationAssociation"]] = relationship()
class UserLocationAssociation(Base):
"""таблица связи Many2Many между user & location"""
__tablename__ = "user_location_association"
client_id: Mapped[int] = mapped_column(
ForeignKey("user.id"),
primary_key=True,
)
location_id: Mapped[int] = mapped_column(
ForeignKey("location.id"), primary_key=True
)
location: Mapped["Location"] = relationship()
class Location(Base):
"""
Адреса клиентов где установленно оборудование
"""
__tablename__ = "location"
id: Mapped[intpk]
address: Mapped[str]
client: Mapped[int]
class Equipment(Base):
""" """
__tablename__ = "equipment"
id: Mapped[intpk]
name: Mapped[str]
eq_type: Mapped[int]
put_into_operation_at: Mapped[datetime]
owner_id: Mapped[int] = mapped_column(ForeignKey("user.id"))
# Определение связи many2one
location_id: Mapped[int] = mapped_column(ForeignKey("location.id"))
location: Mapped["Location"] = relationship()

View File

@ -0,0 +1,78 @@
from faker import Faker
from backend.database import async_session_maker
import backend.models as models
class Crud:
@staticmethod
async def insert_demo_data():
async with async_session_maker() as session:
fake = Faker(["ru_RU"])
all_role = [
models.Role(role="admin"),
models.Role(role="director"),
models.Role(role="worker"),
models.Role(role="client"),
]
session.add_all(all_role)
await session.flush()
admin = models.User(
email="admin@mail.com",
hashed_password="admin",
is_active=True,
is_superuser=True,
is_verified=True,
roles=all_role,
)
director = models.User(
name=fake.first_name_male(),
email="director@mail.com",
hashed_password="director",
roles=all_role[1:3],
)
worker1 = models.User(
email="worker1@mail.com",
hashed_password="worker1",
roles=all_role[2:3],
)
worker2 = models.User(
email="worker2@mail.com",
hashed_password="worker2",
roles=all_role[2:3],
)
client1 = models.User(
email="client1@mail.com",
hashed_password="client1",
roles=all_role[3:],
)
client2 = models.User(
email="client2@mail.com",
hashed_password="client2",
roles=all_role[3:],
)
client3 = models.User(
email="client3@mail.com",
hashed_password="client3",
roles=all_role[3:],
)
for _ in range(10):
print(fake.address())
session.add_all(
[
admin,
director,
worker1,
worker2,
client1,
client2,
client3,
]
)
await session.commit()

View File

@ -0,0 +1,24 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
DB_HOST: str
DB_PORT: int
DB_USER: str
DB_PASS: str
DB_NAME: str
@property
def DATABASE_URL_asyncpg(self):
"""
Возвращает строку подключения к БД необходимую для SQLAlchemy
"""
return (
f"postgresql+asyncpg://{self.DB_USER}:{self.DB_PASS}"
f"@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}"
)
model_config = SettingsConfigDict(env_file=".env")
settings = Settings()

View File

@ -0,0 +1,20 @@
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
site_router = APIRouter()
site_router.mount(
"/static",
StaticFiles(directory="src/frontend/static"),
name="static",
)
templates = Jinja2Templates(directory="src/frontend/templates")
@site_router.get("/testpage", response_class=HTMLResponse)
async def read_item(request: Request):
return templates.TemplateResponse(request=request, name="test.html")

View File

@ -0,0 +1,11 @@
<html>
<head>
<title>test page</title>
</head>
<body>
<h1>test_data</h1>
</body>
</html>

View File

@ -0,0 +1,10 @@
from starlette.testclient import TestClient
from src.app import create_app
client = TestClient(create_app())
def test_testpage():
response = client.get("/testpage")
assert response.status_code == 200

View File

@ -0,0 +1,11 @@
from starlette.testclient import TestClient
from src.app import create_app
client = TestClient(create_app())
def test_ping():
response = client.get("/ping")
assert response.status_code == 200
assert response.json() == {"ping": "pong!"}