Skip to content

并发编程-多线程基础

创建线程

Java中创建线程主要有以下这几种方式:

  • 定义Thread类的子类,并重写该类的run方法
  • 定义Runnable接口的实现类,并重写该接口的run()方法
  • 定义Callable接口的实现类,并重写该接口的call()方法,一般配合Future使用
  • 线程池

线程状态

线程共有6种状态,分别是NEW(初始化)、RUNNABLE(可运行)、WAITING(等待)、TIMED_WAITING(超时等待)、BLOCKED(阻塞)、TERMINATED(终止)。

  • NEW(初始化)

    表示创建线程对象之后,还没有调用start方法。

  • RUNNABLE(可运行)

    表示调用start方法之后,等待CPU调度。为了便于理解,通常又把RUNNABLE分别RUNNING(运行中)和READY(就绪)。处在RUNNING(运行中)状态的线程可以调用yield方法,让出CPU时间片,然后跟其他处于READY(就绪)一起等待被调度。

  • WAITING(等待)

    处于RUNNABLE状态的线程调用wait方法之后,就处于等待状态,需要其他线程显示地唤醒。

  • TIMED_WAITING(超时等待)

    处于RUNNABLE状态的线程调用wait(long)方法之后,就处于等待状态,需要其他线程显示地唤醒。

  • BLOCKED(阻塞)

    等待进入synchronized方法/代码块,处于阻塞状态。

  • TERMINATED(终止)

    表示线程已经执行结束。

image-20221006232449786.png

线程常用方法

方法定义含义使用方式
public synchronized void start()启动线程MyThread myThread = new MyThread(); myThread.start();
public static native Thread currentThread();获取当前线程实例对象Thread thread = Thread.currentThread();
public static native void yield();让出CPU时间片Thread.yield();
public static native void sleep(long millis);睡眠指定时间Thread.sleep(1L);
public void interrupt()中断线程MyThread myThread = new MyThread(); myThread.interrupt();
public static boolean interrupted()判断线程是否已中断MyThread myThread = new MyThread(); boolean interrupted = myThread.isInterrupted();
public final native boolean isAlive();判断线程是否是存活状态MyThread myThread = new MyThread(); boolean alive = myThread.isAlive();
public final String getName()获取线程名称MyThread myThread = new MyThread(); String name = myThread.getName();
public State getState()获取线程状态MyThread myThread = new MyThread(); Thread.State state = myThread.getState();
public long getId()获取线程IDMyThread myThread = new MyThread(); long id = myThread.getId();
public final void join()等待其他线程执行完再执行MyThread myThread = new MyThread(); myThread.join();

CAS

CAS的全称为compare and swap,对应到CPU指令为cmpxchg

CAS 有三个操作数:当前值A、内存值V、要修改的新值B

假设 当前值A 跟 内存值V 相等,那就将 内存值V 改成B

假设 当前值A 跟 内存值V 不相等,要么就重试,要么就放弃更新

ABA问题及解决方案

image-20220406101540742

线程1和线程2同时开始对a的值进行CAS修改,但是线程1的速度比较快,将a的值修改为2之后紧接着又修改回1,这时线程2才开始进行判断,发现a的值是1,所以CAS操作成功。

很明显,这里的1已经不是一开始的那个1了,而是被重新赋值的1,这也是CAS操作存在的问题(无锁虽好,但是问题多多),它只会机械地比较当前值是不是预期值,但是并不会关心当前值是否被修改过,这种问题称之为ABA问题。

采用 AtomicStampedReference 解决ABA问题

java
public static void main(String[] args) throws InterruptedException {
    String a = "Hello";
    String b = "World";
    AtomicStampedReference<String> reference = new AtomicStampedReference<>(a, 1);  //在构造时需要指定初始值和对应的版本号
    reference.attemptStamp(a, 2);   //可以中途对版本号进行修改,注意要填写当前的引用对象
    System.out.println(reference.compareAndSet(a, b, 2, 3));   //CAS操作时不仅需要提供预期值和修改值,还要提供预期版本号和新的版本号
}
public static void main(String[] args) throws InterruptedException {
    String a = "Hello";
    String b = "World";
    AtomicStampedReference<String> reference = new AtomicStampedReference<>(a, 1);  //在构造时需要指定初始值和对应的版本号
    reference.attemptStamp(a, 2);   //可以中途对版本号进行修改,注意要填写当前的引用对象
    System.out.println(reference.compareAndSet(a, b, 2, 3));   //CAS操作时不仅需要提供预期值和修改值,还要提供预期版本号和新的版本号
}

Fork/Join框架

  • 使用多线程拆分任务

  • 利用工作窃取算法,提高线程的利用率

    • **工作窃取算法:**是指某个线程从其他队列里窃取任务来执行。一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务待处理。干完活的线程与其等着,不如帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。

    • image-20220406181701396

    • java
      public class Main {
          public static void main(String[] args) throws InterruptedException, ExecutionException {
              ForkJoinPool pool = new ForkJoinPool();
              System.out.println(pool.submit(new SubTask(1, 1000)).get());
          }
      
        	//继承RecursiveTask,这样才可以作为一个任务,泛型就是计算结果类型
          private static class SubTask extends RecursiveTask<Integer> {
              private final int start;   //比如我们要计算一个范围内所有数的和,那么就需要限定一下范围,这里用了两个int存放
              private final int end;
      
              public SubTask(int start, int end) {
                  this.start = start;
                  this.end = end;
              }
      
              @Override
              protected Integer compute() {
                  if(end - start > 125) {    //每个任务最多计算125个数的和,如果大于继续拆分,小于就可以开始算了
                      SubTask subTask1 = new SubTask(start, (end + start) / 2);
                      subTask1.fork();    //会继续划分子任务执行
                      SubTask subTask2 = new SubTask((end + start) / 2 + 1, end);
                      subTask2.fork();   //会继续划分子任务执行
                      return subTask1.join() + subTask2.join();   //越玩越有递归那味了
                  } else {
                      System.out.println(Thread.currentThread().getName()+" 开始计算 "+start+"-"+end+" 的值!");
                      int res = 0;
                      for (int i = start; i <= end; i++) {
                          res += i;
                      }
                      return res;   //返回的结果会作为join的结果
                  }
              }
          }
      }
      public class Main {
          public static void main(String[] args) throws InterruptedException, ExecutionException {
              ForkJoinPool pool = new ForkJoinPool();
              System.out.println(pool.submit(new SubTask(1, 1000)).get());
          }
      
        	//继承RecursiveTask,这样才可以作为一个任务,泛型就是计算结果类型
          private static class SubTask extends RecursiveTask<Integer> {
              private final int start;   //比如我们要计算一个范围内所有数的和,那么就需要限定一下范围,这里用了两个int存放
              private final int end;
      
              public SubTask(int start, int end) {
                  this.start = start;
                  this.end = end;
              }
      
              @Override
              protected Integer compute() {
                  if(end - start > 125) {    //每个任务最多计算125个数的和,如果大于继续拆分,小于就可以开始算了
                      SubTask subTask1 = new SubTask(start, (end + start) / 2);
                      subTask1.fork();    //会继续划分子任务执行
                      SubTask subTask2 = new SubTask((end + start) / 2 + 1, end);
                      subTask2.fork();   //会继续划分子任务执行
                      return subTask1.join() + subTask2.join();   //越玩越有递归那味了
                  } else {
                      System.out.println(Thread.currentThread().getName()+" 开始计算 "+start+"-"+end+" 的值!");
                      int res = 0;
                      for (int i = start; i <= end; i++) {
                          res += i;
                      }
                      return res;   //返回的结果会作为join的结果
                  }
              }
          }
      }

Arrays工具类提供的并行排序也是利用了ForkJoinPool来实现:

java
public static void parallelSort(byte[] a) {
    int n = a.length, p, g;
    if (n <= MIN_ARRAY_SORT_GRAN ||
        (p = ForkJoinPool.getCommonPoolParallelism()) == 1)
        DualPivotQuicksort.sort(a, 0, n - 1);
    else
        new ArraysParallelSortHelpers.FJByte.Sorter
            (null, a, new byte[n], 0, n, 0,
             ((g = n / (p << 2)) <= MIN_ARRAY_SORT_GRAN) ?
             MIN_ARRAY_SORT_GRAN : g).invoke();
}
public static void parallelSort(byte[] a) {
    int n = a.length, p, g;
    if (n <= MIN_ARRAY_SORT_GRAN ||
        (p = ForkJoinPool.getCommonPoolParallelism()) == 1)
        DualPivotQuicksort.sort(a, 0, n - 1);
    else
        new ArraysParallelSortHelpers.FJByte.Sorter
            (null, a, new byte[n], 0, n, 0,
             ((g = n / (p << 2)) <= MIN_ARRAY_SORT_GRAN) ?
             MIN_ARRAY_SORT_GRAN : g).invoke();
}

JMM内存模型

image-20220405200130691

JMM(Java Memory Model)内存模型规定如下:

  • 所有的变量全部存储在主内存(注意这里包括下面提到的变量,指的都是会出现竞争的变量,包括成员变量、静态变量等,而局部变量这种属于线程私有,不包括在内)
  • 每条线程有着自己的工作内存(可以类比CPU的高速缓存)线程对变量的所有操作,必须在工作内存中进行,不能直接操作主内存中的数据。
  • 不同线程之间的工作内存相互隔离,如果需要在线程之间传递内容,只能通过主内存完成,无法直接访问对方的工作内存。

也就是说,每一条线程如果要操作主内存中的数据,那么得先拷贝到自己的工作内存中,并对工作内存中数据的副本进行操作,操作完成之后,也需要从工作副本中将结果拷贝回主内存中,具体的操作就是Save(保存)和Load(加载)操作。

具体实现:

  • 主内存:对应堆中存放对象的实例的部分。
  • 工作内存:对应线程的虚拟机栈的部分区域,虚拟机可能会对这部分内存进行优化,将其放在CPU的寄存器或是高速缓存中。比如在访问数组时,由于数组是一段连续的内存空间,所以可以将一部分连续空间放入到CPU高速缓存中,那么之后如果我们顺序读取这个数组,那么大概率会直接缓存命中。

重排序原理

在编译或执行时,为了优化程序的执行效率,编译器或处理器常常会对指令进行重排序,有以下情况:

  1. 编译器重排序:Java编译器通过对Java代码语义的理解,根据优化规则对代码指令进行重排序。
  2. 机器指令级别的重排序:现代处理器很高级,能够自主判断和变更机器指令的执行顺序。

指令重排序能够在不改变结果(单线程)的情况下,优化程序的运行效率

happens-before原则

JMM提出了happens-before(先行发生)原则,定义一些禁止编译优化的场景,来向各位程序员做一些保证,只要我们是按照原则进行编程,那么就能够保持并发编程的正确性。具体如下:

  • **程序次序规则:**同一个线程中,按照程序的顺序,前面的操作happens-before后续的任何操作。
    • 同一个线程内,代码的执行结果是有序的。其实就是,可能会发生指令重排,但是保证代码的执行结果一定是和按照顺序执行得到的一致,程序前面对某一个变量的修改一定对后续操作可见的,不可能会出现前面才把a修改为1,接着读a居然是修改前的结果,这也是程序运行最基本的要求。
  • **监视器锁规则:**对一个锁的解锁操作,happens-before后续对这个锁的加锁操作。
    • 就是无论是在单线程环境还是多线程环境,对于同一个锁来说,一个线程对这个锁解锁之后,另一个线程获取了这个锁都能看到前一个线程的操作结果。比如前一个线程将变量x的值修改为了12并解锁,之后另一个线程拿到了这把锁,对之前线程的操作是可见的,可以得到x是前一个线程修改后的结果12(所以synchronized是有happens-before规则的)
  • **volatile变量规则:**对一个volatile变量的写操作happens-before后续对这个变量的读操作。
    • 就是如果一个线程先去写一个volatile变量,紧接着另一个线程去读这个变量,那么这个写操作的结果一定对读的这个变量的线程可见。
  • **线程启动规则:**主线程A启动线程B,线程B中可以看到主线程启动B之前的操作。
    • 在主线程A执行过程中,启动子线程B,那么线程A在启动子线程B之前对共享变量的修改结果对线程B可见。
  • **线程加入规则:**如果线程A执行操作join()线程B并成功返回,那么线程B中的任意操作happens-before线程Ajoin()操作成功返回。
  • **传递性规则:**如果A happens-before B,B happens-before C,那么A happens-before C。

从happens-before原则的角度,来解释一下下面的程序结果:

java
public class Main {
    private static int a = 0;
  	private static int b = 0;
    public static void main(String[] args) {
        a = 10;
        b = a + 1;
        new Thread(() -> {
          if(b > 10) System.out.println(a); 
        }).start();
    }
}
public class Main {
    private static int a = 0;
  	private static int b = 0;
    public static void main(String[] args) {
        a = 10;
        b = a + 1;
        new Thread(() -> {
          if(b > 10) System.out.println(a); 
        }).start();
    }
}

我们定义以上出现的操作:

  • **A:**将变量a的值修改为10
  • **B:**将变量b的值修改为a + 1
  • **C:**主线程启动了一个新的线程,并在新的线程中获取b,进行判断,如果大于10那么就打印a

我们来分析,由于是同一个线程,并且B是一个赋值操作且读取了A,那么按照程序次序规则,A happens-before B,接着在B之后,马上执行了C,按照线程启动规则,在新的线程启动之前,当前线程之前的所有操作对新的线程是可见的,所以 B happens-before C,最后根据传递性规则,由于A happens-before B,B happens-before C,所以A happens-before C,因此在新的线程中会输出a修改后的结果10