flask的数据库操作

DButils

  • pip install DBUtils

pooledDB

提供线程间可共享的数据库连接,并自动管理连接,创建好一批连接池,供所有线程共享使用。总数就那么多个。

创建方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import pymysql
pymysql.install_as_MySQLdb()
from dbutils.pooled_db import PooledDB
pool2 = PooledDB(
creator=pymysql, # 使用连接mysql的模块客户端
maxconnections=6, # 连接池允许的最大连接数,o和None不限制连接数
mincached=2, # 初始化时链接池中最少创建的空闲的连接,0表示不创建
maxcached=5, # 链接池中最多闲置的连接数,0和None表示不限制
maxshared=3, # 连接池中最多共享的连接数,0和None表示全部共享,无用:因为pymysql和MYSQLdb等模块的threadsafety为1
# 线程安全让为每个线程创建的连接不能共享给别的线程使用
blocking=True, # 连接池没有可用连接后是否阻塞等待。True等待!False不等待直接报错
maxusage=None, # 一个连接可以被使用多少次,None表示无限制
setsession=[],
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='123456',
charset='utf8',
db='test'
)

def func():
conn = Pool1.connection(shareable=False)
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute('select * from bb')
results = cursor.fetchall()
print(results)
func()

persistentDB

提供线程专用的数据库连接,并自动管理连接。:为每一个进来的线程创建一个连接,
线程即使调用close也不会关闭,只是把连接重新放到连接池,供自己线程再次使用,
当线程终止时连接自动关闭。(但限制了创建的上线,达到上线就必须等待线程关闭后
再创建新的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pymysql
pymysql.install_as_MySQLdb()
from dbutils.persistent_db import PersistentDB

Pool1 = PersistentDB(
creator=pymysql, # 使用连接mysql的模块客户端
maxusage=None, # 创建的一个连接最多使用多少次
setsession=[], # 执行一些命令修改session variables,在此session链接中可用的配置文件,也是sql命令
ping=0, # 为0永远不会ping,客户端在这里拿连接时,通过ping保证连接可用,为0就不能保证了,如果为7就是always只要拿链接就检查
# 如果为2 when a cursor is created, 4 when a query is executed 1.default whenever it is requested 一般4或者7
closeable=False, # 如果为False,conn.close()实际上会被忽略,如果为True就真的关闭了,会出问题,之后你的线程就没有连接了,也获取不到
threadlocal=None, # if thread_local is None, use threading.local,也可以自定义local,比如之前的
host='127.0.0.1',
port=3306,
user='root',
password='123456',
charset='utf8',
db='test'
)

基于pooledDB的实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
class MySQL(object):
def __init__(self, username, password, hostname, db_name, charset='utf8mb4'):

self.username = username
self.password = password
self.db_name = db_name
self.hostname = hostname
self.log = log
self.pool = PooledDB(
creator=MySQLdb,
mincached=0,
maxcached=6,
maxshared=3,
blocking=True,
ping=0,
maxusage=None,
host=self.hostname,
user=self.username,
passwd=self.password,
db=self.db_name,
port=3306,
charset=charset
)

def manipulate_db(self, sql, pairs=True):
db = None
cursor = None
try:
db = self.pool.connection()
if pairs:
cursor = db.cursor(cursorclass=MySQLdb.cursors.DictCursor)
else:
cursor = db.cursor()
cursor.execute(sql)
if 'select' in sql or 'SELECT' in sql:
return cursor.fetchall()
else:
# select does not need commit(), only update/insert/delete need it
db.commit()
except Exception as e:
self.log('[{}] meet error'.format(sql))
self.log(e)
if 'select' not in sql and 'SELECT' not in sql:
# select does not need rollback
db.rollback()
return ()
finally:
if cursor:
cursor.close()
if db:
db.close()

return True

def insertmany(self, sql, param):
"""
:param sql:
:param param: 必须是元组或列表[(),()]或((),())
:return:
"""
db = self.pool.connection()
cursor = db.cursor()
try:
cursor.executemany(sql, param)
db.commit()
return True
except Exception as e:
log(f"sql: {sql}, errmsg: {str(e)}")
db.rollback()
cursor.close()
db.close()
return False

def transaction(self, sqls):
db = None
cursor = None
try:
db = self.pool.connection()
cursor = db.cursor()
for sql in sqls:
try:
cursor.execute(sql)
except Exception as e:
self.log('[{}] meet error'.format(sql))
self.log(e)
db.rollback()
return ()
db.commit()
except Exception as e:
self.log('[{}] meet error'.format(e))
self.log(e)
db.rollback()
return ()

finally:
if cursor:
cursor.close()
if db:
db.close()
return True

orm

sqlAlchemy

基本使用操作

pip install sqlachemy

  • 示例一:基本数据库连接操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import time
    import datetime
    import sqlalchemy
    import threading
    from _thread import get_ident
    from sqlalchemy import create_engine
    from sqlalchemy.engine.base import Engine
    from pymysql.cursors import DictCursor


    engine = create_engine(
    "mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
    max_overflow=2, # 超过连接池大小外最多创建的连接
    pool_size=5, # 连接池大小
    pool_timeout=30, # 连接池等待时间
    pool_recycle=-1 # 多久后对线程池中的线程进行一次连接回收
    )


    def task(args):
    conn = engine.raw_connection()
    cursor = conn.cursor(DictCursor)
    cursor.execute(
    # 'select * from bb'
    'select sleep(2)'
    )
    results = cursor.fetchall()
    print(args, get_ident(), results)
    cursor.close()
    conn.close()


    task(-1)

    for i in range(20):
    t = threading.Thread(target=task, args=(i, ))
    t.start()
  • 示例二:orm映射
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index


Base = declarative_base()


class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)


def init_db():
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
max_overflow=2, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 连接池等待时间
pool_recycle=-1 # 多久后对线程池中的线程进行一次连接回收
)
Base.metadata.create_all(engine)

def drop_db():
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
max_overflow=2, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 连接池等待时间
pool_recycle=-1 # 多久后对线程池中的线程进行一次连接回收
)
Base.metadata.drop_all(engine)


init_db()

sqlachemy的缺陷

1.不像django一样makemigrate、migrate就可以生成表,如果要生成定义类的表,需要自定义init_db()
2.不能修改表,只能自己给数据库加字段后,在类的关系映射上修改对象加上一个属性。
3.但我们可以创造类似django的功能脚本嘛(flask-migrate)

1
2
3
4
5
6
7
8
9
10
11
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
email = Column(String(32), unique=True)
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)
__table_args__ = {
UniqueConstraint('id', 'name', name='uix_id_name'),
Index('ix_name_email', 'name', 'email')
}

sessionMaker和scopeSession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
max_overflow=2, # 超过连接池大小外最多 创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 连接池等待时间
pool_recycle=-1 # 多久后对线程池中的线程进行一次连接回收
)

Session = sessionmaker(bind=engine)
conn = Session()
obj1 = Users(name='alex1', email='alex1@mail.com')
conn.add(obj1)
conn.commit()

conn.close()


from sqlalchemy.orm import scoped_session
Session = sessionmaker(bind=engine)
conn = scoped_session(Session)
obj1 = Users(name='alex2', email='alex1@mai2.com')
conn.add(obj1)
conn.commit()
  • scope_session和session对象的区别:
    scope使用threading.local为每个线程创建一个session,其实原session每个线程
    进去就实例化一个新的也是可以的。通过scope就只需要为每个线程实例化一个。

基本增删改查

增加&批量增加
1
2
3
4
5
6
7
8
9
10
Session = sessionmaker(bind=engine)
conn = Session()
obj1 = Users(name='alex1', email='alex1@mail.com')
obj2 = Users(name='alex3', email='alex3@mail.com')
obj3 = Users(name='alex4', email='alex4@mail.com')
conn.add(obj1)
conn.add_all([obj2, obj3])
conn.commit()

conn.close()
查&过滤查找
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Session = sessionmaker(bind=engine)
conn = Session()
# 也可以支持原生sql
cur = conn.execute(‘select * from bb’)
ret = cur.fetchall()

user_list = conn.query(Users).all()
print(user_list)
user_search = conn.query(Users).filter(Users.id > 2)
print(user_search)
for row in user_search:
print(row.id, row.name, row.email)
conn.close()
这里filter只有在真的调用时才会执行
user_search = conn.query(Users.name).all()
# 给字段起别名
user_search2 = conn.query(Users.name.label('xx'), Users.email).all()
print(user_search)
for row in user_search2:
print(row.xx, row.email)
user_search3 = conn.query(Users).filter(Users.name == 'alex3')
print(user_search3, type(user_search3))
print(user_search3.first())
print(user_search3.all())
user_search4 = conn.query(Users).filter_by(name='alex3').all()
print(user_search4)

from sqlalchemy import text
user_search5 = conn.query(Users).filter(text("id <:value and name=:name")).\
params(value=2, name='axasfcas').order_by(Users.id).all()
print(user_search5)

user_search6 = conn.query(Users).filter(text("name like :name")).\
params(name='%axasfcas%').all()
print(user_search6)

user_search7 = conn.query(Users).from_statement(text("select * from users where name like :name")).\
params(name='%axasfcas%').all()
print(user_search7)
删除

先查询再删除

1
2
3
user_search = conn.query(Users).filter(Users.id > 2).delete()
conn.commit()
conn.close()
修改
1
2
3
4
5
6
7
8
9
10
user_search = conn.query(Users).filter(Users.id == 1).update({'name':'axasfcas'})
conn.commit()
conn.close()
修改需要传入字典
user_search = conn.query(Users).filter(Users.id == 1).update({Users.name: Users.name + '0999'},
synchronize_session=False)
user_search2 = conn.query(Users).filter(Users.id == 1).update({"age": Users.age + 1}, synchronize_session="evaluate")
conn.commit()
conn.close()

修改数字
时需要修改执行策略,synchronize_session=”evaluate”

范围查找
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# _*_ coding: utf-8 _*_
__author__ = '春江花月夜oo'

import time
import datetime
import sqlalchemy
import threading
from _thread import get_ident
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from pymysql.cursors import DictCursor
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
max_overflow=2, # 超过连接池大小外最多 创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 连接池等待时间
pool_recycle=-1 # 多久后对线程池中的线程进行一次连接回收
)

from sqlalchemy import text, and_, or_

user_search5 = conn.query(Users).filter(text("id <:value and name=:name")).\
params(value=2, name='axasfcas').order_by(Users.id).all()
print(user_search5)

user_search6 = conn.query(Users).filter(text("name like :name")).\
params(name='%axasfcas%').all()
print(user_search6)

user_search7 = conn.query(Users).from_statement(text("select * from users where name like :name")).\
params(name='%axasfcas%').all()
print(user_search7)
user_search8 = conn.query(Users).filter(Users.name == 'axasfcas', Users.id > 1).all()
print(user_search8)
user_search9 = conn.query(Users).filter(Users.name == 'axasfcas', Users.id.between(1, 4)).all()
user_search10 = conn.query(Users).filter(Users.name == 'axasfcas', Users.id.in_([1, 2, 3, 4])).all()
print(user_search9, user_search10)
user_search11 = conn.query(Users).filter(Users.name == 'axasfcas', Users.id.in_(conn.query(Users.id).filter(
text("name like :name")).params(name='%axasfcas%'))).all()
print(user_search11)
user_search12 = conn.query(Users).filter(and_(Users.name == 'axasfcas', Users.id.between(1, 4))).all()
user_search13 = conn.query(Users).filter(or_(Users.name == 'axasfcas', Users.id.between(1, 4))).all()
print(user_search12, user_search13)
user_search14 = conn.query(Users).filter(or_(Users.id < 3, and_(Users.name == 'axasfcas', Users.id.between(1, 4)))).all()

# like
ret = conn.query(Users).filter(Users.name.like('%a%')).all()
# not like
ret1 = conn.query(Users).filter(~Users.name.like('%a%')).all()
# 限制
ret2 = conn.query(Users)[1:2]
print(ret, ret1, ret2)
order by和group by
1
2
3
4
5
6
7
8
9
10
11
12
# 排序
ret3 = conn.query(Users).order_by(Users.name.desc()).all()
ret4 = conn.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
print(ret3, ret4, 11111)
# 分组
ret5 = conn.query(Users).group_by(Users.id).all()
from sqlalchemy.sql import func
ret6 = conn.query(func.max(Users.id)).group_by(Users.id).all()
ret7 = conn.query(Users.id, func.count()).group_by(Users.id).all()
ret8 = conn.query(Users.id, func.count()).group_by(Users.id).having(func.min(Users.id) > 1).all()
print(ret5, ret6, ret7)
print(user_search14, ret)
联表查询
1
2
3
4
5
6
7
8
9
res = conn.query(Users, Favor).filter(Users.id == Favor.uid).all()
print(res)
# inner join
res1 = conn.query(Users).join(Favor).all()
# left join(users left join favor)用顺序改变只有left join
res2 = conn.query(Users).join(Favor, and_(Users.id == Favor.uid, Users.name == Favor.hobby), isouter=True)
res3 = conn.query(Users).join(Favor, or_(Users.id == Favor.uid, Users.name == Favor.hobby), isouter=True)
print(res2)
print(res1)
组合union
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from sqlalchemy.orm import scoped_session
Session = sessionmaker(bind=engine)
conn = scoped_session(Session)
# obj1 = Users(name='alex2', email='alex1@mai2.com')
# conn.add(obj1)
# conn.commit()
q1 = conn.query(Users.name).filter(Users.id > 2)
q2 = conn.query(Favor.hobby).filter(Favor.uid < 2)
q3 = conn.query(Favor.uid).filter(Favor.uid > 2)
ret = q1.union(q2).union(q3)
res = []
for row in ret.all():
print(row.keys())
res.append(dict(zip(row.keys(), row)))
print(res)
conn.close()
如何把session.execute()中返回的元组转化为dict

观察可以发现返回的单一对象是querycollection的对象,它们有方法keys,所以可以
通过dict(zip(obj.keys(), obj))绑定连表查询手动

一对一、一对多
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index


Base = declarative_base()


class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
email = Column(String(32), unique=True)
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)

__table_args__ = (
UniqueConstraint('id', 'name', name='uix_id_name'),
Index('ix_name_email', 'name', 'email'),
)

from sqlalchemy.orm import relationship



class Favor(Base):
__tablename__ = 'favor'
id = Column(Integer, primary_key=True, autoincrement=True)
hobby = Column(String(32))
uid = Column(Integer, ForeignKey("users.id", ondelete='CASCADE'))
user = relationship("Users", backref='favor')

def __repr__(self):
return "<News(name:%s, uid:%s)>" % (self.hobby, self.uid)


# 一对多之多找一
res5 = conn.query(Favor)
print(res5)
for i in res5.all():
print(i.user.name, i.user.email)

# 一对多反向查找
res6 = conn.query(Users)
for i in res6.all():
print(i.favor[0]) # 里面储存列表,有多个对象
快速的跨表查询

快速增加
# 反向增加
user01 = Users(name='xiaoyiqing', email='aaaaa@asaaa.com')
user01.favor = [Favor(hobby='basketball'), Favor(hobby='baseball')]
conn.add(user01)
conn.commit()
# 正向增加
fv = Favor(hobby='basketball', user=Users(name='zhoushuyi', email='aaa@aaaaaaaa.com'))
conn.add(fv)
conn.commit()
绑定了relationship的便利,如果你想同时新增绑定对象

flask sqlalchemy

  • db中的对象:
    • 1.db.Models  其中Models相当于from sqlalchemy.ext.declarative import declarative_base
    • 2.db.Integer/String….  字段
    • 3.db.create_all()/drop_all()  metadata
    • 4.db.session  scoped_session(其中有threading.local区别线程)
    • 5.db.init_app(app)必须读配置文件连接数据库

初始化db

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# settings.py
class SqlSetting:
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123456@127.0.0.1:3306/flask?charset=utf8"
SQLALCHEMY_POOL_SIZE = 5
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = -1

# 追踪对象的修改且发出信号
SQLALCHEMY_TRACK_MODIFICATIONS = True

from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
2 db加入app全局配置文件
def create_app():
app = Flask(__name__)
app.config.from_object('settings.SqlSetting')
db.init_app(app)
app.register_blueprint(userOperation)
return app

class Users(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(32), nullable=False)
email = db.Column(db.String(32), unique=True)
ctime = db.Column(db.DateTime, datetime.datetime.now)
extra = db.Column(db.text)
db.relationship('Hobby', secondary='hobby_user', backref='users')

def __repr__(self):
return "%s" % self.name
  • 注意:
    • 1.flask sqlalchemy必须在flask基础上做,必须加载app的config进db。
    • 2.如果db和model没有在一个py,需要model到导入db加载到内存才能生成表,这也方便了我们控制(和蓝图比较相似)

flask-script

用于实现django python manager.py runserver这种脚本

1
2
3
4
5
6
7
8
9
10
pip install flask-script
from flask_script import Manager
app = create_app()
manager = Manager(app)

if __name__ == '__main__':
manager.run()
1.运行时直接python app.py runserver
2.自定义命令执行
manager = Manager(app)
  • 创建离线脚本
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @manager.command
    def test01(args):
    """python app.py test01 args"""
    with app.app_context():
    conn = db.session


    @manager.option('-n', '--name', dest='name')
    @manager.option('-u', '--url', dest='url')
    def test02(name, url):
    """python app.py test02 -n xxx -u xxx"""
    with app.app_context():
    conn = db.session


    if __name__ == '__main__':
    manager.run()

flask-migrate

flask不支持修改表,增加列,如果要增加需要删除掉原表重新生成,这样原数据就无了,
也可以手动增加,或者就是用migrate(设计者认为该表不是安全可行的操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from flask_sqlalchemy import SQLAlchemy
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
from userOperation.models import Users, H2U, Hobby, db


db = SQLAlchemy()


def create_app():
app = Flask(__name__)
app.config.from_object('settings.SqlSetting')
db.init_app(app)
app.register_blueprint(userOperation)
return app


app = create_app()
manager = Manager(app)
migrate = Migrate(app, db)
manager.add_command('db', MigrateCommand)

# python app.py db init
# python app.py db migrate
# python app.py db upgrade

if __name__ == '__main__':
# app.run()
manager.run()
注意不要重复导入循环导入db,所以最好把db建在models
python app.py db init
python app.py db migrate
python app.py db upgrade
分享到