com.gemstone.gemfire.internal.cache.locks.QueuedSynchronizer类的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(18.7k)|赞(0)|评价(0)|浏览(61)

本文整理了Java中com.gemstone.gemfire.internal.cache.locks.QueuedSynchronizer类的一些代码示例,展示了QueuedSynchronizer类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。QueuedSynchronizer类的具体详情如下:
包路径:com.gemstone.gemfire.internal.cache.locks.QueuedSynchronizer
类名称:QueuedSynchronizer

QueuedSynchronizer介绍

[英]Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Classes must implement the TryLockObject interface (or extend an implementation like ExclusiveSharedSynchronizer to define the methods that change this state, and which define what that state means in terms of this object being acquired or released while containing an object of this class to represent the thread waiter queue. Given these, the other methods in this class carry out all queuing and blocking mechanics. Lock classes can maintain other state fields, but only the atomically updated int value manipulated in the TryLockObject interface implementations is tracked with respect to synchronization.

Classes should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. Class QueuedSynchronizer does not implement any synchronization interface. Instead it defines methods such as #tryAcquireNanos, #tryAcquireSharedNanos that can be invoked as appropriate by concrete locks and related synchronizers to implement their public methods.

This class supports either or both a default exclusive mode and a shared mode or more modes as required depending on the optional integer argument passed to the lock acquire/release methods. When acquired in exclusive mode, attempted acquires by other threads cannot succeed. Shared mode acquires by multiple threads may (but need not) succeed. This class does not "understand" these differences except in the mechanical sense that when a shared mode acquire succeeds, the next waiting thread (if one exists) must also determine whether it can acquire as well. Threads waiting in the different modes share the same FIFO queue. Usually, implementation subclasses support only one of these modes, but both can come into play for example in a ReadWriteLock. Subclasses that support only exclusive or only shared modes need not define the methods supporting the unused mode.

This class provides inspection, instrumentation, and monitoring methods for the internal queue, as well as similar methods for condition objects. These can be exported as desired into classes using an QueuedSynchronizer for their synchronization mechanics.

Usage

To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state in the TryLockObject interface implementation:

  • TryLockObject#tryAcquire
  • TryLockObject#tryRelease
  • TryLockObject#tryAcquireShared
  • TryLockObject#tryReleaseShared
    Implementations of these methods must be internally thread-safe, and should in general be short and not block. Defining these methods is the only supported means of using this class. All methods in this class are declared final because they cannot be independently varied.

Even though this class is based on an internal FIFO queue, it does not automatically enforce FIFO acquisition policies. The core of exclusive synchronization takes the form:

Acquire: 
while (!lock.tryAcquire(arg, owner)) { 
<em>enqueue thread if it is not already queued</em>; 
<em>possibly block current thread</em>; 
} 
Release: 
if (lock.tryRelease(arg, owner)) 
<em>unblock the first queued thread</em>;

(Shared mode is similar but may involve cascading signals.)

Because checks in acquire are invoked before enqueuing, a newly acquiring thread may barge ahead of others that are blocked and queued. However, you can, if desired, define tryAcquire and/or tryAcquireShared to disable barging by internally invoking one or more of the inspection methods. In particular, a strict FIFO lock can define tryAcquire to immediately return false if first queued thread is not the current thread. A normally preferable non-strict fair version can immediately return false only if #hasQueuedThreads returns null and getFirstQueuedThread is not the current thread; or equivalently, that getFirstQueuedThread is both non-null and not the current thread. Further variations are possible.

Throughput and scalability are generally highest for the default barging (also known as greedy, renouncement, and convoy-avoidance) strategy. While this is not guaranteed to be fair or starvation-free, earlier queued threads are allowed to recontend before later queued threads, and each recontention has an unbiased chance to succeed against incoming threads. Also, while acquires do not "spin" in the usual sense, they may perform multiple invocations of tryAcquire interspersed with other computations before blocking. This gives most of the benefits of spins when exclusive synchronization is only briefly held, without most of the liabilities when it isn't. If so desired, you can augment this by preceding calls to acquire methods with "fast-path" checks, possibly prechecking #hasContended and/or #hasQueuedThreads to only do so if the synchronizer is likely not to be contended.

This class provides an efficient and scalable basis for synchronization in part by specializing its range of use to synchronizers that can rely on int state, acquire, and release parameters, and an internal FIFO wait queue. When this does not suffice, you can build synchronizers from a lower level using java.util.concurrent.atomic atomic classes, your own custom java.util.Queue classes, and LockSupport blocking support.

Usage Examples

Here is a non-reentrant mutual exclusion lock class that uses the value zero to represent the unlocked state, and one to represent the locked state. While a non-reentrant lock does not strictly require recording of the current owner thread, this class does so anyway to make usage easier to monitor. It also supports conditions and exposes one of the instrumentation methods:

class Mutex implements Lock, java.io.Serializable { 
// Our internal helper class 
private static class Sync implements TryLockObject { 
// Report whether in locked state 
public boolean hasExclusiveLock() { 
return getState() == 1; 
} 
// Acquire the lock if state is zero 
public boolean tryAcquire(int acquires, Object owner) { 
assert acquires == 1; // Otherwise unused 
if (compareAndSetState(0, 1)) { 
setExclusiveOwner(Thread.currentThread()); 
return true; 
} 
return false; 
} 
// Release the lock by setting state to zero 
protected boolean tryRelease(int releases, Object owner) { 
assert releases == 1; // Otherwise unused 
if (getState() == 0) { 
throw new IllegalMonitorStateException(); 
} 
setExclusiveOwner(null); 
setState(0); 
return true; 
} 
// Provide a Condition 
Condition newCondition() { 
return new ConditionObject(); 
} 
// Deserialize properly 
private void readObject(ObjectInputStream s) throws IOException, 
ClassNotFoundException { 
s.defaultReadObject(); 
setState(0); // reset to unlocked state 
} 
} 
// The sync object does all the hard work. We just forward to it. 
private final Sync sync = new Sync(); 
private final QueuedSynchronizer waiterQueue = new QueuedSynchronizer(); 
public void lock() { 
waiterQueue.acquire(1, Thread.currentThread(), this.sync); 
} 
public boolean tryLock() { 
return sync.tryAcquire(1); 
} 
public void unlock() { 
waiterQueue.release(1, Thread.currentThread(), this.sync); 
} 
public Condition newCondition() { 
return sync.newCondition(); 
} 
public boolean isLocked() { 
return sync.hasExclusiveLock(Thread.currentThread()); 
} 
public boolean hasQueuedThreads() { 
return waiterQueue.hasQueuedThreads(); 
} 
public void lockInterruptibly() throws InterruptedException { 
attemptSharedLock(1, Thread.currentThread(), this.sync); 
} 
public boolean tryLock(long timeout, TimeUnit unit) 
throws InterruptedException { 
return waiterQueue.tryAcquireNanos(1, Thread.currentThread(), this.sync, 
unit.toNanos(timeout)); 
} 
}

Here is a latch class that is like a CountDownLatch except that it only requires a single signal to fire. Because a latch is non-exclusive, it uses the shared acquire and release methods.

class BooleanLatch implements TryLockObject { 
private final QueuedSynchronizer sync = new QueuedSynchronizer(); 
public boolean isSignalled() { 
return getState() != 0; 
} 
public int tryAcquireShared(int ignore, Object owner) { 
return isSignalled() ? 1 : -1; 
} 
public boolean tryReleaseShared(int ignore, Object owner) { 
setState(1); 
return true; 
} 
public void signal() { 
this.sync.releaseShared(1, null, this); 
} 
public void await() throws InterruptedException { 
this.sync.acquireSharedInterruptibly(1, null, this); 
} 
}

[中]提供一个框架,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)。此类被设计为大多数类型的同步器的有用基础,这些同步器依赖于单个原子int值来表示状态。类必须实现TryLockObject接口(或者扩展一个实现,比如ExclusiveSharedSynchronizer,来定义改变这个状态的方法,以及定义这个状态对于获取或释放这个对象意味着什么,同时包含这个类的一个对象来表示线程服务员队列锁定机制。Lock类可以维护其他状态字段,但只有在TryLockObject接口实现中操作的原子更新的int值在同步方面被跟踪。
类应定义为非公共内部帮助器类,用于实现其封闭类的同步属性。类QueuedSynchronizer不实现任何同步接口。相反,它定义了#tryacquirenos、#tryAcquireSharedNanos等方法,具体锁和相关同步器可以根据需要调用这些方法来实现它们的公共方法。
该类支持默认的exclusive模式和shared模式中的一种或两种,或者根据需要支持更多模式,具体取决于传递给锁获取/释放方法的可选整数参数。在独占模式下获取时,其他线程尝试的获取无法成功。多线程获取共享模式可能(但不一定)成功。这个类不“理解”这些差异,除非是在机械意义上,当共享模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也可以获取。在不同模式下等待的线程共享同一FIFO队列。通常,实现子类只支持其中一种模式,但这两种模式都可以发挥作用,例如在ReadWriteLock中。只支持独占或共享模式的子类不需要定义支持未使用模式的方法。
此类为内部队列提供了检查、检测和监视方法,也为条件对象提供了类似的方法。这些可以根据需要导出到类中,使用QueuedSynchronizer作为同步机制。
####用法
要将此类用作同步器的基础,请通过检查和/或修改TryLockObject接口实现中的同步状态,重新定义以下方法(如适用):
*TryLockObject#tryAcquire
*TryLockObject#tryRelease
TryLockObject#tryAcquireShared
TryLockObject#tryReleaseShared
这些方法的实现必须是内部线程安全的,并且通常应该简短,而不是阻塞。定义这些方法是使用此类
唯一
受支持的方法。这个类中的所有方法都被声明为final,因为它们不能独立地改变。
即使该类基于内部FIFO队列,它也不会自动执行FIFO获取策略。独占同步的核心形式如下:

Acquire: 
while (!lock.tryAcquire(arg, owner)) { 
<em>enqueue thread if it is not already queued</em>; 
<em>possibly block current thread</em>; 
} 
Release: 
if (lock.tryRelease(arg, owner)) 
<em>unblock the first queued thread</em>;

(共享模式类似,但可能涉及级联信号。)
因为在排队之前调用了acquire中的签入,所以新的获取线程可能会在其他被阻塞和排队的线程之前中断。但是,如果需要,您可以通过内部调用一种或多种检查方法来定义tryAcquire和/或tryAcquireShared以禁用驳船。特别是,如果第一个排队的线程不是当前线程,严格的FIFO锁可以定义tryAcquire立即返回false。只有当#hasQueuedThreads返回null且getFirstQueuedThread不是当前线程时,通常更可取的非严格公平版本才能立即返回false;或者等效地,getFirstQueuedThread既不是null,也不是当前线程。进一步的变化是可能的。
默认驳船(也称为“贪婪”、“放弃”和“避免护航”)策略的吞吐量和可扩展性通常最高。虽然这不能保证公平或无饥饿,但允许先排队的线程在后排队的线程之前重新调度,并且每个重新调度都有一个不受传入线程影响的成功机会。此外,虽然采集不会在通常意义上“旋转”,但它们可能会在阻塞之前执行多次调用tryAcquire,并在其他计算中穿插。当独占同步仅短暂保持时,这提供了旋转的大部分好处,而当不保持独占同步时,则没有大部分责任。如果需要,可以通过前面的调用来增强这一点,以获取具有“快速路径”检查的方法,可能会预先检查#hasCompleted和/或#hasQueuedThreads,以便仅在同步器可能不会被竞争时执行此操作。
该类为同步提供了一个高效且可扩展的基础,部分是通过将其使用范围专门化为可依赖于int state、acquire和release参数的同步器,以及一个内部FIFO等待队列。当这还不够时,可以使用java从较低级别构建同步器。util。同时发生的原子类,你自己定制的java。util。队列类和锁支持阻塞支持。
####用法示例
这是一个不可重入的互斥锁类,它使用值0表示解锁状态,使用值1表示锁定状态。虽然不可重入的锁并不严格要求记录当前所有者线程,但这个类还是这样做,以便更容易监控使用情况。它还支持条件,并公开其中一种检测方法:

class Mutex implements Lock, java.io.Serializable { 
// Our internal helper class 
private static class Sync implements TryLockObject { 
// Report whether in locked state 
public boolean hasExclusiveLock() { 
return getState() == 1; 
} 
// Acquire the lock if state is zero 
public boolean tryAcquire(int acquires, Object owner) { 
assert acquires == 1; // Otherwise unused 
if (compareAndSetState(0, 1)) { 
setExclusiveOwner(Thread.currentThread()); 
return true; 
} 
return false; 
} 
// Release the lock by setting state to zero 
protected boolean tryRelease(int releases, Object owner) { 
assert releases == 1; // Otherwise unused 
if (getState() == 0) { 
throw new IllegalMonitorStateException(); 
} 
setExclusiveOwner(null); 
setState(0); 
return true; 
} 
// Provide a Condition 
Condition newCondition() { 
return new ConditionObject(); 
} 
// Deserialize properly 
private void readObject(ObjectInputStream s) throws IOException, 
ClassNotFoundException { 
s.defaultReadObject(); 
setState(0); // reset to unlocked state 
} 
} 
// The sync object does all the hard work. We just forward to it. 
private final Sync sync = new Sync(); 
private final QueuedSynchronizer waiterQueue = new QueuedSynchronizer(); 
public void lock() { 
waiterQueue.acquire(1, Thread.currentThread(), this.sync); 
} 
public boolean tryLock() { 
return sync.tryAcquire(1); 
} 
public void unlock() { 
waiterQueue.release(1, Thread.currentThread(), this.sync); 
} 
public Condition newCondition() { 
return sync.newCondition(); 
} 
public boolean isLocked() { 
return sync.hasExclusiveLock(Thread.currentThread()); 
} 
public boolean hasQueuedThreads() { 
return waiterQueue.hasQueuedThreads(); 
} 
public void lockInterruptibly() throws InterruptedException { 
attemptSharedLock(1, Thread.currentThread(), this.sync); 
} 
public boolean tryLock(long timeout, TimeUnit unit) 
throws InterruptedException { 
return waiterQueue.tryAcquireNanos(1, Thread.currentThread(), this.sync, 
unit.toNanos(timeout)); 
} 
}

这里有一个类似于CountDownLatch的闩锁类,只是它只需要一个信号就可以触发。因为闩锁是非独占的,所以它使用共享的获取和释放方法。

class BooleanLatch implements TryLockObject { 
private final QueuedSynchronizer sync = new QueuedSynchronizer(); 
public boolean isSignalled() { 
return getState() != 0; 
} 
public int tryAcquireShared(int ignore, Object owner) { 
return isSignalled() ? 1 : -1; 
} 
public boolean tryReleaseShared(int ignore, Object owner) { 
setState(1); 
return true; 
} 
public void signal() { 
this.sync.releaseShared(1, null, this); 
} 
public void await() throws InterruptedException { 
this.sync.acquireSharedInterruptibly(1, null, this); 
} 
}

代码示例

代码示例来源:origin: io.snappydata/gemfire-core

public ReentrantReadWriteWriteShareLock() {
 this.sync = new QueuedSynchronizer();
}

代码示例来源:origin: io.snappydata/gemfire-core

/**
 * Signal any waiting threads in the {@link QueuedSynchronizer}. By default
 * the {@link QueuedSynchronizer} is obtained by a call to
 * {@link #getQueuedSynchronizer}.
 */
protected void signalQueuedSynchronizer(final Object context,
  final boolean shared) {
 final QueuedSynchronizer sync = getQueuedSynchronizer(context);
 if (shared) {
  sync.signalSharedWaiters();
 }
 else {
  sync.clearOwnerThread();
  sync.signalWaiters();
 }
}

代码示例来源:origin: io.snappydata/gemfirexd-core

/**
 * Returns an estimate of the number of threads waiting to acquire either the
 * read or write lock. The value is only an estimate because the number of
 * threads may change dynamically while this method traverses internal data
 * structures. This method is designed for use in monitoring of the system
 * state, not for synchronization control.
 * 
 * @return the estimated number of threads waiting for this lock
 */
public final int getQueueLength() {
 return this.sync.getQueueLength();
}

代码示例来源:origin: io.snappydata/gemfire-core

/**
 * @see MapCallback#newValue
 */
@Override
public final QueuedSynchronizer newValue(final RegionEntry key,
  final AbstractRegionEntry entry, final Void ignored,
  final MapResult result) {
 final QueuedSynchronizer sync = new QueuedSynchronizer();
 // set one waiter on the queue
 sync.initNumWaiters(1);
 // set the waiters flag on the entry
 entry.setHasWaiters();
 return sync;
}

代码示例来源:origin: io.snappydata/gemfire-core

/**
 * @see ExclusiveSharedSynchronizer#clearOwnerId(Object)
 */
@Override
protected final void clearOwnerId(Object context) {
 this.ownerId = null;
 this.sync.clearOwnerThread();
}

代码示例来源:origin: io.snappydata/gemfire-core

/**
 * @see ExclusiveSharedSynchronizer#setOwnerId(Object, Object)
 */
@Override
protected final void setOwnerId(Object owner, Object context) {
 this.ownerId = owner;
 this.sync.setOwnerThread();
}

代码示例来源:origin: io.snappydata/gemfire-core

/**
 * @see Lock#unlock()
 */
public final void unlock() {
 tryRelease();
 this.sync.signalWaiters();
}

代码示例来源:origin: io.snappydata/gemfirexd-core

private StringBuilder toString(StringBuilder sb) {
 sb.append(REF_PREFIX).append(Integer.toHexString(hashCode())).append(',');
 return this.sync.toObjectString(sb).append("[name=").append(this.lockName)
   .append(']');
}

代码示例来源:origin: io.snappydata/gemfirexd-core

if (this.sync.tryAcquireNanos(0 /* not used */, owner, this,
  TimeUnit.MILLISECONDS.toNanos(timeoutMillis), null, null)) {
 res = true;
res = this.sync.tryAcquireNanos(0 /* not used */, owner, this,
  TimeUnit.MILLISECONDS.toNanos(msecs), null, null);

代码示例来源:origin: io.snappydata/gemfire-core

do {
 if (exclusive) {
  result = sync.tryAcquireNanos(0, ownerId, this, loopWaitNanos, context,
    null);
  result = sync.tryAcquireSharedNanos(lockModeArg, ownerId, this,
    loopWaitNanos, context, null);

代码示例来源:origin: io.snappydata/gemfirexd-core

/**
 * Queries whether any threads are waiting to acquire the read or write lock.
 * Note that because cancellations may occur at any time, a {@code true}
 * return does not guarantee that any other thread will ever acquire a lock.
 * This method is designed primarily for use in monitoring of the system
 * state.
 * 
 * @return {@code true} if there may be other threads waiting to acquire the
 *         lock
 */
public final boolean hasQueuedThreads() {
 return this.sync.hasQueuedThreads();
}

代码示例来源:origin: io.snappydata/gemfire-core

/**
 * Releases in shared mode. Implemented by unblocking one or more threads if
 * {@link #tryReleaseShared(int, Object, Object)} returns true.
 * 
 * @throws IllegalMonitorStateException
 *           If releasing would place this synchronizer in an illegal state or
 *           lock is not held by the calling thread. This exception must be
 *           thrown in a consistent fashion for synchronization to work
 *           correctly.
 */
public final void releaseReadLock() {
 tryReleaseShared();
 this.sync.signalSharedWaiters();
}

代码示例来源:origin: io.snappydata/gemfirexd

int waitForPendingWriter = WAIT_FOR_PENDING_WRITER;
while (msecs > timeoutMillis) {
 if (this.sync.tryAcquireSharedNanos(waitForPendingWriter, owner, this,
   TimeUnit.MILLISECONDS.toNanos(timeoutMillis), null, null)) {
  res = true;
 res = this.sync.tryAcquireSharedNanos(0, owner, this,
   TimeUnit.MILLISECONDS.toNanos(msecs), null, null);

代码示例来源:origin: io.snappydata/gemfire-core

setHead(node);
 Node s = node.next;
 if (s == null || s.isShared())
  signalSharedWaiters();

代码示例来源:origin: io.snappydata/gemfire-core

/**
 * Attempts to set the state to reflect a release in exclusive mode.
 * 
 * <p>
 * This method is always invoked by the thread performing release.
 * 
 * @throws IllegalMonitorStateException
 *           if releasing would place this synchronizer in an illegal state.
 *           This exception must be thrown in a consistent fashion for
 *           synchronization to work correctly.
 */
private final void tryRelease() {
 if (compareAndSet(WRITE_MASK, 0)) {
  this.sync.clearOwnerThread();
 }
 else {
  // if system is going down then this can happen in some rare cases
  getCancelCriterion().checkCancelInProgress(null);
  throw new IllegalMonitorStateException("write lock not held in release");
 }
}

代码示例来源:origin: io.snappydata/gemfire-core

/**
 * Attempts to acquire in exclusive mode. This method should query if the
 * state of the object permits it to be acquired in the exclusive mode, and if
 * so to acquire it.
 * 
 * <p>
 * This method is always invoked by the thread performing acquire. If this
 * method reports failure, the acquire method may queue the thread, if it is
 * not already queued, until it is signalled by a release from some other
 * thread.
 * 
 * @return true if the acquire succeeded and false otherwise
 */
private final boolean tryAcquire() {
 if (compareAndSet(0, 1)) {
  this.sync.setOwnerThread();
  return true;
 }
 else {
  return false;
 }
}

代码示例来源:origin: io.snappydata/gemfire-core

/**
 * Releases the lock previously acquired by a call to
 * {@link #attemptLock(long)}. Implemented by unblocking one or more threads.
 * 
 * @throws IllegalMonitorStateException
 *           if releasing would place this synchronizer in an illegal state.
 *           This exception must be thrown in a consistent fashion for
 *           synchronization to work correctly.
 */
public final void releaseLock() {
 tryRelease();
 this.sync.signalWaiters();
}

代码示例来源:origin: io.snappydata/gemfirexd

private StringBuilder toString(StringBuilder sb) {
 sb.append(REF_PREFIX).append(Integer.toHexString(hashCode())).append(',');
 return this.sync.toObjectString(sb).append("[name=").append(this.lockName)
   .append(']');
}

代码示例来源:origin: io.snappydata/gemfire-core

if (this.sync.tryAcquireNanos(0, null, this,
  TimeUnit.MILLISECONDS.toNanos(timeoutMillis), null, this.stopper)) {
 result = true;
result = this.sync.tryAcquireNanos(0, null, this,
  TimeUnit.MILLISECONDS.toNanos(msecs), null, this.stopper);

代码示例来源:origin: io.snappydata/snappydata-store-core

/**
 * Queries whether any threads are waiting to acquire the read or write lock.
 * Note that because cancellations may occur at any time, a {@code true}
 * return does not guarantee that any other thread will ever acquire a lock.
 * This method is designed primarily for use in monitoring of the system
 * state.
 * 
 * @return {@code true} if there may be other threads waiting to acquire the
 *         lock
 */
public final boolean hasQueuedThreads() {
 return this.sync.hasQueuedThreads();
}

相关文章