View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  /*
18   * Written by Doug Lea with assistance from members of JCP JSR-166
19   * Expert Group and released to the public domain, as explained at
20   * http://creativecommons.org/licenses/publicdomain
21   */
22  
23  package org.jboss.netty.util.internal;
24  
25  import java.util.AbstractQueue;
26  import java.util.Collection;
27  import java.util.ConcurrentModificationException;
28  import java.util.Iterator;
29  import java.util.NoSuchElementException;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
33  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
34  import java.util.concurrent.locks.LockSupport;
35  /**
36   * An unbounded {@link BlockingQueue} based on linked nodes.
37   * This queue orders elements FIFO (first-in-first-out) with respect
38   * to any given producer.  The <em>head</em> of the queue is that
39   * element that has been on the queue the longest time for some
40   * producer.  The <em>tail</em> of the queue is that element that has
41   * been on the queue the shortest time for some producer.
42   *
43   * <p>Beware that, unlike in most collections, the {@code size}
44   * method is <em>NOT</em> a constant-time operation. Because of the
45   * asynchronous nature of these queues, determining the current number
46   * of elements requires a traversal of the elements.
47   *
48   * <p>This class and its iterator implement all of the
49   * <em>optional</em> methods of the {@link Collection} and {@link
50   * Iterator} interfaces.
51   *
52   * <p>Memory consistency effects: As with other concurrent
53   * collections, actions in a thread prior to placing an object into a
54   * {@code LinkedTransferQueue}
55   * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
56   * actions subsequent to the access or removal of that element from
57   * the {@code LinkedTransferQueue} in another thread.
58   *
59   * <p>This class is a member of the
60   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
61   * Java Collections Framework</a>.
62   *
63   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
64   * @author Doug Lea
65   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
66   * @version $Rev: 2373 $, $Date: 2010-10-20 20:33:23 +0900 (Wed, 20 Oct 2010) $ (Upstream: 1.79)
67   *
68   * @param <E> the type of elements held in this collection
69   */
70  public class LinkedTransferQueue<E> extends AbstractQueue<E>
71      implements BlockingQueue<E>, java.io.Serializable {
72      private static final long serialVersionUID = -3223113410248163686L;
73  
74      /*
75       * *** Overview of Dual Queues with Slack ***
76       *
77       * Dual Queues, introduced by Scherer and Scott
78       * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
79       * (linked) queues in which nodes may represent either data or
80       * requests.  When a thread tries to enqueue a data node, but
81       * encounters a request node, it instead "matches" and removes it;
82       * and vice versa for enqueuing requests. Blocking Dual Queues
83       * arrange that threads enqueuing unmatched requests block until
84       * other threads provide the match. Dual Synchronous Queues (see
85       * Scherer, Lea, & Scott
86       * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
87       * additionally arrange that threads enqueuing unmatched data also
88       * block.  Dual Transfer Queues support all of these modes, as
89       * dictated by callers.
90       *
91       * A FIFO dual queue may be implemented using a variation of the
92       * Michael & Scott (M&S) lock-free queue algorithm
93       * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
94       * It maintains two pointer fields, "head", pointing to a
95       * (matched) node that in turn points to the first actual
96       * (unmatched) queue node (or null if empty); and "tail" that
97       * points to the last node on the queue (or again null if
98       * empty). For example, here is a possible queue with four data
99       * elements:
100      *
101      *  head                tail
102      *    |                   |
103      *    v                   v
104      *    M -> U -> U -> U -> U
105      *
106      * The M&S queue algorithm is known to be prone to scalability and
107      * overhead limitations when maintaining (via CAS) these head and
108      * tail pointers. This has led to the development of
109      * contention-reducing variants such as elimination arrays (see
110      * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
111      * optimistic back pointers (see Ladan-Mozes & Shavit
112      * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
113      * However, the nature of dual queues enables a simpler tactic for
114      * improving M&S-style implementations when dual-ness is needed.
115      *
116      * In a dual queue, each node must atomically maintain its match
117      * status. While there are other possible variants, we implement
118      * this here as: for a data-mode node, matching entails CASing an
119      * "item" field from a non-null data value to null upon match, and
120      * vice-versa for request nodes, CASing from null to a data
121      * value. (Note that the linearization properties of this style of
122      * queue are easy to verify -- elements are made available by
123      * linking, and unavailable by matching.) Compared to plain M&S
124      * queues, this property of dual queues requires one additional
125      * successful atomic operation per enq/deq pair. But it also
126      * enables lower cost variants of queue maintenance mechanics. (A
127      * variation of this idea applies even for non-dual queues that
128      * support deletion of interior elements, such as
129      * j.u.c.ConcurrentLinkedQueue.)
130      *
131      * Once a node is matched, its match status can never again
132      * change.  We may thus arrange that the linked list of them
133      * contain a prefix of zero or more matched nodes, followed by a
134      * suffix of zero or more unmatched nodes. (Note that we allow
135      * both the prefix and suffix to be zero length, which in turn
136      * means that we do not use a dummy header.)  If we were not
137      * concerned with either time or space efficiency, we could
138      * correctly perform enqueue and dequeue operations by traversing
139      * from a pointer to the initial node; CASing the item of the
140      * first unmatched node on match and CASing the next field of the
141      * trailing node on appends. (Plus some special-casing when
142      * initially empty).  While this would be a terrible idea in
143      * itself, it does have the benefit of not requiring ANY atomic
144      * updates on head/tail fields.
145      *
146      * We introduce here an approach that lies between the extremes of
147      * never versus always updating queue (head and tail) pointers.
148      * This offers a tradeoff between sometimes requiring extra
149      * traversal steps to locate the first and/or last unmatched
150      * nodes, versus the reduced overhead and contention of fewer
151      * updates to queue pointers. For example, a possible snapshot of
152      * a queue is:
153      *
154      *  head           tail
155      *    |              |
156      *    v              v
157      *    M -> M -> U -> U -> U -> U
158      *
159      * The best value for this "slack" (the targeted maximum distance
160      * between the value of "head" and the first unmatched node, and
161      * similarly for "tail") is an empirical matter. We have found
162      * that using very small constants in the range of 1-3 work best
163      * over a range of platforms. Larger values introduce increasing
164      * costs of cache misses and risks of long traversal chains, while
165      * smaller values increase CAS contention and overhead.
166      *
167      * Dual queues with slack differ from plain M&S dual queues by
168      * virtue of only sometimes updating head or tail pointers when
169      * matching, appending, or even traversing nodes; in order to
170      * maintain a targeted slack.  The idea of "sometimes" may be
171      * operationalized in several ways. The simplest is to use a
172      * per-operation counter incremented on each traversal step, and
173      * to try (via CAS) to update the associated queue pointer
174      * whenever the count exceeds a threshold. Another, that requires
175      * more overhead, is to use random number generators to update
176      * with a given probability per traversal step.
177      *
178      * In any strategy along these lines, because CASes updating
179      * fields may fail, the actual slack may exceed targeted
180      * slack. However, they may be retried at any time to maintain
181      * targets.  Even when using very small slack values, this
182      * approach works well for dual queues because it allows all
183      * operations up to the point of matching or appending an item
184      * (hence potentially allowing progress by another thread) to be
185      * read-only, thus not introducing any further contention. As
186      * described below, we implement this by performing slack
187      * maintenance retries only after these points.
188      *
189      * As an accompaniment to such techniques, traversal overhead can
190      * be further reduced without increasing contention of head
191      * pointer updates: Threads may sometimes shortcut the "next" link
192      * path from the current "head" node to be closer to the currently
193      * known first unmatched node, and similarly for tail. Again, this
194      * may be triggered with using thresholds or randomization.
195      *
196      * These ideas must be further extended to avoid unbounded amounts
197      * of costly-to-reclaim garbage caused by the sequential "next"
198      * links of nodes starting at old forgotten head nodes: As first
199      * described in detail by Boehm
200      * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
201      * delays noticing that any arbitrarily old node has become
202      * garbage, all newer dead nodes will also be unreclaimed.
203      * (Similar issues arise in non-GC environments.)  To cope with
204      * this in our implementation, upon CASing to advance the head
205      * pointer, we set the "next" link of the previous head to point
206      * only to itself; thus limiting the length of connected dead lists.
207      * (We also take similar care to wipe out possibly garbage
208      * retaining values held in other Node fields.)  However, doing so
209      * adds some further complexity to traversal: If any "next"
210      * pointer links to itself, it indicates that the current thread
211      * has lagged behind a head-update, and so the traversal must
212      * continue from the "head".  Traversals trying to find the
213      * current tail starting from "tail" may also encounter
214      * self-links, in which case they also continue at "head".
215      *
216      * It is tempting in slack-based scheme to not even use CAS for
217      * updates (similarly to Ladan-Mozes & Shavit). However, this
218      * cannot be done for head updates under the above link-forgetting
219      * mechanics because an update may leave head at a detached node.
220      * And while direct writes are possible for tail updates, they
221      * increase the risk of long retraversals, and hence long garbage
222      * chains, which can be much more costly than is worthwhile
223      * considering that the cost difference of performing a CAS vs
224      * write is smaller when they are not triggered on each operation
225      * (especially considering that writes and CASes equally require
226      * additional GC bookkeeping ("write barriers") that are sometimes
227      * more costly than the writes themselves because of contention).
228      *
229      * *** Overview of implementation ***
230      *
231      * We use a threshold-based approach to updates, with a slack
232      * threshold of two -- that is, we update head/tail when the
233      * current pointer appears to be two or more steps away from the
234      * first/last node. The slack value is hard-wired: a path greater
235      * than one is naturally implemented by checking equality of
236      * traversal pointers except when the list has only one element,
237      * in which case we keep slack threshold at one. Avoiding tracking
238      * explicit counts across method calls slightly simplifies an
239      * already-messy implementation. Using randomization would
240      * probably work better if there were a low-quality dirt-cheap
241      * per-thread one available, but even ThreadLocalRandom is too
242      * heavy for these purposes.
243      *
244      * With such a small slack threshold value, it is not worthwhile
245      * to augment this with path short-circuiting (i.e., unsplicing
246      * interior nodes) except in the case of cancellation/removal (see
247      * below).
248      *
249      * We allow both the head and tail fields to be null before any
250      * nodes are enqueued; initializing upon first append.  This
251      * simplifies some other logic, as well as providing more
252      * efficient explicit control paths instead of letting JVMs insert
253      * implicit NullPointerExceptions when they are null.  While not
254      * currently fully implemented, we also leave open the possibility
255      * of re-nulling these fields when empty (which is complicated to
256      * arrange, for little benefit.)
257      *
258      * All enqueue/dequeue operations are handled by the single method
259      * "xfer" with parameters indicating whether to act as some form
260      * of offer, put, poll, take, or transfer (each possibly with
261      * timeout). The relative complexity of using one monolithic
262      * method outweighs the code bulk and maintenance problems of
263      * using separate methods for each case.
264      *
265      * Operation consists of up to three phases. The first is
266      * implemented within method xfer, the second in tryAppend, and
267      * the third in method awaitMatch.
268      *
269      * 1. Try to match an existing node
270      *
271      *    Starting at head, skip already-matched nodes until finding
272      *    an unmatched node of opposite mode, if one exists, in which
273      *    case matching it and returning, also if necessary updating
274      *    head to one past the matched node (or the node itself if the
275      *    list has no other unmatched nodes). If the CAS misses, then
276      *    a loop retries advancing head by two steps until either
277      *    success or the slack is at most two. By requiring that each
278      *    attempt advances head by two (if applicable), we ensure that
279      *    the slack does not grow without bound. Traversals also check
280      *    if the initial head is now off-list, in which case they
281      *    start at the new head.
282      *
283      *    If no candidates are found and the call was untimed
284      *    poll/offer, (argument "how" is NOW) return.
285      *
286      * 2. Try to append a new node (method tryAppend)
287      *
288      *    Starting at current tail pointer, find the actual last node
289      *    and try to append a new node (or if head was null, establish
290      *    the first node). Nodes can be appended only if their
291      *    predecessors are either already matched or are of the same
292      *    mode. If we detect otherwise, then a new node with opposite
293      *    mode must have been appended during traversal, so we must
294      *    restart at phase 1. The traversal and update steps are
295      *    otherwise similar to phase 1: Retrying upon CAS misses and
296      *    checking for staleness.  In particular, if a self-link is
297      *    encountered, then we can safely jump to a node on the list
298      *    by continuing the traversal at current head.
299      *
300      *    On successful append, if the call was ASYNC, return.
301      *
302      * 3. Await match or cancellation (method awaitMatch)
303      *
304      *    Wait for another thread to match node; instead cancelling if
305      *    the current thread was interrupted or the wait timed out. On
306      *    multiprocessors, we use front-of-queue spinning: If a node
307      *    appears to be the first unmatched node in the queue, it
308      *    spins a bit before blocking. In either case, before blocking
309      *    it tries to unsplice any nodes between the current "head"
310      *    and the first unmatched node.
311      *
312      *    Front-of-queue spinning vastly improves performance of
313      *    heavily contended queues. And so long as it is relatively
314      *    brief and "quiet", spinning does not much impact performance
315      *    of less-contended queues.  During spins threads check their
316      *    interrupt status and generate a thread-local random number
317      *    to decide to occasionally perform a Thread.yield. While
318      *    yield has underdefined specs, we assume that might it help,
319      *    and will not hurt in limiting impact of spinning on busy
320      *    systems.  We also use smaller (1/2) spins for nodes that are
321      *    not known to be front but whose predecessors have not
322      *    blocked -- these "chained" spins avoid artifacts of
323      *    front-of-queue rules which otherwise lead to alternating
324      *    nodes spinning vs blocking. Further, front threads that
325      *    represent phase changes (from data to request node or vice
326      *    versa) compared to their predecessors receive additional
327      *    chained spins, reflecting longer paths typically required to
328      *    unblock threads during phase changes.
329      *
330      *
331      * ** Unlinking removed interior nodes **
332      *
333      * In addition to minimizing garbage retention via self-linking
334      * described above, we also unlink removed interior nodes. These
335      * may arise due to timed out or interrupted waits, or calls to
336      * remove(x) or Iterator.remove.  Normally, given a node that was
337      * at one time known to be the predecessor of some node s that is
338      * to be removed, we can unsplice s by CASing the next field of
339      * its predecessor if it still points to s (otherwise s must
340      * already have been removed or is now offlist). But there are two
341      * situations in which we cannot guarantee to make node s
342      * unreachable in this way: (1) If s is the trailing node of list
343      * (i.e., with null next), then it is pinned as the target node
344      * for appends, so can only be removed later after other nodes are
345      * appended. (2) We cannot necessarily unlink s given a
346      * predecessor node that is matched (including the case of being
347      * cancelled): the predecessor may already be unspliced, in which
348      * case some previous reachable node may still point to s.
349      * (For further explanation see Herlihy & Shavit "The Art of
350      * Multiprocessor Programming" chapter 9).  Although, in both
351      * cases, we can rule out the need for further action if either s
352      * or its predecessor are (or can be made to be) at, or fall off
353      * from, the head of list.
354      *
355      * Without taking these into account, it would be possible for an
356      * unbounded number of supposedly removed nodes to remain
357      * reachable.  Situations leading to such buildup are uncommon but
358      * can occur in practice; for example when a series of short timed
359      * calls to poll repeatedly time out but never otherwise fall off
360      * the list because of an untimed call to take at the front of the
361      * queue.
362      *
363      * When these cases arise, rather than always retraversing the
364      * entire list to find an actual predecessor to unlink (which
365      * won't help for case (1) anyway), we record a conservative
366      * estimate of possible unsplice failures (in "sweepVotes").
367      * We trigger a full sweep when the estimate exceeds a threshold
368      * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
369      * removal failures to tolerate before sweeping through, unlinking
370      * cancelled nodes that were not unlinked upon initial removal.
371      * We perform sweeps by the thread hitting threshold (rather than
372      * background threads or by spreading work to other threads)
373      * because in the main contexts in which removal occurs, the
374      * caller is already timed-out, cancelled, or performing a
375      * potentially O(n) operation (e.g. remove(x)), none of which are
376      * time-critical enough to warrant the overhead that alternatives
377      * would impose on other threads.
378      *
379      * Because the sweepVotes estimate is conservative, and because
380      * nodes become unlinked "naturally" as they fall off the head of
381      * the queue, and because we allow votes to accumulate even while
382      * sweeps are in progress, there are typically significantly fewer
383      * such nodes than estimated.  Choice of a threshold value
384      * balances the likelihood of wasted effort and contention, versus
385      * providing a worst-case bound on retention of interior nodes in
386      * quiescent queues. The value defined below was chosen
387      * empirically to balance these under various timeout scenarios.
388      *
389      * Note that we cannot self-link unlinked interior nodes during
390      * sweeps. However, the associated garbage chains terminate when
391      * some successor ultimately falls off the head of the list and is
392      * self-linked.
393      */
394 
395     /** True if on multiprocessor */
396     private static final boolean MP =
397         Runtime.getRuntime().availableProcessors() > 1;
398 
399     /**
400      * The number of times to spin (with randomly interspersed calls
401      * to Thread.yield) on multiprocessor before blocking when a node
402      * is apparently the first waiter in the queue.  See above for
403      * explanation. Must be a power of two. The value is empirically
404      * derived -- it works pretty well across a variety of processors,
405      * numbers of CPUs, and OSes.
406      */
407     private static final int FRONT_SPINS   = 1 << 7;
408 
409     /**
410      * The number of times to spin before blocking when a node is
411      * preceded by another node that is apparently spinning.  Also
412      * serves as an increment to FRONT_SPINS on phase changes, and as
413      * base average frequency for yielding during spins. Must be a
414      * power of two.
415      */
416     private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
417 
418     /**
419      * The maximum number of estimated removal failures (sweepVotes)
420      * to tolerate before sweeping through the queue unlinking
421      * cancelled nodes that were not unlinked upon initial
422      * removal. See above for explanation. The value must be at least
423      * two to avoid useless sweeps when removing trailing nodes.
424      */
425     static final int SWEEP_THRESHOLD = 32;
426 
427     /**
428      * Queue nodes. Uses Object, not E, for items to allow forgetting
429      * them after use.  Relies heavily on Unsafe mechanics to minimize
430      * unnecessary ordering constraints: Writes that are intrinsically
431      * ordered wrt other accesses or CASes use simple relaxed forms.
432      */
433     static final class Node {
434         final boolean isData;   // false if this is a request node
435         volatile Object item;   // initially non-null if isData; CASed to match
436         volatile Node next;
437         volatile Thread waiter; // null until waiting
438 
439         // CAS methods for fields
440         final boolean casNext(Node cmp, Node val) {
441             if (AtomicFieldUpdaterUtil.isAvailable()) {
442                 return nextUpdater.compareAndSet(this, cmp, val);
443             } else {
444                 synchronized (this) {
445                     if (next == cmp) {
446                         next = val;
447                         return true;
448                     } else {
449                         return false;
450                     }
451                 }
452             }
453         }
454 
455         final boolean casItem(Object cmp, Object val) {
456             // assert cmp == null || cmp.getClass() != Node.class;
457             if (AtomicFieldUpdaterUtil.isAvailable()) {
458                 return itemUpdater.compareAndSet(this, cmp, val);
459             } else {
460                 synchronized (this) {
461                     if (item == cmp) {
462                         item = val;
463                         return true;
464                     } else {
465                         return false;
466                     }
467                 }
468             }
469         }
470 
471         /**
472          * Constructs a new node.  Uses relaxed write because item can
473          * only be seen after publication via casNext.
474          */
475         Node(Object item, boolean isData) {
476             this.item = item;
477             this.isData = isData;
478         }
479 
480         /**
481          * Links node to itself to avoid garbage retention.  Called
482          * only after CASing head field, so uses relaxed write.
483          */
484         final void forgetNext() {
485             this.next = this;
486         }
487 
488         /**
489          * Sets item to self and waiter to null, to avoid garbage
490          * retention after matching or cancelling. Uses relaxed writes
491          * bacause order is already constrained in the only calling
492          * contexts: item is forgotten only after volatile/atomic
493          * mechanics that extract items.  Similarly, clearing waiter
494          * follows either CAS or return from park (if ever parked;
495          * else we don't care).
496          */
497         final void forgetContents() {
498             this.item = this;
499             this.waiter = null;
500         }
501 
502         /**
503          * Returns true if this node has been matched, including the
504          * case of artificial matches due to cancellation.
505          */
506         final boolean isMatched() {
507             Object x = item;
508             return x == this || x == null == isData;
509         }
510 
511         /**
512          * Returns true if this is an unmatched request node.
513          */
514         final boolean isUnmatchedRequest() {
515             return !isData && item == null;
516         }
517 
518         /**
519          * Returns true if a node with the given mode cannot be
520          * appended to this node because this node is unmatched and
521          * has opposite data mode.
522          */
523         final boolean cannotPrecede(boolean haveData) {
524             boolean d = isData;
525             Object x;
526             return d != haveData && (x = item) != this && x != null == d;
527         }
528 
529         /**
530          * Tries to artificially match a data node -- used by remove.
531          */
532         final boolean tryMatchData() {
533             // assert isData;
534             Object x = item;
535             if (x != null && x != this && casItem(x, null)) {
536                 LockSupport.unpark(waiter);
537                 return true;
538             }
539             return false;
540         }
541 
542         private static final AtomicReferenceFieldUpdater<Node, Node> nextUpdater =
543             AtomicFieldUpdaterUtil.newRefUpdater(Node.class, Node.class, "next");
544         private static final AtomicReferenceFieldUpdater<Node, Object> itemUpdater =
545             AtomicFieldUpdaterUtil.newRefUpdater(Node.class, Object.class, "item");
546 
547         private static final long serialVersionUID = -3375979862319811754L;
548     }
549 
550     /** head of the queue; null until first enqueue */
551     transient volatile Node head;
552 
553     /** tail of the queue; null until first append */
554     transient volatile Node tail;
555 
556     /** The number of apparent failures to unsplice removed nodes */
557     transient volatile int sweepVotes;
558 
559     // CAS methods for fields
560     private boolean casTail(Node cmp, Node val) {
561         if (AtomicFieldUpdaterUtil.isAvailable()) {
562             return tailUpdater.compareAndSet(this, cmp, val);
563         } else {
564             synchronized (this) {
565                 if (tail == cmp) {
566                     tail = val;
567                     return true;
568                 } else {
569                     return false;
570                 }
571             }
572         }
573     }
574 
575     private boolean casHead(Node cmp, Node val) {
576         if (AtomicFieldUpdaterUtil.isAvailable()) {
577             return headUpdater.compareAndSet(this, cmp, val);
578         } else {
579             synchronized (this) {
580                 if (head == cmp) {
581                     head = val;
582                     return true;
583                 } else {
584                     return false;
585                 }
586             }
587         }
588     }
589 
590     private boolean casSweepVotes(int cmp, int val) {
591         if (AtomicFieldUpdaterUtil.isAvailable()) {
592             return sweepVotesUpdater.compareAndSet(this, cmp, val);
593         } else {
594             synchronized (this) {
595                 if (sweepVotes == cmp) {
596                     sweepVotes = val;
597                     return true;
598                 } else {
599                     return false;
600                 }
601             }
602         }
603     }
604 
605     /*
606      * Possible values for "how" argument in xfer method.
607      */
608     private static final int NOW   = 0; // for untimed poll, tryTransfer
609     private static final int ASYNC = 1; // for offer, put, add
610     private static final int SYNC  = 2; // for transfer, take
611     private static final int TIMED = 3; // for timed poll, tryTransfer
612 
613     @SuppressWarnings("unchecked")
614     static <E> E cast(Object item) {
615         // assert item == null || item.getClass() != Node.class;
616         return (E) item;
617     }
618 
619     /**
620      * Implements all queuing methods. See above for explanation.
621      *
622      * @param e the item or null for take
623      * @param haveData true if this is a put, else a take
624      * @param how NOW, ASYNC, SYNC, or TIMED
625      * @param nanos timeout in nanosecs, used only if mode is TIMED
626      * @return an item if matched, else e
627      * @throws NullPointerException if haveData mode but e is null
628      */
629     private E xfer(E e, boolean haveData, int how, long nanos) {
630         if (haveData && e == null) {
631             throw new NullPointerException();
632         }
633         Node s = null;                        // the node to append, if needed
634 
635         retry: for (;;) {                     // restart on append race
636 
637             for (Node h = head, p = h; p != null;) { // find & match first node
638                 boolean isData = p.isData;
639                 Object item = p.item;
640                 if (item != p && item != null == isData) { // unmatched
641                     if (isData == haveData) { // can't match
642                         break;
643                     }
644                     if (p.casItem(item, e)) { // match
645                         for (Node q = p; q != h;) {
646                             Node n = q.next;  // update by 2 unless singleton
647                             if (head == h && casHead(h, n == null? q : n)) {
648                                 h.forgetNext();
649                                 break;
650                             }                 // advance and retry
651                             if ((h = head)   == null ||
652                                 (q = h.next) == null || !q.isMatched()) {
653                                 break;        // unless slack < 2
654                             }
655                         }
656                         LockSupport.unpark(p.waiter);
657                         return LinkedTransferQueue.<E>cast(item);
658                     }
659                 }
660                 Node n = p.next;
661                 p = p != n ? n : (h = head); // Use head if p offlist
662             }
663 
664             if (how != NOW) {                 // No matches available
665                 if (s == null) {
666                     s = new Node(e, haveData);
667                 }
668                 Node pred = tryAppend(s, haveData);
669                 if (pred == null) {
670                     continue retry;           // lost race vs opposite mode
671                 }
672                 if (how != ASYNC) {
673                     return awaitMatch(s, pred, e, (how == TIMED), nanos);
674                 }
675             }
676             return e; // not waiting
677         }
678     }
679 
680     /**
681      * Tries to append node s as tail.
682      *
683      * @param s the node to append
684      * @param haveData true if appending in data mode
685      * @return null on failure due to losing race with append in
686      * different mode, else s's predecessor, or s itself if no
687      * predecessor
688      */
689     private Node tryAppend(Node s, boolean haveData) {
690         for (Node t = tail, p = t;;) {        // move p to last node and append
691             Node n, u;                        // temps for reads of next & tail
692             if (p == null && (p = head) == null) {
693                 if (casHead(null, s)) {
694                     return s;                 // initialize
695                 }
696             }
697             else if (p.cannotPrecede(haveData)) {
698                 return null;                  // lost race vs opposite mode
699             } else if ((n = p.next) != null) { // not last; keep traversing
700                 p = p != t && t != (u = tail) ? (t = u) : // stale tail
701                     p != n ? n : null;      // restart if off list
702             } else if (!p.casNext(null, s)) {
703                 p = p.next;                   // re-read on CAS failure
704             } else {
705                 if (p != t) {                 // update if slack now >= 2
706                     while ((tail != t || !casTail(t, s)) &&
707                            (t = tail)   != null &&
708                            (s = t.next) != null && // advance and retry
709                            (s = s.next) != null && s != t) {
710                         continue;
711                     }
712                 }
713                 return p;
714             }
715         }
716     }
717 
718     /**
719      * Spins/yields/blocks until node s is matched or caller gives up.
720      *
721      * @param s the waiting node
722      * @param pred the predecessor of s, or s itself if it has no
723      * predecessor, or null if unknown (the null case does not occur
724      * in any current calls but may in possible future extensions)
725      * @param e the comparison value for checking match
726      * @param timed if true, wait only until timeout elapses
727      * @param nanos timeout in nanosecs, used only if timed is true
728      * @return matched item, or e if unmatched on interrupt or timeout
729      */
730     private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
731         long lastTime = timed ? System.nanoTime() : 0L;
732         Thread w = Thread.currentThread();
733         int spins = -1; // initialized after first item and cancel checks
734         ThreadLocalRandom randomYields = null; // bound if needed
735 
736         for (;;) {
737             Object item = s.item;
738             if (item != e) {                  // matched
739                 // assert item != s;
740                 s.forgetContents();           // avoid garbage
741                 return LinkedTransferQueue.<E>cast(item);
742             }
743             if ((w.isInterrupted() || timed && nanos <= 0) &&
744                     s.casItem(e, s)) {        // cancel
745                 unsplice(pred, s);
746                 return e;
747             }
748 
749             if (spins < 0) {                  // establish spins at/near front
750                 if ((spins = spinsFor(pred, s.isData)) > 0) {
751                     randomYields = ThreadLocalRandom.current();
752                 }
753             }
754             else if (spins > 0) {             // spin
755                 --spins;
756                 if (randomYields.nextInt(CHAINED_SPINS) == 0) {
757                     Thread.yield();           // occasionally yield
758                 }
759             }
760             else if (s.waiter == null) {
761                 s.waiter = w;                 // request unpark then recheck
762             }
763             else if (timed) {
764                 long now = System.nanoTime();
765                 if ((nanos -= now - lastTime) > 0) {
766                     LockSupport.parkNanos(nanos);
767                 }
768                 lastTime = now;
769             }
770             else {
771                 LockSupport.park();
772             }
773         }
774     }
775 
776     /**
777      * Returns spin/yield value for a node with given predecessor and
778      * data mode. See above for explanation.
779      */
780     private static int spinsFor(Node pred, boolean haveData) {
781         if (MP && pred != null) {
782             if (pred.isData != haveData) {    // phase change
783                 return FRONT_SPINS + CHAINED_SPINS;
784             }
785             if (pred.isMatched()) {           // probably at front
786                 return FRONT_SPINS;
787             }
788             if (pred.waiter == null) {        // pred apparently spinning
789                 return CHAINED_SPINS;
790             }
791         }
792         return 0;
793     }
794 
795     /* -------------- Traversal methods -------------- */
796 
797     /**
798      * Returns the successor of p, or the head node if p.next has been
799      * linked to self, which will only be true if traversing with a
800      * stale pointer that is now off the list.
801      */
802     final Node succ(Node p) {
803         Node next = p.next;
804         return p == next ? head : next;
805     }
806 
807     /**
808      * Returns the first unmatched node of the given mode, or null if
809      * none.  Used by methods isEmpty, hasWaitingConsumer.
810      */
811     private Node firstOfMode(boolean isData) {
812         for (Node p = head; p != null; p = succ(p)) {
813             if (!p.isMatched()) {
814                 return p.isData == isData ? p : null;
815             }
816         }
817         return null;
818     }
819 
820     /**
821      * Returns the item in the first unmatched node with isData; or
822      * null if none.  Used by peek.
823      */
824     private E firstDataItem() {
825         for (Node p = head; p != null; p = succ(p)) {
826             Object item = p.item;
827             if (p.isData) {
828                 if (item != null && item != p) {
829                     return LinkedTransferQueue.<E>cast(item);
830                 }
831             }
832             else if (item == null) {
833                 return null;
834             }
835         }
836         return null;
837     }
838 
839     /**
840      * Traverses and counts unmatched nodes of the given mode.
841      * Used by methods size and getWaitingConsumerCount.
842      */
843     private int countOfMode(boolean data) {
844         int count = 0;
845         for (Node p = head; p != null; ) {
846             if (!p.isMatched()) {
847                 if (p.isData != data) {
848                     return 0;
849                 }
850                 if (++count == Integer.MAX_VALUE) { // saturated
851                     break;
852                 }
853             }
854             Node n = p.next;
855             if (n != p) {
856                 p = n;
857             } else {
858                 count = 0;
859                 p = head;
860             }
861         }
862         return count;
863     }
864 
865     final class Itr implements Iterator<E> {
866         private Node nextNode;   // next node to return item for
867         private E nextItem;      // the corresponding item
868         private Node lastRet;    // last returned node, to support remove
869         private Node lastPred;   // predecessor to unlink lastRet
870 
871         /**
872          * Moves to next node after prev, or first node if prev null.
873          */
874         private void advance(Node prev) {
875             lastPred = lastRet;
876             lastRet = prev;
877             for (Node p = prev == null ? head : succ(prev);
878                  p != null; p = succ(p)) {
879                 Object item = p.item;
880                 if (p.isData) {
881                     if (item != null && item != p) {
882                         nextItem = LinkedTransferQueue.<E>cast(item);
883                         nextNode = p;
884                         return;
885                     }
886                 }
887                 else if (item == null) {
888                     break;
889                 }
890             }
891             nextNode = null;
892         }
893 
894         Itr() {
895             advance(null);
896         }
897 
898         public final boolean hasNext() {
899             return nextNode != null;
900         }
901 
902         public final E next() {
903             Node p = nextNode;
904             if (p == null) {
905                 throw new NoSuchElementException();
906             }
907             E e = nextItem;
908             advance(p);
909             return e;
910         }
911 
912         public final void remove() {
913             Node p = lastRet;
914             if (p == null) {
915                 throw new IllegalStateException();
916             }
917             if (p.tryMatchData()) {
918                 unsplice(lastPred, p);
919             }
920         }
921     }
922 
923     /* -------------- Removal methods -------------- */
924 
925     /**
926      * Unsplices (now or later) the given deleted/cancelled node with
927      * the given predecessor.
928      *
929      * @param pred a node that was at one time known to be the
930      * predecessor of s, or null or s itself if s is/was at head
931      * @param s the node to be unspliced
932      */
933     final void unsplice(Node pred, Node s) {
934         s.forgetContents(); // forget unneeded fields
935         /*
936          * See above for rationale. Briefly: if pred still points to
937          * s, try to unlink s.  If s cannot be unlinked, because it is
938          * trailing node or pred might be unlinked, and neither pred
939          * nor s are head or offlist, add to sweepVotes, and if enough
940          * votes have accumulated, sweep.
941          */
942         if (pred != null && pred != s && pred.next == s) {
943             Node n = s.next;
944             if (n == null ||
945                 n != s && pred.casNext(s, n) && pred.isMatched()) {
946                 for (;;) {               // check if at, or could be, head
947                     Node h = head;
948                     if (h == pred || h == s || h == null) {
949                         return;          // at head or list empty
950                     }
951                     if (!h.isMatched()) {
952                         break;
953                     }
954                     Node hn = h.next;
955                     if (hn == null) {
956                         return;          // now empty
957                     }
958                     if (hn != h && casHead(h, hn)) {
959                         h.forgetNext();  // advance head
960                     }
961                 }
962                 if (pred.next != pred && s.next != s) { // recheck if offlist
963                     for (;;) {           // sweep now if enough votes
964                         int v = sweepVotes;
965                         if (v < SWEEP_THRESHOLD) {
966                             if (casSweepVotes(v, v + 1)) {
967                                 break;
968                             }
969                         }
970                         else if (casSweepVotes(v, 0)) {
971                             sweep();
972                             break;
973                         }
974                     }
975                 }
976             }
977         }
978     }
979 
980     /**
981      * Unlinks matched (typically cancelled) nodes encountered in a
982      * traversal from head.
983      */
984     private void sweep() {
985         for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
986             if (!s.isMatched()) {
987                 // Unmatched nodes are never self-linked
988                 p = s;
989             } else if ((n = s.next) == null) { // trailing node is pinned
990                 break;
991             } else if (s == n) { // stale
992                 // No need to also check for p == s, since that implies s == n
993                 p = head;
994             } else {
995                 p.casNext(s, n);
996             }
997         }
998     }
999 
1000     /**
1001      * Main implementation of remove(Object)
1002      */
1003     private boolean findAndRemove(Object e) {
1004         if (e != null) {
1005             for (Node pred = null, p = head; p != null; ) {
1006                 Object item = p.item;
1007                 if (p.isData) {
1008                     if (item != null && item != p && e.equals(item) &&
1009                         p.tryMatchData()) {
1010                         unsplice(pred, p);
1011                         return true;
1012                     }
1013                 }
1014                 else if (item == null) {
1015                     break;
1016                 }
1017                 pred = p;
1018                 if ((p = p.next) == pred) { // stale
1019                     pred = null;
1020                     p = head;
1021                 }
1022             }
1023         }
1024         return false;
1025     }
1026 
1027 
1028     /**
1029      * Creates an initially empty {@code LinkedTransferQueue}.
1030      */
1031     public LinkedTransferQueue() {
1032         super();
1033     }
1034 
1035     /**
1036      * Creates a {@code LinkedTransferQueue}
1037      * initially containing the elements of the given collection,
1038      * added in traversal order of the collection's iterator.
1039      *
1040      * @param c the collection of elements to initially contain
1041      * @throws NullPointerException if the specified collection or any
1042      *         of its elements are null
1043      */
1044     public LinkedTransferQueue(Collection<? extends E> c) {
1045         this();
1046         addAll(c);
1047     }
1048 
1049     /**
1050      * Inserts the specified element at the tail of this queue.
1051      * As the queue is unbounded, this method will never block.
1052      *
1053      * @throws NullPointerException if the specified element is null
1054      */
1055     public void put(E e) {
1056         xfer(e, true, ASYNC, 0);
1057     }
1058 
1059     /**
1060      * Inserts the specified element at the tail of this queue.
1061      * As the queue is unbounded, this method will never block or
1062      * return {@code false}.
1063      *
1064      * @return {@code true} (as specified by
1065      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
1066      * @throws NullPointerException if the specified element is null
1067      */
1068     public boolean offer(E e, long timeout, TimeUnit unit) {
1069         xfer(e, true, ASYNC, 0);
1070         return true;
1071     }
1072 
1073     /**
1074      * Inserts the specified element at the tail of this queue.
1075      * As the queue is unbounded, this method will never return {@code false}.
1076      *
1077      * @return {@code true} (as specified by
1078      *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
1079      * @throws NullPointerException if the specified element is null
1080      */
1081     public boolean offer(E e) {
1082         xfer(e, true, ASYNC, 0);
1083         return true;
1084     }
1085 
1086     /**
1087      * Inserts the specified element at the tail of this queue.
1088      * As the queue is unbounded, this method will never throw
1089      * {@link IllegalStateException} or return {@code false}.
1090      *
1091      * @return {@code true} (as specified by {@link Collection#add})
1092      * @throws NullPointerException if the specified element is null
1093      */
1094     @Override
1095     public boolean add(E e) {
1096         xfer(e, true, ASYNC, 0);
1097         return true;
1098     }
1099 
1100     /**
1101      * Transfers the element to a waiting consumer immediately, if possible.
1102      *
1103      * <p>More precisely, transfers the specified element immediately
1104      * if there exists a consumer already waiting to receive it (in
1105      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1106      * otherwise returning {@code false} without enqueuing the element.
1107      *
1108      * @throws NullPointerException if the specified element is null
1109      */
1110     public boolean tryTransfer(E e) {
1111         return xfer(e, true, NOW, 0) == null;
1112     }
1113 
1114     /**
1115      * Transfers the element to a consumer, waiting if necessary to do so.
1116      *
1117      * <p>More precisely, transfers the specified element immediately
1118      * if there exists a consumer already waiting to receive it (in
1119      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1120      * else inserts the specified element at the tail of this queue
1121      * and waits until the element is received by a consumer.
1122      *
1123      * @throws NullPointerException if the specified element is null
1124      */
1125     public void transfer(E e) throws InterruptedException {
1126         if (xfer(e, true, SYNC, 0) != null) {
1127             Thread.interrupted(); // failure possible only due to interrupt
1128             throw new InterruptedException();
1129         }
1130     }
1131 
1132     /**
1133      * Transfers the element to a consumer if it is possible to do so
1134      * before the timeout elapses.
1135      *
1136      * <p>More precisely, transfers the specified element immediately
1137      * if there exists a consumer already waiting to receive it (in
1138      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1139      * else inserts the specified element at the tail of this queue
1140      * and waits until the element is received by a consumer,
1141      * returning {@code false} if the specified wait time elapses
1142      * before the element can be transferred.
1143      *
1144      * @throws NullPointerException if the specified element is null
1145      */
1146     public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1147         throws InterruptedException {
1148         if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) {
1149             return true;
1150         }
1151         if (!Thread.interrupted()) {
1152             return false;
1153         }
1154         throw new InterruptedException();
1155     }
1156 
1157     public E take() throws InterruptedException {
1158         E e = xfer(null, false, SYNC, 0);
1159         if (e != null) {
1160             return e;
1161         }
1162         Thread.interrupted();
1163         throw new InterruptedException();
1164     }
1165 
1166     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1167         E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1168         if (e != null || !Thread.interrupted()) {
1169             return e;
1170         }
1171         throw new InterruptedException();
1172     }
1173 
1174     public E poll() {
1175         return xfer(null, false, NOW, 0);
1176     }
1177 
1178     /**
1179      * @throws NullPointerException     {@inheritDoc}
1180      * @throws IllegalArgumentException {@inheritDoc}
1181      */
1182     public int drainTo(Collection<? super E> c) {
1183         if (c == null) {
1184             throw new NullPointerException();
1185         }
1186         if (c == this) {
1187             throw new IllegalArgumentException();
1188         }
1189         int n = 0;
1190         E e;
1191         while ( (e = poll()) != null) {
1192             c.add(e);
1193             ++n;
1194         }
1195         return n;
1196     }
1197 
1198     /**
1199      * @throws NullPointerException     {@inheritDoc}
1200      * @throws IllegalArgumentException {@inheritDoc}
1201      */
1202     public int drainTo(Collection<? super E> c, int maxElements) {
1203         if (c == null) {
1204             throw new NullPointerException();
1205         }
1206         if (c == this) {
1207             throw new IllegalArgumentException();
1208         }
1209         int n = 0;
1210         E e;
1211         while (n < maxElements && (e = poll()) != null) {
1212             c.add(e);
1213             ++n;
1214         }
1215         return n;
1216     }
1217 
1218     /**
1219      * Returns an iterator over the elements in this queue in proper
1220      * sequence, from head to tail.
1221      *
1222      * <p>The returned iterator is a "weakly consistent" iterator that
1223      * will never throw
1224      * {@link ConcurrentModificationException ConcurrentModificationException},
1225      * and guarantees to traverse elements as they existed upon
1226      * construction of the iterator, and may (but is not guaranteed
1227      * to) reflect any modifications subsequent to construction.
1228      *
1229      * @return an iterator over the elements in this queue in proper sequence
1230      */
1231     @Override
1232     public Iterator<E> iterator() {
1233         return new Itr();
1234     }
1235 
1236     public E peek() {
1237         return firstDataItem();
1238     }
1239 
1240     /**
1241      * Returns {@code true} if this queue contains no elements.
1242      *
1243      * @return {@code true} if this queue contains no elements
1244      */
1245     @Override
1246     public boolean isEmpty() {
1247         for (Node p = head; p != null; p = succ(p)) {
1248             if (!p.isMatched()) {
1249                 return !p.isData;
1250             }
1251         }
1252         return true;
1253     }
1254 
1255     public boolean hasWaitingConsumer() {
1256         return firstOfMode(false) != null;
1257     }
1258 
1259     /**
1260      * Returns the number of elements in this queue.  If this queue
1261      * contains more than {@code Integer.MAX_VALUE} elements, returns
1262      * {@code Integer.MAX_VALUE}.
1263      *
1264      * <p>Beware that, unlike in most collections, this method is
1265      * <em>NOT</em> a constant-time operation. Because of the
1266      * asynchronous nature of these queues, determining the current
1267      * number of elements requires an O(n) traversal.
1268      *
1269      * @return the number of elements in this queue
1270      */
1271     @Override
1272     public int size() {
1273         return countOfMode(true);
1274     }
1275 
1276     public int getWaitingConsumerCount() {
1277         return countOfMode(false);
1278     }
1279 
1280     /**
1281      * Removes a single instance of the specified element from this queue,
1282      * if it is present.  More formally, removes an element {@code e} such
1283      * that {@code o.equals(e)}, if this queue contains one or more such
1284      * elements.
1285      * Returns {@code true} if this queue contained the specified element
1286      * (or equivalently, if this queue changed as a result of the call).
1287      *
1288      * @param o element to be removed from this queue, if present
1289      * @return {@code true} if this queue changed as a result of the call
1290      */
1291     @Override
1292     public boolean remove(Object o) {
1293         return findAndRemove(o);
1294     }
1295 
1296     /**
1297      * Always returns {@code Integer.MAX_VALUE} because a
1298      * {@code LinkedTransferQueue} is not capacity constrained.
1299      *
1300      * @return {@code Integer.MAX_VALUE} (as specified by
1301      *         {@link BlockingQueue#remainingCapacity()})
1302      */
1303     public int remainingCapacity() {
1304         return Integer.MAX_VALUE;
1305     }
1306 
1307     /**
1308      * Saves the state to a stream (that is, serializes it).
1309      *
1310      * @serialData All of the elements (each an {@code E}) in
1311      * the proper order, followed by a null
1312      * @param s the stream
1313      */
1314     private void writeObject(java.io.ObjectOutputStream s)
1315         throws java.io.IOException {
1316         s.defaultWriteObject();
1317         for (E e : this) {
1318             s.writeObject(e);
1319         }
1320         // Use trailing null as sentinel
1321         s.writeObject(null);
1322     }
1323 
1324     /**
1325      * Reconstitutes the Queue instance from a stream (that is,
1326      * deserializes it).
1327      *
1328      * @param s the stream
1329      */
1330     private void readObject(java.io.ObjectInputStream s)
1331         throws java.io.IOException, ClassNotFoundException {
1332         s.defaultReadObject();
1333         for (;;) {
1334             @SuppressWarnings("unchecked") E item = (E) s.readObject();
1335             if (item == null) {
1336                 break;
1337             } else {
1338                 offer(item);
1339             }
1340         }
1341     }
1342 
1343     @SuppressWarnings("rawtypes")
1344     private static final AtomicReferenceFieldUpdater<LinkedTransferQueue, Node> headUpdater =
1345         AtomicFieldUpdaterUtil.newRefUpdater(LinkedTransferQueue.class, Node.class, "head");
1346     @SuppressWarnings("rawtypes")
1347     private static final AtomicReferenceFieldUpdater<LinkedTransferQueue, Node> tailUpdater =
1348         AtomicFieldUpdaterUtil.newRefUpdater(LinkedTransferQueue.class, Node.class, "tail");
1349     @SuppressWarnings("rawtypes")
1350     private static final AtomicIntegerFieldUpdater<LinkedTransferQueue> sweepVotesUpdater =
1351         AtomicFieldUpdaterUtil.newIntUpdater(LinkedTransferQueue.class, "sweepVotes");
1352 }
1353