본문 바로가기
Fast API/fastapi배우기

Fast API 배우기 23부 - SQL 데이터베이스

by 붕어사랑 티스토리 2021. 11. 8.
반응형

FastAPI는 SQLAlchemy(파이썬에서 데이터베이스를 다루는 툴)을 이용해 데이터 베이스를 다를 수 있다.

 

아래가 대표적인 예시이다.

 

  • PostgreSQL
  • MySQL
  • SQLite
  • Oracle
  • Microsoft SQL Server

 

이번 예제에서는 SQLite를 이용해 설명을 해보도록 하겠다. SQLite는 singe file로 관리되고 파이썬이 내부적으로 서포트 하고 있는 데이터베이스이다.

 

 

ORM

Object Relational Mapping 객체를 데이터베이스 테이블과 매칭시켜주는 tool이다.

한마디로 정의하면 프로그래밍 코드로 데이터베이스를 다룰 수 있다는걸 의미한다.

 

앞으로 나올 내용에서는 SQLAlchemy와 Pydantic 모델 내용이 나온다.

 

SQLAlchemy : 실제 DB에 사용되는 모델

Pydantic : 앞서부터 계속 써왔던 Request Body에서 배웠던 클래스모델

 

여기서 우리는 이 Pydantic Model과 SQLAlchemy를 ORM Mode라는걸 통해 연동하는것을 배울 것이다.

 

 

 

 

파일구조

my_super_project 라는 프로젝트 폴더를 만들고 하위 디렉토리로 sql_app을 만들 것이다.

파일 구조는 다음과 같이 된다.

 

.
└── sql_app
    ├── __init__.py
    ├── crud.py
    ├── database.py
    ├── main.py
    ├── models.py
    └── schemas.py

 

 

SQLAlchemy parts

 

sql_app/database.py 를 열고 다음과 같이 작성한다.

 

from sqlalchemy import create_engine

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy.orm import sessionmaker


SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

 

 

먼저 하기와 같이 필요한 코드들을 import 한다.

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

 

그리고 데이터베이스 URL을 만든다

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

위 예제에서는 sqlite 데이터베이스에다가 connecting 했다. url 구조를 보면 현재 폴더에 있는 sql_app.db 파일을 여는거와 같다는걸 알 수 있다.

 

먼역햐 PostgreSQL 데이터베이스를 쓰고싶다면 상기 주석된 url을 해제하여 사용하면 된다.

 

 

 

Create the SQLAlchemy engine

다음으로는 engine을 create 해준다.

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)

 

여기서 주목할점은 하기 세팅은 SQLite에서만 필요하고 다른 데이터베이스는 필요하지 않다.

connect_args={"check_same_thread": False}

 

 

 

Create a SessionLocal class

각각의 SessionLocal 클래스 객체는 데이타베이스의 세션이 된다.

 

변수명은 SessionLocal로 한다. 추후 사용할 Session과 이름 충돌을 막기 위해서이다.

 

SessionLocal클래스를 생성하려면 sessionmaker를 이용한다.

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

 

 

Create a Base class

자 이제 declarative_base() 함수를 이용하 클래스 하나를 리턴받자.

추후 우리는 이 클래스를 상속바다 데이터베이스의 모델이나 ORM 클래스를 생성할 것이다.

Base = declarative_base()

 

 

 

 

 

 

데이터베이스 model 생성하기

 

자 이제 sql_app/models.py 파일로 넘어가자

 

 

먼저 database.py에서 만든 Base클래스를 import 하고 이를 상속하여 데이터베이스 모델을 만든다.

 

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship


from .database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    items = relationship("Item", back_populates="owner")



class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

 

__tablename__ 속성은 SQLAlchemy의 테이블이름을 의미한다.

 

Column 함수를 이용해 각 데이터의 column들을 만들어 줄 수 있다.

데이터 type, primary_key 여부 등을 입력해준다.

id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)

여기서 unique는 유일 식별자로 대체키나 후보키 역할을 하는 걸 지정하는거고

index 변수는 쿼리의 퍼포먼스를 올려주는데 사용된다고 한다.

 

 

 

그리고 아래 relationship 함수를 이용하여 데이터베이션 간 relation으로 연결이 가능하다.

items = relationship("Item", back_populates="owner")

 

SQLAlchemy에 대한 자세한 내용은 하기 링크를 참조하자.

나중에 한번 정리 해야겠다.

https://docs.sqlalchemy.org/en/14/orm/basic_relationships.html

 

Basic Relationship Patterns — SQLAlchemy 1.4 Documentation

One To One is essentially a bidirectional relationship with a scalar attribute on both sides. Within the ORM, “one-to-one” is considered as a convention where the ORM expects that only one related row will exist for any parent row. The “one-to-one”

docs.sqlalchemy.org

 

 

 

초기 Pydantic 모델 / 스키마 생성하기

다음으로 sql_app/schemas.py로 넘어가자

 

ItemBase와 UserBase를 만들고 이를 상속받아 ItemCreate와 UserCreate를 만든다.

from typing import List, Optional
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Optional[str] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int
    class Config:
        orm_mode = True


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str



class User(UserBase):
    id: int
    is_active: bool
    items: List[Item] = []

    class Config:
        orm_mode = True

 

위 코드를 보고 몇가지 의문점이 들 것이다

 

왜 SQLAlchemy 모델을 만들고, Pydantic 모델을 따로만들어요?

우린 Pydantic모델을이용해서 API에서 읽을 데이터 형식을 지정해주고 이것을 return으로 사용해 주었다.

Pydantic 모델은 즉 API의 데이터를 읽고 리턴할때 쓰는 모델이다. 원래 DB에서 사용되는 모델이 아닌 것 이다.

 

우리가 만약 SQLAlchemy의 User모델을 DB에 넣어준다고 하자. 우리가 DB에 데이터를 넣기 전 까지 SQLAlchemy의 User에 id는 모른다. id는 키값이고 생성되지 않았으니깐.

 

대충 돌아간 폼이 어떠나면

 

API에서 데이터를 받아오려면 Pydantic 모델을 사용했는데 이떄 위코드에서 Pydantic의 UserCreate를 이용하여 데이터를 받아오고, 이를통해 SQLAlchemy User모델을 만든뒤 DB에 넣어준 후, 만약 이를 리턴하고 싶으면 Pydantic의 User를 사용하여 리턴하는 것 이다.

 

(그냥 솔직히 말하면 공식문서가 헤깔리게 설명해놔씀...)

 

 

 

orm_mode

위 코드를 보면 Pydantic model에 orm_mode라는게 추가되어있는걸 볼 수 있다.

 

class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        orm_mode = True

 

이 모드가 켜지면 dict 자료형이 아닌데도 dict처럼 값을 읽게 해 주고 SQLAlchemy의 ORM 모델처럼 사용할 수 있게 해준다.

 

만약 아래처럼 값을 읽으면

 

id = data["id"]

아래코드까지 같이 시도 한다(라고 한다...)

id = data.id

 

그리고 Pydantic 모델이 이제 SQLAlchemy로 맵핑 될 수 있기에, response_model값으로 pydantic 모델을 path operation에 선언해주면, SQLAlchemy모델을 API에서 리턴해주면 알아서 pydantic Model로 변환해준다.

 

이게 뭔말이냐?

 

이게 무슨말인지 알기 위해서는 SQLAlchemy의 lazy loading에 대해 이해 해야 한다

SQLAlchemy는 릴레이션이 걸린 attribute를 아래처럼 접근하지 않으면 릴레이션이 걸린 데이터를 로딩하지 않는다

current_user.items

(예시코드에서 user와 item은 릴레이션 관계였던걸 기억!)

그래서 orm_mode 없이 SQLAlchemy를 API에서 리턴해버리면 릴레이션 데이터를 로딩하지 않고 리턴하게 된다.

 

반대로 orm_mode를 주고 SQLAlchemy를 리턴하면, pydantic모델로 변환 과정에서 데이터를 액세스 하게 되고 릴레이션 데이터를 불러와서 같이 리턴해준다.

 

 

 

CRUD utils

자 이제 sql_app/crud.py를 보자

 

이 파일에서는 데이터베이스와 소통하는 함수를 만들것이다.

 

CRUD의 의미는 아시다 시피 다음과 같다.

 

CREATE, READ, UPDATE, DELETE

from sqlalchemy.orm import Session
from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item
반응형

Read data

sqlalchemy.orm에서 Session을 import 한다. 

그리고 이 Session을 통해 db라는 파라미터를 만들고 데이터베이스와 소통할 수 있다.

 

그리고 models.py를 import 한다(SQLAlchemy model들) 그리고 schemas.py를 임포트 한다(Pydantic models / schemas)

 

그리고 아래와 같은 utility 함수들을 만든다

 

  • ID와 email로 한명의 유저 정보를 읽는 함수
  • 여러 유저 정보를 읽는 함수
  • 여러 아이템을 읽는 함수

 

def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()

 

Create Data

자 다음오로 데이터베이스에 데이터를 만드는 함수를 만들어 보자

 

다음과 같은 순서로 진행한다.

  • SQLAlchemy model 객체를 만든다
  • 만든 객체를 add 함수를 이용하여 데이터베이스에 추가한다.
  • commit 함수를 이용해 데이터베이스에 반영한다.
  • refresh 함수를 이용해 데이터베이스의 데이터를 최신으로 갱신한다.
def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

 

 

 

 

Main FastAPI app

자 이제 마지막으로 여지껏 했던 작업들을 main에 작성하여 통합하자.

 

데이터베이스 테이블 만들기

sql_app/main.py를 작성한다.

 

from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

engine을 연결해준다.

 

models.Base.metadata.create_all(bind=engine)

 

 

디펜던시에서 배웠던 디펜던시 + yield 조합을 통하여 리퀘스트에서 db세션을 생성하고 리스폰스시에 db를 종료하도록 코드를 작성한다

 

# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

 

 

그리고 이를 디펜던시로 사용한다. 디펜던시로 사용할때 자료형을 db의 자료형을 Session으로 지정해준다.

 

def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
def read_user(user_id: int, db: Session = Depends(get_db)):
def create_item_for_user(user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)):
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):

 

 

그리고 마지막으로 path operation을 구현시켜주면 된다.

 

@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

 

 

DB에 Middleware 추가하기

아래와 같이 middleware를 DB에도 추가 할 수 있다.

 

# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

 

위 예제를 아래처럼 바꿔주면 미들웨어 추가가 가능하다

 

@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response


# Dependency
def get_db(request: Request):
    return request.state.db

 

 

미들웨어와 디펜던시 + yield 차이

  • 미들웨어는 좀더 많은 코드와 복잡도를 요구한다
  • 미들웨어는 async funtion이여야 한다.
  • 만약 미들웨어에 wait가 있을경우 퍼포먼스에 문제가 생길수 있다.
  • 미들웨어는 매 리퀘스트마다 실행되므로 매 리퀘스트마다 DB에 접속한다

 

마지막 이유로 미들웨어에 db를 쓰지말고 Dependency + yield 조합을 사용하자

반응형

댓글