Skip to content

Feature branch #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import asyncio
import json


class Todo(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
content: str = Field(index=True)
Expand All @@ -27,14 +28,15 @@ class Todo(SQLModel, table=True):
connection_string, connect_args={}, pool_recycle=300
)

#engine = create_engine(
# engine = create_engine(
# connection_string, connect_args={"sslmode": "require"}, pool_recycle=300
#)
# )


def create_db_and_tables()->None:
def create_db_and_tables() -> None:
SQLModel.metadata.create_all(engine)


async def consume_messages(topic, bootstrap_servers):
# Create a consumer instance.
consumer = AIOKafkaConsumer(
Expand All @@ -49,7 +51,8 @@ async def consume_messages(topic, bootstrap_servers):
try:
# Continuously listen for messages.
async for message in consumer:
print(f"Received message: {message.value.decode()} on topic {message.topic}")
print(f"Received message: {
message.value.decode()} on topic {message.topic}")
# Here you can add code to process each message.
# Example: parse the message, store it in a database, etc.
finally:
Expand All @@ -62,36 +65,41 @@ async def consume_messages(topic, bootstrap_servers):
# https://fastapi.tiangolo.com/advanced/events/#lifespan-function
# loop = asyncio.get_event_loop()
@asynccontextmanager
async def lifespan(app: FastAPI)-> AsyncGenerator[None, None]:
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
print("Creating tables..")
# loop.run_until_complete(consume_messages('todos', 'broker:19092'))
task = asyncio.create_task(consume_messages('todos', 'broker:19092'))
create_db_and_tables()
yield


app = FastAPI(lifespan=lifespan, title="Hello World API with DB",
version="0.0.1",
servers=[
{
"url": "http://host.docker.internal:8085", # ADD NGROK URL Here Before Creating GPT Action
"description": "Development Server"
},{
"url": "http://127.0.0.1:8085",
"description": "Development Server"
}]
)
app = FastAPI(root_path='/service1', docs_url="/docs", redoc_url=None, openapi_url="/openapi.json", lifespan=lifespan, title="Hello World API with DB",
version="0.0.1",
servers=[
{
# ADD NGROK URL Here Before Creating GPT Action
"url": "http://host.docker.internal:8085",
"description": "Development Server"
}, {
"url": "http://localhost:8085",
"description": "Development Server"
},
]
)


def get_session():
with Session(engine) as session:
yield session


@app.get("/")
@ app.get("/")
def read_root():
return {"Hello": "PanaCloud"}
return {"Hello": "ALiii"}

# Kafka Producer as a dependency


async def get_kafka_producer():
producer = AIOKafkaProducer(bootstrap_servers='broker:19092')
await producer.start()
Expand All @@ -100,20 +108,21 @@ async def get_kafka_producer():
finally:
await producer.stop()

@app.post("/todos/", response_model=Todo)
async def create_todo(todo: Todo, session: Annotated[Session, Depends(get_session)], producer: Annotated[AIOKafkaProducer, Depends(get_kafka_producer)])->Todo:
todo_dict = {field: getattr(todo, field) for field in todo.dict()}
todo_json = json.dumps(todo_dict).encode("utf-8")
print("todoJSON:", todo_json)
# Produce message
await producer.send_and_wait("todos", todo_json)
session.add(todo)
session.commit()
session.refresh(todo)
return todo

@ app.post("/todos/", response_model=Todo)
async def create_todo(todo: Todo, session: Annotated[Session, Depends(get_session)], producer: Annotated[AIOKafkaProducer, Depends(get_kafka_producer)]) -> Todo:
todo_dict = {field: getattr(todo, field) for field in todo.dict()}
todo_json = json.dumps(todo_dict).encode("utf-8")
print("todoJSON:", todo_json)
# Produce message
await producer.send_and_wait("todos", todo_json)
session.add(todo)
session.commit()
session.refresh(todo)
return todo


@app.get("/todos/", response_model=list[Todo])
@ app.get("/todos/", response_model=list[Todo])
def read_todos(session: Annotated[Session, Depends(get_session)]):
todos = session.exec(select(Todo)).all()
return todos
todos = session.exec(select(Todo)).all()
return todos
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
version: '3.9'
version: "3.9"

x-kong-config:
&kong-env
x-kong-config: &kong-env
KONG_DATABASE: ${KONG_DATABASE:-postgres}
KONG_PG_DATABASE: ${KONG_PG_DATABASE:-kong}
KONG_PG_HOST: db
Expand All @@ -27,7 +26,7 @@ services:
kong-migrations:
image: "${KONG_DOCKER_TAG:-kong:latest}"
command: kong migrations bootstrap
profiles: [ "database" ]
profiles: ["database"]
depends_on:
- db
environment:
Expand All @@ -39,7 +38,7 @@ services:
kong-migrations-up:
image: "${KONG_DOCKER_TAG:-kong:latest}"
command: kong migrations up && kong migrations finish
profiles: [ "database" ]
profiles: ["database"]
depends_on:
- db
environment:
Expand Down Expand Up @@ -78,7 +77,7 @@ services:
- "127.0.0.1:8444:8444/tcp"
- "127.0.0.1:8002:8002/tcp"
healthcheck:
test: [ "CMD", "kong", "health" ]
test: ["CMD", "kong", "health"]
interval: 10s
timeout: 10s
retries: 10
Expand All @@ -93,7 +92,7 @@ services:

db:
image: postgres:9.5
profiles: [ "database" ]
profiles: ["database"]
environment:
POSTGRES_DB: ${KONG_PG_DATABASE:-kong}
POSTGRES_USER: ${KONG_PG_USER:-kong}
Expand All @@ -108,7 +107,7 @@ services:
"-d",
"${KONG_PG_DATABASE:-kong}",
"-U",
"${KONG_PG_USER:-kong}"
"${KONG_PG_USER:-kong}",
]
interval: 30s
timeout: 30s
Expand All @@ -121,4 +120,4 @@ services:

secrets:
kong_postgres_password:
file: ./POSTGRES_PASSWORD
file: ./POSTGRES_PASSWORD