JUC

x33g5p2x  于2021-11-21 转载在 其他  
字(15.6k)|赞(0)|评价(0)|浏览(265)

卖票复习

匿名内部类

package com.xrh.saleticket;

/** *题目:三个售票员 卖出 30张票 * 多线程编程的企业级套路+模板 * 1.在高内聚(资源类把对外的方法放在自己身上)低耦合(各自独立系统,可以互相调用)的前提下,线程 操作(对外暴露的调用方法) 资源类 */
public class SaleTicket {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i <= 40; i++) {
                    ticket.saleTicket();
                }
            }
        },"A").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i <= 40; i++) {
                    ticket.saleTicket();
                }
            }
        },"B").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i <= 40; i++) {
                    ticket.saleTicket();
                }
            }
        },"C").start();
    }
}

class Ticket {//资源类
    //票
    private int number = 30;

    public synchronized  void saleTicket(){
        if (number > 0){
            System.out.println(Thread.currentThread().getName()+"\t卖出第:"+(number--)+"\t还剩下:"+number);
        }
    }
}

LambdaExpress+lock(替换synchronized)

package com.xrh.saleticket;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SaleTicketLambda {
    public static void main(String[] args) {
        Ticket1 ticket = new Ticket1();

        new Thread(()->{for (int i = 1; i <= 40; i++) ticket.sale();},"A").start();
        new Thread(()->{for (int i = 1; i <= 40; i++) ticket.sale();},"B").start();
        new Thread(()->{for (int i = 1; i <= 40; i++) ticket.sale();},"C").start();
    }
}

//资源类 = 实例变量 + 实例方法
class Ticket1{
    //票
    private int number = 30;
    Lock lock  = new ReentrantLock();

    public void sale(){
        lock.lock();
        try {
            if (number > 0){
                System.out.println(Thread.currentThread().getName()+"\t卖出第:"+(number--)+"\t还剩下:"+number);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

LambdaExpress

LambdaExpress是当接口中只有一个方法时,这样的接口叫函数时接口,可以用拉姆达表达式。
可以用注解@FunctionalInterface显式声明,不写底层会默认加上

因为接口中只有一个方法,所以直接拷贝小括号,写死右箭头,落地大括号

package com.xrh.lambda;

public class LambdaExpress {
    public static void main(String[] args) {
        Foo foo=(x,y)->{
            System.out.println("come in");
            return x+y;
        };

        System.out.println(foo.add(1,2));
    }
}

@FunctionalInterface
interface Foo{
    public int add(int x,int y);
}

java8之前,接口只允许有方法声明,8之后允许有部分实现,default声明,可以有多个

package com.xrh.lambda;

public class LambdaExpressDefault {
    public static void main(String[] args) {
        Foo1 foo1=(x,y)->{
            System.out.println("come in");
            return x+y;
        };

        int add = foo1.add(1, 2);
        System.out.println(add);

        System.out.println(foo1.div(10,5));
    }
}

@FunctionalInterface
interface Foo1{
    public int add(int x,int y);

    //default
    public default int div(int x,int y){
        System.out.println("default可以实现辣");
        return x/y;
    }

    public default int div2(int x,int y){
        System.out.println("default222可以实现辣");
        return x/y;
    }
}

static声明的,也可以实现,有多个,但是调用的时候直接是接口点方法

package com.xrh.lambda;

public class LambdaExpressDefault {
    public static void main(String[] args) {
        Foo1 foo1=(x,y)->{
            System.out.println("come in");
            return x+y;
        };

        int add = foo1.add(1, 2);
        System.out.println(add);

        System.out.println(foo1.div(10,5));

        //static的方法调用
        int mv = Foo1.mv(1, 2);
        System.out.println(mv);
    }
}

@FunctionalInterface
interface Foo1{
    public int add(int x,int y);

    //default
    public default int div(int x,int y){
        System.out.println("default可以实现辣");
        return x/y;
    }

    public default int div2(int x,int y){
        System.out.println("default222可以实现辣");
        return x/y;
    }

    //static
    public static int mv(int x,int y){
        return x*y;
    }
}

消费者和生产者模式

上面线程没有横向交流,没有线程之间通信

synchronized+wait+notifyAll

package com.xrh.shengxiaofeizhe;

/** * 题目:现在两个线程,可以操作初始值为零的一个变量, * 实现一个线程对该变量加1,一个线程对该变量-1, * 实现交替,来10轮,变量初始值为0. * 1.高内聚低耦合前提下,线程操作资源类 * 2.判断/干活/通知 * 3.防止虚假唤醒(判断只能用while,不能用if) * 知识小总结:多线程编程套路+while判断+新版写法 * * wait和notify是object的方法 */
public class WaitNotify {
    public static void main(String[] args) {
        Air air=new Air();

        new Thread(()->{
            try {
                for (int i = 0; i < 10; i++) {
                    air.add();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A1").start();

        new Thread(()->{
            try {
                for (int i = 0; i < 10; i++) {
                    air.add();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A2").start();

        new Thread(()->{
            try {
                for (int i = 0; i < 10; i++) {
                    air.dec();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B1").start();

        new Thread(()->{
            try {
                for (int i = 0; i < 10; i++) {
                    air.dec();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B2").start();
    }
}

class Air{
    private int num=0;

    public synchronized void add() throws InterruptedException {
        while (num!=0){
            this.wait();
        }
        num++;
        System.out.println(Thread.currentThread().getName()+" : "+num);
        //干完活通知
        this.notifyAll();
    }

    public synchronized void dec() throws InterruptedException {
        while (num==0){
            this.wait();
        }
        num--;
        System.out.println(Thread.currentThread().getName()+" : "+num);
        //干完活通知
        this.notifyAll();
    }
}

lock+condition(await,signal)

package com.xrh.shengxiaofeizhe;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/** * 新版的,lock替换synchronized,await,signal代替wait,notify * * await,signal是condition里的方法,Condition condition=lock.newCondition(); */
public class LockAwaitSignal {
    public static void main(String[] args) {

        Air air=new Air();

        new Thread(()->{
            try {
                for (int i = 0; i < 10; i++) {
                    air.add();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A1").start();

        new Thread(()->{
            try {
                for (int i = 0; i < 10; i++) {
                    air.add();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A2").start();

        new Thread(()->{
            try {
                for (int i = 0; i < 10; i++) {
                    air.dec();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B1").start();

        new Thread(()->{
            try {
                for (int i = 0; i < 10; i++) {
                    air.dec();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B2").start();
    }
}

class Air1{
    private int num=0;
    private Lock lock=new ReentrantLock();
    private Condition condition=lock.newCondition();

    public void add(){
        lock.lock();

        try {
            while (num!=0){
                condition.await();
            }
            num++;
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void dec(){
        lock.lock();

        try {
            while (num==0){
                condition.await();
            }
            num--;
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

}

精确通知和顺序访问

精确唤醒哪个线程,增加标志位,lock+condition,condition就相当于钥匙,配多个钥匙,判断标志位,然后对应的钥匙等待和唤醒

package com.xrh.duoxiancheng.shengxiaofeizhe.jingquedaji;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/** * 备注:多线程之间按顺序调用,实现A->B->C * 三个线程启动,要求如下: * AA打印5次,BB打印10次,CC打印15次 * 接着 * AA打印5次,BB打印10次,CC打印15次 * 来10轮 * 1.高内聚低耦合前提下,线程操作资源类 * 2.判断/干活/通知 * 3.多线程交互中,防止虚假唤醒(判断只能用while,不能用if) * 4.标志位 */
public class LockAwaitSignal {
    public static void main(String[] args) {
        PrintData printData=new PrintData();
        new Thread(new Runnable() {
            @Override
            public void run() {
                printData.print5();
            }
        },"A").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                printData.print10();
            }
        },"B").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                printData.print15();
            }
        },"C").start();
    }
}

class PrintData{
    private int num=1;//A:1,B:2,C:3
    private Lock lock=new ReentrantLock();
    private Condition condition1=lock.newCondition();
    private Condition condition2=lock.newCondition();
    private Condition condition3=lock.newCondition();

    public void print5(){
        lock.lock();

        try {
            //判断
            while (num!=1){
                condition1.await();
            }
            //干活
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName()+" : "+i);
            }

            //标志位+唤醒(按顺序A完B)
            num=2;
            condition2.signal();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void print10(){
        lock.lock();

        try {
            //判断
            while (num!=2){
                condition2.await();
            }
            //干活
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName()+" : "+i);
            }

            //标志位+唤醒(按顺序A完B)
            num=3;
            condition3.signal();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void print15(){
        lock.lock();

        try {
            //判断
            while (num!=3){
                condition3.await();
            }
            //干活
            for (int i = 0; i < 15; i++) {
                System.out.println(Thread.currentThread().getName()+" : "+i);
            }

            //标志位+唤醒(按顺序A完B)
            num=3;
            condition3.signal();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

}

八锁理论

package com.xrh.duoxiancheng.basuo;

import java.util.concurrent.TimeUnit;

/** * 所有的静态同步方法用的也是同一把锁--类对象本身, * 这两把锁(this/class)是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有静态条件的。 */
public class Lock8 {
    public static void main(String[] args) throws InterruptedException {
        Phone phone = new Phone();
        Phone phone2 = new Phone();

        new Thread(()->{
            try {
                phone.sendEmail();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"A").start();

        Thread.sleep(100);

        new Thread(()->{
            try {
                //phone.sendMs();
                phone2.sendMs();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"B").start();
    }
}

class Phone{
    public static synchronized void sendEmail() throws Exception{
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("*******sendEmail");
    }
    public  void sendMs() throws Exception{

        System.out.println("*******sendMs");
    }
    public void sayHello() throws Exception{

        System.out.println("*****sayHello");
    }
}

锁静态方法和锁普通方法不冲突,这是两把锁,跟多少个对象也无关。如果静态除非都是静态,要不跟对象也没关系

集合不安全

ArrayList线程不安全

package com.xrh.jihe;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class NotSafe {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();

        for (int i = 0; i < 3; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,8));
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"线程,读集合为"+list);
            },String.valueOf(i)).start();
        }
    }
}
  • 1.故障现象
    并发修改异常
    java.util.ConcurrentModificationException
  • 2.导致原因
  • 3.解决方法
3.1 new Vector<>();
3.2 Collections.synchronizedList(new ArrayList<String>());
3.3 new CopyOnWriteArrayList(); //写时复制
  • 4.优化建议(同样的错误不犯第二次)
  • 写时复制:
  • CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,而是现将当前容器Object[]进行Copy, 复制出一个新的容器Object[] newElements,然后新的容器Object[] newElements里添加元素,添加完元素之后,
    再将原容器的引用指向新的容器setArray(newElements);。这样做的好处是可以对CopyOnWrite容器进行并发的读, 而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器

HashSet线程不安全

CopyOnWriteArraySet()

HashSet的底层是一个HashMap,HashSet调用add方法时,调用的HashMap的put方法,add的值其实就是put的key,value是PRESENT,一个Object类型的常量

HashMap线程不安全

new ConcurrentHashMap<>();

callable接口

Callable与Runnable接口区别

1.callable有返回值
2.callable抛了异常
3.callable实现的方法是call,Runnable实现的方法是run
4.Callable有泛型(其实什么泛型就返回什么类型,跟第一条重了)

callable实现多线程

之前实现多线程的时候两种方式,实现接口Runnable和继承Thread

原始方式:

Mythread m=new Mythread();
        m.start();//继承Thread
//创建一个可运行的对象,这个类实现Runnable接口
        //Phone phone=new Phone();
        //将可运行的对象封装成一个线程对象
        //Thread t=new Thread(phone);

        //合并
        Thread t=new Thread(new Phone());
        t.start();//Phone类实现了接口

匿名内部类方式

new Thread(new Runnable() {
            @Override
            public void run() {
               
            }
        },"B").start();

lambda表达式

new Thread(()->{for (int i = 1; i <= 40; i++) ticket.sale();},"A").start();

实现callable接口的类,在new Thread时报错,没有这种构造方法,所以得找个既与callable有关的又与runable有关的类,那就是FutureTask,原理就是多态

MyThread myThread = new MyThread();
        Thread t1=new Thread(myThread);//!!!报错
        t1.start();

JUC辅助类

CountDownLatch

package com.xrh.fuzhulei;

import java.util.concurrent.CountDownLatch;

/** * 需求: * 1-6线程执行完,main线程才能执行完,就是最后执行 * CountDownLatch三行代码,倒数计时器, */
public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch=new CountDownLatch(6);//一共几个线程
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"同学离开教室");
                countDownLatch.countDown();//每个线程结束后,计数减一
            },String.valueOf(i)).start();
        }
        countDownLatch.await();//如果不为0,就一直等着,直到0,再往下走
        System.out.println(Thread.currentThread().getName()+"班长关门走人");
    }
}
/** * CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。 * 其他线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞), * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行 */

CyclicBarrier

package com.xrh.fuzhulei;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/** * 这个是正着计数,到多少个线程做完,就结束(人到齐了,再开会) */
public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
            System.out.println("人齐了,开会!");
        });//只要线程完成到7,就完成后面的线程

        for (int i = 1; i <= 7; i++) {
            final int tempInt=i;//lambda表达式不好用变量,用final修饰
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+tempInt+"这个人来了");
                try {
                    cyclicBarrier.await();//所有线程完成之后等着,等计数器到7了,在完成上面的线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }

    }
}

Semaphore

package com.xrh.fuzhulei;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/** * 信号灯,在信号量上定义两种操作: * acquire,获取,成功获取资源(信号量减一),要么一直等待下去,直到有线程释放 * release: 释放,信号量加一,唤醒等待的线程 * * 当Semaphore semaphore=new Semaphore(1);//1个资源,相当于synchronized * * 作用: 一是用于多个共享资源的互斥作用,另一个是用于控制并发线程数的控制 * * 抢车位,7个车抢3个车位,多个线程抢多个资源 */
public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore=new Semaphore(3);//3个资源

        for (int i = 1; i <= 7; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();//占住,减一
                    System.out.println(Thread.currentThread().getName()+"抢到了车位");
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"离开了车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }

    }
}

java.util.concurrent.locks下的 ReadWriteLock

java.util.concurrent.locks下有Condition,Lock

读写锁跟之前学的lock差不多

package com.xrh.juclockreadwritelock;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockTest {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        for (int i = 1; i <= 5; i++) {
            final int tempInt = i;
            new Thread(()->{
                myCache.put(tempInt+"",tempInt+"");
            },String.valueOf(i)).start();
        }
        
        for (int i = 1; i <= 5; i++) {
            final int tempInt = i;
            new Thread(()->{
                myCache.get(tempInt+"");
            },String.valueOf(i)).start();
        }
    }
}

class MyCache {
    //volatile:,保证可见性,不保证原子性,一个线程修改后,通知更新
    private volatile Map<String, Object> map = new HashMap<>();
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public void put(String key, Object value) {
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "\t--写入数据" + key);
            //暂停一会线程
            try {
                TimeUnit.MICROSECONDS.sleep(300);
            } catch (Exception e) {
                e.printStackTrace();
            }
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "\t--写入完成");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }

    public void get(String key) {
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "\t读取数据");
            Object result = map.get(key);
            System.out.println(Thread.currentThread().getName() + "\t读取完成" + result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
}

阻塞队列

Collection接口不仅有set,list接口继承,还有一个queue接口也继承,BlockQueueing也是一个接口继承了queue,这个接口有7个实现类

每个实现类的特点:

核心方法

线程池

Executor类结构

线程池三大方法

通过工具类new池子

package com.xrh.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolMethod {
    public static void main(String[] args) {
        //ExecutorService threadPool= Executors.newFixedThreadPool(5);//一池固定5个线程
        //ExecutorService threadPool= Executors.newSingleThreadExecutor();//一池一个线程
        ExecutorService threadPool= Executors.newCachedThreadPool();//一池N个线程,可扩容


        //模拟10个顾客来办理银行业务,目前池子里有5个窗口(线程)
        try {
            for (int i = 0; i < 10; i++) {

                TimeUnit.SECONDS.sleep(1);
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }

    }
}

底层原理

线程池里7大参数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

线程池底层工作原理

拒绝策略

相关文章