본문 바로가기
Fast API/fastapi배우기

Fast API 배우기 20부 - Security, authentication 인증 시스템

by 붕어사랑 티스토리 2021. 11. 4.
반응형

들어가기에 앞서 큰그림부터 살짝 훑자.

뭔말인지 이해안되면 이런게 있다 싶다는 정도로 넘어가면 된다.

OAuth2

인증과 권한부여와 관련된 시스템이라고 생각하면 된다. 3rd party 인증시스템도 제공한다.

구글계정으로 로그인하기, 페이스북계정으로 로그인하기 이런것들 말이다.

 

 

 

OpenID Connect

OAuth2를 기반으로 하는 인증 시스템이다. OAuth2안에 애매모호한것들을 구체화 한 extends이기도 하다.

 

구글 로그인은 OpenID를 지원한다(물론 OAuth2기반 하에) 그러나 페이스북 로그인은 OpenID를 지원하지 않는다.

 

 

 

OpenID(위의 OpenID Connect와 다름)

OpenID Connect 처럼 만들어졌으나 OAuth2 기반이 아니다. 그리고 잘 안쓰인다

 

 

 

OpenAPI

누구나 사용할 수 있도록 공개된 API. fastapi는 openapi 기반이다.

 

OpenAPI는 여러가지 security 스키마를 가지고 있다.

 

사용하는 security 스키마들은 하기와 같다.

  • apiKey : 어플리케이션을 특정하게 하는 키. 쿼리, 헤더, 쿠키에서 얻어온다
  • http : http authenication 시스템이다. 하기 내용을 포함한다
      -  bearer : Bearer값과 토큰을 포함하고 있는 Autorization 헤더이다. OAuth2를 상속한다
      -  HTTP 기반 authentication이다
      -  HTTP digest 인증기법 이용한다
  • oauth2 : 모든 OAuth2 security에 관한 내용을 다룬다. 이를 flow라고 한다
  • openIdConnect : OAuth2 인증데이터를 자동으로 찾아준다.

 

 

 

 

자 그럼 이제 본격적으로 시작하자.

 

예를들자.

 

당신이 backend API 시스템을 가지고있다. 그리고 frontend 시스템도 쿠축했다고 하자.

당신은 username과 password드로 authentication시스템을 구축하고 싶어한다.

 

여기서 우리는 OAuth2 를 이용할 수 있을것이다.

 

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}

상기와 같이 코드를 작성하고 서버를 가동한뒤 docs를 실행시키자

 

 

그럼 아래와 같이 나온다

Authorize 버튼이 생겼다. 저걸 클릭하자

 

위와같은 창이 뜬다.

 

이놈이 뭐하는거냐면 리퀘스트 try it 하듯이 authenication 즉 로그인 하는거 테스트하는 툴이다.

 

 

 

 

Password flow

아까 OAuth2의 security를 다루는걸 flow라고 했다. password flow라 하면 비밀번호로 보안을 다룬다 라는 뜻이 된다.

 

 

원래 OAuth2는 백엔드나 API가 authentication 해주는 서버랑 독립되도록 디자인했지만 하기에 나오는 예제는 일단 같은 application에 때려박겠다.

 

 

 

password flow는 아래와 같은 방식으로 처리된다

 

  • 유저가 프론트엔드단에서 username 과 password를 적고 Enter를 누른다
  • 프론트엔드는 username과 password를 정해진 API의 URL에 보낸다.(tokenUrl="token" 이 선언된)
  • API는 username과 password를 확인한뒤 reponse로 token을 넘겨준다(앞선예제에선 아직 안나와있음)
    token이란 단순한 string이며 유저를 확인하기 위해 사용된다
    프론트엔드에서는 이 token을 어디다가 저장한다.
    token은 expiration이 있다. 즉 유통기한이 있고 시간지나면 쓸수가 없다.
    token이 유통기한이 지나지 않으면 유저가 웹페이지를 돌아다닐때 추가로 authentication을 할 필요가 없다
    token이 유통기한이 지나면 유저는 다시 로그인 해야된다
  • 프론트엔드에서는 로그인하면 웹페이지를 이곳저곳 막 돌아다닌다.
  • 이곳저곳 막 돌아다니다 어떤 데이터를 백엔드에 요청할 일이 생긴다.
    이때 특정 endpoints에서는 authentication이 필요하다.
    이때 프론트엔드에서는 헤더에 Authorization이라는걸 Bearer + token 값과 함께 보낸다.
  • 만약에 token이 foobar라는 값을 contain하면 Authorization 헤더는 Bearer foobar가 된다.

 

 

 

FastAPI's OAuth2PasswordBearer

이제 앞으로 나올 예제에서는 OAuth2와 Bearer token을 사용한 Password flow를 알아볼거다

 

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}

 

 

OAuth2PasswordBearer 객체를 생성할때 tokenUrl이라는 파라미터를 넘겨준다. 이 파라미터는 프론트엔드에서 유저가 token값을 얻어 올 때 사용된다.

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

그리고 이 tokenUrl은 상대경로이다. 만약 API가 https://example.com/ 에 있다면

token url은 https://example.com/token 이 된다.

같은 방식으로 https://example.com/api/v1/의 token url은 https://example.com/api/v1/token 이다.

 

 

 

여기서 oauth2_scheme variable은 객체이지만 callbale이다. 

즉 Depends에 사용이 가능하다!

async def read_items(token: str = Depends(oauth2_scheme)):

 

 

위 예제의 API는 이제 리퀘스트가 오면 Authorization header를 확인한다. 앞서 말한것 처럼 Bearer + token 값이 있는지 확인후 값이 있다면 token을 str로 리턴해준다.

 

만약에 Authorization헤더가 없거나 값이 Bearer token이 아니면, 401 에러를 일으킨다.

 

docs로 돌아가서 무지성 리퀘스트를 한번 날려주면 다음과 같이 401에러가 나는걸 볼 수 있다.

 

 

 

 

 

 

Get Current User 현재 유저정보 가져오기

 

다음 예제는 이해를 위한 중간설명이라고 생각하면 되겠다.

실제로 안돌아가는 코드이니 읽기만 하도록. 왜 이런 페이지를 만든건지 이해안감.

 

authenication을 통해 현재 유저정보를 가져올 것이다.

 

from typing import Optional

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer

from pydantic import BaseModel


app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")



class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None



def fake_decode_token(token):
    return User(
        username=token + "fakedecoded", email="john@example.com", full_name="John Doe"
    )


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    return user


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

 

 

먼저 유저 모델을 만들어준다.

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

 

 

 

아래와 같이 디펜던시인 get_current_user 를 만들어준다.

그리고 이 디펜던시에 앞선 예제의 oauth2_scheme를 sub 디펜던시로 걸어주자

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    return user
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

 

 

get_current_user는 토큰을 얻었다면 fake_decode_token 함수를 호출하여 User모델을 리턴한다.

 

def fake_decode_token(token):
    return User(
        username=token + "fakedecoded", email="john@example.com", full_name="John Doe"
    )
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    return user

 

그럼 마지막으로 api가 User 정보를 리턴하게 된다.

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user
반응형

 

 

 

간단한 OAuth2 password flow and Bearer 예제

위 예제는 공식홈페이지에 나온건데 그냥 대략적인 flow 설명이고 사실 안돌아가는 망할 예제이다.

 

 

이제 진짜 본격적으로 Password flow를 시작해 보겠다.

 

 

첫번째로 간단한 예제로 이해를 한뒤 두번째 예제에서 좀더 실전으로 들어가겠음.

 

 

 

 

 

Username과 password 얻기

Password flow에서 유저는 반드시 username과 password를 보내야 한다.

 

그리고 field 이름은 반드시 username과 password 이여야 한다.

 

그리고 데이터형대는 JSON데이터가 아닌 form 데이터로 보내져야 한다.

 

 

 

scope

password flow에서 scope라는 field가 있다.

이 form filed는 기다란 str이며 스페이스로 구분된다.

 

스페이스로 구분되는 문자열을 scope라고 부른다.

 

이 scope들은 permission에 관한 정보를 가진다.

 

예를들면

 

  • users:read 나 users:write 같은 common한 예시가 있고
  • instagram_basic 같은 페이스북이나 인스타그램에서 쓰이는 scope가있고
  • https://www.googleapis.com/auth/drive 같이 구글에서 쓰이는 scope가 있다

 

 

자 이제 첫번째 쉬운 예제를 보자

아래가 전체 예제이다.

from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()


def fake_hash_password(password: str):
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

 

 

아래와 같이 OAuth2PasswordRequestForm 을 import하고 /token url에 Depends를 걸어준다

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

..................................................
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):

 

 

OAuth2PasswordRequestForm 은 클래스 디펜던시이다. 아래와 같은 form body를 가진다.

 

  • username
  • password
  • scope(Optional)
  • grant_type(Optional)
  • client_id(Optional)
  • client_secret(Optional)

(OAuth2 spec은 사실 grant_type이 required이다. 허나 OAuth2PasswordRequestForm 에선 이게 required가 아니다. grant_type을 required로 쓰고 싶으면 OAuth2PasswordRequestFormStrict 을 대신 써주자)

 

 

 

 

 

Username 확인

자 이제 그럼 fake database에서 username이 있는지 확인하자. 만약 error가 날 시 HTTPException을 일으킨다.

from fastapi import Depends, FastAPI, HTTPException, status
.......
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}
.......
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

바로 이부분

user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

 

 

 

Password hashing

데이터베이스에 유저네임이 있다면 이번에는 프론트엔드에서 전달된 password가 데이터베이스의 패스워드와 일치하는지 확인할 차례이다.

 

자 그런데 여기서 fake_user_db의 password를 보자. 패스워드라고 하기엔 너무 뒤죽박죽한 문자열이지 않은가?

 

사실 이건 패스워드를 hashing한 값이다.

hashing이란 특정 문자열을 다른 문자열로 변환시켜주는것을 의미한다.

1:1관계가 아니라서 패스워드를 해시값으로 변환하는건 되지만 해시값을 패스워드로 복원하는건 안되는 경우가 있다.

 

왜 이런 패스워드 해싱을 할까? 만약 데이터베이스가 도난당했다고 치자.

도난당한 데이터베이스에 실제 패스워드가 있다면 해커가 그걸보고 유저 계정을 로그인하여 해킹할수 있겠지만

만약 해싱된 패스워드를 본다면 해킹이 불가능하다.

 

그래서 패스워드 해싱을 사용한다.

 

def fake_hash_password(password: str):
    return "fakehashed" + password
..................
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

 

 

 

 

토큰값 리턴하기

password 플로우에서 로그인에 성공하면 reponse로 token을 준다고 했다. 이 token은 반드시 JSON object 이여야 한다.

 

또한 token은 반드시 access_token 과 token_type을  가져야 한다.

 

이 예제에서는 Bearer token 타입을 사용하므로 token_type값은 bearer가 된다.

 

access_token은 이 예제는 간단한 예제이니 그냥 username으로 해주자

 

    return {"access_token": user.username, "token_type": "bearer"}

 

(spec에 따르면 위 예제처럼 access_token과 token_type을 반드시 리턴해주어야 한다고 함)

 

 

 

디펜던시 업데이트 하기

자 이제 로그인도 되고 토큰도 리턴했다. 

 

유저가 acitve인 상태에서만 current_user를 리턴하고 아니면 에러를 일으키자.

 

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user
    
 ............
 
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

 

 

 

 

Docs에서 확인

 

자 이제 docs에서 아까 배운 툴을 확인해보자

 

 

User: johndoe

Password: secret

 

 

 

위와같이 로그인 되었다!

 

 

 

 

현재 유저데이터 얻어오기

자 이제 /users/me api를 try 하자. 그럼 아래와 같은 결과가 나온다.

 

 

 

 

만약 로그아웃을 하면 아래와 같은 response를 얻는다

{
  "detail": "Not authenticated"
}

inactive 유저인 alice도 테스트해보자!

 

User: alice

Password: secret2

 

그럼 아래와 같은 reponse를 얻는다.

{
  "detail": "Inactive user"
}

 

 

 

 

 

 

OAuth2 with Password&hashing, Bearer with JWT tokens

 

자이제 두번째 예제로 들어가자. 두번째 예제는 공식문서에 따르면 실전에도 적용가능한 내용이라고 한다.

 

 

 

 

JWT

들억기전에 JWT에 대해서 배워보자

 

JWT란 JSON Web Tokens의 약자이다.

 

JSON 객체를 기다란 공백없는 문자열로 아래처럼 만든것이다.

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

이건 암호화 하는게 아니여서 누구나 이 JWT를 다시 원복 가능하다

대신에 signed 즉 서명이 되어 있어서 토큰을 내보내면 실제로 내보내건지 확인이 가능하다.

 

이러한 방식으로, 1주일짜리 토큰을 유저한테 돌려줬다 치자. 유저가 토큰의 유통기한이 끝나기전에 토큰을 다시 들고 왔다면, 유저는 따로 로그인 다시해줄 필요없이 백엔드 시스템을 마음껏 이용 가능하다.

 

토큰이 만료되었으면 유저는 다시 로그인을 해주어야 하고, 만약 유저가 토큰을 modify, 예를들어 유통기한을 늘리려고 시도하면, signed된 데이터가 매칭되지 않아 이를 감지할 수 있다.

 

 

 

 

일단 환경세팅을 위해 하기 내용을 인스톨 하자

 

pip install "python-jose[cryptography]"
pip install "passlib[bcrypt]"

 

jose는 JWT를 위한 라이브러리고, passlib은 패스워드 해싱을 위한 라이브러리다.

 

 

 

 

 

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt

from passlib.context import CryptContext

from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str



pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()



def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)




def get_password_hash(password):
    return pwd_context.hash(password)



def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)



def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user



def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

 

워우 코드가 길다. 아래내용을 일단 주목하자

 

from passlib.context import CryptContext

......................................................................

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

......................................................................

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)
    
def get_password_hash(password):
    return pwd_context.hash(password)
    
......................................................................

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

 

 

 

JWT 토큰 다루기

 

하기와 같은 커맨드로 임의의 JWT 토큰을 얻어온다

 

$ openssl rand -hex 32

09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7

 

그리고 이를 복사해서 SECRET_KEY 값에더가 저장한다.

ALGORITHM 변수를 만들고 JWT sign에 활용될 값인 HS256을 적는다

그리고 마지막으로 토큰의 유통기한을 30으로 설정한다.

 

from jose import JWTError, jwt

...............................................................

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

...............................................................

class Token(BaseModel):
    access_token: str
    token_type: str

...............................................................

 def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

 

 

 

디펜던시 update

get_current_user 함수를 업데이트 해주자. 이번에는 JWT 토큰을 이용한다.

전달받은 토큰을 decode하고 current user를 리턴한다.

만약 토큰이 invalid하면 바로 HTTP 에러를 일으킨다.

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

 

/token path operation 업데이트 하기

timedelta를 이용하여 토큰의 만기 기한을 아까 정한 30분으로 해 준다.

그리고 JWT 토큰을 만든뒤 리턴해준다.

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

 

 

JWT의 sub

JWT에넌 sub 이라는게 있다. 이게 뭐냐면 subject of the token 이라는 뜻이다.

즉 토큰에 주제를 정할수 있다는것. 이것은 optional이다.

 

JWT는 단순히 유저 인증말고 다양한 곳에 사용이 가능하다.

 

예를들어 "차량" 또는 "블로그 포스트" 같은걸 식별한다고 치자

그런다음 "드라이브(차량용)" 또는 "편집(블로그용)" 같은 entity에 관한 권한을 추가 할 수 있다.

그리고 이를 JWT 토큰으로 제공하면 사용자는 계정을 보유할 필요없이 JWT 토큰만으로 작업을 할 수 있게된다.

 

이러한 아이디어를 이용하면, JWT는 훨씬 더 복잡한 시나리오에 사용 가능하다

 

이때 엔티티중 몇개는 같은 ID를 가지게 된다. 가령 사용자 foo, 차량 foo, 블로그 foo 이렇게 충돌이 난다.

ID충돌을 방지하기 위해서는 하위 키값에 prefix를 붙일 수 있다. 예를들면 username:johndoe 처럼 말이다.

 

명심해야 할 점은 하위키가 전체 응용프로그램에 걸쳐 고유한 식별자를 가지고 있어야 하며 문자열 이여야 한다.

 

 

 

docs 확인

 

혹시나 코드를 실행시켰는데 syntax에러가 나면 하기작업 수행

pip uninstall jose
pip install python-jose

 

이제 docs를 실행시키고 authenication을 테스트해보자

 

username : johndoe

password : secret

 

 

 

그리고 endpoint /users/me/ 에 리퀘스트를 날리면 하기와 같은 reponse를 얻는다.

 

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false
}

 

 

F12를 눌러서 network 탭에 가보면 token이 어떻게 동작하는지도 볼 수 있다.

패스워드는 첫 리퀘스트에만 보내지고 이후에는 보내지지 않는다.

 

 

 

scopes

OAuth2는 scopes라는 개념이 있다. 앞에 설명한것.

 

JWT token에 이 scopes를 이용해 여러가지 권한을 추가할 수 있다.

 

그리고 이걸 유저에게 직접 혹은 3rdparty 를 통해 전달해서 유저에게 API관하여 이것저것에 제한을 할 수 있다.

 

공식 홈페이지 Advanced User Guide에 자세히 나와있으니 참고

 

반응형

댓글