停机部署
1.挂牌停机通告
2.通过写的脚本(直接按游标取出数据进行迁移)按照规定逻辑进行分库分表(也可以使用工具)
3.重写中间件按照逻辑访问数据库
4.上线
- 优点
- 比较容易,不用考虑这时的用户操作,无脑迁移数据
- 缺点
- 系统必须停机,客户体验差
- 必须在规定时间完成迁移及数据校验
不停机部署
数据双写迁移法
1.老库和新库同时insert/update/delete,然后将老数据批量迁移到新库,最后流量切换到新库并关闭老库读写。
2.迁移完成后进行数据校验,将旧库和新库中的数据进行比对,完全一致则符合预期,如果出现步骤二中的极限不一致情况,则以旧库中的数据为准。
数据不一致分析
老库删除的时候,新库刚好在插入这条数据,导致新库有这条数据老库没有
数据迁移
- 存量数据迁移
直接通过游标加md5取模进行迁移,通过executemany插入入库,迁移2千万的数据大概开销40分钟,写好脚本可以创建test进行测试,test主库插入1百万数据,再分32或64个从库进行迁移测试,可以通过多线程进行并发执行加快速度,也可以慢慢的转移不影响线上服务(也可通过load data infile操作,大致思路为按照分表方法把不同数据导入不同的文本中,然后分别通过脚本进行load data infile也很快)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77# python数据迁移伪代码
# _*_ coding: utf-8 _*_
__author__ = '春江花月夜o'
import time
import pymysql
from hashlib import md5
from pymysql.cursors import DictCursor
# 这里使用虚拟机测试数据库进行测试
con = pymysql.connect(host='10.167.218.136', port=3306, db='db01', user='root', password='xxxx')
cursor = con.cursor(DictCursor)
# 确定游标阈值和executemany的阈值
threshold1 = 10000
threshold2 = 5000
# 申请不同的列表存放各个分表数据
for i in range(0, 32):
exec('a_' + str(i+1) + '=[]')
fetch_sql = """select * from test order by id limit {} offset {}"""
insert_sql = """insert into {} (name, addTime) values(%s, %s)"""
# 获取逻辑分表号(按32个表进行分表,用name作为md5取值)
def get_hash_val(value):
h = md5()
h.update(value.encode('utf-8'))
return int(h.hexdigest(), 16) % 32 + 1
current_time = time.time()
flag = True
skip = 0
count = 0
while flag:
count += 1
sql = fetch_sql.format(threshold1, skip)
cursor.execute(sql)
vals = cursor.fetchall()
con.commit()
for val in vals:
hash_key = str(get_hash_val(val['name']))
exec('a_' + hash_key + '.append(list(val.values())[1:])')
if len(eval('a_' + hash_key)) >= threshold2:
cursor.executemany(query=insert_sql.format('test_{}'.format(hash_key)), args=eval('a_' + hash_key))
con.commit()
exec('a_' + hash_key + '=[]')
if not count % 10:
print('已迁移%d条数据' % (count*threshold1))
if len(vals) < threshold1:
flag = False
skip += threshold1
for i in range(0, 32):
if len(eval('a_' + str(i+1))) != 0:
cursor.executemany(insert_sql.format('test_{}'.format(str(i+1))), args=eval('a_' + str(i+1)))
total_time = time.time() - current_time
print(total_time)
# 测试结果
已迁移100000条数据
已迁移200000条数据
已迁移300000条数据
已迁移400000条数据
已迁移500000条数据
已迁移600000条数据
已迁移700000条数据
已迁移800000条数据
已迁移900000条数据
已迁移1000000条数据
126.02490472793579 - 增量数据迁移
先更新旧库数据,然后更新新库数据,注意保持数据唯一性避免数据重复即可,这里即在完成插入更新旧库后加sql更新新库(或者通过异步消息队列更新新建消费者)。
数据迁移过程中的修改操作分别讨论:
- 1.假设迁移过程中进行了一个双insert操作,旧库新库都插入了数据,数据一致性没有被破坏
- 2.假设迁移过程中进行了一个双delete操作,这又分为两种情况
- 2.1假设这delete的数据属于[min,now]范围,即已经完成迁移,则旧库新库都删除了数据,数据一致性没有被破坏
- 2.2假设这delete的数据属于[now,max]范围,即未完成迁移,则旧库中删除操作的affect rows为1,新库中删除操作的affect rows为0,但是数据迁移工具在后续数据迁移中,并不会将这条旧库中被删除的数据迁移到新库中,所以数据一致性仍没有被破坏
- 3.假设迁移过程中进行了一个双update操作,可以认为update操作是一个delete加一个insert操作的复合操作,所以数据仍然是一致的
数据校验
- 如果使用load data infile的思路,当数据导入到每张新表后,直接再拉取新表数据到文本中,然后分别校验100个新老文件的MD5值,若都相同,则存量数据一致性没问题。
- 如果使用脚本更新,最好的校验还是上面那种文件md5方法,由于我的数据库的sn和时间具有唯一性,而我按照sn区别分表,所以可以通知分表sn和主表sn的个数即可确定数据是否一致。
如何确保一致性
1、对于数据迁移脚本迁移的数据一致性校验:数据根据uid取模从老表中分别存到100个文本文件中,当数据导入到每张新表后,直接再拉取新表数据到文本中,然后分别校验100个新老文件的MD5值,若都相同,则存量数据一致性没问题。
2、对于数据库双写保证数据一致性校验:在老表执行成功后才会旁路发hippo(公司内部的消息队列组件)操作新表,hippo消息队列做消息的可靠性保证,新表操作如果失败通过monitor上报告警、error日志记录来保证一致性,如果没有告警和error日志,则可以认为双写数据的一致性。
3、数据校验工具校验一致性:当存量数据导入新库之后,这是就通过数据校验工具比对新老库数据的一致性。为了不影响现网服务,数据校验比对拉的是老表的从库。当所有数据多次都比对一致后,此时新老库的数据就完全一致。
上线切换
- 当数据校验完成后,在需要修改的业务逻辑视图中进行逻辑修改,判断当前需要进入使用哪个分表,也可以通过中间件进行判断,如果访问此类接口时,需要先获得分表然后进行下一步逻辑,或者在进入sql时进行判断,最后停用旧表。
选择思路
1、引入开源的数据库中间件如mycat、sharding-sphere等。但是引入第三方组件,
增大开发成本,后续维护成本。
2、停服迁移数据方案。该方案要停机,这样会影响到用户使用。
综上,最终选择不引入第三方组件,数据双写不停服的方案。
思路分析
- 1.首先按照流量计算需要分多少个表,可以用多久
- 2.分表按照什么方式,有多种方式id、hash、时间、地域等,选择业务最常用的方式进行分表,以达到对业务最好的契合度。
- 3.进行分表的方法选择,规划时间跨度
- 4.开发相应的脚本插件
- 5.进行测试、校验
- 6.开始分表
- 7.上线切换
分表带来的问题
1 事务问题
解决事务问题目前有两种可行的方案:分布式事务和通过应用程序与数据库共同控制实现事务下面对两套方案进行一个简单的对比。
- 方案一:使用分布式事务
优点: 交由数据库管理,简单有效
缺点:性能代价高,特别是shard越来越多时
分布式事务解决方案
按照理解概述一个解决思路1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45以支付系统为例:A给B转账(A用户在A系统的数据库,B用户在B系统)
1.从数据库中扣除A的转账
2.A系统中提前建立消息表,往消息表插入这条事务信息A转账到哪个系统的哪个用户转了多少。
3.A系统异步脚本轮询消息表,如果有消息则加入到mq中,然后消息表把该消息置为已处理
4.每个系统都有消费者消费mq中消息,B系统发现mq中的该消息指向自己,取出进行消费
5.要是B系统执行失败了,这里需要做最大努力尝试,给B一个反复调用的次数,
如果调用一致失败就把失败消息发进mq指向A系统,告诉A,xxx事务执行失败请回滚。
A从消息表取出这条事务回滚,并删除消息表的这条消息。
6.如果B成功了也发送mq告诉A,A进行消息表清理,B进行短信、邮件等的实时发送和异步发送业务。(实时发送失败则异步)
这里我只是进行了分表操作,并没有跨库跨系统,分表后还是在同一个数据库,
并不属于分布式事务
调用的时候直接
在同一事务里执行两个sql,不对就rollback,pooleddb
def execute_transaction(self, operations):
"""
执行一系列sql语句组成的事务。
:param operations: list,
操作列表,如[{'sql': 'UPDATE ...', 'params':[[s1, s2], [s1, s2]]}]
"""
conn = None
cursor = None
index = None
try:
conn = self.pool.connection()
cursor = conn.cursor()
for index, d in enumerate(operations):
cursor.executemany(d.get('sql'), d.get('params'))
conn.commit()
return True
except Exception as e:
self.log('[{}] meet error'.format(
','.join([d.get('sql') for d in operations]) if index is None else operations[index].get('sql'))
)
self.log(e)
conn.rollback()
return False
finally:
if cursor:
cursor.close()
if conn:
conn.commit()
conn.close()
- 方案二:由应用程序和数据库共同控制
原理:将一个跨多个数据库的分布式事务分拆成多个仅处 于单个数据库上面的小事务,并通过应用程序来总控 各个小事务。
优点:性能上有优势
缺点:需要应用程序在事务控制上做灵活设计。如果使用 了spring的事务管理,改动起来会面临一定的困难。
2 跨节点Join的问题
只要是进行切分,跨节点Join的问题是不可避免的。但是良好的设计和切分却可以减少此类情况的发生。解决这一问题的普遍做法是分两次查询实现。在第一次查询的结果集中找出关联数据的id,根据这些id发起第二次请求得到关联数据。
3 跨节点的count,order by,group by以及聚合函数问题
这些是一类问题,因为它们都需要基于全部数据集合进行计算。多数的代理都不会自动处理合并工作。解决方案:与解决跨节点join问题的类似,分别在各个节点上得到结果后在应用程序端进行合并。和join不同的是每个结点的查询可以并行执行,因此很多时候它的速度要比单一大表快很多。但如果结果集很大,对应用程序内存的消耗是一个问题。
4 数据迁移,容量规划,扩容等问题
来自淘宝综合业务平台团队,它利用对2的倍数取余具有向前兼容的特性(如对4取余得1的数对2取余也是1)来分配数据,避免了行级别的数据迁移,但是依然需要进行表级别的迁移,同时对扩容规模和分表数量都有限制。总得来说,这些方案都不是十分的理想,多多少少都存在一些缺点,这也从一个侧面反映出了Sharding扩容的难度。
实际问题
1.id分表后的显示问题
- 把每个库的id加上库前缀显示到页面。
2.按时间查找问题
- 每个分表查询然后联合
3.join问题
- 把join切割成多个查询,然后在业务代码中拼接