Mi aporte al reto
def delete_movie(self, id : int):
result = self.get_movie(id)
self.db.delete(result)
self.db.commit()
return
Conexión con bases de datos
SQLAlchemy: el ORM de FastAPI
Instalación y configuración de SQLAlchemy
Creación de modelos
Registro de datos
Consulta de datos
Modificación y eliminación de datos
SQLModel: el futuro ORM de FastAPI
Modularización
Manejo de errores y middlewares
Creación de routers
Servicios para consultar datos
Servicios para registrar y modificar datos
Despliegue
Preparando el proyecto para desplegar a producción
Creando el repositorio en GitLab
Creando el Droplet en Digital Ocean
Instalación de herramientas para el servidor
Ejecutando FastAPI con NGINX
Próximos pasos
¿Quieres más cursos de FastAPI?
No tienes acceso a esta clase
¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera
Aportes 13
Preguntas 2
Mi aporte al reto
def delete_movie(self, id : int):
result = self.get_movie(id)
self.db.delete(result)
self.db.commit()
return
Excelente Framework muy versátil !!!
Comparto mi proyecto utilizando MongoDB
https://github.com/cmtoro/Python-FastAPI-MovieAPP
from models.movie import Movie as MovieModel
from schemas.movie import MovieAPI
class FilmsService():
def __init__(self, db) -> None:
self.db = db
def get_films(self):
result = self.db.query(MovieModel).all()
return result
def get_movie_id(self, id):
result = self.db.query(MovieModel).filter(MovieModel.id == id).one_or_none()
return result
def get_films_by_category(self, category):
result = self.db.query(MovieModel).filter(MovieModel.category == category).all()
return result
def get_films_by_year(self, year):
result = self.db.query(MovieModel).filter(MovieModel.year == year).all()
return result
def get_films_by_category_and_year(self, category, year):
result = self.db.query(MovieModel).filter(MovieModel.category == category and MovieModel.year == year).all()
return result
def create_movie(self, movie: MovieAPI):
new_movie = MovieModel(**movie.dict())
self.db.add(new_movie)
self.db.commit()
return
def update_movie(self, id: int, data: MovieAPI):
movie_update = self.get_movie_id(id)
movie_update.update(data.dict(exclude_unset=True))
self.db.commit()
self.db.refresh(movie_update)
return
def delete_movie(self, id : int):
movie_delete = self.get_movie_id(id)
self.db.delete(movie_delete)
self.db.commit()
return
def delete_all_movie(self):
all_films_delete = self.get_films()
for movie in all_films_delete:
self.db.delete(movie)
self.db.commit()
return
from fastapi import APIRouter
from fastapi import HTTPException, status, Path, Query, Depends
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from typing import Union, List
from config.database import Session
from models.movie import Movie as MovieModel
from middlewares.jwt_bearer import JWTBearer
import datetime
from fastapi import HTTPException
from schemas.movie import MovieAPI
from services.films import FilmsService
films_router = APIRouter()
# creacion de la ruta peliculas, y la etiqueta peliculas
@films_router.get('/films',
tags=['films'],
status_code=status.HTTP_200_OK,
summary="All films",
response_model=List[MovieAPI],
# dependencies=[Depends(JWTBearer())]
)
def get_films() -> List[MovieAPI]: # devuelve el listado de las peliculas
"""
Obtener todas las peliculas
"""
db = Session()
result = FilmsService(db).get_films()
if not result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List films empty")
return JSONResponse(status_code=status.HTTP_200_OK,
content={
"message": "Get all films successfully",
"details": jsonable_encoder(result)
}
)
@films_router.get(path="/films/read/{id}",
tags=["films"],
status_code=status.HTTP_200_OK,
summary="Movie by id",
response_model=MovieAPI,
dependencies=[Depends(JWTBearer())])
def get_movie_by_id(id: int = Path(ge=1, le=2000)) -> MovieAPI:
"""
Obtener pelÃcula por id por parámetro de ruta
"""
db = Session()
result = FilmsService(db).get_movie(id)
if not result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Id movie not found")
return JSONResponse(status_code=status.HTTP_200_OK,
content={
"message": "Get movie por id successfully",
"details": jsonable_encoder(result)
}
)
@films_router.get('/films/read/',
tags=['films'],
status_code=status.HTTP_200_OK,
summary="Movie by category our year",
response_model=List[MovieAPI],
dependencies=[Depends(JWTBearer())]
)
# Cuando no se deja un parámetro en la ruta pero si en la función, fastAPI detecta que es por query
def get_movie_by_category_our_year(category: Union[str, None] = Query(None, min_length=5, max_length=20),
year: Union[int, None] = Query(None, ge=1900, le=datetime.date.today().year)) -> List[MovieAPI]:
"""
Obtener pelÃcula/as por una categorÃa o año por Query Parameters
"""
db = Session()
if category is None and year is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Coloque algo, no sea pendejo")
elif year is None:
result = FilmsService(db).get_films_by_category(category)
if not result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category of movie not found")
print("DEBUGGER")
return JSONResponse(status_code=status.HTTP_200_OK,
content={
"message": f"Get all films for category {category} successfully",
"details": jsonable_encoder(result)
}
)
elif category is None:
result = FilmsService(db).get_films_by_year(year)
if not result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Year of movie not found")
print("DEBUGGER")
return JSONResponse(status_code=status.HTTP_200_OK,
content={
"message": f"Get all films for year {year} successfully",
"details": jsonable_encoder(result)
}
)
else:
result = FilmsService(db).get_films_by_category_and_year(category, year)
if not result:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category our year of movie not found")
print("DEBUGGER")
return JSONResponse(status_code=status.HTTP_200_OK,
content={
"message": f"Get all films for year {year} and category {category} successfully",
"details": jsonable_encoder(result)
}
)
@films_router.post('/films/create/',
tags=['films'],
status_code=status.HTTP_201_CREATED,
summary="Add Movie to films",
response_model=dict,
dependencies=[Depends(JWTBearer())]
)
async def create_movie(movie: MovieAPI) -> dict:
"""
Agregar una pelÃcula por parámetros en el body, ID se coloca de manera automatica segun orden que sigue en nuestros "films"
"""
db = Session()
new_movie = FilmsService(db).create_movie(movie)
return JSONResponse(status_code=status.HTTP_201_CREATED,
content={"message": "Se ha registrado la pelÃcula"})
@films_router.put(
"/films/update/{id}",
tags=['films'],
status_code=status.HTTP_200_OK,
summary="Update movie",
response_model=dict,
dependencies=[Depends(JWTBearer())]
)
async def update_movie( movie: MovieAPI, id: int = Path(ge=1, le=2000)) -> dict:
"""
Actualizar una pelÃcula por parámetros en el body buscando por el parámetro de ID
"""
db = Session()
movie_update = FilmsService(db).get_movie_id(id)
if not movie_update:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="ID No found")
FilmsService(db).update_movie(id, movie)
return JSONResponse(status_code=status.HTTP_200_OK,
content={"message": "Updated movie successfully",
"details": jsonable_encoder(movie_update)
}
)
@films_router.delete(
"/films/delete/{id}",
tags=['films'],
status_code=status.HTTP_200_OK,
summary="Delete movie",
response_model=dict,
dependencies=[Depends(JWTBearer())]
)
async def delete_movie(id: int = Path(ge=1, le=2000)) -> dict:
"""
Eliminar una pelÃcula por el parámetro de ID
"""
db = Session()
movie_delete = FilmsService(db).get_movie_id(id)
if not movie_delete:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="ID No found")
FilmsService(db).delete_movie(id)
return JSONResponse(status_code=status.HTTP_200_OK,
content={
"message": "Deleted movie successfully",
"details": jsonable_encoder(movie_delete)
})
@films_router.delete(
"/films/delete/",
tags=['films'],
status_code=status.HTTP_200_OK,
summary="Delete all movie",
response_model=dict,
dependencies=[Depends(JWTBearer())]
)
async def delete_all_movie() -> dict:
"""
Eliminar una pelÃcula por el parámetro de ID
"""
db = Session()
all_films_delete = FilmsService(db).get_films()
if not all_films_delete:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="List films empty")
FilmsService(db).delete_all_movie()
return JSONResponse(status_code=status.HTTP_200_OK,
content={
"message": "Delete all films successfully",
"details": jsonable_encoder(all_films_delete)
}
)
Como en el método put y post recibo la data destructurada entonces he decidido proteger el id para evitar de su modificación.
from models.movie import Movie as MovieModel
from schemas.movie import Movie
from config.database import Session
class MovieService(object):
def __init__(self, database: Session): # type: ignore
self.database = database
def get_movies(self):
return self.database.query(MovieModel).all()
def get_movie(self, movie_id: int):
return self.database.query(MovieModel).filter(MovieModel.id == movie_id).first()
def get_movies_by_category(self, category: str):
return self.database.query(MovieModel).filter(MovieModel.category == category).all()
def create_movie(self, movie: Movie):
data = movie.dict(exclude_unset=True)
exclude_fields = ["id"]
if all(field in list(data.keys()) for field in exclude_fields):
raise Exception("Ha agregado campos no editables")
new_movie = MovieModel(**data)
self.database.add(new_movie)
self.database.commit()
def update_movie(self, movie_id: int, movie: Movie):
result = self.database.query(MovieModel).filter(MovieModel.id == movie_id)
data = movie.dict(exclude_unset=True)
exclude_fields = ["id"]
if all(field in list(data.keys()) for field in exclude_fields):
raise Exception("Ha agregado campos no editables")
result.update(data)
self.database.commit()
def delete_movie(self, movie_id: int):
result = self.get_movie(movie_id)
self.database.delete(result)
self.database.commit()from models.movie import Movie as MovieModel
from schemas.movie import Movie
from config.database import Session
class MovieService(object):
def __init__(self, database: Session): # type: ignore
self.database = database
def get_movies(self):
return self.database.query(MovieModel).all()
def get_movie(self, movie_id: int):
return self.database.query(MovieModel).filter(MovieModel.id == movie_id).first()
def get_movies_by_category(self, category: str):
return self.database.query(MovieModel).filter(MovieModel.category == category).all()
def create_movie(self, movie: Movie):
data = movie.dict(exclude_unset=True)
exclude_fields = ["id"]
if all(field in list(data.keys()) for field in exclude_fields):
raise Exception("Ha agregado campos no editables")
new_movie = MovieModel(**data)
self.database.add(new_movie)
self.database.commit()
def update_movie(self, movie_id: int, movie: Movie):
result = self.database.query(MovieModel).filter(MovieModel.id == movie_id)
data = movie.dict(exclude_unset=True)
exclude_fields = ["id"]
if all(field in list(data.keys()) for field in exclude_fields):
raise Exception("Ha agregado campos no editables")
result.update(data)
self.database.commit()
def delete_movie(self, movie_id: int):
result = self.get_movie(movie_id)
self.database.delete(result)
self.database.commit()
Mi solución
router
@movie_router.delete('/movies/{id}', tags=['movies'], response_model=dict, status_code=200)
def delete_movie(id: int= Path(ge=1, le=2000)) -> dict:
db = Session()
result = MovieService(db).get_movie(id)
if not result:
return JSONResponse(status_code=404, content={'message': movie_not_found})
MovieService(db).delete_movie(id)
return JSONResponse(status_code=200, content={"message": "The movie has been removed"})
Mi solucion
servicio
def delete_movie(self, id: int):
movie = self.db.query(MovieModel).filter(MovieModel.id == id).first()
self.db.delete(movie)
self.db.commit()
Mi aporte para el reto de borrado de peliculas de la BD:
-Dentro de services/movie.py en la clase MovieService:
def delete_movie(self, id):
result = self.db.query(MovieModel).filter(MovieModel.id == id).first()
self.db.delete(result)
self.db.commit()
return result
asi con el return result, puedes usarlo para mostrar la pelicula que fue borrada y con esa variable local posteriormente podrias agregar una funcion que recupere lo borrado.
mi solucion al reto
def delete_movie(self,id:int):
result=self.db.query(MovieModel).filter(MovieModel.id==id).first()
self.db.delete(result)
self.db.commit()
return
def delete_movie(id: int)-> dict:
db=Session()
result=MovieService(db).get_movie(id)
if not result:
return JSONResponse(status_code=404,content={"message":"Not found "})
MovieService(db).delete_movie(id)
return JSONResponse(status_code=200, content={"message": "Se ha eliminado la pelÃcula"})
¿Quieres ver más aportes, preguntas y respuestas de la comunidad?