No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Servicios para registrar y modificar datos

11/17
Recursos

Aportes 13

Preguntas 2

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

Mi aporte al reto

def delete_movie(self, id : int):
        result = self.get_movie(id)
        self.db.delete(result)
        self.db.commit()
        return 

Excelente Framework muy versátil !!!

Comparto mi proyecto utilizando MongoDB
https://github.com/cmtoro/Python-FastAPI-MovieAPP

from models.movie import Movie as MovieModel
from schemas.movie import MovieAPI

class FilmsService():
    def __init__(self, db) -> None:
        self.db = db
            
    def get_films(self):
        result = self.db.query(MovieModel).all()
        return result

    def get_movie_id(self, id):
        result = self.db.query(MovieModel).filter(MovieModel.id == id).one_or_none()
        return result

    def get_films_by_category(self, category):
        result = self.db.query(MovieModel).filter(MovieModel.category == category).all()
        return result
    
    def get_films_by_year(self, year):
        result = self.db.query(MovieModel).filter(MovieModel.year == year).all()
        return result
    
    def get_films_by_category_and_year(self, category, year):
        result = self.db.query(MovieModel).filter(MovieModel.category == category and MovieModel.year == year).all()
        return result

    def create_movie(self, movie: MovieAPI):
        new_movie = MovieModel(**movie.dict())
        self.db.add(new_movie)
        self.db.commit()
        return


    def update_movie(self, id: int, data: MovieAPI):
        movie_update = self.get_movie_id(id)
        movie_update.update(data.dict(exclude_unset=True))
        self.db.commit()
        self.db.refresh(movie_update)
        return
    
    def delete_movie(self, id : int):
        movie_delete = self.get_movie_id(id)
        self.db.delete(movie_delete)
        self.db.commit()
        return 
    
    def delete_all_movie(self):
        all_films_delete = self.get_films()
        for movie in all_films_delete:
            self.db.delete(movie)
        self.db.commit()
        return 
from fastapi import APIRouter
from fastapi import HTTPException, status, Path, Query, Depends
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from typing import Union, List
from config.database import Session
from models.movie import Movie as MovieModel
from middlewares.jwt_bearer import JWTBearer
import datetime
from fastapi import HTTPException
from schemas.movie import MovieAPI
from services.films import FilmsService

films_router = APIRouter()


# creacion de la ruta peliculas, y la etiqueta peliculas
@films_router.get('/films', 
         tags=['films'], 
         status_code=status.HTTP_200_OK,
         summary="All films",
         response_model=List[MovieAPI],
        #  dependencies=[Depends(JWTBearer())]
         )
def get_films() -> List[MovieAPI]: # devuelve el listado de las peliculas
    """
    Obtener todas las peliculas
    """
    db = Session()
    result = FilmsService(db).get_films()
    if not result:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List films empty")
    return JSONResponse(status_code=status.HTTP_200_OK,
                        content={
                            "message": "Get all films successfully",
                            "details": jsonable_encoder(result)
                            }
                        )


@films_router.get(path="/films/read/{id}", 
        tags=["films"], 
        status_code=status.HTTP_200_OK,
        summary="Movie by id",
        response_model=MovieAPI,
        dependencies=[Depends(JWTBearer())])
def get_movie_by_id(id: int = Path(ge=1, le=2000)) -> MovieAPI:
    """
    Obtener película por id por parámetro de ruta
    """
    db = Session()
    result = FilmsService(db).get_movie(id)
    if not result:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Id movie not found")
    return JSONResponse(status_code=status.HTTP_200_OK,
                        content={
                            "message": "Get movie por id successfully",
                            "details": jsonable_encoder(result)
                            }
                        )
    

@films_router.get('/films/read/', 
        tags=['films'],
        status_code=status.HTTP_200_OK,
        summary="Movie by category our year",
        response_model=List[MovieAPI],
        dependencies=[Depends(JWTBearer())]
        )
# Cuando no se deja un parámetro en la ruta pero si en la función, fastAPI detecta que es por query
def get_movie_by_category_our_year(category: Union[str, None] = Query(None, min_length=5, max_length=20), 
                                   year: Union[int, None] = Query(None, ge=1900, le=datetime.date.today().year)) -> List[MovieAPI]:
    """
    Obtener película/as por una categoría o año por Query Parameters
    """
    db = Session()
    if category is None and year is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Coloque algo, no sea pendejo")
    
    elif year is None:
        result = FilmsService(db).get_films_by_category(category)
        if not result:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category of movie not found")
        print("DEBUGGER")
        return JSONResponse(status_code=status.HTTP_200_OK,
                content={
                    "message": f"Get all films for category {category} successfully",
                    "details": jsonable_encoder(result)
                    }
                )
    elif category is None:
        result = FilmsService(db).get_films_by_year(year)
        if not result:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Year of movie not found")
        print("DEBUGGER")
        return JSONResponse(status_code=status.HTTP_200_OK,
                content={
                    "message": f"Get all films for year {year} successfully",
                    "details": jsonable_encoder(result)
                    }
                )
    else:
        result = FilmsService(db).get_films_by_category_and_year(category, year)
        if not result:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category our year of movie not found")
        print("DEBUGGER")
        return JSONResponse(status_code=status.HTTP_200_OK,
                content={
                    "message": f"Get all films for year {year} and category {category} successfully",
                    "details": jsonable_encoder(result)
                    }
                )


@films_router.post('/films/create/', 
          tags=['films'], 
          status_code=status.HTTP_201_CREATED,
          summary="Add Movie to films", 
          response_model=dict,
          dependencies=[Depends(JWTBearer())]
          )
async def create_movie(movie: MovieAPI) -> dict:
    """
    Agregar una película por parámetros en el body, ID se coloca de manera automatica segun orden que sigue en nuestros "films"
    """
    db = Session()
    new_movie = FilmsService(db).create_movie(movie)
    return JSONResponse(status_code=status.HTTP_201_CREATED, 
                        content={"message": "Se ha registrado la película"})
    

@films_router.put(
    "/films/update/{id}",
    tags=['films'],
    status_code=status.HTTP_200_OK,
    summary="Update movie", 
    response_model=dict,
    dependencies=[Depends(JWTBearer())]
    )
async def update_movie( movie: MovieAPI, id: int = Path(ge=1, le=2000))  -> dict:
    """
    Actualizar una película por parámetros en el body buscando por el parámetro de ID
    """
    db = Session()
    movie_update = FilmsService(db).get_movie_id(id)
    if not movie_update: 
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="ID No found")
    
    FilmsService(db).update_movie(id, movie)
    
    return JSONResponse(status_code=status.HTTP_200_OK, 
                        content={"message": "Updated movie successfully",
                                 "details": jsonable_encoder(movie_update)
                            }
                        )
        



@films_router.delete(
    "/films/delete/{id}",
    tags=['films'],
    status_code=status.HTTP_200_OK,
    summary="Delete movie", 
    response_model=dict,
    dependencies=[Depends(JWTBearer())]
    )
async def delete_movie(id: int = Path(ge=1, le=2000)) -> dict:
    """
    Eliminar una película por el parámetro de ID
    """
    db = Session()
    movie_delete = FilmsService(db).get_movie_id(id)

    if not movie_delete: 
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="ID No found")
    
    FilmsService(db).delete_movie(id)
    return JSONResponse(status_code=status.HTTP_200_OK, 
                        content={
                            "message": "Deleted movie successfully",
                            "details": jsonable_encoder(movie_delete)
                            })
    
    
    
@films_router.delete(
    "/films/delete/",
    tags=['films'],
    status_code=status.HTTP_200_OK,
    summary="Delete all movie", 
    response_model=dict,
    dependencies=[Depends(JWTBearer())]
    )
async def delete_all_movie() -> dict:
    """
    Eliminar una película por el parámetro de ID
    """
    db = Session()
    all_films_delete = FilmsService(db).get_films()
    if not all_films_delete:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List films empty")

    FilmsService(db).delete_all_movie()
    
    return JSONResponse(status_code=status.HTTP_200_OK,
                        content={
                            "message": "Delete all films successfully",
                            "details": jsonable_encoder(all_films_delete)
                            }
                        )
Algo que me llama la atención desde que vendo haciendo la carrera Backend de python , porque cuando organizan el proyecto en equis arquitectura con los Models , Schemas , Middlewares ,Routers Etc , llaman a los archivos de igual manera en cada directorio: Creo que seria más fácil colocando ej: Middleware/middlewareMovie.py Models/modelMovie.py .....

Como en el método put y post recibo la data destructurada entonces he decidido proteger el id para evitar de su modificación.

from models.movie import Movie as MovieModel
from schemas.movie import Movie
from config.database import Session


class MovieService(object):
    def __init__(self, database: Session):  # type: ignore
        self.database = database

    def get_movies(self):
        return self.database.query(MovieModel).all()

    def get_movie(self, movie_id: int):
        return self.database.query(MovieModel).filter(MovieModel.id == movie_id).first()

    def get_movies_by_category(self, category: str):
        return self.database.query(MovieModel).filter(MovieModel.category == category).all()

    def create_movie(self, movie: Movie):
        data = movie.dict(exclude_unset=True)

        exclude_fields = ["id"]
        if all(field in list(data.keys()) for field in exclude_fields):
            raise Exception("Ha agregado campos no editables")

        new_movie = MovieModel(**data)
        self.database.add(new_movie)
        self.database.commit()

    def update_movie(self, movie_id: int, movie: Movie):
        result = self.database.query(MovieModel).filter(MovieModel.id == movie_id)

        data = movie.dict(exclude_unset=True)

        exclude_fields = ["id"]
        if all(field in list(data.keys()) for field in exclude_fields):
            raise Exception("Ha agregado campos no editables")

        result.update(data)
        self.database.commit()

    def delete_movie(self, movie_id: int):
        result = self.get_movie(movie_id)
        self.database.delete(result)
        self.database.commit()from models.movie import Movie as MovieModel
from schemas.movie import Movie
from config.database import Session


class MovieService(object):
    def __init__(self, database: Session):  # type: ignore
        self.database = database

    def get_movies(self):
        return self.database.query(MovieModel).all()

    def get_movie(self, movie_id: int):
        return self.database.query(MovieModel).filter(MovieModel.id == movie_id).first()

    def get_movies_by_category(self, category: str):
        return self.database.query(MovieModel).filter(MovieModel.category == category).all()

    def create_movie(self, movie: Movie):
        data = movie.dict(exclude_unset=True)

        exclude_fields = ["id"]
        if all(field in list(data.keys()) for field in exclude_fields):
            raise Exception("Ha agregado campos no editables")

        new_movie = MovieModel(**data)
        self.database.add(new_movie)
        self.database.commit()

    def update_movie(self, movie_id: int, movie: Movie):
        result = self.database.query(MovieModel).filter(MovieModel.id == movie_id)

        data = movie.dict(exclude_unset=True)

        exclude_fields = ["id"]
        if all(field in list(data.keys()) for field in exclude_fields):
            raise Exception("Ha agregado campos no editables")

        result.update(data)
        self.database.commit()

    def delete_movie(self, movie_id: int):
        result = self.get_movie(movie_id)
        self.database.delete(result)
        self.database.commit()
me aburro
`    def delete_movie(``self``, ``id``: int):        result = ``self``.get_movie(``id``)        ``self``.db.delete(result)        ``self``.db.commit()        return`
mi aporte a mi manera ```python ##service.py from config.database import Session from models.history import History class HistoryService(): def __init__(self) -> None: self.db = Session() self.model = History def get_histories(self): result = self.db.query(self.model).all() return result def get_history(self, id:int): result = self.db.query(self.model).filter(self.model.id==id).first() return result def create_history(self, history): new_history = self.model(**history.dict()) self.db.add(new_history) self.db.commit() def put_history(self, id, history): result = self.db.query(self.model).filter(self.model.id==id).first() if not result: return result result.path_file = history.path_file self.db.commit() return self.get_history(id) def delete_history(self, id:int): result = self.db.query(self.model).filter(self.model.id==id).first() if not result: return result self.db.delete(result) self.db.commit() ##router.py class History(BaseModel): path_file:str @history_router.post('/history/', tags=['history'], response_model=dict)#, dependencies=[Depends(JWTBearer())]) def create_history(history: History) -> JSONResponse: HistoryService().create_history(history=history) return JSONResponse(status_code=201, content={'message': 'Create register'}) @history_router.get('/histories/', tags=['history'], response_model=List[History])#, dependencies=[Depends(JWTBearer())]) def get_Histories() -> List[History]: result = HistoryService().get_histories() return JSONResponse(status_code=201, content=jsonable_encoder(result)) @history_router.get('/history/{id}', tags=['history'], response_model=History)#, dependencies=[Depends(JWTBearer())]) def get_history(id: int = Path(ge=1, le=2000)) -> History: result = HistoryService().get_history(id=id) if not result: return JSONResponse(status_code=404, content={'message': 'No found register'}) return JSONResponse(status_code=201, content=jsonable_encoder(result)) @history_router.put('/history/{id}', tags=['history'], response_model=dict)#, dependencies=[Depends(JWTBearer())]) def get_history(id: int, history: History) -> dict: result = HistoryService().put_history(id, history) if not result: return JSONResponse(status_code=404, content={'message': 'No found register'}) return JSONResponse(status_code=201, content=jsonable_encoder(result)) @history_router.delete('/history/{id}', tags=['history'], response_model=dict) def detele_history(id: int) -> dict: result = HistoryService().delete_history(id) if not result: return JSONResponse(status_code=404, content={'message': 'No found register'}) return JSONResponse(status_code=404, content={'message': 'The register was deleted'}) ```

Mi solución

router

@movie_router.delete('/movies/{id}',  tags=['movies'], response_model=dict, status_code=200)
def delete_movie(id: int= Path(ge=1, le=2000)) -> dict:
    db = Session()
    result = MovieService(db).get_movie(id)
    if not result:
        return JSONResponse(status_code=404, content={'message': movie_not_found})
    MovieService(db).delete_movie(id)
    return JSONResponse(status_code=200, content={"message": "The movie has been removed"})

Mi solucion

servicio

    def delete_movie(self, id: int):
        movie = self.db.query(MovieModel).filter(MovieModel.id == id).first()
        self.db.delete(movie)
        self.db.commit()

Mi aporte para el reto de borrado de peliculas de la BD:

-Dentro de services/movie.py en la clase MovieService:
def delete_movie(self, id):
        result = self.db.query(MovieModel).filter(MovieModel.id == id).first()
        self.db.delete(result)
        self.db.commit()
        return result

asi con el return result, puedes usarlo para mostrar la pelicula que fue borrada y con esa variable local posteriormente podrias agregar una funcion que recupere lo borrado.

mi solucion al reto

def delete_movie(self,id:int):
        result=self.db.query(MovieModel).filter(MovieModel.id==id).first()
        self.db.delete(result)
        self.db.commit()
        return
def delete_movie(id: int)-> dict:
    db=Session()
    result=MovieService(db).get_movie(id)
    if not result:
        return JSONResponse(status_code=404,content={"message":"Not found "})
    MovieService(db).delete_movie(id)
    return JSONResponse(status_code=200, content={"message": "Se ha eliminado la película"})