分享好友 最新动态首页 最新动态分类 切换频道
AbstractQueuedSynchronizer源码解析(下)
2024-12-27 05:36

Kafka高级篇知识点

AbstractQueuedSynchronizer源码解析(下)

44个Kafka知识点(基础+进阶+高级)解析如下

由于篇幅有限,小编已将上面介绍的**《Kafka源码解析与实战》、Kafka面试专题解析、复习学习必备44个Kafka知识点(基础+进阶+高级)都整理成册,全部都是PDF文档**

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

  1. 共享式获取同步状态:acquireShared(), doAcquireShared(), setHeadAndPropagate()

  2. 共享式释放同步状态:releaseShared(), doReleaseShared()

这个图总结了 AQS 整体架构的组成,和部分场景的动态流向,图中两个点说明一下,方便大家观看。

  1. AQS 中队列只有两个:同步队列 + 条件队列,底层数据结构两者都是链表(在上一篇我们已经介绍过了

  2. 图中有四种颜色的线代表四种不同的场景,1、2、3 序号代表看的顺序。

AQS 本身就是一套锁的框架,它定义了获得锁和释放锁的代码结构,所以如果要新建锁,只要继承 AQS,并实现相应方法即可。

一、获取锁

========================================================================

1.1 acquire排他锁


获取锁最直观的感受就是使用 Lock.lock () 方法来获得锁,最终目的是想让线程获得对资源的访问权。

Lock 一般是 AQS 的子类,lock 方法根据情况一般会选择调用 AQS 的 acquire 或 tryAcquire 方法。

acquire 方法 AQS 已经实现了,tryAcquire 方法是等待子类去实现。

acquire 方法制定了获取锁的框架,先尝试使用 tryAcquire 方法获取锁,获取不到时,再入同步队列中等待锁。tryAcquire 方法 AQS 中直接抛出一个异常,表明需要子类去实现,子类可以根据同步器的 state 状态来决定是否能够获得锁,接下来我们详细看下 acquire 的源码解析。

acquire 也分两种,一种是排它锁,一种是共享锁,我们一一来看下

// 排他模式下,尝试获得锁

public final void acquire(int arg) {

// tryAcquire方法需要实现类去实现

// 实现思路一般都是 cas 给 state 赋值来决定是否能获得锁

if (!tryAcquire(arg) &&

// acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

// 注意:上面这行代码是两个方法addWaiter和acquireQueued,这两个方法在下面我们会细说

// addWaiter入参代表是排他模式

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

以上代码的主要步骤是(流程见整体架构图中红色场景

  1. 尝试执行一次 tryAcquire,如果成功直接返回,失败走 2

  2. 线程尝试进入同步队列,首先调用 addWaiter 方法,把当前线程放到同步队列的队尾

  3. 接着调用 acquireQueued 方法,两个作用,第一个作用是阻塞当前节点,第二个作用是节点被唤醒时,使其能够获得锁

  4. 如果 2、3 失败了,打断线程。

1.1.2 addWaiter

接下来我们先来看下 addWaiter 的源码实现

方法的主要目的:node追加到同步队列的队尾

// 方法传入的参数 mode 表示 当前线程的节点

// return node 表示新增的node

private Node addWaiter(Node mode) {

// 初始化node

Node node = new Node(Thread.currentThread(), mode);

// 这里的逻辑与 enq 方法的逻辑一样,只不过 enq 增加对队尾判空的操作

// 当前节点的前置节点为tail

Node pred = tail;

if (pred != null) {

node.prev = pred;

// 以CAS的方式去设置尾节点 compareAndSetTail

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

// 有可能CAS会失败,因为存在竞争,所以进入 enq 环节

// enq 自旋保证node加入队尾

enq(node);

return node;

}

// 线程加入同步队列队尾的方法

// 这里注意一下,返回值是添加 node 的前一个节点

private Node enq(final Node node) {

// 自旋,直到成功为止,或者直到放弃为止

for (;😉 {

// 得到队尾节点

Node t = tail;

// 如果队尾为空,则说明当前同步队列没有进行初始化,进行初始化

if (t == null) {

if (compareAndSetHead(new Node()))

tail = head;

// 如果队尾不为空,则将当前节点追加到队尾

} else {

node.prev = t;

// node追加到队尾

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

总结:就是把新的节点追加到同步队列的队尾。

1.1.3 acquireQueued

下一步就是要阻塞当前线程了,是 acquireQueued 方法来实现的,即队列中的节点什么时候阻塞,什么时候唤醒由 acquireQueued 去决定,我们来看下源码实现

这个方法主要做了两件事

  1. 通过不断的自旋尝试使自己的前一个节点的状态变成signal状态,然后阻塞自己

  2. 获得锁的线程执行完毕之后,释放锁时,会把阻塞的 node 唤醒,node 唤醒之后再次自旋,尝试获得锁

  3. 返回 false 表示获得锁成功,返回 true 则表示失败

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

// 自旋

for (;😉 {

// 选择上一个节点,p 代表前置结点

final Node p = node.predecessor();

// 有两种情况会走到 p == head

// 1. node之前没有获得锁,进入 acquireQueued 方法时,才发现它的前置节点就是头节点,于是尝试获得一次锁

// 2. node之前一直在阻塞沉睡,然后被唤醒,此时唤醒 node 的节点正是其前一个节点,也能走到if

// 如果自己 tryAcquire (尝试抢锁) 成功,就立刻讲自己设置成为 head,并且把上一个节点移除

// 如果自己 tryAcquire 失败,尝试进入同步队列

if (p == head && tryAcquire(arg)) {

// 获得锁,将自己设置成 head 节点

setHead(node);

// p 被回收

p.next = null;

failed = false;

return interrupted;

}

// shouldParkAfterFailedAcquire 把 node 的前一个节点设置为signal

// 只要前一个节点的状态是 signal 了,那么自己就可以阻塞了

if (shouldParkAfterFailedAcquire(p, node) &&

// 线程是在这个方法中阻塞的,醒来的时候仍在无限 for 循环里面,就能再次自旋尝试获得锁

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

// 如果获得 node 的锁失败,将 node 从队列中移除

if (failed)

cancelAcquire(node);

}

}

1.1.4 shouldParkAfterFailedAcquire

shouldParkAfterFailedAcquire,这个方法的主要目的就是把前一个节点的状态置为 SIGNAL,只要前一个节点的状态是 SIGNAL,当前节点就可以阻塞了。

// 当前线程可以安心阻塞的标准,就是前一个节点的线程状态是 signal 了

// 传入的参数 pred 表示前一个节点, node 表示当前节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;

// 如果前一个节点 waitStatus 状态已经是 signal 了,直接返回,不需要再自旋了

if (ws == Node.SIGNAL)

return true;

if (ws > 0) {

// 找到前一个状态不是取消的节点,因为把当前 node 挂在有效节点的身上

// 因为节点的状态是取消的话,是无效的,是不能作为 node 的前置节点的,所以必须找到 node 的有效节点才行

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

// 否则直接把节点状态设置为 signal

} else {

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

acquire 整个过程非常长,代码也非常多,但注释很清楚,可以一行一行仔细看看代码。

1.1.5 总结

acquire 方法大致分为三步

  1. 使用 tryAcquire 方法尝试获得锁,获得锁直接返回,获取不到锁的走 2

  2. 把当前线程组装成节点(Node,追加到同步队列的尾部(addWaiter

  3. 自旋,使同步队列中当前节点的前置节点状态为 signal 后,然后阻塞自己。

1.2 acquireShared 获取共享锁


acquireShared 整体流程和 acquire 相同,代码也很相似,重复的源码就不贴了,我们就贴出来不一样的代码来,也方便大家进行比较

  1. 第一处尝试获得锁的地方,有所不同,排它锁使用的是 tryAcquire 方法,共享锁使用的是 tryAcquireShared 方法,如下图
  1. 第二处不同在于节点获得排它锁时,仅仅把自己设置为同步队列的头节点即可(setHead 方法,但如果是共享锁的话,还会去唤醒自己的后续节点,一起来获得该锁(setHeadAndPropagate 方法,不同之处如下(左边排它锁,右边共享锁

1.2.1 setHeadAndPropagate

这个方法主要做了两件事

  1. 把当前节点设置成头节点

  2. 看看后续节点有无正在等待,并且也是共享模式的,有的话唤醒这些节点

private void setHeadAndPropagate(Node node, int propagate) {

Node h = head;

// 当前节点设置成头节点

setHead(node);

// propagate > 0 表示已经有节点获得共享锁了

if (propagate > 0 || h == null || h.waitStatus < 0 ||

(h = head) == null || h.waitStatus < 0) {

Node s = node.next;

// 共享模式,还唤醒头节点的后置节点

if (s == null || s.isShared())

doReleaseShared();

}

}

// 释放后置共享节点

private void doReleaseShared() {

for (;😉 {

Node h = head;

// 还没有到队尾,此时队列中至少有两个节点

if (h != null && h != tail) {

int ws = h.waitStatus;

// 如果队列状态是 SIGNAL ,说明后续节点都需要唤醒

if (ws == Node.SIGNAL) {

// CAS 保证只有一个节点可以运行唤醒的操作

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue;

// 进行唤醒操作

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue;

}

if (h == head)

break;

}

}

这个就是共享锁独特的地方,当一个线程获得锁后,它就会去唤醒排在它后面的其它节点,让其它节点也能够获得锁。

二、释放锁

========================================================================

释放锁的触发时机就是我们常用的 Lock.unLock () 方法,目的就是让线程释放对资源的访问权(流程见整体架构图蓝色路线)。

释放锁也是分为两类,一类是排它锁的释放,一类是共享锁的释放,我们分别来看下。

2.1 释放排它锁


排它锁的释放就比较简单了从队头开始,找它的下一个节点,如果下一个节点是空的,就会从尾开始,一直找到状态不是取消的节点,然后释放该节点,源码如下

2.1.1 release

public final boolean release(int arg) {

// tryRelease 交给实现类去实现,如果返回true则说明成功释放锁

if (tryRelease(arg)) {

Node h = head;

// 头节点不为空 && 非初始化状态

if (h != null && h.waitStatus != 0)

// 从头开始唤醒等待锁的节点

unparkSuccessor(h);

return true;

}

return false;

}

2.1.2 unparkSuccessor

// 当线程释放锁成功后,从 node 开始唤醒同步队列中的节点

// 通过唤醒机制,保证线程不会一直在同步队列中阻塞等待

private void unparkSuccessor(Node node) {

// node节点为当前释放锁的节点,也是同步队列的头结点

int ws = node.waitStatus;

// 如果节点已经被取消了,把节点的状态设置为初始化

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

// 拿出 node 节点的下一个节点

Node s = node.next;

// s 为空,表示 node 的后一个节点为空

// s.waitStatus > 0 表示s节点已经被取消了

// 遇到上面两种情况,就从队尾开始,向前遍历,找到第一个 waitStatus 字段不是被取消的节点

if (s == null || s.waitStatus > 0) {

s = null;

// 这个for循环,从尾部开始迭代

// 主要是因为节点被阻塞的时候,是在acquiredQueued方法里面被阻塞的(上面已经介绍了这个方法

// 所以唤醒的时候也一定是在acquiredQueued方法里面被唤醒

// 唤醒的条件是,判断判断当前节点的前置节点是否为头结点,这里是判断当前节点的前置节点

// 所以这里必须从尾部开始迭代,目的就是过滤无效的前置节点,不然节点被唤醒时,发现前置节点是无效节点的话,就又会陷入阻塞

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

// 唤醒以上代码找到的线程

if (s != null)

LockSupport.unpark(s.thread);

}

2.2 释放共享锁


2.2.1 releaseShared

释放共享锁的方法是 releaseShared,主要分成两步

  1. tryReleaseShared 尝试释放当前共享锁,失败返回 false,成功走 2

  2. 唤醒当前节点的后续阻塞节点,这个方法我们之前看过了,线程在获得共享锁的时候,就会去唤醒其后面的节点,方法名称为:doReleaseShared。

我们一起来看下 releaseShared 的源码

// 共享模式下,释放当前线程的共享锁

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

// 线程在获得共享锁的时候,就会去唤醒其后面的节点

doReleaseShared();

return true;

}

return false;

}

三. 条件队列的一些重要方法

=================================================================================

3.1 为什么需要条件队列


Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对

象关联的锁。Condition对象是由Lock对象创建出来的,换句话说,Condition是依赖Lock对象的。

当调用Condition的await()方法后,当前线程会释放锁并在此等待。而其他线程调用Condition对象的

signal()方法通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

因为并不是所有场景一个同步队列就可以搞定的。

  1. 在遇到锁 + 队列结合的场景时,就需要 Lock + Condition 配合才行,先使用 Lock 来决定哪些线程可以获得锁,哪些线程需要到同步队列里面排队阻塞

  2. 获得锁的多个线程在碰到队列满或者空的时候,可以使用 Condition 来管理这些线程,让这些线程阻塞等待,然后在合适的时机后,被正常唤醒。

同步队列 + 条件队列联手使用的场景,最多被使用到锁 + 队列的场景中。所以说条件队列也是不可或缺的一环。

接下来我们来看一下条件队列一些比较重要的方法,以下方法都在 ConditionObject 内部类中。

3.2 入队列等待 await


获得锁的线程,如果在碰到队列满或空的时候,就会阻塞住,这个阻塞就是用条件队列实现的,这个动作我们叫做入条件队列,方法名称为 await,流程见整体架构图中绿色箭头流向,我们一起来看下 await 的源码

// 线程进入条件队列

public final void await() throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

// 加入条件队列的队尾

Node node = addConditionWaiter();

// 加入队列后,会释放 lock 时所申请的资源,唤醒同步队列的头结点

// fullyRelease 释放资源,自己马上就要阻塞了,所以要马上释放之前 lock 的资源

// 不然自己不被唤醒的话,别的线程永远得不到该共享的资源

long savedState = fullyRelease(node);

int interruptMode = 0;

// 确认 node 不在同步队列上,在阻塞,如果node在同步队列上的话,是不能够上锁的

// 为什么这么做?

// node刚被加入到条件队列,立马就被其他线程唤醒转移到同步队列当中了

// 线程之前在条件队列中沉睡,被唤醒后加入到同步队列当中

while (!isOnSyncQueue(node)) {

// 阻塞在条件队列上

LockSupport.park(this);

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

break;

}

// 其他线程通过 signal 已经把 node 从条件队列中转移到同步队列中

// 所以这里尝试acquireQueued

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if (node.nextWaiter != null)

// 如果状态不是CONDITION,就会自动删除

// CONDITION表示线程正在等待条件

unlinkCancelledWaiters();

if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);

}

await 方法有几点需要特别注意

  1. fullyRelease(node);,节点在准备进入条件队列之前,一定会先释放当前持有的锁,不然自己进去条件队列了,其余的线程都无法获得锁了

  2. (acquireQueued(node, savedState) && interruptMode != THROW_IE),此时节点是被 Condition.signal 或者 signalAll 方法唤醒的,此时节点已经成功的被转移到同步队列中去了(整体架构图中蓝色流程,所以可以直接执行 acquireQueued 方法

  3. Node 在条件队列中的命名,源码喜欢用 Waiter 来命名,所以我们在条件队列中看到 Waiter,其实就是 Node。

await 方法中有两个重要方法:addConditionWaiter 和 unlinkCancelledWaiters,我们一一看下。

3.3 addConditionWaiter


addConditionWaiter 方法主要是把节点放到条件队列中,方法源码如下

// 增加新的 waiter 到队列中,返回新添加的 waiter

// 如果尾节点的状态不是CONDITION状态,删除条件队列中所有状态不是CONDITION的节点

// 如果队列为空,新增的节点作为队列头节点,否则追加到尾节点上

private Node addConditionWaiter() {

Node t = lastWaiter;

// 如果尾节点的状态不是CONDITION状态,删除

if (t != null && t.waitStatus != Node.CONDITION) {

unlinkCancelledWaiters();

t = lastWaiter;

}

// 新建条件队列node

Node node = new Node(Thread.currentThread(), Node.CONDITION);

// 如果是空的,直接放在队列头

if (t == null)

firstWaiter = node;

互联网大厂比较喜欢的人才特点:对技术有热情,强硬的技术基础实力;主动,善于团队协作,善于总结思考。无论是哪家公司,都很重视高并发高可用技术,重视基础,所以千万别小看任何知识。面试是一个双向选择的过程,不要抱着畏惧的心态去面试,不利于自己的发挥。同时看中的应该不止薪资,还要看你是不是真的喜欢这家公司,是不是能真的得到锻炼。其实我写了这么多,只是我自己的总结,并不一定适用于所有人,相信经过一些面试,大家都会有这些感触。

**另外本人还整理收藏了2021年多家公司面试知识点以及各种技术点整理 **

下面有部分截图希望能对大家有所帮助。

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

中所有状态不是CONDITION的节点

// 如果队列为空,新增的节点作为队列头节点,否则追加到尾节点上

private Node addConditionWaiter() {

Node t = lastWaiter;

// 如果尾节点的状态不是CONDITION状态,删除

if (t != null && t.waitStatus != Node.CONDITION) {

unlinkCancelledWaiters();

t = lastWaiter;

}

// 新建条件队列node

Node node = new Node(Thread.currentThread(), Node.CONDITION);

// 如果是空的,直接放在队列头

if (t == null)

firstWaiter = node;

互联网大厂比较喜欢的人才特点:对技术有热情,强硬的技术基础实力;主动,善于团队协作,善于总结思考。无论是哪家公司,都很重视高并发高可用技术,重视基础,所以千万别小看任何知识。面试是一个双向选择的过程,不要抱着畏惧的心态去面试,不利于自己的发挥。同时看中的应该不止薪资,还要看你是不是真的喜欢这家公司,是不是能真的得到锻炼。其实我写了这么多,只是我自己的总结,并不一定适用于所有人,相信经过一些面试,大家都会有这些感触。

**另外本人还整理收藏了2021年多家公司面试知识点以及各种技术点整理 **

下面有部分截图希望能对大家有所帮助。

[外链图片转存中…(img-zAhrcP5B-1715804684962)]

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

最新文章
新奥长期免费资料大全|经典解释落实
  在当代社会,获取知识变得越来越便捷,新奥长期免费资料大全就是这样一个平台,它致力于向公众提供丰富的学习资源,包括但不限于经典文献、研究报告、学术论文等,覆盖各个学科领域。本文将以“新奥长期免费资料大全 | 经典解释落实”
论文ai写作网站有哪些 论文ai写作网站一览
论文ai写作网站有哪些,AI写作网站是一个能够帮助用户更高效地进行创作和写作的工具。用户只需提供一些关键信息,AI就可以自动生成文章的内容,从而大大提升了创作的效率。这对于有时间压力或者灵感不足的用户来说,是一个非常有用的辅助工
梦幻西游最强的宝宝没有之一,神马浮云在它面前都是垃圾
梦幻西游中召唤兽跟随号主南征北战,为自己的武神之战付出汗马功劳,也获得了丰功伟绩;今天咱们就说一说梦幻西游中最强悍而且已经是绝版的召唤兽,绝对是站在武神坛顶端的宝宝了,价值肯定超过最少两百万,神马浮云在它面前就是垃圾,咱们
谷歌adsense广告怎么赚钱?英文网站通过google广告盈利赚美金
想赚美金,又不想太累?那就来做英文内容网站去挂谷歌ADSENSE广告来赚钱吧。如何去做一个高级的英文内容网站,并把它当做一个资产去沉淀和积累,获取大流量后稳稳收益?依内容营销为指导,把网站上每一项内容都做成可以长期沉淀的资产,帮
蚂蚁庄园答案合集(蚂蚁庄园答案大全集结,轻松解锁庄园秘籍)
随着移动互联网的普及,各类手机应用层出不穷。其中,蚂蚁庄园作为一款集娱乐、知识于一体的游戏应用,深受广大用户的喜爱。蚂蚁庄园答案合集(《蚂蚁庄园答案大全集结,轻松解锁庄园秘籍》)正是为了满足用户在庄园游戏中遇到的各种问题而
骛与鹜的区别
描写黄山天都峰的诗句:1、孤峰突兀现青虚,喜若羁人望故都。神马已驰身尚远,却疑真有二文殊。——李弥逊《次韵公显宫教实见天都峰》2、奇险天都著,遥观亦有缘。大雄无与并,苍浑莫之先。倏忽阴晴异,逡巡起伏迁。云腾致雨气,水泻在山泉
最新款手机排行榜前十名(2023年畅销机型排名)
2023年最新款手机排行榜前十名在2023年的手机市场中,各种品牌的新机型层出不穷,竞争激烈,以下是基于销售数据和用户评价,我们列出的2023年最新款手机排行榜前十名:1. iPhone 14 Pro处理器:A16芯片屏幕:6.1英寸 Super Retina XDR 显示
首个机器人与AI共建平台落地亦庄,引爆智能制造新机遇
在科技界,机器人和人工智能的结合被视为未来工业的曙光,而现在,这一曙光似乎正照耀着北京亦庄。12月9日,北京京东数智工业科技有限公司(简称“京东工业”)与科大讯飞股份有限公司(简称“科大讯飞”)在亦庄签署了一项颇具里程碑意义
最官方的淘宝标题优化技巧,关键词拆分与重组
  前面小编给大家分享了一个官方标题优化的方法,今天给大家说说大家一直关心的分词、可拆分词和不可拆分词、标题空格等问题对于标题搜索有什么影响,这里开淘小编特意把最官方的信息收集过来给大家,希望能帮助到你们!  一、分词问题
远程查看对象微信聊天记录软件(如何偷偷监控老婆手机)
远程查看对象微信聊天记录软件(如何偷偷监控老婆手机)   打开微信,点击顶部的搜索框,输入关键词(如联系人名、内容等),直接查找相关聊天记录。翻阅历史聊天记录进入与某人的聊天页面,向上滑动查看历史记录,直到找到需要的信息。聊
相关文章
推荐文章
发表评论
0评