다음과 같이 fastapi에서 background task를 할 수 있다.
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
하지만 api call을 안때리고 startup에서 하고싶다면?
예를 들어
@app.on_event("startup")
async def on_app_start(background_tasks: BackgroundTasks):
background_tasks.add_task(...)
이런식으로 말이다.
하지만 startup을 훅으로 가지는 함수에는 다음과 같이 parameter를 받을 수 없고, 그렇다면
background_tasks = BackgroundTasks()
@app.on_event("startup")
async def on_app_start():
background_tasks.add_task(...)
이렇게 하면 어떨까?
실행해보면 add_task에 전달되는 task 함수가 실행이 안된다. 무슨문제인지는 모르겠지만,,,
그래서 구글링을 해보니 startup에서 background task를 하는 방법으로 asyncio.create_task()가 있었다.
그냥 단순히 아래처럼 작성하면 된다.
@app.on_event("startup")
async def on_app_start():
logger.info("App initalize")
asyncio.create_task(task)
사실상 asyncio.create_task만 실행시키면 no running loop 에러가 뜨기 마련인데 uvicorn으로 실행시키면 이벤트 루프가 도는 것 같아서 에러가 안뜨고 task를 넣어 실행시켜준다.
좋은 방법이 있다면 알려주길 바란다.
'Fastapi' 카테고리의 다른 글
내맘대로 Fastapi docs 정리(Pydantic data types, Pydantic Nested Models) (0) | 2022.06.28 |
---|---|
내맘대로 Fastapi Docs 정리(Query, Path, Body parameters) (0) | 2022.06.28 |
내맘대로 Fastapi Document summary[9] (0) | 2021.11.25 |
내맘대로 Fastapi Document summary[8] (0) | 2021.11.24 |
내맘대로 Fastapi Document summary[7] (0) | 2021.11.23 |