from sqlalchemy import Column, Table, Integer, MetaData, String, select, create_engine from sqlalchemy.sql import func from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def get_db(): db = SessionLocal() try: yield db finally: db.close() Base = declarative_base() class People(Base): __tablename__ = "people" id = Column(Integer, primary_key=True, index=True) name = Column(String) age = Column(Integer) gender = Column(String) country = Column(String) def create_people_from_list(db, people_list): count = 0 for p in people_list: db_people = People(**p.dict()) db.add(db_people) count += 1 db.commit() return count def get_average_age_by_country(db) -> []: return db.query(People.country.label('country'), func.avg(People.age).label('average_age'))\ .group_by( People.country).all() def count_people_by_country(db) -> []: return db.query(People.country.label('country'), func.count(People.id).label('people_count'))\ .group_by( People.country).all() def get_gender_repartition_by_country(db, country) -> []: count_women, total = db.query( select(func.count(People.id).label('count_women')) \ .where(People.gender == 'F') \ .where(People.country == country).subquery(), select(func.count(People.id).label('total')) \ .where(People.country == country).subquery(), ).first() female_proportion = count_women / total return { "female_proportion": female_proportion, "male_proportion": 1 - female_proportion } meta = MetaData() Table( 'people', meta, Column('id', Integer, primary_key=True), Column('name', String), Column('age', Integer), Column('gender', String), Column('country', String), ) meta.create_all(engine)