Semaphore
信号量(Semaphore),是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。
通过使用信号量,我们可以决定某个资源同一时间能够被访问的最大线程数,它相当于对某个资源的访问进行了流量控制。简单来说,它就是一个可以被N个线程占用的排它锁(因此也支持公平和非公平模式),我们可以在最开始设定Semaphore的许可证数量,每个线程都可以获得1个或n个许可证,当许可证耗尽或不足以供其他线程获取时,其他线程将被阻塞。
源码分析
Semaphore类
public class Semaphore implements java.io.Serializable {
// 只有一个Sync同步变量
private final Sync sync;
// Sync继承自AQS,主要逻辑都在这里面
abstract static class Sync extends AbstractQueuedSynchronizer {
// 只有这一个构造方法,需要指定计数器数值
Sync(int permits) {
setState(permits);
}
}
/**
* 非公平锁实现
*/
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
}
/**
* 公平锁实现
*/
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
}
}
Semaphore
跟CountDownLatch
一样,也没有直接继承AQS
,也是采用组合的方式,使用Sync
同步变量实现更新许可的功能,而Sync
同步变量才是真正继承AQS的。Sync
抽象类底层有两个子类实现,分别是公平锁实现FairSync
和非公平锁实现NonfairSync
。
构造方法
// 指定许可数量(默认使用非公平锁)
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 指定许可数量和是否使用公平锁
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire方法
/**
* acquire方法入口
*/
public void acquire() throws InterruptedException {
// 底层调用父类AQS中的acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}
/**
* 父类AQS
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// tryAcquireShared()由子类实现
if (tryAcquireShared(arg) < 0) {
doAcquireSharedInterruptibly(arg);
}
}
// 定义抽象方法,由子类实现
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
}
acquire()
方法里面调用的是父类AQS
中的acquireSharedInterruptibly()
方法,
acquireSharedInterruptibly()
方法又在调用子类Sync
中tryReleaseShared()
方法。
tryReleaseShared()
方法在子类中有两种实现方式,公平锁实现和非公平锁实现。
非公平锁实现
非公平锁的实现逻辑很简单,就是更新许可数量,也就是state值。
/**
* 非公平锁实现
*/
static final class NonfairSync extends Sync {
// 重写父类AQS中的tryAcquireShared方法
@Override
protected int tryAcquireShared(int acquires) {
// 底层调用父类Sync中nonfairTryAcquireShared方法
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();
// 更新剩余许可数量
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
公平锁实现
公平锁实现与非公平锁实现的不同之处,就是在第一步增加了一个判断条件hasQueuedPredecessors()
方法,判断队列中是否有前置节点,如果有前置节点,直接返回不做处理。因为所有任务都需要在队列中排队,队头的节点优先处理,如果有前置节点,就需要优先处理前置节点。
/**
* 公平锁实现
*/
static final class FairSync extends Sync {
// 重写父类AQS中的tryAcquireShared方法
@Override
protected int tryAcquireShared(int acquires) {
for (; ; ) {
// 1. 判断队列中是否有前置节点
if (hasQueuedPredecessors()) {
return -1;
}
int available = getState();
// 2. 更新许可数量
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
release方法
release()
方法底层也是调用父类中releaseShared()
方法,而父类AQS
又需要调用子类Sync
中的具体实现。
/**
* release方法入口
*/
public void release() {
// 底层调用父类AQS中的releaseShared()方法
sync.releaseShared(1);
}
/**
* 父类AQS
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final boolean releaseShared(int arg) {
// tryReleaseShared()方法由子类实现
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 定义抽象方法,由子类实现
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
}
子类Sync
只需要实现tryReleaseShared()
方法即可,而tryReleaseShared()
方法的作用就是更新许可数量,公平锁和非公平锁用的都是同一个方法实现。
/**
* 子类Sync
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
// 实现父类AQS中的tryAcquireShared()方法
@Override
protected final boolean tryReleaseShared(int releases) {
for (; ; ) {
int current = getState();
int next = current + releases;
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
// 更新许可数量
if (compareAndSetState(current, next)) {
return true;
}
}
}
}
drainPermits方法
手动回收掉所有的许可证
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
new Thread(semaphore::acquireUninterruptibly).start();
Thread.sleep(500);
// 直接回收掉剩余的许可证
System.out.println("收回剩余许可数量:"+semaphore.drainPermits());
}
示例
acquire()
方法用来获取许可,只有获取许可之后才能执行任务,如果剩余许可数量为零,会一直阻塞。release()
方法用来释放并归还许可。
- 往线程池中提交10个任务,如果没有使用信号量的话,这10个任务会同时执行。但是示例中的信号量只有3个许可,导致每次只能执行3个任务。从输出结果中执行时间也能看出,每3个任务为一组,这组任务执行完成之后,才能执行下一组。
在这里
Semaphore
就起到了限流的作用,限制资源的访问频率,保证系统稳定运行。
- 输出结果:
java10:10:00.000 [pool-1-thread-1] INFO com.yideng.SemaphoreTest - pool-1-thread-1 执行完成 10:10:00.000 [pool-1-thread-2] INFO com.yideng.SemaphoreTest - pool-1-thread-2 执行完成 10:10:00.000 [pool-1-thread-3] INFO com.yideng.SemaphoreTest - pool-1-thread-3 执行完成 10:10:01.000 [pool-1-thread-4] INFO com.yideng.SemaphoreTest - pool-1-thread-4 执行完成 10:10:01.000 [pool-1-thread-6] INFO com.yideng.SemaphoreTest - pool-1-thread-6 执行完成 10:10:01.000 [pool-1-thread-5] INFO com.yideng.SemaphoreTest - pool-1-thread-5 执行完成 10:00:02.000 [pool-1-thread-7] INFO com.yideng.SemaphoreTest - pool-1-thread-7 执行完成 10:00:02.000 [pool-1-thread-8] INFO com.yideng.SemaphoreTest - pool-1-thread-8 执行完成 10:00:02.000 [pool-1-thread-9] INFO com.yideng.SemaphoreTest - pool-1-thread-9 执行完成 10:00:03.000 [pool-1-thread-10] INFO com.yideng.SemaphoreTest - pool-1-thread-10 执行完成
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
// 1. 创建一个线程池,用来执行10个任务
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 2. 创建一个信号量,许可数量是3
Semaphore semaphore = new Semaphore(3);
// 3. 启动10个任务
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
try {
// 4. 执行任务之前,获取许可
semaphore.acquire();
// 5. 睡眠1秒,模拟任务执行过程
Thread.sleep(1000);
System.out.println("剩余许可证数量:"+semaphore.availablePermits());
System.out.println("是否存在线程等待许可证:"
+(semaphore.hasQueuedThreads() ? "是" : "否"));
System.out.println("等待许可证线程数量:"+semaphore.getQueueLength());
log.info(Thread.currentThread().getName() + " 执行完成");
// 6. 执行任务完成,释放许可
semaphore.release();
} catch (InterruptedException e) {
}
});
}
// 7. 关闭线程池
executorService.shutdown();
}
}