网站首页 > 文章精选 正文
简介:
本来说好写队列的,但是一看LinkedBlockingDeque有不少关于ReentrantLock的东西,所以这篇改成ReentrantLock。
ReentrantLock是一把可重入的锁,其实就是用Java代码实现的锁,而synchronized是底层实现的,说白了就是很多线程去抢锁,失败的就加入队列,排队等前面的唤醒自己。不过,ReentrantLock还提供了可中断功能,排队排的不高兴了可以不排了,而synchronized不行。
UML图:
属性:
Sync是ReentrantLock的内部抽象类,ReentrantLock的公平锁和非公平锁都是Sync的子类。
FairSync的UML图:
NonfairSync的UML图:
可以看到都是间接继承了AbstractQueuedSynchronizer,简称AQS。AQS简单来说就是维护了lock的状态和等待列表。
构造方法:
默认是非公平的。
下面看一下ReentrantLock的方法,这里只分析常用的方法。
非公平锁的lock():
compareAndSetState(0, 1):
//锁的状态
//0:表示无线程使用
//>=1:表示某个线程使用的次数
private volatile int state;
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
expect和stateOffset相等,就会把this的stateOffset更新成update,stateOffset是state的值,state表示锁的状态。
非公平的性质体现出来了,正常来说,等待线程都在一个列表里,前面的线程释放锁之后通知后面的线程来抢锁。此时可能已经有线程在排队,本来快轮到别人去抢锁了,结果非公平锁直接来插队抢锁。
acquire(1):
非公平锁的tryAcquire(arg):
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取当前锁的状态
int c = getState();
if (c == 0) {
//无线程使用就再次尝试抢锁
if (compareAndSetState(0, acquires)) {
//成功记下当前线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//如果当前线程已经抢到锁了,使用次数+1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//把值赋给state
setState(nextc);
return true;
}
return false;
}
tryAcquire()的作用就是,先判断当前是否有线程使用锁,没有就再次尝试抢锁,true是成功,如果失败,准备加入等待队列。
addWaiter(Node.EXCLUSIVE):
//EXCLUSIVE为null表示锁已经被某个线程使用了
static final Node EXCLUSIVE = null;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
//如果已经有节点了,把新节点的前驱指向以前的尾部pred
node.prev = pred;
//用CAS的方法更新tail,把tail更新成node
if (compareAndSetTail(pred, node)) {
//替换成功再把老的tail的next指向新tail
//其实就是一个链表的新增,只不过是线程安全的,用的CAS更新tail
pred.next = node;
return node;
}
}
//直接入列失败,就死循环去插入,其实就是死循环执行上面的步骤
enq(node);
return node;
}
Node(Thread thread, Node mode) {
//注意看这里,别被坑了,nextWaiter = mode跟参数属性是反的,nextWaiter是null
this.nextWaiter = mode;
this.thread = thread;
}
private final boolean compareAndSetTail(Node expect, Node update) {
//如果tailOffset跟expect相等,就把this的tailOffset更新成update
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
第一步先是尝试直接入列,插入队尾。失败了就准备死循环入列:
enq(node):
private Node enq(final Node node) {
//保证把当前node挂到tail后面才跳出循环
for (;;) {
Node t = tail;
if (t == null) {
//tail为null就为head赋一个空节点
if (compareAndSetHead(new Node()))
//把head赋给tail,只有这种情况才会相等
tail = head;
} else {
//插入队尾,步骤前面描述过了
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter(Node.EXCLUSIVE)的作用就是将线程加入等待队列,先尝试直接入列,插入队尾。直接入列失败就死循环入列,保证入列成功。
enq(node)是基于CLH算法的,
CLH算法:单向链表,但是是线程安全的。死循环新增节点,也叫自旋。先把新插入的节点的前驱指向原来的tail,然后以前驱为原始值,用CAS替换掉tail,失败了就不停的自旋。
为什么要为head赋一个空节点呢?因为必须要有tail,哪怕是空节点,新插入的节点的前驱不能指向null,可以指向空节点,然后以前驱的值进行CAS。
其实就是一个链表的新增:
只不过更新tail的时候用的CAS。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg):
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//取node的前驱
final Node p = node.predecessor();
//如果前驱是head,就尝试去抢锁
//因为head代表当前使用锁的线程,而自己是排在下一个,理所当然的尝试抢锁
if (p == head && tryAcquire(arg)) {
//获取锁成功就把当前node赋给head
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
//前驱不是head或者抢锁失败就去判断是否要阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//走到这里,就会中断此线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt():
//取消状态:当前线程不会去抢锁了
static final int CANCELLED = 1;
//会通知状态:当前线程会唤醒自己的后驱,所以此状态的线程的后驱可以休眠
static final int SIGNAL = -1;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取当前线程状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//会通知状态就返回true,可以阻塞
return true;
if (ws > 0) {
//大于0肯定是取消状态,不会通知
//就一直往前找,找到一个能通知自己的,挂在那个节点后面,方便自己休眠有人唤醒自己
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//用CAS把waitStatus改成SIGNAL
//防止pred自己把自己的状态突然改了,这里用CAS
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//只要前驱不是SIGNAL状态固定返回false
//这样下次进来要么前驱是SIGNAL状态
//要么是上面的CAS失败了,那就重新CAS,保证前驱是SIGNAL状态才返回true
return false;
}
private final boolean parkAndCheckInterrupt() {
//阻塞线程,interrupt()也会唤醒
LockSupport.park(this);
//如果被谁唤醒就返回线程的中断状态
//注意interrupted()是会重置线程的中断状态的
return Thread.interrupted();
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的作用就是看有没有轮到自己去抢锁呢,轮到就去抢,轮不到就找个大哥跟着。这个大哥需要关心自己,大哥用完了锁能记得通知自己,因为自己准备去睡觉了。找到了就挂在大哥的后驱,找不到就反复上面的步骤,直到找到大哥或者自己抢到锁。
非公平锁的unlock():
release(1):
public final boolean release(int arg) {
//当前锁数量-1
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//如果head不为null,并且是可以唤醒别的线程的状态
//就去唤醒下一个线程来抢锁
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//state - 1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//把独占线程置为null,此时已无线程使用
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//小于0说明是可通知状态
//马上head就要换人了,所以把head的waitStatus改成0,相当于重置
compareAndSetWaitStatus(node, ws, 0);
//取后驱
Node s = node.next;
//如果后驱为null或者是取消状态
if (s == null || s.waitStatus > 0) {
s = null;
//从tail往前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
//找到最前面的并且会通知的状态的节点
s = t;
}
if (s != null)
//唤醒这个节点
LockSupport.unpark(s.thread);
}
release(1)的作用就是从尾到头遍历,找到最前面的一个不是取消状态的线程,唤醒线程,使其回到acquireQueued()的死循环里面来。
公平锁的lock():
acquire(1):
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//中断当前线程
selfInterrupt();
}
tryAcquire()是跟非公平锁唯一不一样的,看下:
tryAcquire(arg):
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//即便锁当前无线程使用,也要先判断线程优先级
//也就是排在队列的head的下一个才能去抢锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//跟非公平锁一样
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
多了一步hasQueuedPredecessors():
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
详细解释下最后的判断:
1:h != t说明肯定有第二个节点,因为只有为空,头和尾才相等,具体看入列的代码
2:(s = h.next) == null,走到这一步说明有第二个节点,为啥?看下入列的步骤:
node.prev = pred;先赋前驱
if (compareAndSetTail(pred, node)) {再改tail的值
pred.next = node;//最后改后驱
此时应该是走完compareAndSetTail(pred, node)这一步了,但是next还没赋值,这个时候说明这个线程肯定不是自己,因为自己只有两种可能:一种是刚进来,一种是被唤醒的,无论哪种都不可能后驱为null,所以这后驱为null就可以返回true了。
3:s.thread != Thread.currentThread(),走到这一步说明有next,但是不确定是不是自己,所以判断线程是否相等
公平的性质体现出来了,新来的想抢锁,首先要看前面有没人排队。有人排队,即便锁无线程使用,也不能抢锁,得排队,等前面的人叫。排队的代码跟非公平锁一样,就不重复贴了。
公平锁的unlock():
跟非公平锁一样
tryLock():
调的是抽象方法nonfairTryAcquire(),非公平锁里面讲过了
tryLock(long timeout, TimeUnit unit):
tryAcquireNanos(1, unit.toNanos(timeout)):
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取锁,失败就准备休眠
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
tryAcquire()上面都说过了,看下doAcquireNanos:
doAcquireNanos(int arg, long nanosTimeout):
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//算出总共的休眠时间
final long deadline = System.nanoTime() + nanosTimeout;
//先将线程入列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//取前驱
final Node p = node.predecessor();
//前驱是head就尝试去抢锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return true;
}
//算出休眠时间还剩多少时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
//时间到了,还没抢到锁,就返回false
return false;
//找一个可靠的大哥,并且总共的休眠时间还没到
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//休眠固定时间,中途也可以被唤醒
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
//为true说明一直没抢到锁
if (failed)
//最后时间到了还是没有抢到锁,就把当前线程的状态改成取消状态
cancelAcquire(node);
}
}
doAcquireNanos()的作用是先去抢下锁,失败了就判断休眠时间有没有到,没有就休眠,等待别人唤醒或者时间到了再去抢一下。
cancelAcquire(node):
private void cancelAcquire(Node node) {
if (node == null)
return;
//当前节点的线程置为null
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
//一直往前找,直到找到一个waitStatus<=0的,也就是会通知状态的节点
//跳出循环的时候,pred是node前面的第一个会通知状态的节点,node的前驱是pred
node.prev = pred = pred.prev;
Node predNext = pred.next;
//把node的状态改成取消状态
node.waitStatus = Node.CANCELLED;
//如果node是tail节点,并且成功把tail改成pred
if (node == tail && compareAndSetTail(node, pred)) {
//把pred的next置为null
compareAndSetNext(pred, predNext, null);
} else {
//如果此时的node不是最后一个
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//next是当前抢不到锁的后驱
//把pred的后驱改成next
compareAndSetNext(pred, predNext, next);
} else {
//如果是head,就唤醒
unparkSuccessor(node);
}
node.next = node;
}
}
cancelAcquire(node),走到这一步,说明时间到了还没抢到锁,就往前找,找到一个可靠的大哥,把自己的next挂到大哥的后面,多好,自己快被删了,还有照顾自己的小弟。
lockInterruptibly():
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//注意这里
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
其他代码都一样,就一步,多了个抛中断异常的代码。给线程打上中断标识,会抛中断异常,也就是放弃抢锁了。
ReentrantLock里面还是有能学到不少关于保证线程安全的知识的。
下一篇再把Condition看一下。
猜你喜欢
- 2025-01-03 ant design Modal关闭时清除数据的解决方案
- 2025-01-03 Flink状态管理详解:Keyed State和Operator List State深度解析
- 2025-01-03 电气自动化专业词汇中英文对照表
- 2025-01-03 事务相关知识集锦
- 2025-01-03 FlexSim常用脚本语言汇总--搜集最常用
- 2025-01-03 停止javascript的ajax请求,取消axios请求,取消reactfetch请求
- 2025-01-03 MySQL进阶垫脚石:线程长时间处于killed状态怎么破?
- 2025-01-03 ReentrantLock的底层原理
- 2025-01-03 还搞不定 SQL 阻塞与超时?早晚得出事!
- 2025-01-03 细说ReactiveCocoa的冷信号与热信号(二):为什么要区分冷热信号
- 最近发表
- 标签列表
-
- newcoder (56)
- 字符串的长度是指 (45)
- drawcontours()参数说明 (60)
- unsignedshortint (59)
- postman并发请求 (47)
- python列表删除 (50)
- 左程云什么水平 (56)
- 计算机网络的拓扑结构是指() (45)
- 稳压管的稳压区是工作在什么区 (45)
- 编程题 (64)
- postgresql默认端口 (66)
- 数据库的概念模型独立于 (48)
- 产生系统死锁的原因可能是由于 (51)
- 数据库中只存放视图的 (62)
- 在vi中退出不保存的命令是 (53)
- 哪个命令可以将普通用户转换成超级用户 (49)
- noscript标签的作用 (48)
- 联合利华网申 (49)
- swagger和postman (46)
- 结构化程序设计主要强调 (53)
- 172.1 (57)
- apipostwebsocket (47)
- 唯品会后台 (61)
- 简历助手 (56)
- offshow (61)