sqlalchemy 基本操作总结

使用 sqlalchemy 有一段时间了,简单总结下使用方法和常用的查询操作。

初始化

主要包含数据库连接、表创建、创建会话以及通过上下文管理器来管理会话的开启与关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from contextlib import contextmanager

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy.orm import sessionmaker

# 连接数据库
engine = create_engine("sqlite:///test.db")

Base = declarative_base()


class User(Base):
__tablename__ = "user"

id = Column(Integer, primary_key=True, autoincrement=True, comment="自增主键")
username = Column(String, nullable=False, comment="用户名")
password = Column(String, nullable=False, comment="密码")
age = Column(Integer, comment="年龄")
country_id = Column(Integer, comment="国家id")


class Country(Base):
__tablename__ = "country"

id = Column(Integer, primary_key=True, autoincrement=True, comment="自增主键")
name = Column(Integer, nullable=False, comment="国家名")


# 创建数据表(所有继承了 Base 的表都会被创建)
Base.metadata.create_all(engine)

# 创建当前数据库的 Session 对象
Session = sessionmaker(engine)

# 创建会话对象,会话结束后需要关闭
session = Session()
# 关闭会话
session.close()


# 可以定义一个上下文管理器,实现会话的开启与关闭
@contextmanager
def open_session():
"""
Usage:
with open_session() as session:
# your sql code
"""
session = Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()

基本查询

主要记录一下常用的几种查询:and、or、between、连表查询、分组查询还有排序以及对应的 sql 语句(还是原生 sql 写起来顺手一些 - -)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 1. and
# select * from user where age > 20 and id > 5;
session.query(User).filter(User.age > 20, User.id > 5).all()

# 2. or
# select * from user where age = 18 or id > 10;
from sqlalchemy import or_

session.query(User).filter(or_(User.age == 18, User.id > 10)).all()

# 3. between
# select * from user where age between 18 and 30;
session.query(User).filter(User.age.between(18, 30)).all()

# 3. 连表查询
# select u.username, u.age, c.name from user u join country c on u.country_id=c.id where u.age >= 18;
session.query(
User.username, User.age, Country.name).join(Country, User.country_id == Country.id).filter(User.age >= 18).all()

# 4. 分组查询
# select country_id, count(1) user_cnt from user group by country_id;
from sqlalchemy import func

session.query(User.country_id, func.count(1).label("user_cnt")).group_by(User.country_id).all()

# 5. 排序
# select * from user where country_id = 1 order by id desc;
session.query(User).filter(User.country_id == 1).order_by(User.id.desc()).all()