sqlalchemy

 

2022-04-28


FlaskSQLAlchemy

连接

  • MySQL: mysql://username:password@host/databasename

属性

  • primary_key: 主键
  • unique: 唯一
  • index: 索引
  • nullable: 空值
  • default: 默认值,发生 ==insert== 语句时调用
  • onupdated: 发生 ==update== 语句时调用
  • comment: 注释

数据类型

SQLALchemy 类型MySQL 类型Python 类型描述
SmallIntegersmallintint数字,2字节:-32768 到 32767
Integerintint数字,4字节:-2147483648 到 2147483647
BigIntegerbigintint/long数字,8字节:9223372036854775807
Floatfloatfloat浮点数:4字节
Numericdecimaldecimal.Decimal定点数,M + 2字节:Numeric(M, D)
Stringvarcharstr字符串
Timetimedate.time时间
Datedatedatetime.date日期
DateTimedatetimedatetime.datetime日期时间
Booleantinyintbool布尔值
JSONJSONlist列表/JSON
Text长文本

过滤方法

  • filter():使用指定过滤规则,并返回查询对象
  • filter_by():使用指定过滤规则(以关键字表达式形式)
  • order_by():根据指定条件对记录进行排序
  • group_by():指定条件对记录进行分组

查询方法

  • all():返回包含所有查询记录的列表
  • first():返回查询的第一条记录,如未找到,返回 None
  • get(id):传入主键作为参数,如未找到,返回 None
  • count():返回查询结果的数量
  • first_or_404():返回查询结果的第一条记录,如果未找到,返回 404
  • get_or_404(id):传入主键作为参数,如未找到,返回 404
  • paginate():返回第一个 Pagination 对象,可对记录进行分页处理

使用

使用 UUID 作为主键

1
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))

创建

1
# 添加数据到会话
2
db.seesion.add(User(name='immwind')) # User 为表类
3
# 提交到数据库
4
db.session.commit()

查询

1
# User 为对应模型类
2
3
# 获取 User 第一条记录
4
User.query.first()
5
User.query.first().name # 读取 name 的值
6
7
# User 所有记录
8
User.query.all() # 返回:[<User 1>, <User 2>]
9
10
# 指定数量
11
User.query.limit(42).all()
12
13
# 获取指定键值
14
User.query.get(42) # 返回 <User 1>
15
16
# 过滤
17
User.query.filter_by(name='immwind').first() # 返回 name 等于 immwind 的第一条记录
18
User.query.fileter(User.name='immwind').first() # 同上
19
20
# 获取记录数
21
User.query.count()
22
self.session.query(<Table>).count()

更新

1
# 需先读取数据
2
user = User.query.get(1)
3
user.name = 'python' # 更新数据

删除

1
# 先获取数据
2
user = User.query.get(1)
3
db.session.delete(user)

2.0

select

1
from sqlalchemy import func, select
2
from sqlalchemy import create_engine
3
from sqlalchemy.orm import Session
4
5
class DB():
6
"""数据库操作"""
7
8
def __init__(self):
9
self.engine = create_engine(f'mysql+mysqlconnector://root:{urlquote("password")}@0.0.0.0:3306/new_highway', pool_size=100)
10
f
11
self.session = Session(bind=self.engine, future=True)
12
13
def get_table_all(self, table):
14
"""获取指定数据库所有值"""
15
with self.session.begin():
16
# # result = session.query(LineNumberInfo).all()
17
stmt = select(LineNumberInfo.line_number)
18
results = self.session.execute(stmt).scalars().all()
19
20
print(len(results))
21
for result in results:
22
print(result.to_dict())
  • .scalar(): 获取单个

  • .scalars() 获取所有

  • 以字典形式返回所有数据

1
def get_all(self, table):
2
result = session.execute(User.__table__.select())
3
for row in result:
4
print(dict(row))

示例

时间自动更新

1
from sqlalchemy import func
2
3
update = Column(DateTime, default=func.now(), onupdate=func.now())

报错

Authentication plugin ‘caching_sha2_password’ is not supported

pip 安装 ==mysql-connector-python== 解决,而不是 ==mysql-connector-python==

参考