flask信号量

信号量

  • orm mysql增加之前记录一条日志怎么做?
    用装饰器的话为了保证原子性,无法在接口处装饰,必须在orm.save中去装饰,但这样就会很麻烦,需要调用很多接口
    自定义1个sql_save的信号,在save成功的源码中加上信号

部分源码

flask自己并不支持信号,需要安装插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
pip install blinker

# -*- coding: utf-8 -*-
try:
from blinker import Namespace

signals_available = True
except ImportError:
signals_available = False

class Namespace(object):
def signal(self, name, doc=None):
return _FakeSignal(name, doc)

class _FakeSignal(object):
def __init__(self, name, doc=None):
self.name = name
self.__doc__ = doc

def send(self, *args, **kwargs):
pass

def _fail(self, *args, **kwargs):
raise RuntimeError(
"Signalling support is unavailable because the blinker"
" library is not installed."
)

connect = connect_via = connected_to = temporarily_connected_to = _fail
disconnect = _fail
has_receivers_for = receivers_for = _fail
del _fail


# The namespace for code signals. If you are not Flask code, do
# not put signals in here. Create your own namespace instead.
_signals = Namespace()


# Core signals. For usage examples grep the source code or consult
# the API documentation in docs/api.rst as well as docs/signals.rst
template_rendered = _signals.signal("template-rendered")
before_render_template = _signals.signal("before-render-template")
request_started = _signals.signal("request-started")
request_finished = _signals.signal("request-finished")
request_tearing_down = _signals.signal("request-tearing-down")
got_request_exception = _signals.signal("got-request-exception")
appcontext_tearing_down = _signals.signal("appcontext-tearing-down")
appcontext_pushed = _signals.signal("appcontext-pushed")
appcontext_popped = _signals.signal("appcontext-popped")
message_flashed = _signals.signal("message-flashed")

信号量使用

  • 1.信号怎么触发?
    触发信号调用的是send方法 singals.request_started.send()
    1
    2
    3
    def func(*args, **kwargs):
    print('触发信号', args, kwargs)
    signals.request_started.connect(func)
    调用send是在flask运行中内部帮我们调用。
  • 2.信号在哪儿触发?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    app.__call__(environ, start_response)
    ctx = self.request_context(environ)
    error = None
    try:
    try:
    ctx.push()
    response = self.full_dispatch_request()
    except Exception as e:
    error = e
    response = self.handle_exception(e)
    except: # noqa: B001
    error = sys.exc_info()[1]
    raise
    return response(environ, start_response)
    finally:
    if self.should_ignore_error(error):
    error = None
    ctx.auto_pop(error)

    def full_dispatch_request(self):
    """Dispatches the request and on top of that performs request
    pre and postprocessing as well as HTTP exception catching and
    error handling.

    .. versionadded:: 0.7
    """
    self.try_trigger_before_first_request_functions()
    try:
    request_started.send(self)
    rv = self.preprocess_request()
    if rv is None:
    rv = self.dispatch_request()
    except Exception as e:
    rv = self.handle_user_exception(e)
    return self.finalize_request(rv)


    在处理response之前
    1.首先处理before_first_request
    在app对象中存储before_first_request_funcs列表
    def try_trigger_before_first_request_functions(self):
    if self._got_first_request:
    return
    with self._before_request_lock:
    if self._got_first_request:
    return
    for func in self.before_first_request_funcs:
    func()
    self._got_first_request = True


    2.执行request_start信号量
    3.执行before_request(process_request)

    def preprocess_request(self):
    bp = _request_ctx_stack.top.request.blueprint

    funcs = self.url_value_preprocessors.get(None, ())
    if bp is not None and bp in self.url_value_preprocessors:
    funcs = chain(funcs, self.url_value_preprocessors[bp])
    for func in funcs:
    func(request.endpoint, request.view_args)

    funcs = self.before_request_funcs.get(None, ())
    if bp is not None and bp in self.before_request_funcs:
    funcs = chain(funcs, self.before_request_funcs[bp])
    for func in funcs:
    rv = func()
    if rv is not None:
    return rv
    如果执行before_request扩展有返回值,直接返回,不再寻找view函数执行。

    4.执行视图函数
    def dispatch_request(self):
    req = _request_ctx_stack.top.request
    if req.routing_exception is not None:
    self.raise_routing_exception(req)
    rule = req.url_rule
    if (
    getattr(rule, "provide_automatic_options", False)
    and req.method == "OPTIONS"
    ):
    return self.make_default_options_response()
    # otherwise dispatch to the handler for that endpoint
    return self.view_functions[rule.endpoint](**req.view_args)

    5.是否执行模板渲染
    如果在view中调用了render,比如from flask import render_template,那么进需要进一步执行render函数。
    def render_template(template_name_or_list, **context):
    ctx = _app_ctx_stack.top
    ctx.app.update_template_context(context)
    return _render(
    ctx.app.jinja_env.get_or_select_template(template_name_or_list),
    context,
    ctx.app,
    )

    def _render(template, context, app):
    """Renders the template and fires the signal"""

    before_render_template.send(app, template=template, context=context)
    rv = template.render(context)
    template_rendered.send(app, template=template, context=context)
    return rv

    在模板渲染之前,先发送信号,之后也发送信号
    6.异常处理response
    如果在获取rv时出现异常,会进入handler_error,在其中执行error的扩展

    7.最终处理response
    获得rv之后,开始最终处理
    def finalize_request(self, rv, from_error_handler=False):
    response = self.make_response(rv)
    try:
    response = self.process_response(response)
    request_finished.send(self, response=response)
    except Exception:
    if not from_error_handler:
    raise
    self.logger.exception(
    "Request finalizing failed with an error while handling an error"
    )
    return response
    首先处理rv,把rv转换为response对象,或者json对象。
    太长了就不贴出来了。
    然后开始执行after_request扩展process_response,并发送request_finished信号


    8.最后调用ctx.auto_pop(error)
    其中执行pop时会执行do_teardown_request(exc)
    def pop(self, exc=_sentinel):
    """Pops the request context and unbinds it by doing that. This will
    also trigger the execution of functions registered by the
    :meth:`~flask.Flask.teardown_request` decorator.

    .. versionchanged:: 0.9
    Added the `exc` argument.
    """
    app_ctx = self._implicit_app_ctx_stack.pop()

    try:
    clear_request = False
    if not self._implicit_app_ctx_stack:
    self.preserved = False
    self._preserved_exc = None
    if exc is _sentinel:
    exc = sys.exc_info()[1]
    self.app.do_teardown_request(exc)
    可以使用@teardown_request无论视图是否成功,都可以执行

自定义信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from blinker import Namespace
my_signals = Namespace()
model_saved = my_signals.signal('model-saved')
# 连接函数
model_saved.connect(func)

# 发送信号 自己定义在哪儿发送
model_saved.send()
这里可以写一个数据库提交的类,在提交时发送信号量

from blinker import Namespace
from sqlalchemy.orm import scoped_session
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import sessionmaker


my_singals = Namespace()
db_commit = my_singals.signal('db-commit')


def save_commit_log(message):
with open('db', 'w') as f:
f.write(message)


db_commit.connect(save_commit_log)


class DB:
def __init__(self, settings):
self.engine = create_engine(settings)
self.session = sessionmaker(self.engine)
self.con = scoped_session(self.session)

def add(self, obj):
self.con.add(obj)

def commit(self):
db_commit.send()
self.con.commit()
分享到