flask基本介绍

配置文件

配置方式

  • 1.单个配置

    1
    flask.config[‘xxx’] = xxx
  • 2.可以像django一样文件配置

    1
    2
    3
    4
    DEBUG = True
    TESTING = True
    DATABASE_URI='127.0.0.1:XXX'
    flask.config.from_pyfile(‘某个文件setting’)
  • 3.类配置 生产环境一套、debug一套。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    class Config:
    DEBUG = False
    TESTING = False
    DATABASE_URI = '127.0.0.1:XXX'

    class ProductionConfig(Config):
    DATABASE_URI = '127.0.0.1:XXX'

    class TestingConfig(Config):
    DEBUG = True
    TESTING = True
    DATABASE_URI = '127.0.0.1:XXX'

源码解析

就是一个字典结构,初始化加载了defaults,即Flask中的defaults

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class Config(dict):
def __init__(self, root_path, defaults=None):
dict.__init__(self, defaults or {})
self.root_path = root_path

def from_envvar(self, variable_name, silent=False):
rv = os.environ.get(variable_name)
if not rv:
if silent:
return False
raise RuntimeError(
"The environment variable %r is not set "
"and as such configuration could not be "
"loaded. Set this variable and make it "
"point to a configuration file" % variable_name
)
return self.from_pyfile(rv, silent=silent)

def from_pyfile(self, filename, silent=False):
filename = os.path.join(self.root_path, filename)
d = types.ModuleType("config")
d.__file__ = filename
try:
with open(filename, mode="rb") as config_file:
exec(compile(config_file.read(), filename, "exec"), d.__dict__)
except IOError as e:
if silent and e.errno in (errno.ENOENT, errno.EISDIR, errno.ENOTDIR):
return False
e.strerror = "Unable to load configuration file (%s)" % e.strerror
raise
self.from_object(d)
return True

def from_object(self, obj):
if isinstance(obj, string_types):
obj = import_string(obj)
for key in dir(obj):
if key.isupper():
self[key] = getattr(obj, key)

def from_json(self, filename, silent=False):
filename = os.path.join(self.root_path, filename)

try:
with open(filename) as json_file:
obj = json.loads(json_file.read())
except IOError as e:
if silent and e.errno in (errno.ENOENT, errno.EISDIR):
return False
e.strerror = "Unable to load configuration file (%s)" % e.strerror
raise
return self.from_mapping(obj)

def from_mapping(self, *mapping, **kwargs):
rv = {}
for k, v in iteritems(self):
if not k.startswith(namespace):
continue
if trim_namespace:
key = k[len(namespace) :]
else:
key = k
if lowercase:
key = key.lower()
rv[key] = v
return rv

def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, dict.__repr__(self))

路由route

详细配置

普通匹配模式及其传参方法

1
2
3
4
5
6
7
8
9
10
11
@app.route(‘/<username>’)  字符串
@app.route(‘/<int:post_id>’) 整数
@app.route(‘/<float:price>’)
@app.route(‘/<path:path>’) url路径

# 路径匹配其实使用了正则贪婪匹配,要注意必须在视图函数的参数中输入对应个数的参数
@app.route('/<name>|<int:id>|<path:asf>', methods=['get', 'post'])
def index(name, id, asf):
print(name, id, type(id), type(name), asf)
return "fafafasa"
# 中件用分隔符或者自定义一个隔离符号

通过?匹配

即最常见的路由匹配方式

1
2
3
http://127.0.0.1:5000/user/?a=1&b=2&c=3
参数放进了flask.request中为字典类型
request.args request.data

自定义匹配模式(正则)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from flask import url_for
from werkzeug.routing import BaseConverter

import re
class RegexConverter(BaseConverter):
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.re = re.compile(regex)

# 路由匹配成功,在传入视图之前进行一次校验,改变数据类型为想要的类型
def to_python(self, value):
tmp = self.re.search(value)
if tmp:
return tmp.group()
return tmp

# 使用url反向生成url时传递参数经过该方法处理,返回值用于生成url中的参数
def to_url(self, value):
val = super(RegexConverter, self).to_url(value)
return val
app.url_map.converters['regex'] = RegexConverter

@app.route('/index/<regex("\w+"):nid>')
def index(nid):
print(nid)
url_for('index', nid=89)
return 'index'

详解各参数作用

源码分析

基于装饰器形式展现本质上还是add_url_route,和django有所区别(本质还是一样)。

装饰器的route

  • 1.route()调用
    第一步返回一个装饰器decorator

    1
    2
    3
    4
    5
    6
    def route(self, rule, **options):
    def decorator(f):
    endpoint = options.pop("endpoint", None)
    self.add_url_rule(rule, endpoint, f, **options)
    return f
    return decorator
  • 2.decorator(func)
    第二步@decorator 执行decorator函数,关键函数
    其中执行了self.add_url_rule(rule, endpoint, f, **options)
    把路由和执行函数和别名加到了一个路由关系对应表

注意

  • 1.添加路由的本质就是执行add_url_rule

    1
    2
    3
    def login():
    pass
    app.add_url_rule('/login', 'n2', login, methods=['get', 'post'])
  • 2.如果end_point是空的,就把view_func的名字赋给end_point

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @setupmethod
    def add_url_rule(
    self,
    rule,
    endpoint=None,
    view_func=None,
    provide_automatic_options=None,
    **options
    ):
    if endpoint is None:
    endpoint = _endpoint_from_view_func(view_func)
    options["endpoint"] = endpoint

生成rule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
rule = self.url_rule_class(rule, methods=methods, **options)
rule.provide_automatic_options = provide_automatic_options

# Rule
@implements_to_string
class Rule(RuleFactory):
def __init__(
self,
string,
defaults=None,
subdomain=None,
methods=None,
build_only=False,
endpoint=None,
strict_slashes=None,
merge_slashes=None,
redirect_to=None,
alias=False,
host=None,
websocket=False,
):
if not string.startswith("/"):
raise ValueError("urls must start with a leading slash")
self.rule = string
self.is_leaf = not string.endswith("/")

self.map = None
self.strict_slashes = strict_slashes
self.merge_slashes = merge_slashes
self.subdomain = subdomain
self.host = host
self.defaults = defaults
self.build_only = build_only
self.alias = alias
self.websocket = websocket

if methods is not None:
if isinstance(methods, str):
raise TypeError("'methods' should be a list of strings.")

methods = {x.upper() for x in methods}

if "HEAD" not in methods and "GET" in methods:
methods.add("HEAD")

if websocket and methods - {"GET", "HEAD", "OPTIONS"}:
raise ValueError(
"WebSocket rules can only use 'GET', 'HEAD', and 'OPTIONS' methods."
)

self.methods = methods
self.endpoint = endpoint
self.redirect_to = redirect_to

if defaults:
self.arguments = set(map(str, defaults))
else:
self.arguments = set()
self._trace = self._converters = self._regex = self._argument_weights = None

rule参数解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1.defaults=None 
{‘Start_Time’:’2020-10-01’}默认参数
默认传default

2.strict_slashes=None
False/True 严格斜杠
如果为True严格控制,有slash就访问不到或者访问得到

3.redirect_to=None
定义之后直接重定向,重定向指定地址
@app.route('/index2', methods=['get', 'post'], endpoint='n3', defaults={"nid": 888}, redirect_to='/index3')
def index2(nid):
print(nid)
return "公司首页"

@app.route('/index3', methods=['get', 'post'], endpoint='n4', defaults={"nid": 888})
def index2(nid):
print(nid)
return "公司新首页"

4.subdomain=None
子域名访问 需修改hosts
www.oldboyedu.com
car.oldboyedu.com
admin.oldboy.com

# 子域名模式访问(比如你的域名设置了多个解析)
app.config['SERVER_NAME'] = 'axiba.com:5000'
@app.route('/12', methods=['get', 'post'], endpoint='n5', subdomain="admin")
def static_index():
return "static.your-domain.tid"

@app.route('/dynamic', methods=['get', 'post'], endpoint='n6', subdomain="<username>")
def static_index(username):
return username + ".your-domain.tid"

url加入url_map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
self.url_map.add(rule)
# url_map
class Map(object):
default_converters = ImmutableDict(DEFAULT_CONVERTERS)
lock_class = Lock
def __init__(
self,
rules=None,
default_subdomain="",
charset="utf-8",
strict_slashes=True,
merge_slashes=True,
redirect_defaults=True,
converters=None,
sort_parameters=False,
sort_key=None,
encoding_errors="replace",
host_matching=False,
):
self._rules = []
self._rules_by_endpoint = {}
self._remap = True
self._remap_lock = self.lock_class()

self.default_subdomain = default_subdomain
self.charset = charset
self.encoding_errors = encoding_errors
self.strict_slashes = strict_slashes
self.merge_slashes = merge_slashes
self.redirect_defaults = redirect_defaults
self.host_matching = host_matching

self.converters = self.default_converters.copy()
if converters:
self.converters.update(converters)

self.sort_parameters = sort_parameters
self.sort_key = sort_key

for rulefactory in rules or ():
self.add(rulefactory)

访问url的步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 从上下文获得当前用户的req
req = _request_ctx_stack.top.request
if req.routing_exception is not None:
self.raise_routing_exception(req)

# 通过url_map获取当前用户访问的url的rule
rule = req.url_rule

# 确认method不为option
# if we provide automatic options for this URL and the
# request came with the OPTIONS method, reply automatically
if (
getattr(rule, "provide_automatic_options", False)
and req.method == "OPTIONS"
):
return self.make_default_options_response()

# otherwise dispatch to the handler for that endpoint
# 根据endpoint在注册的view中找到view并传参调用
return self.view_functions[rule.endpoint](**req.view_args)

template模板

自带防xss攻击

比如return给模板


<input …>会自动变为字符串不能变成input框
可以通过管道符|safe也可以通过from flask import Markup
return Markup(
<input …>)…

支持宏定义

1
2
3
4
5
6
7
8
{% macro xx(name, type=’text’, value=’’) %}
<input type=”{{type}}” name=”{{name}}1” value=”{{value}}”>
<input type=”{{type}}” name=”{{name}}1” value=”{{value}}”>
<input type=”{{type}}” name=”{{name}}1” value=”{{value}}”>
<input type=”{{type}}” name=”{{name}}1” value=”{{value}}”>
{% endmacro %}
# 导入块并传参数
{{ xx(‘n’) }}

request&response

request

1
2
3
4
5
6
7
8
9
10
from flask import request
几乎都写到了request里面
request.form.get("key", type=str, default=None) 获取表单数据
request.args.get("key") 获取get请求参数
request.values.get("key") 获取所有参数
request.get_json()
request.url
request.base_url
request.url_root
request.host_url

response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1.headers	类似于字典,用来存放headers

2.status A string with a response status.

3.status_code The response status as integer.

4.data A descriptor that calls get_data() and set_data().

5.get_json(force=False, silent=False, cache=True) Parse data as JSON.

6.is_json Check if the mimetype indicates JSON data,
either application/json or application/*+json.

7.max_cookie_size Read-only view of the MAX_COOKIE_SIZE config key.

8.mimetype The mimetype (content type without charset etc.)

9.set_cookie(key, value=’’, max_age=None, expires=None, path=’/’,
domain=None, secure=False, httponly=False, samesite=None)

# 如果需要修改response
from flask import make_response
response = make_response(‘fsfs’)/make_response(render_template(‘index.html))
response.set_cookie(‘key’, ‘value’)
response.delete_cookie(‘key’)
response.headers(‘xxxx’) = ‘xxxx’
return response

session

基础使用

1
2
3
4
5
6
from flask import session
app.secret_key = ‘sfdaac’
session[‘k1’] = v1
会有加密签名的字符串k1需要使用secret_key
session.pop(xxx)
del session[‘xxx’]

源码解析

session其实就是一个类字典对象,sessionId一般用cookie存储,
session值存储在服务器的缓存redis、mysql、文件缓存等中。

  • ctx.push()

    1
    2
    3
    4
    5
    6
    if self.session is None:
    session_interface = self.app.session_interface
    self.session = session_interface.open_session(self.app, self.request)

    if self.session is None:
    self.session = session_interface.make_null_session(self.app)
  • session_interface

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    # 默认session_interface
    class SecureCookieSessionInterface(SessionInterface):
    serializer = session_json_serializer
    session_class = SecureCookieSession

    def get_signing_serializer(self, app):
    if not app.secret_key:
    return None
    signer_kwargs = dict(
    key_derivation=self.key_derivation, digest_method=self.digest_method
    )
    return URLSafeTimedSerializer(
    app.secret_key,
    salt=self.salt,
    serializer=self.serializer,
    signer_kwargs=signer_kwargs,
    )

    def open_session(self, app, request):
    s = self.get_signing_serializer(app)
    if s is None:
    return None
    val = request.cookies.get(app.session_cookie_name)
    if not val:
    return self.session_class()
    max_age = total_seconds(app.permanent_session_lifetime)
    try:
    data = s.loads(val, max_age=max_age)
    return self.session_class(data)
    except BadSignature:
    return self.session_class()

    def save_session(self, app, session, response):
    domain = self.get_cookie_domain(app)
    path = self.get_cookie_path(app)

    # If the session is modified to be empty, remove the cookie.
    # If the session is empty, return without setting the cookie.
    if not session:
    if session.modified:
    response.delete_cookie(
    app.session_cookie_name, domain=domain, path=path
    )

    return

    # Add a "Vary: Cookie" header if the session was accessed at all.
    if session.accessed:
    response.vary.add("Cookie")

    if not self.should_set_cookie(app, session):
    return

    httponly = self.get_cookie_httponly(app)
    secure = self.get_cookie_secure(app)
    samesite = self.get_cookie_samesite(app)
    expires = self.get_expiration_time(app, session)
    val = self.get_signing_serializer(app).dumps(dict(session))
    response.set_cookie(
    app.session_cookie_name,
    val,
    expires=expires,
    httponly=httponly,
    domain=domain,
    path=path,
    secure=secure,
    samesite=samesite,
    )
  • make_null_session(self.app)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    def make_null_session(self, app):
    return self.null_session_class()


    class NullSession(SecureCookieSession):
    def _fail(self, *args, **kwargs):
    raise RuntimeError(
    "The session is unavailable because no secret "
    "key was set. Set the secret_key on the "
    "application to something unique and secret."
    )
    __setitem__ = __delitem__ = clear = pop = popitem = update = setdefault = _fail


    class SecureCookieSession(CallbackDict, SessionMixin):
    def __init__(self, initial=None):
    def on_update(self):
    self.modified = True
    self.accessed = True
    super(SecureCookieSession, self).__init__(initial, on_update)

    def __getitem__(self, key):
    self.accessed = True
    return super(SecureCookieSession, self).__getitem__(key)

    def get(self, key, default=None):
    self.accessed = True
    return super(SecureCookieSession, self).get(key, default)

    def setdefault(self, key, default=None):
    self.accessed = True
    return super(SecureCookieSession, self).setdefault(key, default)

flask_session

  • config配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    config = app.config.copy()
    config.setdefault('SESSION_TYPE', 'null')
    config.setdefault('SESSION_PERMANENT', True)
    config.setdefault('SESSION_USE_SIGNER', False)
    config.setdefault('SESSION_KEY_PREFIX', 'session:')
    config.setdefault('SESSION_REDIS', None)
    config.setdefault('SESSION_MEMCACHED', None)
    config.setdefault('SESSION_FILE_DIR',
    os.path.join(os.getcwd(), 'flask_session'))
    config.setdefault('SESSION_FILE_THRESHOLD', 500)
    config.setdefault('SESSION_FILE_MODE', 384)
    config.setdefault('SESSION_MONGODB', None)
    config.setdefault('SESSION_MONGODB_DB', 'flask_session')
    config.setdefault('SESSION_MONGODB_COLLECT', 'sessions')
    config.setdefault('SESSION_SQLALCHEMY', None)
    config.setdefault('SESSION_SQLALCHEMY_TABLE', 'sessions')

    if config['SESSION_TYPE'] == 'redis':
    session_interface = RedisSessionInterface(
    config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],
    config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
    elif config['SESSION_TYPE'] == 'memcached':
    session_interface = MemcachedSessionInterface(
    config['SESSION_MEMCACHED'], config['SESSION_KEY_PREFIX'],
    config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
    elif config['SESSION_TYPE'] == 'filesystem':
    session_interface = FileSystemSessionInterface(
    config['SESSION_FILE_DIR'], config['SESSION_FILE_THRESHOLD'],
    config['SESSION_FILE_MODE'], config['SESSION_KEY_PREFIX'],
    config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
    elif config['SESSION_TYPE'] == 'mongodb':
    session_interface = MongoDBSessionInterface(
    config['SESSION_MONGODB'], config['SESSION_MONGODB_DB'],
    config['SESSION_MONGODB_COLLECT'],
    config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
    config['SESSION_PERMANENT'])
    elif config['SESSION_TYPE'] == 'sqlalchemy':
    session_interface = SqlAlchemySessionInterface(
    app, config['SESSION_SQLALCHEMY'],
    config['SESSION_SQLALCHEMY_TABLE'],
    config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
    config['SESSION_PERMANENT'])
    else:
    session_interface = NullSessionInterface()

    return session_interface
  • 使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    from flask import Flask, session
    from flask_redis import FlaskRedis
    from flask_session import Session

    app = Flask(__name__)
    # 必须设置
    secret_key = 'asfasfs'
    redis = FlaskRedis()
    session_store = Session()
    app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7) # 设置session 的时间为7天
    class Config():
    # DEBUG调试模式
    DEBUG = True
    # json多字节转unicode编码
    JSON_AS_ASCII = False
    # 数据库链接配置
    SECRET_KEY = "*(%#4sxcz(^(#$#8423"
    # session存储方式为redis
    SESSION_TYPE = "redis"
    # session保存数据到redis时启用的链接对象
    SESSION_REDIS = redis
    # 如果设置session的生命周期是否是会话期, 为True,则关闭浏览器session就失效
    SESSION_PERMANENT = True
    # 是否对发送到浏览器上session的cookie值进行加密
    SESSION_USE_SIGNER = True
    # 保存到redis的session数的名称前缀
    SESSION_KEY_PREFIX = "session:"
    # redis的链接配置
    REDIS_URL = "redis://localhost:6379/1"


    app.config.from_object(Config)
    # 初始化redis
    redis.init_app(app)
    # 初始化session_store
    session_store.init_app(app)

session的应用

  • 1.登录,做用户标识,存储一些用户的信息在session中。
  • 2.购物车

session的缺点

  • 1.存储在服务器,当同一时间有很多用户登录造成巨大服务器压力
  • 2.会话结束session过期
  • 3.不支持跨域

修改配置&参数解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
通过make_response.set_cookie设置
response.set_cookie(
app.session_cookie_name,
val,
expires=expires,
httponly=httponly,
domain=domain,
path=path,
secure=secure,
samesite=samesite,
)
httponly:只能http访问
domain:设置域访问
path:定制访问路径
secure:是否仅通过安全的https,值为0或1,如果值为1,
则cookie只能在https连接上有效,默认值为0,表示cookie
在http和https连接上都有效
samesite:
三种模式Strict/Lax/None
Strict:第三方的cookie不能合并到一起发送
Lax:第三方部分cookie可以一起发送,图片、script、链接
None:都可以

cookie的应用

1
2
3
4
1.一般用来做用户标识的键值对(第一次登录生成token)
2.购物车功能,(比如京东)把购物车商品放在cookie中,序列化商品的属性,键值存放
3.存储用户的一些配置
4.记录用户的浏览数据,以便后续更好的服务(广告)

cookie的缺点

1
2
3
1.大小最大为4KB,不能存很大的数据
2.http中明文传输,存储在本地,容易被获取
3.cookie如果不设置path之类的,会在每个请求中都携带,增大流量消耗

flash

原理是通过session存取,放到session的一个list里面,session基于用户已经隔离开了.
应用于对临时信息的操作
从某个地方获取设置过得所有值并清除.类似pop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from flask import flash, get_flashed_messages
app.secret_key = 'sfafa'


@app.route('/get')
def get():
data = get_flashed_messages()
print(data)
return str(data)
@app.route('/set')
def set():
flash('啊手动阀')
return 'aafafasfa'

比如:
flash(‘超时错误’, category=’x1’)
get_flashed_messages(category_filter=[‘x1’])
网页view错误后跳转到另外一个网页需要显示详细错误
通过category分类拿去正确的错误信息

请求扩展request-extension

如果一个装饰器每个视图都需要,其实就可以改为中间件实现,flask叫做请求扩展.如登录验证.

请求前before_request

1
2
3
4
5
6
7
@app.before_request
def before_req(*args, **kwargs):
print('asdffffffff')
# 等同于django的process_request
# 需要在其中设置白名单,如果视图在白名单就不执行该扩展
if request.path in 白名单:
return None
  • 例子
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    from flask import session, redirect
    @app.before_request
    def process_request(*args, **kwargs):
    if request.path == '/login':
    return None
    user = session.get('user_info')
    if user:
    return None
    return redirect("/login")
    print('asdffffffff')

    请求后after_request

    1
    2
    3
    4
    @app.after_request
    def process_response(resposne):
    return resposne
    如果request拦截,response还是全部执行,但view不执行,与django的老版中间件类似

error后errorhandler(404)

1
2
3
@app.errorhandler(404)
def error_response(*args, **kwargs):
return "404 not found"

模板中定制方法template_global()

1
2
3
4
5
@app.template_global()
def sc(a1, b1):
return a1 + b1
这样就可以在模板中直接调用函数
使用{ {sdfafa sc(1, 2) } }

第一次请求的时候做的操作before_first_request

1
2
3
4
@app.before_first_request
def axiba(*args, **kwargs):
return None
应用场景在于带动异步健康函数启动监视,如果发现异常截断往后所有的请求,停止服务

中间件middleware

请求之前定制一些操作,之后也定制一些,比如打印请求log,多实例
当请求到来之后,运行run_simple(host, port, self, **options)
执行第三个参数的__call__也就是self()

  • app.__call__()
    1
    2
    3
    4
    5
    def __call__(self, environ, start_response):
    """The WSGI server calls the Flask application object as the
    WSGI application. This calls :meth:`wsgi_app` which can be
    wrapped to applying middleware."""
    return self.wsgi_app(environ, start_response)
  • 中间件:把wsgi_app做一个封装即可在中间加入自己的一些参数变量
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    class Md:
    def __init__(self, old_wsgi_app):
    self.old_wsgi_app = old_wsgi_app

    def __call__(self, environ, start_response):
    print('开始之前')
    ret = self.old_wsgi_app(environ, start_response)
    print('结束之后')
    return ret

    if __name__ == "__main__":
    app.wsgi_app = Md(app.wsgi_app)
    app.run()

蓝图blueprint

一个程序不可能就一个py文件,flask如何实现由多个py文件来实现一个项目

普通架构思路

通过__init__把文件联系到一起

  • 架构
    1
    2
    3
    4
    5
    6
    app.py
    views
    __init__.py
    account.py
    user.py
    order.py
  • init.py
    1
    2
    3
    from flask import Flask
    app = Flask(__name__)
    from . import account, user, order
  • account.py,user.py,order.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    from . import app

    @app.route('account/index')
    def index():
    return 'index'

    @app.route('account/login')
    def login():
    return 'login'

    @app.route('account/logout')
    def logout():
    return 'logout'
  • app.py
    1
    2
    3
    4
    from views import app

    if __name__ == '__main__':
    app.run()

蓝图使用

蓝图相当于帮我们做好了以上架构工作,并且可以指定static、template、prefix等

  • 架构

    1
    2
    3
    4
    5
    6
    7
    8
    app.py
    pro_flask
    __init__.py
    statics
    templates
    views
    account.py
    user.py
  • account.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    from flask import Blueprint, render_template
    account = Blueprint('account', __name__, url_prefix='/account', template_folder='templates')
    @account.route('/index1')
    def index1():
    return render_template('login.html')

    @account.route('/login1')
    def login1():
    return 'login'

    @account.route('/logout1')
    def logout1():
    return 'logout'

  • init.py

    1
    2
    3
    4
    5
    6
    7
    8
    from flask import Flask
    from .views.account import account
    from .views.user import user

    app = Flask(__name__, template_folder='templates', static_folder='statics', static_url_path='/static')

    app.register_blueprint(account)
    app.register_blueprint(user)
  • app.py

    1
    2
    3
    from pro_flask import app
    if __name__ == '__main__':
    app.run()
  • flask专用的蓝图

    1
    2
    3
    @account.before_request
    def process_request(*args, **kwargs):
    print('come on')
分享到