Java并发编程之 locks

x33g5p2x  于2021-10-02 转载在 Java  
字(12.0k)|赞(0)|评价(0)|浏览(338)

锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。

使用synchronized关键字将会隐式地获取锁,但是它将锁的获取和释放固化了,也就是先获取再释放。虽然这种方式简化了同步的管理,但是扩展性没有显示的锁获取和释放来的好。

锁的使用方式

lock.lock();
try{
} finally{
    lock.unlock();
}

在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。

不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放。

可重入锁ReentrantLock

可重入锁,顾名思义,就是支持重进入的锁。它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。

而synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁。

ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。

ReentrantLock提供了一个构造函数,能够控制锁是否是公平的:

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

如何实现可重入

ReentrantLock是通过组合自定义同步器(AQS)来实现锁的获取与释放。

已获得锁的线程重新获取锁

以非公平性(默认的)实现为例:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。

锁的最终释放

成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求ReentrantLock在释放同步状态时减少同步状态值:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。

公平与非公平获取锁的区别

公平方式获取锁:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

他们获取锁的方式都是:首先判断state是否为0,如果为0,说明这个锁还没有被线程占有,那么就CAS将state的值加一(compareAndSetState(0, acquires)),然后将该线程设置为具有这把锁的访问权限的线程(setExclusiveOwnerThread(current));如果state不等于0,说明这把锁已经被某个线程对象持有了,接下来判断想要获取这把锁的线程是不是已经拥有这把锁的线程(判断重入),如果是的话,就将state的值用CAS的方式加一,并且返回true,表示重入获取锁成功。否则直接返回false,表示获取锁失败。

公平方式获取锁tryAcquire(int acquires)与非公平方式获取锁nonfairTryAcquire(int acquires)的唯一的不同点就是当state为0即这个锁还没有被线程占有时,会先调用hasQueuedPredecessors()方法判断加入了同步队列中当前节点是否有前驱节点,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

可重入读写锁ReentrantReadWriteLock

ReentrantLock和synchronized都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。

ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLock()方法和writeLock()方法;而ReentrantReadWriteLock除了接口方法之外,还提供了一些便于外界监控其
内部工作状态的方法:

ReentrantReadWriteLock有五个内部类,Sync继承自AQS、NonfairSync和FairSync继承自Sync类(通过构造函数传入的布尔值决定要构造哪一种Sync实例);ReadLock和WriteLock实现了Lock接口:

读写状态的设计

读锁ReadLock是共享锁:

public void lock() {
    sync.acquireShared(1);
}

写锁WriteLock是独占锁:

public void lock() {
    sync.acquire(1);
}

但是一个AQS对象只有一个state,想要表示两种不同的状态,就需要使用“按位切割”的方法,将整型的state的二进制位数切分为两部分,高16位表示读,低16位表示写(比如这张图就说明一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁):

假设当前同步状态值为c

获取写状态(获取写锁数量exclusiveCount): c & ((1 << 16) - 1) (0x0000ffff/65535)(将高16位全部抹去)

static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

获取读状态(获取读锁数量sharedCount): c >>> 16(无符号右移16位)

static final int SHARED_SHIFT   = 16;
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

写状态加一: c + 1

compareAndSetState(c, c + 1)

读状态加一: c +(1<<16)

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
compareAndSetState(c, c + SHARED_UNIT)

c不等于0时,当写状态等于0时,则读状态一定大于0,即读锁已被获取。

写锁的获取与释放

写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态

获取

写锁的获取由tryAcquire()实现:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    //写线程数量(即获取独占锁的重入数)
    int w = exclusiveCount(c);
    //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
    if (c != 0) {
        //总锁数量不为0,但写锁数量为0,说明读锁数量不为0,返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

该方法除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取。原因在于:读写锁要确保写锁的操作对读锁可见如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取。而写锁一旦被获取,则其他读写线程的后续访问均被阻塞

释放

写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。

protected final boolean tryRelease(int releases) {
    //若锁的持有者不是当前线程,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //写锁的新线程数
    int nextc = getState() - releases;
    //如果独占模式重入数为0了,说明独占模式被释放
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        //若写锁的新线程数为0,则将锁的持有者设置为null
        setExclusiveOwnerThread(null);
    //设置写锁的新线程数
    //不管独占模式是否被释放,更新独占重入数
    setState(nextc);
    return free;
}

读锁的获取与释放

获取

读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是增加读状态如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态

protected final int tryAcquireShared (int unused){
    for (;;) {
        int c = getState();
        int nextc = c + (1 << 16);
        if (nextc < c)
            throw new Error("Maximum lock count exceeded");
        if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
            return -1;
        if (compareAndSetState(c, nextc))
            return 1;
    }
}

如果其他线程已经获取了写锁(exclusiveCount(c) != 0),则当前线程获取读锁失败,进入等待状态(返回-1)。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁(返回1)。

释放

读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是(1<<16)。

static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);

for (;;) {
    int c = getState();
    int nextc = c - SHARED_UNIT;
    if (compareAndSetState(c, nextc))
        return nextc == 0;
}

锁降级

锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指在已获取一把写锁的前提下,再获取到读锁,随后释放先前拥有的写锁的过程。

ReentrantReadWriteLock总结

  • 在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)
  • 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)
  • 一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁
  • 写锁可以“降级”为读锁。对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁
  • 读锁不能“升级”为写锁。当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁

LockSupport

LockSupport是一个编程工具类,主要是为了阻塞和唤醒线程用的。

它的内部其实两类主要的方法:park(停车阻塞线程)和unpark(启动唤醒线程)。他们也都是调用的UnSafe类的parkunpark方法。

特点

释放锁方法unpark()可以优先于获取锁park()方法调用:

释放锁unpark()可以多次调用

LockSupport锁是不可重入的,他不像synchronized那样可重入是通过对ObjectWaiter_count进行加减操作,也不像ReentrantLock那样可重入是通过对AQSstate进行加减操作,他只有一个“许可证”,这个许可证如果被一个线程占有了,不但其他线程拿不到,即便是已拥有“许可证”的线程想要重新获取都是不可以的,只有等该线程释放掉“许可证”后才可以重新获取“许可证”。

park/unpark与wait/notify的区别

  1. wait()和notify()都是Object中的方法,在调用这两个方法前必须先获得锁对象,但是park()不需要获取某个对象的锁就可以锁住线程。即LockSupport不需要在同步代码块里 。所以线程间也不需要维护一个共享的同步对象了,实现了线程间的解耦。
  2. notify()只能随机选择一个线程唤醒,无法唤醒指定的线程,unpark()却可以唤醒一个指定的线程。
    写一段例子代码,线程A执行一段业务逻辑后调用wait阻塞住自己。主线程调用notify方法唤醒线程A,线程A然后打印自己执行的结果。
public class Main {
    public static void main(String[] args) {
        final Object lock = new Object();
        Thread A = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            try {
                lock.wait();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(sum);
        });
        A.start();
        Thread.sleep(1000);
        lock.notify();
    }
}

这段代码会报非法监视器异常IllegalMonitorStateException

因为wait()、notify()/notifyAll()方法只能在同步代码块中使用:

public class Main {
    public static void main(String[] args) {
        final Object lock = new Object();
        Thread A = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            try {
                synchronized (lock) {
                    lock.wait();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(sum);
        });
        A.start();
        Thread.sleep(1000);
        synchronized (lock) {
            lock.notify();
        }
    }
}

使用synchronized给lock对象上锁后再调用wait()/notify()方法就不会报错了。

使用LockSupport也可以实现:

public class Main {
    public static void main(String[] args) {
        Thread A = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            LockSupport.park();
            System.out.println(sum);
        });
        A.start();
        Thread.sleep(1000);
        LockSupport.unpark(A);
    }
}

对于wait()/notify()而言,线程A启动后需要调用Thread.sleep()方法确保线程A执行完毕进入wait状态,如果没有这句代码,很有可能导致线程A还没有执行完累加操作,就已经调用了外面的notify()方法,即在wait()方法之前调用notify()方法,导致线程永远不会被唤醒:

public class Main {
    public static void main(String[] args) {
        final Object lock = new Object();
        Thread A = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            try {
                synchronized (lock) {
                    lock.wait();
                    System.out.println("wait");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(sum);
        });
        A.start();
        //Thread.sleep(1000);
        synchronized (lock) {
            lock.notify();
            System.out.println("notify");
        }
    }
}

而使用LockSupport则永远不会产生类似问题:

public class Main {
    public static void main(String[] args){
        Thread A = new Thread(() -> {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            LockSupport.park();
            System.out.println(sum);
        });
        A.start();
        //Thread.sleep(1000);
        LockSupport.unpark(A);
    }
}

Condition接口

任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法:

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

ConditionObject

ConditionObject是同步器AbstractQueuedSynchronizer的内部类,实现了Condition接口。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

AQS同步队列和Condition等待队列

每个ConditionObject对象都包含一个等待队列,ConditionObject对象拥有首节点(firstWaiter)和尾节点(lastWaiter)。等待队列是一个FIFO的队列,在队列中的每个节点(AbstractQueuedSynchronizer.Node)都包含了一个线程引用,该线程就是在ConditionObject对象上等待的线程,如果一个线程调用了它的await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态(addConditionWaiter()):

ConditionObject对象拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock对象(更确切地说是同步器)拥有一个同步队列和多个等待队列(一个Lock对象可以调用多次newCondition()方法获得多个Condition对象):

等待

调用ConditionObject对象的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了ConditionObject对象相关联的锁。

如果从队列(同步队列和等待队列)的角度看await()方法,调用await()方法时相当于将同步队列的首节点(获取了锁的节点)移动到等待队列中(添加为等待队列的尾结点)

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //当前线程加入等待队列
    Node node = addConditionWaiter();
    //释放同步状态(释放锁)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

调用await()的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。

当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException

唤醒

调用await()方法时相当于将等待队列的首节点(获取了锁的节点)移动到同步队列中(添加为同步队列的尾结点)并唤醒:

public final void signal() {
    //当前线程必须是获取了锁的线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //将等待队列的头结点添加到同步队列(作为同步队列的尾结点)
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //唤醒刚刚从等待队列加入到同步队列的结点
        LockSupport.unpark(node.thread);
    return true;
}

signalAll()方法相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。

相关文章