UnDoLog

  • 是 Innodb 存储引擎层生成的日志,实现了事务中的原子性,主要用于事务回滚和 MVCC。

undo log 是一种用于撤销回退的日志。在事务没提交之前,MySQL 会先记录更新前的数据到 undo log 日志文件里面,当事务回滚时,可以利用 undo log 来进行回滚。

一条记录的每一次更新操作产生的 undo log 格式都有一个 roll_pointer 指针和一个 trx_id 事务id:

通过 trx_id 可以知道该记录是被哪个事务修改的;
通过 roll_pointer 指针可以将这些 undo log 串成一个链表,这个链表就被称为版本链。

undo log 还有一个作用,通过 ReadView + undo log 实现 MVCC(多版本并发控制)。

对于「读提交」和「可重复读」隔离级别的事务来说,它们的快照读(普通 select 语句)是通过 Read View + undo log 来实现的,它们的区别在于创建 Read View 的时机不同:

  • 「读提交」隔离级别是在每个 select 都会生成一个新的 Read View,也意味着,事务期间的多次读取同一条数据,前后两次读的数据可能会出现不一致,因为可能这期间另外一个事务修改了该记录,并提交了事务。
  • 「可重复读」隔离级别是启动事务时生成一个 Read View,然后整个事务期间都在用这个 Read View,这样就保证了在事务期间读到的数据都是事务启动前的记录。

这两个隔离级别实现是通过「事务的 Read View 里的字段」和「记录中的两个隐藏列(trx_id 和 roll_pointer)」的比对,如果不满足可见行,就会顺着 undo log 版本链里找到满足其可见性的记录,从而控制并发事务访问同一个记录时的行为,这就叫 MVCC(多版本并发控制)。

因此,undo log 两大作用:

  • 实现事务回滚,保障事务的原子性。 事务处理过程中,如果出现了错误或者用户执 行了 ROLLBACK 语句,MySQL 可以利用 undo log 中的历史数据将数据恢复到事务开始之前的状态。
  • 实现 MVCC(多版本并发控制)关键因素之一。 MVCC 是通过 ReadView + undo log 实现的。undo log 为每条记录保存多份历史数据,MySQL 在执行快照读(普通 select 语句)的时候,会根据事务的 Read View 里的信息,顺着 undo log 的版本链找到满足其可见性的记录。



RedoLog与BufferPool

BufferPool

MySQL 的数据都是存在磁盘中的,那么我们要更新一条记录的时候,得先要从磁盘读取该记录,然后在内存中修改这条记录。那修改完这条记录是选择直接写回到磁盘,还是选择缓存起来呢?

当然是缓存起来好,这样下次有查询语句命中了这条记录,直接读取缓存中的记录,就不需要从磁盘获取数据了。

为此,Innodb 存储引擎设计了一个缓冲池(Buffer Pool),来提高数据库的读写性能。 层级关系是:

Mysql(Server Innodb(BufferPool)) -> 操作系统 -> 磁盘(数据页(数据行))

有了 Buffer Pool 后:

  • 当读取数据时,如果数据存在于 Buffer Pool 中,客户端就会直接读取 Buffer Pool 中的数据,否则再去磁盘中读取。

  • 当修改数据时,如果数据存在于 Buffer Pool 中,那直接修改 Buffer Pool 中数据所在的页,然后将其页设置为脏页(该页的内存数据和磁盘上的数据已经不一致),为了减少磁盘I/O,不会立即将脏页写入磁盘,后续由后台线程选择一个合适的时机将脏页写入到磁盘。

InnoDB 会把存储的数据划分为若干个「页」,以页作为磁盘和内存交互的基本单位,一个页的默认大小为 16KB。因此,Buffer Pool 同样需要按「页」来划分。

在 MySQL 启动的时候,InnoDB 会为 Buffer Pool 申请一片连续的内存空间,然后按照默认的16KB的大小划分出一个个的页, Buffer Pool 中的页就叫做缓存页。此时这些缓存页都是空闲的,之后随着程序的运行,才会有磁盘上的页被缓存到 Buffer Pool 中。

Buffer Pool 除了缓存「索引页」和「数据页」,还包括了 Undo 页,插入缓存、自适应哈希索引、锁信息等等。

Undo 页是记录什么?

开启事务后,InnoDB 层更新记录前,首先要记录相应的 undo log,如果是更新操作,需要把被更新的列的旧值记下来,也就是要生成一条 undo log,undo log 会写入 Buffer Pool 中的 Undo 页面。

查询一条记录,就只需要缓冲一条记录吗?

不是的。

当我们查询一条记录时,InnoDB 是会把整个页的数据加载到 Buffer Pool 中,将页加载到 Buffer Pool 后,再通过页里的「页目录」去定位到某条具体的记录。


RedoLog

  • 是 Innodb 存储引擎层生成的日志,实现了事务中的持久性,主要用于掉电等故障恢复;

为了防止断电导致数据丢失的问题,当有一条记录需要更新的时候,InnoDB 引擎就会先更新内存(同时标记为脏页),然后将本次对这个页的修改以 redo log 的形式记录下来,这个时候更新就算完成了。

后续,InnoDB 引擎会在适当的时候,由后台线程将缓存在 Buffer Pool 的脏页刷新到磁盘里,这就是 WAL (Write-Ahead Logging)技术。
WAL 技术指的是, MySQL 的写操作并不是立刻写到磁盘上,而是先写日志,然后在合适的时间再写到磁盘上。

redo log 和 undo log 区别在哪?

这两种日志是属于 InnoDB 存储引擎的日志,它们的区别在于:

redo log 记录了此次事务「完成后」的数据状态,记录的是更新之后的值;
undo log 记录了此次事务「开始前」的数据状态,记录的是更新之前的值;

所以有了 redo log,再通过 WAL 技术,InnoDB 就可以保证即使数据库发生异常重启,之前已提交的记录都不会丢失,这个能力称为 crash-safe(崩溃恢复)。可以看出来, redo log 保证了事务四大特性中的持久性

写入 redo log 的方式使用了追加操作, 所以磁盘操作是顺序写,而写入数据需要先找到写入位置,然后才写到磁盘,所以磁盘操作是随机写。

至此, 针对为什么需要 redo log 这个问题我们有两个答案:

  • 实现事务的持久性,让 MySQL 有 crash-safe 的能力,能够保证 MySQL 在任何时间段突然崩溃,重启后之前已提交的记录都不会丢失;

  • 将写操作从「随机写」变成了「顺序写」,提升 MySQL 写入磁盘的性能。

产生的 redo log 是直接写入磁盘的吗?

实际上, 执行一个事务的过程中,产生的 redo log 也不是直接写入磁盘的,因为这样会产生大量的 I/O 操作,而且磁盘的运行速度远慢于内存。

所以,redo log 也有自己的缓存—— redo log buffer,每当产生一条 redo log 时,会先写入到 redo log buffer,后续在持久化到磁盘如下图:

redo log 什么时候刷盘?

  • MySQL 正常关闭时;
  • 当 redo log buffer 中记录的写入量大于 redo log buffer 内存空间的一半时,会触发落盘;
  • InnoDB 的后台线程每隔 1 秒,将 redo log buffer 持久化到磁盘。
  • 每次事务提交时都将缓存在 redo log buffer 里的 redo log 直接持久化到磁盘(这个策略可由 innodb_flush_log_at_trx_commit 参数控制,下面会说)。

innodb_flush_log_at_trx_commit 参数控制的是什么?

可取的值有:0、1、2,默认值为 1,这三个值分别代表的策略如下:

  • 当设置该参数为 0 时,表示每次事务提交时 ,还是将 redo log 留在 redo log buffer 中 ,该模式下在事务提交时不会主动触发写入磁盘的操作。每隔一秒,MySQL进程后台会把缓存在 redo log buffer 中的 redo log ,通过调用 write() 写到操作系统的 Page Cache,然后调用 fsync() 持久化到磁盘。所以参数为 0 的策略,MySQL 进程的崩溃会导致上一秒钟所有事务数据的丢失;

  • 当设置该参数为 1 时,表示每次事务提交时,都将缓存在 redo log buffer 里的 redo log 直接持久化到磁盘,这样可以保证 MySQL 异常重启之后数据不会丢失

  • 当设置该参数为 2 时,表示每次事务提交时,都只是缓存在 redo log buffer 里的 redo log 写到 redo log 文件,注意写入到「 redo log 文件」并不意味着写入到了磁盘,因为操作系统的文件系统中有个 Page Cache,Page Cache 是专门用来缓存文件数据的,所以写入「 redo log文件」意味着写入到了操作系统的文件缓存,此种设置下,每隔一秒MYSQL进程,调用 fsync,将缓存在操作系统中 Page Cache 里的 redo log 持久化到磁盘。所以参数为 2 的策略,较取值为 0 情况下更安全,因为 MySQL 进程的崩溃并不会丢失数据,只有在操作系统崩溃或者系统断电的情况下,上一秒钟所有事务数据才可能丢失。

参数性能与安全性的对比情况:

  • 数据安全性:参数 1 > 参数 2 > 参数 0
  • 写入性能:参数 0 > 参数 2> 参数 1

硬盘上存储的 redo log 日志文件不只一个,而是以一个日志文件组的形式出现的,每个的redo日志文件大小都是一样的。比如可以配置为一组4个文件,每个文件的大小是 1GB,整个 redo log 日志文件组可以记录4G的内容。它采用的是环形数组形式,从头开始写,写到末尾又回到头循环写,

在个日志文件组中还有两个重要的属性,分别是 write poscheckpoint

  • write pos 是当前记录的位置,一边写一边后移
  • checkpoint 是当前要擦除的位置,也是往后推移

每次刷盘 redo log 记录到日志文件组中,write pos 位置就会后移更新。每次 MySQL 加载日志文件组恢复数据时,会清空加载过的 redo log 记录,并把 checkpoint 后移更新。write poscheckpoint 之间的还空着的部分可以用来写入新的 redo log 记录。

如果 write pos 追上 checkpoint ,表示日志文件组满了,这时候不能再写入新的 redo log 记录,MySQL 得停下来,清空一些记录,把 checkpoint 推进一下。


BinLog

  • 是 Server 层生成的日志,主要用于数据备份和主从复制;
  • MySQL 在完成一条更新操作后,Server 层还会生成一条 binlog,等之后事务提交的时候,会将该事物执行过程中产生的所有 binlog 统一写 入 binlog 文件。

redo log 和 binlog 有什么区别?

这两个日志有四个区别。

1、适用对象不同:

  • binlog 是 MySQL 的 Server 层实现的日志,所有存储引擎都可以使用;
  • redo log 是 Innodb 存储引擎实现的日志;

2、文件格式不同:

  • binlog 有 3 种格式类型,分别是 STATEMENT(默认格式)、ROW、 MIXED,区别如下:
    • STATEMENT:每一条修改数据的 SQL 都会被记录到 binlog 中(相当于记录了逻辑操作,所以针对这种格式, binlog 可以称为逻辑日志),主从复制中 slave 端再根据 SQL 语句重现。但 STATEMENT 有动态函数的问题,比如你用了 uuid 或者 now 这些函数,你在主库上执行的结果并不是你在从库执行的结果,这种随时在变的函数会导致复制的数据不一致;
    • ROW:记录行数据最终被修改成什么样了(这种格式的日志,就不能称为逻辑日志了),不会出现 STATEMENT 下动态函数的问题。但 ROW 的缺点是每行数据的变化结果都会被记录,比如执行批量 update 语句,更新多少行数据就会产生多少条记录,使 binlog 文件过大,而在 STATEMENT 格式下只会记录一个 update 语句而已;
    • MIXED:包含了 STATEMENT 和 ROW 模式,它会根据不同的情况自动使用 ROW 模式和 STATEMENT 模式;
  • redo log 是物理日志,记录的是在某个数据页做了什么修改,比如对 XXX 表空间中的 YYY 数据页 ZZZ 偏移量的地方做了AAA 更新;

3、写入方式不同:

  • binlog 是追加写,写满一个文件,就创建一个新的文件继续写,不会覆盖以前的日志,保存的是全量的日志。
  • redo log 是循环写,日志空间大小是固定,全部写满就从头开始,保存未被刷入磁盘的脏页日志。

4、用途不同:

  • binlog 用于备份恢复、主从复制;
  • redo log 用于掉电等故障恢复。

如果不小心整个数据库的数据被删除了,能使用 redo log 文件恢复数据吗?

不可以使用 redo log 文件恢复,只能使用 binlog 文件恢复。

因为 redo log 文件是循环写,是会边写边擦除日志的,只记录未被刷入磁盘的数据的物理日志,已经刷入磁盘的数据都会从 redo log 文件里擦除。

binlog 文件保存的是全量的日志,也就是保存了所有数据变更的情况,理论上只要记录在 binlog 上的数据,都可以恢复,所以如果不小心整个数据库的数据被删除了,得用 binlog 文件恢复数据。

主从复制是怎么实现?

主从复制过程

MySQL 的主从复制依赖于 binlog ,也就是记录 MySQL 上的所有变化并以二进制形式保存在磁盘上。复制的过程就是将 binlog 中的数据从主库传输到从库上。

这个过程一般是异步的,也就是主库上执行事务操作的线程不会等待复制 binlog 的线程同步完成。

binlog 什么时候刷盘?

事务执行过程中,先把日志写到 binlog cache(Server 层的 cache),事务提交的时候,再把 binlog cache 写到 binlog 文件中。

一个事务的 binlog 是不能被拆开的,因此无论这个事务有多大(比如有很多条语句),也要保证一次性写入。这是因为有一个线程只能同时有一个事务在执行的设定,所以每当执行一个 begin/start transaction 的时候,就会默认提交上一个事务,这样如果一个事务的 binlog 被拆开的时候,在备库执行就会被当做多个事务分段自行,这样破坏了原子性,是有问题的。

MySQL 给每个线程分配了一片内存用于缓冲 binlog ,该内存叫 binlog cache,参数 binlog_cache_size 用于控制单个线程内 binlog cache 所占内存的大小。如果超过了这个参数规定的大小,就要暂存到磁盘。

什么时候 binlog cache 会写到 binlog 文件?

在事务提交的时候,执行器把 binlog cache 里的完整事务写入到 binlog 文件中,并清空 binlog cache。如下图:

binlogcache

虽然每个线程有自己 binlog cache,但是最终都写到同一个 binlog 文件:

  • 图中的 write,指的就是指把日志写入到 binlog 文件,但是并没有把数据持久化到磁盘,因为数据还缓存在文件系统的 page cache 里,write 的写入速度还是比较快的,因为不涉及磁盘 I/O。
  • 图中的 fsync,才是将数据持久化到磁盘的操作,这里就会涉及磁盘 I/O,所以频繁的 fsync 会导致磁盘的 I/O 升高。
    MySQL提供一个 sync_binlog 参数来控制数据库的 binlog 刷到磁盘上的频率:

  • sync_binlog = 0 的时候,表示每次提交事务都只 write,不 fsync,后续交由操作系统决定何时将数据持久化到磁盘;

  • sync_binlog = 1 的时候,表示每次提交事务都会 write,然后马上执行 fsync;
  • sync_binlog =N(N>1) 的时候,表示每次提交事务都 write,但累积 N 个事务后才 fsync。

在MySQL中系统默认的设置是 sync_binlog = 0,也就是不做任何强制性的磁盘刷新指令,这时候的性能是最好的,但是风险也是最大的。因为一旦主机发生异常重启,还没持久化到磁盘的数据就会丢失。

而当 sync_binlog 设置为 1 的时候,是最安全但是性能损耗最大的设置。因为当设置为 1 的时候,即使主机发生异常重启,最多丢失一个事务的 binlog,而已经持久化到磁盘的数据就不会有影响,不过就是对写入性能影响太大。

如果能容少量事务的 binlog 日志丢失的风险,为了提高写入的性能,一般会 sync_binlog 设置为 100~1000 中的某个数值。

Update语句的执行过程

当优化器分析出成本最小的执行计划后,执行器就按照执行计划开始进行更新操作。

具体更新一条记录 UPDATE t_user SET name = 'xl' WHERE id = 1; 的流程如下:

  • 执行器负责具体执行,会调用存储引擎的接口,通过主键索引树搜索获取 id = 1 这一行记录:
    • 如果 id=1 这一行所在的数据页本来就在 buffer pool 中,就直接返回给执行器更新;
    • 如果记录不在 buffer pool,将数据页从磁盘读入到 buffer pool,返回记录给执行器。
  • 执行器得到聚簇索引记录后,会看一下更新前的记录和更新后的记录是否一样:
    • 如果一样的话就不进行后续更新流程;
    • 如果不一样的话就把更新前的记录和更新后的记录都当作参数传给 InnoDB 层,让 InnoDB 真正的执行更新记录的操作;
  • 开启事务, InnoDB 层更新记录前,首先要记录相应的 undo log,因为这是更新操作,需要把被更新的列的旧值记下来,也就是要生成一条 undo log,undo log 会写入 Buffer Pool 中的 Undo 页面,不过在内存修改该 Undo 页面后,需要记录对应的 redo log。
  • InnoDB 层开始更新记录,会先更新内存(同时标记为脏页),然后将记录写到 redo log 里面,这个时候更新就算完成了。为了减少磁盘I/O,不会立即将脏页写入磁盘,后续由后台线程选择一个合适的时机将脏页写入到磁盘。这就是 WAL 技术,MySQL 的写操作并不是立刻写到磁盘上,而是先写 redo 日志,然后在合适的时间再将修改的行数据写到磁盘上。
  • 至此,一条记录更新完了。
  • 在一条更新语句执行完成后,然后开始记录该语句对应的 binlog,此时记录的 binlog 会被保存到 binlog cache,并没有刷新到硬盘上的 binlog 文件,在事务提交时才会统一将该事务运行过程中的所有 binlog 刷新到硬盘。
  • 事务提交


两阶段提交

如果在将 redo log 刷入到磁盘之后, MySQL 突然宕机了,而 binlog 还没有来得及写入。在主从架构中,binlog 会被复制到从库,由于 binlog 丢失了这条更新语句,从库的这一行 name 字段是旧值 jay,与主库的值不一致性;
如果在将 binlog 刷入到磁盘之后, MySQL 突然宕机了,而 redo log 还没有来得及写入。由于 redo log 还没写,崩溃恢复以后这个事务无效,在主从架构中,binlog 会被复制到从库,从库执行了这条更新语句,那么这一行 name 字段是新值 xiaolin,与主库的值不一致性;

MySQL 为了避免出现两份日志之间的逻辑不一致的问题,使用了「两阶段提交」来解决,两阶段提交把单个事务的提交拆分成了 2 个阶段,分别是「准备(Prepare)阶段」和「提交(Commit)阶段」。

事务的提交过程有两个阶段,就是将 redo log 的写入拆成了两个步骤:prepare 和 commit,中间再穿插写入binlog,具体如下:

  • prepare 阶段:将 XID(内部 XA 事务的 ID) 写入到 redo log,同时将 redo log 对应的事务状态设置为 prepare,然后将 redo log 持久化到磁盘(innodb_flush_log_at_trx_commit = 1 的作用);

  • commit 阶段:把 XID 写入到 binlog,然后将 binlog 持久化到磁盘(sync_binlog = 1 的作用),接着调用引擎的提交事务接口,将 redo log 状态设置为 commit,此时该状态并不需要持久化到磁盘,只需要 write 到文件系统的 page cache 中就够了,因为只要 binlog 写磁盘成功,就算 redo log 的状态还是 prepare 也没有关系,一样会被认为事务已经执行成功;

MySQL 异常重启会出现什么现象?

在 MySQL 重启后会按顺序扫描 redo log 文件,碰到处于 prepare 状态的 redo log,就拿着 redo log 中的 XID 去 binlog 查看是否存在此 XID:

  • 如果 binlog 中没有当前内部 XA 事务的 XID,说明 redolog 完成刷盘,但是 binlog 还没有刷盘,则回滚事务。对应时刻 A 崩溃恢复的情况。
  • 如果 binlog 中有当前内部 XA 事务的 XID,说明 redolog 和 binlog 都已经完成了刷盘,则提交事务。对应时刻 B 崩溃恢复的情况。

可以看到,对于处于 prepare 阶段的 redo log,即可以提交事务,也可以回滚事务,这取决于是否能在 binlog 中查找到与 redo log 相同的 XID,如果有就提交事务,如果没有就回滚事务。这样就可以保证 redo log 和 binlog 这两份日志的一致性了。

所以说,两阶段提交是以 binlog 写成功为事务提交成功的标识,因为 binlog 写成功了,就意味着能在 binlog 中查找到与 redo log 相同的 XID。


两阶段提交有什么问题?

  • 磁盘 I/O 次数高:对于“双1”配置,每个事务提交都会进行两次 fsync(刷盘),一次是 redo log 刷盘,另一次是 binlog 刷盘。
  • 锁竞争激烈:两阶段提交虽然能够保证「单事务」两个日志的内容一致,但在「多事务」的情况下,却不能保证两者的提交顺序一致,因此,在两阶段提交的流程基础上,还需要加一个锁来保证提交的原子性,从而保证多事务的情况下,两个日志的提交顺序一致。

MySQL 引入了 binlog 组提交(group commit)机制,当有多个事务提交的时候,会将多个 binlog 刷盘操作合并成一个,从而减少磁盘 I/O 的次数,如果说 10 个事务依次排队刷盘的时间成本是 10,那么将这 10 个事务一次性一起刷盘的时间成本则近似于 1。

引入了组提交机制后,prepare 阶段不变,只针对 commit 阶段,将 commit 阶段拆分为三个过程:

  • flush 阶段:多个事务按进入的顺序将 binlog 从 cache 写入文件(不刷盘);
  • sync 阶段:对 binlog 文件做 fsync 操作(多个事务的 binlog 合并一次刷盘);
  • commit 阶段:各个事务按顺序做 InnoDB commit 操作;

上面的每个阶段都有一个队列,每个阶段有锁进行保护,因此保证了事务写入的顺序,第一个进入队列的事务会成为 leader,leader领导所在队列的所有事务,全权负责整队的操作,完成后通知队内其他事务操作结束。

有 binlog 组提交,那有 redo log 组提交吗?

这个要看 MySQL 版本,MySQL 5.6 没有 redo log 组提交,MySQL 5.7 有 redo log 组提交。

在 MySQL 5.6 的组提交逻辑中,每个事务各自执行 prepare 阶段,也就是各自将 redo log 刷盘,这样就没办法对 redo log 进行组提交。

所以在 MySQL 5.7 版本中,做了个改进,在 prepare 阶段不再让事务各自执行 redo log 刷盘操作,而是推迟到组提交的 flush 阶段,也就是说 prepare 阶段融合在了 flush 阶段。

这个优化是将 redo log 的刷盘延迟到了 flush 阶段之中,sync 阶段之前。通过延迟写 redo log 的方式,为 redolog 做了一次组写入,这样 binlog 和 redo log 都进行了优化。

Spring事务的实现要点

事务的开始

org.springframework.transaction.support.TransactionTemplate#execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
}
else {
//关键点1:初始化事务状态对象,其中包含初始化事务的内容
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
result = action.doInTransaction(status);
}
catch (RuntimeException ex) {
// Transactional code threw application exception -> rollback
rollbackOnException(status, ex);
throw ex;
}
catch (Error err) {
// Transactional code threw error -> rollback
rollbackOnException(status, err);
throw err;
}
catch (Throwable ex) {
// Transactional code threw unexpected exception -> rollback
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
this.transactionManager.commit(status);
return result;
}
}

org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//关键点2:此处开始一个事务
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}

事务的初始化数据库连接

org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//当前事务中没有创建数据库连接则新创建一个
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
//否则取已经存在的那一个
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
//此处手动将自动提交设置成false用于事务,JDBC标准新连接的自动提交默认是true
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 将当前的数据源对象和当前事务的conn链接等信息 和当前Thread绑定
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}

org.springframework.transaction.support.TransactionSynchronizationManager#bindResource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Bind the given resource for the given key to the current thread.
* @param key the key to bind the value to (usually the resource factory)
* @param value the value to bind (usually the active resource object)
* @throws IllegalStateException if there is already a value bound to the thread
* @see ResourceTransactionManager#getResourceFactory()
*/
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = resources.get();
// set ThreadLocal Map if none found
if (map == null) {
map = new HashMap<Object, Object>();
resources.set(map);
}
Object oldValue = map.put(actualKey, value);
// Transparently suppress a ResourceHolder that was marked as void...
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
if (logger.isTraceEnabled()) {
logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
Thread.currentThread().getName() + "]");
}
}

实际SQL的DAO处数据库连接获取

在事务的业务逻辑中,DAO的操作中最终会走到:Executor中,此时一般默认实现是:(mybatis)
org.apache.ibatis.executor.SimpleExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* Copyright 2009-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ibatis.executor;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.logging.Log;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.transaction.Transaction;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
/**
* @author Clinton Begin
*/
public class SimpleExecutor extends BaseExecutor {
public SimpleExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
} finally {
closeStatement(stmt);
}
}
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>queryCursor(stmt);
}
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
return Collections.emptyList();
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
}

在org.apache.ibatis.executor.SimpleExecutor#prepareStatement方法中会去拿当前的数据库连接
org.apache.ibatis.executor.BaseExecutor#getConnection

1
2
3
4
5
6
7
8
protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}

使用Spring框架管理的情况下默认transaction的实现是:
org.mybatis.spring.transaction.SpringManagedTransaction,其getConn的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* {@inheritDoc}
*/
@Override
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
/**
* Gets a connection from Spring transaction manager and discovers if this
* {@code Transaction} should manage connection or let it to Spring.
* <p>
* It also reads autocommit setting because when using Spring Transaction MyBatis
* thinks that autocommit is always false and will always call commit/rollback
* so we need to no-op that calls.
*/
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"JDBC Connection ["
+ this.connection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}

最终导向了

  • org.springframework.jdbc.datasource.DataSourceUtils#doGetConnection:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    /**
    ...
    */
    public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
    try {
    return doGetConnection(dataSource);
    }
    catch (SQLException ex) {
    throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
    }
    }
    /**
    * Actually obtain a JDBC Connection from the given DataSource.
    * Same as {@link #getConnection}, but throwing the original SQLException.
    * <p>Is aware of a corresponding Connection bound to the current thread, for example
    * when using {@link DataSourceTransactionManager}. Will bind a Connection to the thread
    * if transaction synchronization is active (e.g. if in a JTA transaction).
    * <p>Directly accessed by {@link TransactionAwareDataSourceProxy}.
    * @param dataSource the DataSource to obtain Connections from
    * @return a JDBC Connection from the given DataSource
    * @throws SQLException if thrown by JDBC methods
    * @see #doReleaseConnection
    */
    public static Connection doGetConnection(DataSource dataSource) throws SQLException {
    Assert.notNull(dataSource, "No DataSource specified");
    //注意此时会首先去TransactionSynchronizationManager中拿当前线程中是否已经有针对这个datasource的事务信息,如果有
    //则会返回其中的conn,而这个conn是在org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin中创建初始化的
    //所以,针对org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource实现的业务动态数据源,在事务处理时需要在事务模板的execute方法前进行
    //determineCurrentLookupKey,而不是在dao方法之前进行,因为此时如果dao在事务之中,在executor中执行sql时候拿到的数据源仍然是
    //org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin中获取的DB链接,而这两个的时刻的db路由key可能不一致导致错误
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
    if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
    conHolder.requested();
    if (!conHolder.hasConnection()) {
    logger.debug("Fetching resumed JDBC Connection from DataSource");
    conHolder.setConnection(dataSource.getConnection());
    }
    return conHolder.getConnection();
    }
    // Else we either got no holder or an empty thread-bound holder here.
    logger.debug("Fetching JDBC Connection from DataSource");
    Connection con = dataSource.getConnection();
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
    logger.debug("Registering transaction synchronization for JDBC Connection");
    // Use same Connection for further JDBC actions within the transaction.
    // Thread-bound object will get removed by synchronization at transaction completion.
    ConnectionHolder holderToUse = conHolder;
    if (holderToUse == null) {
    holderToUse = new ConnectionHolder(con);
    }
    else {
    holderToUse.setConnection(con);
    }
    holderToUse.requested();
    TransactionSynchronizationManager.registerSynchronization(
    new ConnectionSynchronization(holderToUse, dataSource));
    holderToUse.setSynchronizedWithTransaction(true);
    if (holderToUse != conHolder) {
    TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
    }
    }
    return con;
    }

Practice Tips

  • 程序员修炼之道Tips集锦,时时细细领会

1、关心你的技艺(Care about your craft)

2、思考你的工作(Think! About your work)

3、提供选择而不是借口(Provide options, Don’t make lame excuses)

4、不要容忍破窗户(Don’t live with broken windows):实时重构,干掉低劣设计、错误决策、糟糕代码,发现一处干掉一处

5、做变化的催化剂(Be a catalyst for change):让人们瞥见未来,人们就会聚集在你周围

6、记住大图景(Remember the big picture):时刻观察周围发生的事情而不是你自己在做的事情

7、使质量称为需求问题(Make quality a requirements issue):现实世界不会让我们制作出完美的产品尤其是不会出错的软件:时间、技术和急躁都在合谋反对我们,不要过度修饰和过度求精而损毁完好的程序,让你的代码凭借自己的质量站立一会,它也许不完美,but don’t worry, 它不可能完美~

8、定期为你的知识资产投资(Invest regularly in your knowledge portfolio)

9、批判地分析你读到的和听到的(Critically analyze what you read and hear)

10、你说什么和你怎么说同样重要(It’s both what you say and the way you say it)

  • 知道你要说什么
  • 了解你的听众
  • 选择时机
  • 选择风格
  • 让文档美观
  • 让听众参加
  • 做倾听者
  • 回复他人

11、不要重复你自己(DRY-Don’t Repeat Yourself)

12、让复用变得容易(Make It Easy to Reuse)

  • 如果不容易大家就不会去复用你的代码

13、消除无关事物之间的影响(Eliminate Effects Between Unrelated Things)

  • 接触组件之间的耦合度
  • 从设计、库、编码与测试方面提高系统组件之间的正交性

14、不存在最终决策(There Are No Final Decisions)

  • 拥抱变化,组织上的亦或系统设计上的

15、用曳光弹找到目标(Use Tracer Bullets to Find the Target)

  • 尽可能快的连接软件服务的端到端来快速发现问题与改进点
  • 敏捷开发 迭代进行

16、为了学习而制作原形(Prototype to Learn)

  • 可用来制作原形的内容:架构、新功能、外部数据的结构或内容、第三方工具或组件、性能问题、用户界面
  • 系统建模的架构图就是一种架构建模

17、靠近问题领域编程(Program Close to the Problem domain)

  • 用用户的语言去进行“设计”与“编码”(业务逻辑的伪代码描述)

18、估算,以避免发生意外(Estimate to Avoid Surprise)

19、通过代码对进度表进行迭代(Iterate the Schedule with the Code)

  • 通过代码开发的实际进度与包含的功能来对进度表重新迭代

20、用纯文本保存知识(Keep Knowledge in Plain Text)

  • 获得自描述的数据流

21、利用命令Shell的力量(Use the Power of Command Shells)

22、用好一种编辑器(Use a Single Editor Well)

23、总是使用源码控制(Always Use Source Code Control)

24、要修正问题,而不是发出指责(Fix the Problem, Not the Blame)

25、不要恐慌(Don’t Panic)

  • 人很容易恐慌,尤其是在最后期限的到来时或正在设法找到bug的原因
  • 实际的故障离你观察的地方可能还有几步远,并且涉及到许多其他的相关事物,要总是设法找到问题的根源而不是问题的特定表现

26、“Select”没有问题(“Select” Isn’t Broken)

  • 总是要先考虑自身的原因

27、不要假定,要证明(Don’t Assume it - Prove It)

28、学习一种文本操纵语言(Learn a Text Manipulation Language)

29、编写能编写代码的代码(Write Code That Writes Code)

  • IDE处理

30、你不可能写出完美的软件(You Can’t Write Perfect Software)

31、通过合约进行设计(Design with Contracts)

32、早崩溃(Crash Early)

  • fail fast

33、如果不可能发生,使用断言确保(If It Can’t Happen, Use Assertions to Ensure That It Won’t)

34、将异常用于异常的问题(Use Exceptions for Exceptional Problems)

35、要有始有终(Finish What You Start)

  • 获取资源的线程应该负责释放这个资源

36、使得模块之间的耦合减至最少(Minimize Coupling Between Modules)

37、要配置,不要集成(Configure, Don’t Integrate)

  • 元数据编程

38、抽象放入代码,细节放入元数据(Put Abstractions in Code, Details in Metadata)

39、分析工作流,以改善并发性(Analyze Workflow to Improve Concurrency)

40、用服务进行设计(Design Using Services)

41、总是为并发进行设计(Always Design for Concurrency)

  • 设计为并发模式,后续的部署等环节灵活性更大,不需要考虑单机多机对服务的影响

42、使视图与模型分离(Separate Views from Models)

43、用黑板协调工作流(Use Blackboards to Coordinate Workflow)

44、不要靠巧合编程(Don’t Program by Coincidence)

45、估算算法的阶(Estimate the Order of Your Algorithms)

46、测试你的估算(Test Your Estimates)

47、早重构,常重构(Refactor Early, Refactor Often)

48、为测试而设计(Design for Test)

49、测试你的软件,否则你的用户就得测试(Test Your Software, or Your Users Will)

50、不要使用你不理解的向导代码(Don’t Use Wizard Code You Don’t Understand)

  • 代码生成器之类

51、不要搜集需求–挖掘它们(Don’t Gather Requirements-Dig for them)

52、与用户一同工作,以像用户一样思考(Work with a User to Think Like a User)

  • 需求文档的一大风险是太具体,好的需求文档会保持抽象

53、抽象比细节活的更长久(Abstractions Live Longer than Details)

54、使用项目词汇表(Use a Project Glossary)

55、不要在盒子外面思考-要找到盒子(Don’t Think Outside the Box - Find the Box)

56、倾听反复出现的疑虑-等你准备好再开始(Listen to Nagging Doubts - Start When You’re Ready)

  • 当面对一项任务时候,如果反复感到疑虑,或是体验到某种勉强,要注意它。给它时间,疑虑可能会结晶成某种更为坚实的东西。

57、对有些事情“做”胜于“描述”(Some Things Are Better Done than Described)

58、不要做形式方法的奴隶(Don’t Be a Slave to Formal Methods)

59、昂贵的工具不一定能制作出更好的设计(Expensive Tools Do Not Produce Better Designs)

60、围绕功能组织团队(Organize Teams Around Functionality)

61、不要使用手工流程(Don’t Use Manual Procedures)

62、早测试,常测试,自动测试(Test Early. Test Often. Test Automatically)

63、要到通过全部测试,编码才算完成(Coding Ain’t Done ‘Til All the Tests Run)

64、通过“蓄意破坏”测试你的测试(Use Saboteurs to Test Your Testing)

65、测试状态覆盖,而不是代码覆盖(Test State Coverage, Not Code Coverage)

66、一个Bug只抓一次(Find Bugs Once)

67、把英语当作又一种编程语言(Treat English as Just Another Programming Language)

68、把文档建在里面,不要拴在外面(Build Documentation In, Don’t Bolt It On)

69、温和地超出用户期望(Gently Exceed Your User’s Expectations)

70、在你的作品上签名(Sign Your Work)