目录
一、什么是JUC二、基本知识2.1、进程和线程2.2、Java默认有两个进程2.3、Java能够开启线程吗?2.4、并发和并行2.5、线程的状态2.6、wait和sleep的区别2.7、什么是可重入锁2.8、synchronized买票案例回顾
三、Lock锁3.1、简介3.2、买票问题重现3.3、和Synchronized的区别
四、生产者消费者问题(Lock版)4.1、Synchronized实现虚假唤醒问题
4.2、Lock实现Condition精准通知唤醒
五、8个案例彻底理解锁的对象六、安全集合类6.1、CopyOnWriteArrayListArrayList不安全引入CopyOnWriteArrayList
6.2、CopyOnWriteArraySetHashSet不安全引入CopyOnWriteArraySet
6.3、ConcurrentHashMapHashMap不安全引入ConcurrentHashMap
七、Callable7.1、同Runnable的区别7.2、怎么实现
八、三大常用辅助类8.1、CountDownLatch8.2、CyclicBarrier8.3、Semaphore
九、读写锁十、阻塞队列10.1、关系图10.2、ArrayBlockingQueue四组API抛出异常(add、remove、element)不跑出异常(offer、poll、pick)等待阻塞(put、take)限时等待()
10.3、SynchronousQueue
十一、线程池11.1、创建线程池三大方法newSingleThreadExecutornewFixedThreadPoolnewCachedThreadPool
11.2、七大参数11.3、四种拒绝策略11.4、最大线程数怎么定义?CPU密集型IO密集型
十二、四大函数式接口什么是函数式接口12.1、Function12.2、Predicate12.3、Consumer12.4、Supplier
十三、Stream流式计算13.1、什么是Stream流式计算13.2、案例测试
十四、ForkJoin14.1、什么是ForkJoin14.2、ForkJoin的特点14.3、案例:计算求和任务
十五、异步回调十六、理解JMM&Volatile16.1、请你谈谈对Volatile的理解16.2、什么是JMM16.3、Volatile特点1. 保证可见性2. 不保证原子性3. 禁止指令重排
十七、彻底玩转单例模式17.1、饿汉式17.2、懒汉式17.3、双重检测锁式17.4、静态内部类式反射破坏单例模式17.5、枚举单例
十八、深入理解CAS18.1、什么是CAS18.2、ABA问题
十九、各种锁的理解19.1、乐观锁/悲观锁悲观锁(Pessimistic Lock)乐观锁(Optimistic Locking)
19.2、公平/非公平锁19.3、可重入锁19.4、自旋锁19.5、死锁什么是死锁?死锁问题排查
参考:
狂神说Java_JUC并发编程最新版通俗易懂什么是乐观锁什么是悲观锁
一、什么是JUC
JUC就是java.util.concurrent工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。
二、基本知识
2.1、进程和线程
进程是程序的一次执行过程,包含进程控制块、程序段、数据三部分
1️⃣ 动态性
动态性是进程最基本的特征,表现为:由创建而产生,由调度而执行,得不到资源而暂停执行,由撤销而消亡;有一定的生命周期程序只是一组有序的指令集合
2️⃣ 并发性
引入进程的目的就是和其他进程能并发执行程序不能并发执行
3️⃣ 独立性
进程实体是一个能独立运行的基本单位,是系统中独立获得资源和独立调度的基本单位程序不能作为一个独立的单位进行运行
2.2、Java默认有两个进程
Main和GC
2.3、Java能够开启线程吗?
不行,Java是通过native本地方法调底层C++写的方法,Java无法直接操作硬件
2.4、并发和并行
并发:多个事件在同一时间间隔内发生(cpu一核,模拟出来多条线程快速交替运行)并行:多个事件在同一时刻发生(cpu多核,多个线程可以同时执行)
查看cpu的核数
2.5、线程的状态
NEW尚未启动的线程处于此状态RUNNABLE在Java虚拟机中执行的线程处于此状态BLOCKED被阻塞等待监视器锁定的线程处于此状态(IO操作,wait,juc锁定)WAITING正在等待另一个线程执行特定动作的线程处于此状态(sleep,join)TIMED_WAITING正在等待另一个线程执行动作达到指定等待时间的线程处于此状态(sleep,join)TERMINATED已退出的线程处于此状态。
2.6、wait和sleep的区别
sleep不释放锁,wait释放锁来自不同的类:sleep()函数在Thread类中,wait()函数属于Object类使用范围不同:sleep可以在任何地方使用,wait只能使用在同步代码块中
2.7、什么是可重入锁
可重入,就是可以重复获取相同的锁而不会出现死锁;synchronized和ReentrantLock都是可重入的
// 演示可重入锁是什么意思,可重入,就是可以重复获取相同的锁
// synchronized和ReentrantLock都是可重入的
// 可重入降低了编程复杂性
public class WhatReentrant {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
synchronized (this) {
System.out.println("第1次获取锁,这个锁是:" + this);
int index = 1;
while (true) {
synchronized (this) {
System.out.println("第" + (++index) + "次获取锁,这个锁是:" + this);
}
if (index == 10) {
break;
}
}
}
}
}).start();
}
}
import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;
// 演示可重入锁是什么意思
public class WhatReentrant2 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();
System.out.println("第1次获取锁,这个锁是:" + lock);
int index = 1;
while (true) {
try {
lock.lock();
System.out.println("第" + (++index) + "次获取锁,这个锁是:" + lock);
try {
Thread.sleep(new Random().nextInt(200));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (index == 10) {
break;
}
} finally {
lock.unlock();
}
}
} finally {
lock.unlock();
}
}
}).start();
}
}
2.8、synchronized买票案例回顾
真正的开发中,线程只是一个资源类(包含属性),没有任何附属的操作
模拟卖票:
以前我们会将Ticket类继承Runnable接口现在会将Ticket作为一个资源类,里面不添加关于线程的操作
package com.zsr;
public class saleTicket {
public static void main(String[] args) {
//一份资源
Ticket ticket = new Ticket();
//多个线程
new Thread(() -> {
for (int i = 0; i < 30; i++) {
ticket.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
ticket.sale();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 80; i++) {
ticket.sale();
}
}, "C").start();
}
}
//资源类
class Ticket {
//票数
private int number = 100;
public synchronized void sale() {
if (number > 0)
System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余" + number);
}
}
三、Lock锁
3.1、简介
官方文档地址:https://tool.oschina.net/apidocs/apidoc?api=jdk-zh
使用方法:创建锁、加锁、业务代码、解锁
3.2、买票问题重现
我们接下来用Lock锁的方式来解决上述买票问题,Lock接口最常用的实现类就是ReentrantLock可重入锁
可以看到看到ReentrantLock有两个构造方法,可以指定使用 公平锁/非公平锁公平锁:十分公平,先来后到非公平锁:不公平,可以插队
修改Ticket类
//资源类
class Ticket2 {
//票数
private int number = 100;
//Lock锁
Lock lock = new ReentrantLock();
public void sale() {
lock.lock();//加锁
try {
if (number > 0)
System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余" + number);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();//解锁
}
}
}
3.3、和Synchronized的区别
Synchronized是内置的关键字,Lock是一个Java类
Synchronized无法判断锁的状态,Lock可以判断是否获取到了锁
Synchronized会自动释放锁,Lock需要手动释放锁(如果不释放锁,会造成死锁)
假如有两个线程:线程1、线程2;线程1获得了锁
Synchronized:如果线程1阻塞了,线程2就会一直等待,造成死锁
Lock:如果线程1阻塞了,线程2不会一直等待,可以通过trylock()方法尝试获取锁
两者都是可重入锁,但是Synchronized不可中断,为非公平锁;而Lock锁可判断锁状态,并且可以设置为公平锁/非公平锁
Synchronized适合锁少量代码同步代码,Lock适合锁大量代码同步代码
四、生产者消费者问题(Lock版)
生产者消费者——线程之间的通信问题
通过Synchronized实现,我们常用 object.wait() + Object.notify()通过Lock怎么实现呢,用condition.await() + condition.signal()
4.1、Synchronized实现
两个线程A、B实现:
如果number不等于0,则number-1;如果number等于0,则number+1
package com.zsr;
public class communicate {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
class Data {
private int number = 0;
//如果number=0,则number+1
public synchronized void plus() throws InterruptedException {
if (number != 0)
this.wait();//等待
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();//唤醒其他线程
}
//如果number!=0,则number-1
public synchronized void minor() throws InterruptedException {
if (number == 0)
this.wait();
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();//唤醒其他线程
}
}
结果:
虚假唤醒问题
如果再加两个线程呢?
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
看结果,会出现2、3的情况;这就是因为if判断只判断一次,两个线程可能同时+1;造成了虚假唤醒的问题 修改代码:if判断改成while判断
class Data {
private int number = 0;
//如果number=0,则number+1
public synchronized void plus() throws InterruptedException {
while (number != 0)
this.wait();//等待
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();//唤醒其他线程
}
//如果number!=0,则number-1
public synchronized void minor() throws InterruptedException {
while (number == 0)
this.wait();
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();//唤醒其他线程
}
}
4.2、Lock实现
修改代码:
package com.zsr;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class communicate2 {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class Data2 {
private int number = 0;
//创建Lock锁
private Lock lock = new ReentrantLock();
//获得condition
Condition condition = lock.newCondition();
//如果number=0,则number+1
public void plus() throws InterruptedException {
lock.lock();//加锁
try {
while (number != 0)
condition.await();//等待
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();//唤醒其他线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//解锁
}
}
//如果number!=0,则number-1
public void minor() throws InterruptedException {
lock.lock();
try {
while (number == 0)
condition.await();
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();//唤醒其他线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
根据结果,成功实现!但是发现一个问题,线程的执行都是随机的,怎么进行有序的实现呢?A=>B=>C=>D
Condition精准通知唤醒
可以设置多个condition监视器,每个监视器监视一个线程,精确等待唤醒某个线程
package com.zsr;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class communicate3 {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class Data3 {
private int number = 0;
//创建Lock锁
private Lock lock = new ReentrantLock();
//获得condition
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();
Condition conditionD = lock.newCondition();
//如果number=0,则number+1
public void plus() throws InterruptedException {
lock.lock();//加锁
try {
if (Thread.currentThread().getName()=="A") {
while (number != 0)
conditionA.await();//A等待
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
conditionB.signal();//唤醒B线程
}
if (Thread.currentThread().getName()=="C") {
while (number != 0)
conditionC.await();//C等待
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
conditionD.signal();//唤醒D线程
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//解锁
}
}
//如果number!=0,则number-1
public void minor() throws InterruptedException {
lock.lock();
try {
if (Thread.currentThread().getName()=="B") {
while (number == 0)
conditionB.await();//B等待
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
conditionC.signal();//唤醒C线程
}
if (Thread.currentThread().getName()=="D") {
while (number == 0)
conditionD.await();//D等待
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
conditionA.signal();//唤醒A线程
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
结果:
五、8个案例彻底理解锁的对象
如何判断锁的是谁!永远的知道什么锁,锁到底锁的是谁(对象、 Class)
创建两个线程A、B,A线程执行发短信方法,B线程执行打电话方法,谁先执行?
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
//线程A发消息
new Thread(() -> {
phone.send_message();
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
//线程B打电话
new Thread(() -> {
phone.call();
}, "B").start();
}
}
class Phone {
public synchronized void send_message() {
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
结果:先发短信再打电话
那如果让发短信休眠2s呢?
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.call();
}, "B").start();
}
}
class Phone {
public synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
结果:还是先发短信再打电话 这是为什么呢?并不是因为A先执行,而是有锁的存在
synchronized锁的对象是方法的调用者,也就是phone对象,因此打电话和发短信方法锁的是同一个对象,谁先拿到锁谁就先执行
如果新增一个普通方法hello,那先是hello还是发短信呢?
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.hello();
}, "B").start();
}
}
class Phone {
public synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
public void hello() {
System.out.println("hello");
}
}
根据结果,先执行hello,这是因为hello是一个普通方法,没有锁,不受锁的影响
如果有两个phone对象,是先发短信还是先打电话
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone1.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone {
public synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
两个对象,两个调用者,所以有两把锁,所以按时间顺序执行
修改两个方法为静态方法,只有一个对象,是先打电话还是发短信
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.call();
}, "B").start();
}
}
class Phone {
public static synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
根据结果,先发短信 因为static静态方法在类加载的时候就有了,因此这里synchronized锁的是Class模板,也就是Phone.class,全局唯一;也就是两个方法拿的仍是同一把锁
那如果两个方法为静态方法,有两个对象,是先打电话还是发短信?
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone1.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone {
public static synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
根据结果,仍是先发短信 因为锁的是Phone.class,也就是说两个方法仍是同一把锁
如果是一个普通的同步方法和一个静态的同步方法,只有一个对象
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.call();
}, "B").start();
}
}
class Phone {
public static synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
结果: 一个锁的是Phone.class模板,一个锁的是phone对象,因此两个方法不是一把锁,因此按时间顺序运行
如果是一个普通的同步方法和一个静态的同步方法,有两个对象
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone1.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone {
public static synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
同样两个方法不是一把锁,因此按时间顺序运行
六、安全集合类
6.1、CopyOnWriteArrayList
ArrayList不安全
并发情况下,ArrayList不安全,我们通过一个简单的案例来测试:
package com.zsr.collection;
import java.util.ArrayList;
import java.util.UUID;
public class ListTest {
public static void main(String[] args) {
//并发情况下
ArrayList
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
根据结果,发现报错了ConcurrentModificationException(并发修改异常)
那么怎么解决呢?
方案一:换成线程安全的vector
Vector
方案二:利用Collections工具类
List
方案三:利用JUC包中的类CopyOnWriteArrayList
CopyOnWriteArrayList
引入CopyOnWriteArrayList
CopyOnWrite简称COW,是计算机程序设计领域的一种优化策略多个线程调用的时候,读取的时候固定,但写入时会复制,避免写入造成的数据覆盖问题
其效率比Vector更高,因为Vector在方法上都用了Synchronized关键字,会降低效率
而CopyOnWriteArratList是用Lock锁实现的,底层也是数组实现,不过添加的时候会先拷贝一份新的数组,最后再拷贝回去
6.2、CopyOnWriteArraySet
HashSet不安全
并发情况下,HashSet不安全,我们通过一个简单的案例来测试:
package com.zsr.collection;
import java.util.HashSet;
import java.util.UUID;
public class SetTest {
public static void main(String[] args) {
//并发情况下
HashSet
for (int i = 0; i < 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}
同样,ConcurrentModificationException:并发修改异常 那么怎么解决呢?
方案一:利用Collections工具类
Set
方案三:利用JUC包中的类CopyOnWriteArrayList
CopyOnWriteArraySet
引入CopyOnWriteArraySet
CopyOnWrite简称COW,是计算机程序设计领域的一种优化策略多个线程调用的时候,读取的时候固定,但写入时会复制,避免写入造成的数据覆盖问题
CopyOnWriteArratSet同样是用Lock锁实现的,底层也是数组实现,不过添加的时候会先拷贝一份新的数组,最后再拷贝回去
6.3、ConcurrentHashMap
HashMap不安全
并发情况下,HashMap不安全,我们通过一个简单的案例来测试:
package com.zsr.collection;
import java.util.HashMap;
import java.util.UUID;
public class MapTest {
public static void main(String[] args) {
//并发情况下
HashMap
for (int i = 0; i < 30; i++) {
new Thread(() -> {
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 8));
System.out.println(map);
}, String.valueOf(i)).start();
}
}
}
同样,ConcurrentModificationException:并发修改异常 那么怎么解决呢?
方案一:利用Collections工具类
Map
方案三:利用JUC包中的类ConcurrentHashMap
ConcurrentHashMap
引入ConcurrentHashMap
ConcurrentHashMap融合了hashtable和hashmap二者的优势
但是hashtable每次同步执行的时候都要锁住整个结构。看下图:
ConcurrentHashMap正是为了解决这个问题而诞生的,其锁的方式是稍微细粒度的,引入了分段锁的概念;
可以理解为把一个大的Map拆分成N(默认为16)个小的HashTable,根据key.hashCode()来决定把key放到哪个HashTable中。
在ConcurrentHashMap中,就是把Map分成了N个Segment,put和get的时候,都是现根据key.hashCode()算出放到哪个Segment中:
通过把整个Map分为N个Segment(类似HashTable),可以提供相同的线程安全;原来只能一个线程进入,现在却能同时16个写线程进入(写线程才需要锁定,而读线程几乎不受限制),并发性的提升是显而易见的
七、Callable
7.1、同Runnable的区别
可以有返回值可以抛出异常重写call()方法而不是run()
7.2、怎么实现
实现Runnable接口时,我们通过Thread.start()进行启动,因为Thread的构造方法可以传入Runnable对象 那么怎么实现Callable呢?我们无法直接通过Thread进行启动,但是我们可以通过Runnable间接的启动 查看帮助文档,可以看到Runnable接口有一个FutureTask启动类,我们点进去看看 可以看到,它有两个构造方法,分别可以传入Callable和Runnable对象,这就将两者联系了起来 于事我们就可以通过Thread来启动Callable接口的实现类
package com.zsr;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class dome1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
myThread myThread = new myThread();
FutureTask
new Thread(futureTask).start();
System.out.println(futureTask.get());//获取call方法返回值
}
}
class myThread implements Callable
@Override
public String call() throws Exception {
System.out.println("调用call方法");
return "call方法执行完成";
}
}
注意:
通过FutureTask的get()获取call方法的返回值,该方法可能会产生阻塞(可能返回结果需要大量的计算,很耗时),一般情况下将其放在最后一行或者使用异步通信来处理
FutureTask任务多线程并发访问时为啥只会被执行一次
八、三大常用辅助类
8.1、CountDownLatch
就是一个减法计数器
package com.zsr.countDown;
import java.util.concurrent.CountDownLatch;
public class Test {
public static void main(String[] args) throws InterruptedException {
//创建一个计数器,初始化为6
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "线程执行结束");
countDownLatch.countDown();//计数器-1
}, String.valueOf(i)).start();
}
countDownLatch.await();//等待计数器归0再往下执行
System.out.println("所有线程执行完毕");
}
}
如果不加countDownLatch.await() 加了之后
8.2、CyclicBarrier
相当于加法计数器
package com.zsr.Cyclic;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Test {
//模拟集齐7课龙珠召唤神龙
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙!");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集了第" + temp + "颗龙珠");
try {
cyclicBarrier.await();//计数不断+1,直到为7
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
8.3、Semaphore
计数信号量
semaphore.acquire(),获得许可正,如果许可证已经满了,等待其他线程释放许可证semaphore.release(),释放许可证
作用:多个共享资源互斥使用,并发限流,控制最大的线程数
package com.zsr.Sema;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
//模拟3个车位,6辆车要停车
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();//获取许可
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();//释放许可
}
}, String.valueOf(i)).start();
}
}
}
九、读写锁
代码测试:定义一个缓存区用于读写操作,然后启动5个线程分别进行读和写,测试
首先测试不加锁的情况下
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
public static void main(String[] args) {
UnlockCache unlockCache = new UnlockCache();
//5个线程写入
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
unlockCache.put(temp, temp);
}, String.valueOf(i)).start();
}
//5个线程读出
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
unlockCache.get(temp);
}, String.valueOf(i)).start();
}
}
}
//不加锁的
class UnlockCache {
private volatile Map
//写入
public void put(int key, Object value) {
System.out.println("开始写入" + key);
map.put(key, value);
System.out.println(key + "写入完成");
}
//读出
public void get(int key) {
System.out.println("开始读取" + key);
map.get(key);
System.out.println(key + "读取完毕");
}
}
根据结果,可以看到写入时被插队,这是不允许的!
加读写锁,实现只能同时有一个线程写,多个线程读
也就是写锁为独占锁(一次只能被一个线程占有),读锁为共享锁(多个线程可以同时占有)
读-读:可共存读-写:不可共存写-写:不可共存 import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
public static void main(String[] args) {
LockCache lockCache = new LockCache();
//5个线程写入
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
lockCache.put(temp, temp);
}, String.valueOf(i)).start();
}
//5个线程读出
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
lockCache.get(temp);
}, String.valueOf(i)).start();
}
}
}
//加锁:实现同时只能有一个线程写,多个线程读
class LockCache {
private volatile Map
//读写锁:实现只能有一个线程写,多个线程写,更细粒度的控制
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//写入:只能有一个线程写
public void put(int key, Object value) {
readWriteLock.writeLock().lock();
try {
System.out.println("开始写入" + key);
map.put(key, value);
System.out.println(key + "写入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//读出:可以多个线程读
public void get(int key) {
readWriteLock.readLock().lock();
try {
System.out.println("开始读取" + key);
map.get(key);
System.out.println(key + "读取完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
根据结果,我们实现了写入时不能被插队,但是读取可以多个线程读取
十、阻塞队列
10.1、关系图
BlockingQueue关系图:
队列阻塞:
10.2、ArrayBlockingQueue四组API
方式抛出异常有返回值、不抛出异常阻塞等待限时等待添加add()offer()put()offer( , )移除remove()poll()take()poll( , )判断队列首element()peek()\\
抛出异常(add、remove、element)
//抛出异常
public static void test1() {
//初始化阻塞队列大小为3
ArrayBlockingQueue
System.out.println(blockingQueue.add(1));
System.out.println(blockingQueue.add(2));
System.out.println(blockingQueue.add(3));
//如果添加第四个元素,则发生异常java.lang.IllegalStateException: Queue full
System.out.println(blockingQueue.add(4));
}
//抛出异常
public static void test1() {
//初始化阻塞队列大小为3
ArrayBlockingQueue
System.out.println(blockingQueue.add(1));
System.out.println(blockingQueue.add(2));
System.out.println(blockingQueue.add(3));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//如果取出第四个元素,则发生异常java.util.NoSuchElementException
System.out.println(blockingQueue.remove());
}
//抛出异常
public static void test1() {
//初始化阻塞队列大小为3
ArrayBlockingQueue
System.out.println(blockingQueue.add(1));
System.out.println(blockingQueue.add(2));
System.out.println(blockingQueue.add(3));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//如果取出队首元素,则发生异常java.util.NoSuchElementException
System.out.println(blockingQueue.element());
}
不跑出异常(offer、poll、pick)
//不抛出异常
public static void test1() {
//初始化阻塞队列大小为3
ArrayBlockingQueue
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(2));
System.out.println(blockingQueue.offer(3));
//如果添加第四个元素,返回false
System.out.println(blockingQueue.offer(4));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//如果取出第四个元素,返回null
System.out.println(blockingQueue.poll());
//如果取出队首元素,返回null
System.out.println(blockingQueue.peek());
}
等待阻塞(put、take)
//等待阻塞
public static void test1() throws InterruptedException {
//初始化阻塞队列大小为3
ArrayBlockingQueue
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);
//如果添加第四个元素,程序阻塞
blockingQueue.put(4);
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//如果取出第四个元素,程序阻塞
blockingQueue.take();
}
限时等待()
//限时等待
public static void test1() throws InterruptedException {
//初始化阻塞队列大小为3
ArrayBlockingQueue
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(2));
System.out.println(blockingQueue.offer(3));
//如果添加第四个元素,等待2s后程序结束
System.out.println(blockingQueue.offer(4, 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//如果取出第四个元素,等待2s后程序结束
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
}
10.3、SynchronousQueue
一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。
public static void main(String[] args) throws InterruptedException {
SynchronousQueue
//线程T1,存元素
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
//线程T2,取元素
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
十一、线程池
池化技术:程序的运行就会占用系统资源就会占用系统资源,为了优化资源的使用,就引入了池化技术,事先准备好一些资源,需要使用就来取,用完即放回;例如 线程池、连接池、内存池、对象池
线程池的好处:线程复用、可以控制最大并发数、管理线程
降低资源消耗提高响应速度方便管理
11.1、创建线程池三大方法
如何创建线程池呢?java.util.concurrent中提供了Executors类
其中有一些静态方法用于创建线程池
newSingleThreadExecutor
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static void main(String[] args) {
//创建单一线程池,只有一个线程执行
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
//创建线程池,可指定固定线程数量同时执行
for (int i = 0; i < 20; i++) {
singleThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
//线程池用完,关闭线程池
singleThreadPool.shutdown();
}
}
newFixedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static void main(String[] args) {
//创建线程池,可指定固定线程数量同时执行
ExecutorService fixThreadPool = Executors.newFixedThreadPool(5);
//使用线程池创建线程
for (int i = 0; i < 20; i++) {
fixThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
//线程池用完,关闭线程池
fixThreadPool.shutdown();
}
}
newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static void main(String[] args) {
//创建可伸缩的线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//使用线程池创建线程
for (int i = 0; i < 20; i++) {
cachedThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
//线程池用完,关闭线程池
cachedThreadPool.shutdown();
}
}
11.2、七大参数
查看newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool的源码,可以发现本质上就是创建了一个ThreadPoolExcutor对象
再查看ThreadPoolExecutor的源码,可以看到七大参数
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize, //最大线程池大小
long keepAliveTime, //存活时间(超时未调用则释放)
TimeUnit unit, //超时单位
BlockingQueue
ThreadFactory threadFactory, //线程工程:创建线程,一般默认不改动
RejectedExecutionHandler handler) //拒绝策略
{
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
各种参数的含义好比银行办理业务,corePoolSize是已经开放的服务窗口,BlockingQueue是候客区,假设人流量非常大,就需要多开放几个服务窗口,maximumPoolSize就是最大开放的服务窗口数;再假如很少有人办理业务,过了一定的时间就关闭窗口,keepAliveTime就是要关闭窗口的事件;RejectedExecutionHandler拒绝策略就好比银行满了,再来人就不让进了 可以看到newCachedThreadPool的核心线程池大小设置未0,最大线程池大小设置为Integer.MAX_VALUE,约等于21亿;也就是说通过Executors.newCachedThreadPool()创建的线程池可以支持并发的线程数介于0~21亿之间,这是十分耗费资源的
因此,阿里巴巴官方手册有以下规定:
11.3、四种拒绝策略
我们来自定义一个线程池,拒绝策略为AbortPolicy
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常
for (int i = 1; i <= 8; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
}
);
}
}
}
如果i<=9,超过了最大承载,则会抛出异常 修改拒绝策略为CallerRunsPolicy:可以看到没有抛出异常,而是由main线程处理
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常
for (int i = 1; i <= 9; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
}
);
}
}
}
修改拒绝策略为DiscardPolicy:可以看到不抛出异常不执行
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()
);
//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常
for (int i = 1; i <= 9; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
}
);
}
}
}
策略DiscardOldestPolicy同DiscardPolicy类似:不抛出异常不执行,但是会尝试竞争
11.4、最大线程数怎么定义?
CPU密集型
电脑的cpu是几核就定义为几,定义为常数换台电脑就不行了
//自定义线程池:CPU密集型
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
IO密集型
大于 判断程序中耗费IO的线程 即可
十二、四大函数式接口
新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算
什么是函数式接口
函数式接口(Functional Interface)是jdk8引入的,有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。并且这类接口使用了@FunctionalInterface进行注解,函数式接口可以被隐式转换为 lambda 表达式
JDK 1.8 之前已有的函数式接口:
java.lang.Runnable
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
java.util.concurrent.Callable
java.security.PrivilegedAction
java.util.Comparator
java.io.FileFilter
java.nio.file.PathMatcher
java.lang.reflect.InvocationHandler
java.beans.PropertyChangeListener
java.awt.event.ActionListener
javax.swing.event.ChangeListener
JDK 1.8 新增加的函数接口:
java.util.function
这个package中的接口大致分为了以下四类:
Function:接收参数,并返回结果,主要方法 R apply(T t)
Consumer:接收参数,无返回结果, 主要方法为 void accept(T t)
// forEach的参数就是消费者类型函数式接口Consumer
@Override
public void forEach(Consumer super E> action) {
Objects.requireNonNull(action);
final int expectedModCount = modCount;
final Object[] es = elementData;
final int size = this.size;
for (int i = 0; modCount == expectedModCount && i < size; i++)
action.accept(elementAt(es, i));
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
Supplier:不接收参数,但返回结构,主要方法为 T get()
Predicate:接收参数,x返回boolean值,主要方法为 boolean test(T t)
12.1、Function
函数式接口:有参数且需要返回值
@FunctionalInterface
public interface Function
//有参数且有返回值
R apply(T t);
default
Objects.requireNonNull(before);
return (V v) -> apply(before.apply(v));
}
default
Objects.requireNonNull(after);
return (T t) -> after.apply(apply(t));
}
static
return t -> t;
}
}
实现一个Function接口
package 函数式接口;
import java.util.function.Function;
public class Demo {
public static void main(String[] args) {
//匿名内部类,没有类的名称
Function function1 = new Function
@Override
public String apply(String str) {
return str;
}
};
//修改为lambda表达式
Function function2 = (str) -> {
return str;
};
System.out.println(function1.apply("hello"));
System.out.println(function2.apply("hello lambda"));
}
}
12.2、Predicate
判断型接口:有参数,返回值为布尔型
@FunctionalInterface
public interface Predicate
//有参数,返回值为布尔型
boolean test(T t);
default Predicate
Objects.requireNonNull(other);
return (t) -> test(t) && other.test(t);
}
default Predicate
return (t) -> !test(t);
}
default Predicate
Objects.requireNonNull(other);
return (t) -> test(t) || other.test(t);
}
static
return (null == targetRef)
? Objects::isNull
: object -> targetRef.equals(object);
}
@SuppressWarnings("unchecked")
static
Objects.requireNonNull(target);
return (Predicate
}
}
实现一个Predicate接口
package 函数式接口;
import java.util.function.Predicate;
public class Demo1 {
public static void main(String[] args) {
//匿名内部类,没有类的名称
Predicate
@Override
public boolean test(String s) {
return s.isEmpty();//判断字符串是否为空
}
};
//修改为lambda
Predicate
return s.isEmpty();
};
System.out.println(predicate1.test("hello"));
System.out.println(predicate2.test(""));
}
}
12.3、Consumer
消费性接口:没有返回值,有参数
@FunctionalInterface
public interface Consumer
//没有返回值,有参数
void accept(T t);
default Consumer
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}
实现Consumer接口
package 函数式接口;
import java.util.function.Consumer;
public class Demo2 {
public static void main(String[] args) {
//匿名内部类,没有类的名称
Consumer
@Override
public void accept(String s) {
System.out.println(s);
}
};
//修改为lambda
Consumer consumer2 = (s) -> {
System.out.println(s);
};
consumer1.accept("hello1");
consumer1.accept("hello2");
}
}
12.4、Supplier
供给型接口:无参数,指定返回值类型
@FunctionalInterface
public interface Supplier
//无参数,指定返回值类型
T get();
}
实现Supplier接口
package 函数式接口;
import java.util.function.Supplier;
public class Demo3 {
public static void main(String[] args) {
//匿名内部类,没有类的名称
Supplier
@Override
public String get() {
return "hello1";
}
};
//修改为lambda
Supplier supplier2 = () -> {
return "hello2";
};
System.out.println(supplier1.get());
System.out.println(supplier2.get());
}
}
十三、Stream流式计算
13.1、什么是Stream流式计算
大数据时代分为存储+计算,存储交给数据库、集合等来处理,计算就交给流来做
Java中就提供了java.util.stream用于流式计算
13.2、案例测试
package 流式计算;
import java.util.Arrays;
import java.util.List;
public class Test {
public static void main(String[] args) {
User user1 = new User("a", 1, 18);
User user2 = new User("b", 2, 19);
User user3 = new User("c", 3, 21);
User user4 = new User("d", 4, 30);
User user5 = new User("e", 5, 28);
User user6 = new User("f", 6, 27);
//list来存储数据
List
//stream来计算
//筛选出以下条件的用户:
//1.id为偶数
//2.age>23
//3.name转换为大写字母
//4.name字母倒序排序
//5.只输出一个用户
users.stream()
.filter(user -> {//判断型接口Predicate
return user.getId() % 2 == 0;
})
.filter(user -> {//判断型接口Predicate
return user.getAge() > 23;
})
.map(user -> {//函数式接口Function
return user.getName().toUpperCase();
})
.sorted((u1, u2) -> {//Comparator函数式接口
return u2.compareTo(u1);
})
.limit(1)
.forEach(System.out::println);
}
}
十四、ForkJoin
14.1、什么是ForkJoin
ForkJoin出现于jdk1.7,用于并行执行任务 ,提高效率,用于大数据量的情况(分支合并)
大数据:Map Reduce——将大任务拆分成小任务
14.2、ForkJoin的特点
工作窃取:里面维护的都是双端队列
14.3、案例:计算求和任务
我们可以在JUC中找到ForkJoinPool类 其中有一个方法,可用于执行一个ForkJoinTask 我们需要返回值,查看RecursiveTask,可以找到compute方法进行计算
1️⃣ 编写任务
package forkjoin;
import java.util.concurrent.RecursiveTask;
//计算任务
public class ForkJoinTask extends RecursiveTask
private long begin;
private long end;
public ForkJoinTask(long begin, long end) {
this.begin = begin;
this.end = end;
}
//计算方法:计算1加到1_0000_0000
@Override
protected Long compute() {
long sum = 0;
if ((end - begin) < 10000) {//如果差值小于10000则暴力加
for (long i = begin; i <= end; i++)
sum += i;
return sum;
} else {//数据量大于10000采用forkjoin来计算
long mid = (begin + end) / 2;
//任务一
ForkJoinTask task1 = new ForkJoinTask(begin, mid);
task1.fork();//拆分任务,将线程压入队列
//任务二
ForkJoinTask task2 = new ForkJoinTask(mid + 1, end);
task2.fork();//拆分任务,将线程压入队列
return task1.join() + task2.join();
}
}
}
2️⃣ 测试比较
package forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
test2();
test3();
}
//普通遍历方式
public static void test1() {
long sum = 0;
long start = System.currentTimeMillis();
for (long i = 0; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("普通遍历法耗时" + (end - start) + "结果为:" + sum);
}
//ForkJoin方式
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask task = new ForkJoinTask(0, 10_0000_0000);
java.util.concurrent.ForkJoinTask
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("通过ForkJoin方式耗时" + (end - start) + "结果为:" + sum);
}
//Stream并行流方式
public static void test3() {
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);//rangeClosed(]
long end = System.currentTimeMillis();
System.out.println("通过Stream并行流方式耗时" + (end - start) + "结果为:" + sum);
}
}
十五、异步回调
异步回调通常用CompletableFuture 发起两个异步请求,一个有返回结果,一个没有返回结果
package future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
//异步调用异步执行:成功回调/失败回调
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//发起一个请求(没有返回值的异步回调)
CompletableFuture
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
});
completableFuture1.get();//阻塞获取执行结果
//有返回值的异步回调
CompletableFuture
System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
return 1024;
});
System.out.println(completableFuture2
.whenComplete((t, u) -> {//成功回调:正确的返回结果
System.out.println("t=" + t);
System.out.println("u=" + u);
}).exceptionally((e) -> {//失败回调:错误的返回结果
System.out.println(e.getMessage());
return 233;
}).get());
}
}
如果有返回结果的异步回调报错,就会走失败回调的方法,返回233
十六、理解JMM&Volatile
16.1、请你谈谈对Volatile的理解
Volatile是 JVM 提供的轻量级的同步机制
保证可见性不保证原子性禁止指令重排
怎么保证可见性?就需要和JMM挂钩
16.2、什么是JMM
Java内存模型,不存在的东西,是一种概念一种约定
关于JMM同步的约定:
线程解锁前,必须立刻把自己的共享变量刷回主存线程加锁前,必须读取主存中的最新值到工作内存中加锁和解锁是同一把锁
线程分为:工作内存、主内存
16.3、Volatile特点
Volatile可以保证可见性,不能保证原子性,可以避免指令重排的现象
1. 保证可见性
package jmm;
import java.util.concurrent.TimeUnit;
public class Demo {
private static int num;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (num == 0) ;
}, "t").start();
//main线程休眠1s
TimeUnit.SECONDS.sleep(1);
num = 1;
System.out.println(num);
}
}
开启一个线程,当num=0时不停的死循环;然后让主线程休眠1s后修改num=1,也就是将主内存中的num修改为1;看到结果程序并没有停止,这是因为t线程并没有拿到主内存中num最新的值,不知套其发生了变化,也就是t线程对main线程的变化不可见
如何解决呢?只需要通过volatile关键字修饰num即可保证其的可见性
可以看到,程序立马停止了
2. 不保证原子性
什么是原子性?也就是一个线程执行的时候不能被打扰分割,要么同时成功要么同时失败
package jmm;
public class Demo2 {
//通过volatile不能保证原子性
private volatile static int num;
public static void add() {
num++;
}
public static void main(String[] args) throws InterruptedException {
//理论上num=20000
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
//为了让20000条线程跑完,让main线程进行礼让(这里的2表示java固有的两个线程main和gc)
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + ":" + num);
}
}
根据结果,发现volatile并不保证原子性的操作,为什么不安全呢?
我们反编译看看,可以看到num++一行代码在底层是多行操作,因此不能保证原子性,所以是不安全的
D:\学习\IDEA project\Test\out\production\Test\jmm>javap -c Demo2.class
Compiled from "Demo2.java"
public class jmm.Demo2 {
public jmm.Demo2();
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."
4: return
public static synchronized void add();
Code:
0: getstatic #7 // Field num:I
3: iconst_1
4: iadd
5: putstatic #7 // Field num:I
8: return
public static void main(java.lang.String[]) throws java.lang.InterruptedException;
Code:
0: iconst_0
1: istore_1
2: iload_1
3: bipush 20
5: if_icmpge 29
8: new #13 // class java/lang/Thread
11: dup
12: invokedynamic #15, 0 // InvokeDynamic #0:run:()Ljava/lang/Runnable;
17: invokespecial #19 // Method java/lang/Thread."
20: invokevirtual #22 // Method java/lang/Thread.start:()V
23: iinc 1, 1
26: goto 2
29: invokestatic #25 // Method java/lang/Thread.activeCount:()I
32: iconst_2
33: if_icmple 42
36: invokestatic #29 // Method java/lang/Thread.yield:()V
那如果不通过Lock和Synchronized 怎么保证原子性呢?
可以通过java.util.concurrent.atomic包中的原子类解决原子问题 ,这些类的底层都直接和操作系统挂钩,在内存中修改值,这些类是特殊的存在
package jmm;
import java.util.concurrent.atomic.AtomicInteger;
public class Demo {
//通过volatile不能保证原子性
private volatile static AtomicInteger num = new AtomicInteger();
public static void add() {
num.getAndIncrement();//并不是简单的+1,而是利用了底层的CAS
}
public static void main(String[] args) throws InterruptedException {
//理论上num=20000
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
//为了让20000条线程跑完,让main线程进行礼让(这里的2表示java固有的两个线程main和gc)
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + ":" + num);
}
}
3. 禁止指令重排
指令重排:计算机并不是按照我们编写的程序去执行
源代码–》编译器优化代码重排–》指令并行重排–》内存系统重排–》执行
处理器在指令重排的时候,会考虑数据之间的依赖性
int x = 1; //1
int y = 2; //2
x = x + 5; //3
y = x * x; //4
按我们所期望的执行顺序是1->2->3->4
但实际上可能是2143或者1324,但不可能是4123
指令重排可能会导致一些错误的结果,如下图所示:
使用volatile可以避免指令重排,底层实现是通过 内存屏障 实现的,可以保证特定的操作执行顺序,也可以保证某些变量的内存可见性
十七、彻底玩转单例模式
volatile在单例模式中使用的最多
17.1、饿汉式
package 单例模式;
//饿汉式
public class Hungry {
private static Hungry hungry = new Hungry();
//构造器私有
private Hungry() {
}
public static Hungry getInstance() {
return hungry;
}
}
优点: static变量会在类装载时初始化,不涉及多个线程访问该对象的问题,可以省略synchronized关键字
缺点:类初始化时就创建了对象,如果只是加载本类,而不是要调用 getinstance(),甚至永远没有调用,则会造成资源浪费!
17.2、懒汉式
package 单例模式;
//懒汉式
public class Lazy {
private static Lazy lazy;
private Lazy() {
}
public static Lazy getInstance() {
if (lazy == null)
lazy = new Lazy();
return lazy;
}
}
优点: 延迟加载,真正用的时候才实例化对象,提高了资源的利用率
缺点:存在并发访问的问题,以下测试并发访问情况
package 单例模式;
//懒汉式
public class Lazy {
private static Lazy lazy;
private Lazy() {
System.out.println("创建示例");
}
public static Lazy getInstance() {
if (lazy == null)
lazy = new Lazy();
return lazy;
}
public static void main(String[] args) {
//10条线程并发访问下
for (int i = 0; i < 10; i++) {
new Thread(() -> {
Lazy.getInstance();
}).start();
}
}
}
根据结果,可以看到有5个线程打印了结果,也就说进行了5次初始化,这是非常大的漏洞,出现了并发访问的问题
17.3、双重检测锁式
为了解决懒汉式并发访问的问题,加入了sychronized关键字
package 单例模式;
//双重检测锁式
public class DoubleLock {
private static DoubleLock doubleLock;
private DoubleLock() {
System.out.println("创建示例");
}
public static DoubleLock getInstance() {
if (doubleLock == null) {
synchronized (Lazy.class) {
if (doubleLock == null)
doubleLock = new DoubleLock();
}
}
return doubleLock;
}
public static void main(String[] args) {
//10条线程并发访问下
for (int i = 0; i < 10; i++) {
new Thread(() -> {
DoubleLock.getInstance();
}).start();
}
}
}
根据打印结果,解决了并发访问的问题;但是这样仍然会存在问题,因为我们new对象时并不是一个完整的原子性操作,而是分为以下三部:
分配内存空间执行构造方法,初始化对象把这个对象指向这个空间
单个线程A执行的情况下可以123按顺序执行,也可能由于指令重排按132执行;但是如果线程A按132顺序执行到3时来了一个线程B,此时该对象已经指向了分配的空间,因此B判断对象不是null,就会直接返回对象,但其实对象并没有进行初始化,就造成了错误
因此指令重排也会导致错误,因此完整的双重检测锁式还加入了Volatile关键字来避免指令重排,完整代码如下:
package 单例模式;
//双重检测锁式
public class DoubleLock {
private volatile static DoubleLock doubleLock;
private DoubleLock() {
System.out.println("创建示例");
}
public static DoubleLock getInstance() {
if (doubleLock == null) {
synchronized (Lazy.class) {
if (doubleLock == null)
doubleLock = new DoubleLock();
}
}
return doubleLock;
}
}
17.4、静态内部类式
package 单例模式;
public class InnerClass {
private InnerClass() {
}
//静态内部类里面创建对象
public static class inner {
private static final InnerClass innerClass = new InnerClass();
}
public static InnerClass getInstance() {
return inner.innerClass;
}
}
反射破坏单例模式
package 单例模式;
import java.lang.reflect.Constructor;
//双重检测锁式
public class DoubleLock {
private volatile static DoubleLock doubleLock;
private DoubleLock() {
System.out.println("创建示例");
}
public static DoubleLock getInstance() {
if (doubleLock == null) {
synchronized (Lazy.class) {
if (doubleLock == null)
doubleLock = new DoubleLock();
}
}
return doubleLock;
}
public static void main(String[] args) throws Exception {
DoubleLock instance1 = doubleLock.getInstance();
Constructor
constructor.setAccessible(true);
DoubleLock instance2 = constructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
根据结果,看到创建了两个实例,也就是单例模式被破坏,那么怎么解决呢?
可以在私有构造中加锁
package 单例模式;
import java.lang.reflect.Constructor;
//双重检测锁式
public class DoubleLock {
private volatile static DoubleLock doubleLock;
private DoubleLock() {
synchronized (DoubleLock.class){
if(doubleLock!=null){
throw new RuntimeException("不要试图使用反射破坏异常");
}
}
System.out.println("创建示例");
}
public static DoubleLock getInstance() {
if (doubleLock == null) {
synchronized (Lazy.class) {
if (doubleLock == null)
doubleLock = new DoubleLock();
}
}
return doubleLock;
}
public static void main(String[] args) throws Exception {
DoubleLock instance1 = doubleLock.getInstance();
Constructor
constructor.setAccessible(true);
DoubleLock instance2 = constructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
根据结果,可以看到避免了单例模式的破坏?可是上述两个对象一个是通过单例获取,一个通过反射获取;
那如果两个对象都是通过反射获取呢?
public static void main(String[] args) throws Exception {
Constructor
constructor.setAccessible(true);
DoubleLock instance1= constructor.newInstance();
DoubleLock instance2 = constructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
根据结果,可以看到单例模式又被破坏了,创建了两个对象!这种情况如何解决呢?
可以通过红绿灯方法实现,定义一个标志位记录对象是否创建
package 单例模式;
import java.lang.reflect.Constructor;
//双重检测锁式
public class DoubleLock {
private volatile static DoubleLock doubleLock;
//标志位
private static boolean flag = false;
private DoubleLock() {
synchronized (DoubleLock.class) {
if (flag == false)
flag = true;
else
throw new RuntimeException("不要试图使用反射破坏异常");
}
System.out.println("创建示例");
}
public static DoubleLock getInstance() {
if (doubleLock == null) {
synchronized (Lazy.class) {
if (doubleLock == null)
doubleLock = new DoubleLock();
}
}
return doubleLock;
}
public static void main(String[] args) throws Exception {
Constructor
constructor.setAccessible(true);
DoubleLock instance1 = constructor.newInstance();
DoubleLock instance2 = constructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
可以看到我们通过设置标志位flag再次解决了这个问题,但是一旦被获取了这个关键字,单例模式仍然可以通过反射被破解,如下所示
public static void main(String[] args) throws Exception {
Constructor
Field declaredField = DoubleLock.class.getDeclaredField("flag");
constructor.setAccessible(true);
declaredField.setAccessible(true);
DoubleLock instance1 = constructor.newInstance();
declaredField.set(instance1, false);//第一个对象创建完毕后将flag改为false
DoubleLock instance2 = constructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
可以看到单例模式再次被破坏;因此为了让程序更加安全,通常对flag关键字进行加密处理
那么到底如何完全的避免反射破坏单例模式呢?我们查看newInstance的源码 可以看到,如果是枚举类型的话,就不能通过反射获取枚举;
因此引入了第5种单例模式
17.5、枚举单例
package 单例模式;
import java.lang.reflect.Constructor;
//enum本质上就是一个Class类
public enum EnumSingle {
INSTANCE;
public static void main(String[] args) throws Exception {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor
declaredConstructor.setAccessible(true);
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
我们再次通过反射创建对象,根据结果报错没有EnumSingle的空构造方法,这不是我们希望看到的 我们对EnumSingle的class文件进行反编译,可以看到明明有空构造方法 但是执行明明报错没有无参构造,我们使用更专业的反编译工具jad对class文件再进行反编译 可以看到枚举类本质上就是继承了Enum类,本身就是一个Class,而且没有无参构造,而是含两个参数的有参构造,我们修改代码在测试
public static void main(String[] args) throws Exception {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor
declaredConstructor.setAccessible(true);
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
这才正确显示了报错的信息:无法反射地创建枚举对象
十八、深入理解CAS
18.1、什么是CAS
CAS 是 compareAndSet 的缩写:比较并交换,是CPU的并发原语
package CAS探究;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);//底层是CAS
/**
* public final boolean compareAndSet(int expectedValue, int newValue)
* 期望、更新
* 如果期望的值达到了就更新,否则不更新
*/
System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回是否修改成功
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2020, 2022));
System.out.println(atomicInteger.get());
}
}
我们再来看看 atomicInteger.getAndIncrement() 方法是怎么实现的?我们该方法的源码 可以看到是由U调用了getAndAddInt()方法,而U就是Unsafe类的一个实例
什么是Unsafe类
Java无法操作内存,只能通过调用C++来操作内存,Unsafe就是Java通过C++操作内存的接口就类似于Java通过native关键字来调用C++本地方法来和操作系统交互
可以看到,底层是一个do while循环,也就是一个自旋锁
因此:CAS就是比较当前工作内存中的值和主内存中的值,如果这个值是期望的,就执行操作;如果不是,就一直循环,因为底层是一个do while循环(自旋锁)
CAS有三个操作数:
期望的值比较的值更新的值
缺点:
底层是自旋锁,循环耗时一次性只能保证一个共享变量的原子性会存在ABA问题
18.2、ABA问题
比如有两个线程A,B同时向修改A的内容,但是B线程执行速度快,首先cas(1,3)将A修改为3,然后又执行cas(3,1)将A修改为1,这之后线程A再cas(1,2)将A修改为2,但此时A=1已经不是原来的1了;
这就是ABA问题
我们来个代码模拟以下
package CAS探究;
import java.util.concurrent.atomic.AtomicInteger;
public class ABADemo {
//CAS是compareAndSet的缩写:比较并交换,是CPU的并发原语
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);//底层是CAS
//线程B
System.out.println(atomicInteger.compareAndSet(1, 3));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(3, 1));
System.out.println(atomicInteger.get());
//线程A
System.out.println(atomicInteger.compareAndSet(1, 2));
System.out.println(atomicInteger.get());
}
}
可以看到三个结果都为true,但不是我们期望的,我们希望知道谁动过A的值
可以通过类似乐观锁的方案来解决,使用 原子引用类AtomicReference/AtomicStampedReference(带时间) 我们使用AtomicStampedReference测试以下
package CAS探究;
import java.util.concurrent.atomic.AtomicStampedReference;
public class ABADemo {
//CAS是compareAndSet的缩写:比较并交换,是CPU的并发原语
public static void main(String[] args) {
//public AtomicStampedReference(V initialRef, int initialStamp):这里的第二个参数等同于乐观锁的version,初始值设为1
AtomicStampedReference
//线程B,多了连个参数:期望的版本号,更新的版本号
System.out.println(atomicStampedReference.compareAndSet(1, 3,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
System.out.println(atomicStampedReference.compareAndSet(3, 1,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
//线程A
System.out.println(atomicStampedReference.compareAndSet(1, 2, 1, 2));
}
}
可以看到A成功察觉到了B修改过数据,所以执行失败;和乐观锁原理相同
注意:如果泛型是包装类,注意对象引用问题(正常业务都是对象,这里是使用包装类Integer进行测试)
如果我们的范围不再-128~127,则会失败
十九、各种锁的理解
19.1、乐观锁/悲观锁
悲观锁(Pessimistic Lock)
1️⃣ 简介
当要对数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。这种借助数据库锁机制,在修改数据之前先锁定,再修改的方式被称之为悲观并发控制【Pessimistic Concurrency Control,缩写“PCC”,又名“悲观锁”】
悲观锁,正如其名,具有强烈的独占和排他特性。它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度。因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。
之所以叫做悲观锁,是因为这是一种对数据的修改持有悲观态度的并发控制方式。总是假设最坏的情况,每次读取数据的时候都默认其他线程会更改数据,因此需要进行加锁操作,当其他线程想要访问数据时,都需要阻塞挂起。悲观锁的实现:
传统的关系型数据库使用这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java 里面的同步 synchronized 关键字的实现。
2️⃣ 分类
悲观锁主要分为 共享锁 和 排他锁
共享锁【shared locks】又称为读锁,简称S锁。顾名思义,共享锁就是多个事务对于同一数据可以共享一把锁,都能访问到数据,但是只能读不能修改。排他锁【exclusive locks】又称为写锁,简称X锁。顾名思义,排他锁就是不能与其他锁并存,如果一个事务获取了一个数据行的排他锁,其他事务就不能再获取该行的其他锁,包括共享锁和排他锁,但是获取排他锁的事务是可以对数据行读取和修改。
3️⃣ 说明
悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。但是在效率方面,处理加锁的机制会让数据库产生额外的开销,还有增加产生死锁的机会。另外还会降低并行性,一个事务如果锁定了某行数据,其他事务就必须等待该事务处理完才可以处理那行数据。
乐观锁(Optimistic Locking)
1️⃣ 简介
乐观锁是相对悲观锁而言的,乐观锁假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则返回给用户错误的信息,让用户决定如何去做。乐观锁适用于读操作多的场景,这样可以提高程序的吞吐量。 乐观锁机制采取了更加宽松的加锁机制。乐观锁是相对悲观锁而言,也是为了避免数据库幻读、业务处理时间过长等原因引起数据处理错误的一种机制,但乐观锁不会刻意使用数据库本身的锁机制,而是依据数据本身来保证数据的正确性。
2️⃣ 实现
CAS实现:Java中java.util.concurrent.atomic包下面的原子变量使用了乐观锁的一种 CAS 实现方式版本号控制:一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数。当数据被修改时,version 值会+1。当线程A要更新数据值时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值与当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功
3️⃣ 说明
乐观并发控制相信事务之间的数据竞争(data race)的概率是比较小的,因此尽可能直接做下去,直到提交的时候才去锁定,所以不会产生任何锁和死锁
19.2、公平/非公平锁
公平锁:非常公平,不能插队,线程的执行必须先来后到非公平锁:非常不公平,可以插队,默认都为非公平锁!(比如一个线程3s执行完,一个线程1min执行完,如果使用公平锁严重影响某个线程的效率)
19.3、可重入锁
可重入锁(递归锁)
代码示例:synchronized版
执行结果:
代码示例:Lock版
19.4、自旋锁
不断的尝试,直到成功为止!
我们来编写一个自旋锁
package 自旋锁;
import java.util.concurrent.atomic.AtomicReference;
//自定义自旋锁
public class SpinLock {
//锁线程
AtomicReference
//加锁
public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + "==>myLock");
//自旋锁
while (!atomicReference.compareAndSet(null, thread)) ;
}
//解锁
public void myUnlock() {
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + "==>myUnLock");
atomicReference.compareAndSet(thread, null);
}
}
然后编写一段测试代码
package 自旋锁;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
SpinLock spinLock = new SpinLock();
//线程T1
new Thread(() -> {
spinLock.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
spinLock.myUnlock();
}
}, "T1").start();
TimeUnit.SECONDS.sleep(1);
//线程T2
new Thread(() -> {
spinLock.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
spinLock.myUnlock();
}
}, "T2").start();
}
}
根据结果,总是T1线程解锁后,T2线程才能解锁;因为如果T1线程不解锁,T2就会卡住在while循环不停的尝试cas直到thread=null为止
19.5、死锁
什么是死锁?
是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象
简单的死锁案例
package 死锁;
import java.util.concurrent.TimeUnit;
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA, lockB), "T1").start();
new Thread(new MyThread(lockB, lockA), "T2").start();
}
}
class MyThread implements Runnable {
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + "持有锁" + lockA + "尝试获取" + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + "持有锁" + lockB + "尝试获取" + lockA);
}
}
}
}
根据运行结果,可以看到程序卡死,因为发生了死锁,因为T1和T2分别持有lockA和lockB,但又都试图获取对方的锁!
死锁问题排查
使用jps -l命令定位进程号
使用jstack 进程号查看指定进程的堆栈信息,找到死锁问题
可以看到,控制台清晰的打印了找到死锁,并可以看到产生的原因就是T1和T2互相尝试获取对方的锁