Learning JDK Code – concurrent

Learning JDK Code – concurrent

CopyOnWriteArrayList
 package java.util.concurrent;  
 public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {  
   /** The lock protecting all mutators */  
   transient final ReentrantLock lock = new ReentrantLock();  
   private volatile transient Object[] array;  
    * <p>The returned iterator provides a snapshot of the state of the list  
    * when the iterator was constructed. No synchronization is needed while  
    * traversing the iterator. The iterator does <em>NOT</em> support the  
    * <tt>remove method.  
   public Iterator<E> iterator() {  
     return new COWIterator<E>(getArray(), 0);  
   }    
   public E set(int index, E element) {  
      final ReentrantLock lock = this.lock;  
      lock.lock();  
      try {  
        Object[] elements = getArray();  
        Object oldValue = elements[index];  
        if (oldValue != element) {  
           int len = elements.length;  
     // create a new array  
           Object[] newElements = Arrays.copyOf(elements, len);  
           newElements[index] = element;  
           setArray(newElements);  
        } else {  
           // Not quite a no-op; ensures volatile write semantics  
           setArray(elements);  
        }  
        return (E)oldValue;  
      } finally {  
        lock.unlock();  
      }  
   }    
   public boolean add(E e) {  
      final ReentrantLock lock = this.lock;  
      lock.lock();  
      try {  
        Object[] elements = getArray();  
        int len = elements.length;  
        Object[] newElements = Arrays.copyOf(elements, len + 1);  
        newElements[len] = e;  
        setArray(newElements);  
        return true;  
      } finally {  
        lock.unlock();  
      }  
   }    
   public boolean remove(Object o) {  
      final ReentrantLock lock = this.lock;  
      lock.lock();  
      try {  
        Object[] elements = getArray();  
        int len = elements.length;  
        if (len != 0) {  
           // Copy while searching for element to remove  
           // This wins in the normal case of element being present  
           int newlen = len - 1;  
           Object[] newElements = new Object[newlen];  
           for (int i = 0; i < newlen; ++i) {  
             if (eq(o, elements[i])) {  
                // found one; copy remaining and exit  
                for (int k = i + 1; k < len; ++k)  
                  newElements[k-1] = elements[k];  
                setArray(newElements);  
                return true;  
             } else  
                newElements[i] = elements[i];  
           }  
           // special handling for last cell  
           if (eq(o, elements[newlen])) {  
             setArray(newElements);  
             return true;  
           }  
        }  
        return false;  
      } finally {  
        lock.unlock();  
      }  
   }  
   public E remove(int index) {  
      final ReentrantLock lock = this.lock;  
      lock.lock();  
      try {  
        Object[] elements = getArray();  
        int len = elements.length;  
        Object oldValue = elements[index];  
        int numMoved = len - index - 1;  
        if (numMoved == 0)  
           setArray(Arrays.copyOf(elements, len - 1));  
        else {  
           Object[] newElements = new Object[len - 1];  
           System.arraycopy(elements, 0, newElements, 0, index);  
           System.arraycopy(elements, index + 1, newElements, index,  
                      numMoved);  
           setArray(newElements);  
        }  
        return (E)oldValue;  
      } finally {  
        lock.unlock();  
      }  
   }    
   private static class COWIterator<E> implements ListIterator<E> {  
     /** Snapshot of the array **/  
     private final Object[] snapshot;  
     /** Index of element to be returned by subsequent call to next. */  
     private int cursor;  
     private COWIterator(Object[] elements, int initialCursor) {  
       cursor = initialCursor;  
       snapshot = elements;  
     }  
     public boolean hasNext() {  
       return cursor < snapshot.length;  
     }  
     public boolean hasPrevious() {  
       return cursor > 0;  
     }  
     public E next() {  
        if (! hasNext())  
         throw new NoSuchElementException();  
        return (E) snapshot[cursor++];  
     }  
     public E previous() {  
        if (! hasPrevious())  
         throw new NoSuchElementException();  
        return (E) snapshot[--cursor];  
     }  
     public int nextIndex() {  
       return cursor;  
     }  
     public int previousIndex() {  
       return cursor-1;  
     }  
     public void remove()|set(E e)|add(E e) {  
       throw new UnsupportedOperationException();  
     }  
   }  
 }    
ArrayBlockingQueue
 public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
   implements BlockingQueue<E>, java.io.Serializable {  
 {  
   private final E[] items;  
   private int takeIndex;  
   private int putIndex;      
   /** Main lock guarding all access */  
   private final ReentrantLock lock;  
   /** Condition for waiting takes */  
   private final Condition notEmpty;  
   /** Condition for waiting puts */  
   private final Condition notFull;  
   // Blocked operations  
    * Inserts the specified element at the tail of this queue, waiting  
    * for space to become available if the queue is full.  
   public void put(E e) throws InterruptedException {  
     if (e == null) throw new NullPointerException();  
     final E[] items = this.items;  
     final ReentrantLock lock = this.lock;  
     lock.lockInterruptibly();  
     try {  
       try {  
         while (count == items.length)  
           notFull.await();  
       } catch (InterruptedException ie) {  
         notFull.signal(); // propagate to non-interrupted thread  
         throw ie;  
       }  
       insert(e);  
     } finally {  
       lock.unlock();  
     }  
   }    
    * Inserts the specified element at the tail of this queue, waiting  
    * for space to become available if the queue is full.    
   public E take() throws InterruptedException {  
     final ReentrantLock lock = this.lock;  
     lock.lockInterruptibly();  
     try {  
       try {  
         while (count == 0)  
           notEmpty.await();  
       } catch (InterruptedException ie) {  
         notEmpty.signal(); // propagate to non-interrupted thread  
         throw ie;  
       }  
       E x = extract();  
       return x;  
     } finally {  
       lock.unlock();  
     }  
   }  
   public E poll(long timeout, TimeUnit unit) throws InterruptedException {  
      long nanos = unit.toNanos(timeout);  
     final ReentrantLock lock = this.lock;  
     lock.lockInterruptibly();  
     try {  
       for (;;) {  
         if (count != 0) {  
           E x = extract();  
           return x;  
         }  
         if (nanos <= 0)  
           return null;  
         try {  
           nanos = notEmpty.awaitNanos(nanos);  
         } catch (InterruptedException ie) {  
           notEmpty.signal(); // propagate to non-interrupted thread  
           throw ie;  
         }  
       }  
     } finally {  
       lock.unlock();  
     }  
   }    
   public boolean offer(E e, long timeout, TimeUnit unit)  
     throws InterruptedException {  
     if (e == null) throw new NullPointerException();  
      long nanos = unit.toNanos(timeout);  
     final ReentrantLock lock = this.lock;  
     lock.lockInterruptibly();  
     try {  
       for (;;) {  
         if (count != items.length) {  
           insert(e);  
           return true;  
         }  
         if (nanos <= 0)  
           return false;  
         try {  
           nanos = notFull.awaitNanos(nanos);  
         } catch (InterruptedException ie) {  
           notFull.signal(); // propagate to non-interrupted thread  
           throw ie;  
         }  
       }  
     } finally {  
       lock.unlock();  
     }  
   }  
   public boolean contains(Object o) {  
     if (o == null) return false;  
     final E[] items = this.items;  
     final ReentrantLock lock = this.lock;  
     lock.lock();  
     try {  
       int i = takeIndex;  
       int k = 0;  
       while (k++ < count) {  
         if (o.equals(items[i]))  
           return true;  
         i = inc(i);  
       }  
       return false;  
     } finally {  
       lock.unlock();  
     }  
   }    
    * Inserts element at current put position, advances, and signals.  
    * Call only when holding lock.  
   private void insert(E x) {  
     items[putIndex] = x;  
     putIndex = inc(putIndex);  
     ++count;  
     notEmpty.signal();  
   }  
    * Extracts element at current take position, advances, and signals.  
    * Call only when holding lock.  
   private E extract() {  
     final E[] items = this.items;  
     E x = items[takeIndex];  
     items[takeIndex] = null;  
     takeIndex = inc(takeIndex);  
     --count;  
     notFull.signal();  
     return x;  
   }    
   final int inc(int i) {  
     return (++i == items.length)? 0 : i;  
 }  
   /**  
    * Returns an iterator over the elements in this queue in proper sequence.  
    * The returned Iterator is a "weakly consistent" iterator that  
    * will never throw {@link ConcurrentModificationException},  
    * and guarantees to traverse elements as they existed upon  
    * construction of the iterator, and may (but is not guaranteed to)  
    * reflect any modifications subsequent to construction.  
   public Iterator<E> iterator() {  
     final ReentrantLock lock = this.lock;  
     lock.lock();  
     try {  
       return new Itr();  
     } finally {  
       lock.unlock();  
     }  
 }  
   private class Itr implements Iterator<E> {  
      * Index of element to be returned by next,  
      * or a negative number if no such.  
     private int nextIndex;  
      * nextItem holds on to item fields because once we claim  
      * that an element exists in hasNext(), we must return it in  
      * the following next() call even if it was in the process of  
      * being removed when hasNext() was called.  
     private E nextItem;  
      * Index of element returned by most recent call to next.  
      * Reset to -1 if this element is deleted by a call to remove.  
     private int lastRet;  
     Itr() {  
       lastRet = -1;  
       if (count == 0)  
         nextIndex = -1;  
       else {  
         nextIndex = takeIndex;  
         nextItem = items[takeIndex];  
       }  
     }  
     public boolean hasNext() {  
        * No sync. We can return true by mistake here  
        * only if this iterator passed across threads,  
        * which we don't support anyway.  
       return nextIndex >= 0;  
     }  
      * Checks whether nextIndex is valid; if so setting nextItem.  
      * Stops iterator when either hits putIndex or sees null item.  
     private void checkNext() {  
       if (nextIndex == putIndex) {  
         nextIndex = -1;  
         nextItem = null;  
       } else {  
         nextItem = items[nextIndex];  
         if (nextItem == null)  
           nextIndex = -1;  
       }  
     }  
     public E next() {  
       final ReentrantLock lock = ArrayBlockingQueue.this.lock;  
       lock.lock();  
       try {  
         if (nextIndex < 0)  
           throw new NoSuchElementException();  
         lastRet = nextIndex;  
         E x = nextItem;  
         nextIndex = inc(nextIndex);  
         checkNext();  
         return x;  
       } finally {  
         lock.unlock();  
       }  
     }  
     public void remove() {  
       final ReentrantLock lock = ArrayBlockingQueue.this.lock;  
       lock.lock();  
       try {  
         int i = lastRet;  
         if (i == -1)  
           throw new IllegalStateException();  
         lastRet = -1;  
         int ti = takeIndex;  
         removeAt(i);  
         // back up cursor (reset to front if was first element)  
         nextIndex = (i == ti) ? takeIndex : i;  
         checkNext();  
       } finally {  
         lock.unlock();  
       }  
     }  
   }  
 }  
LinkedBlockingQueue
 public class LinkedBlockingQueue<E> extends AbstractQueue<E>  
     implements BlockingQueue<E>, java.io.Serializable {  
   static class Node<E> {  
     E item;  
      * One of:  
      * - the real successor Node  
      * - this Node, meaning the successor is head.next  
      * - null, meaning there is no successor (this is the last node)  
     Node<E> next;  
     Node(E x) { item = x; }  
   }  
   /** Current number of elements */  
   private final AtomicInteger count = new AtomicInteger(0);  
   private transient Node<E> head;  
   private transient Node<E> last;  
   /** Lock held by take, poll, etc */  
   private final ReentrantLock takeLock = new ReentrantLock();  
   /** Wait queue for waiting takes */  
   private final Condition notEmpty = takeLock.newCondition();  
   /** Lock held by put, offer, etc */  
   private final ReentrantLock putLock = new ReentrantLock();  
   /** Wait queue for waiting puts */  
   private final Condition notFull = putLock.newCondition();   
   public LinkedBlockingQueue(int capacity) {  
     if (capacity <= 0) throw new IllegalArgumentException();  
     this.capacity = capacity;  
     last = head = new Node<E>(null);  
   }    
   public LinkedBlockingQueue(Collection<? extends E> c) {  
     this(Integer.MAX_VALUE);  
     final ReentrantLock putLock = this.putLock;  
     putLock.lock(); // Never contended, but necessary for visibility  
     try {  
       int n = 0;  
       for (E e : c) {  
         if (e == null)  
           throw new NullPointerException();  
         if (n == capacity)  
           throw new IllegalStateException("Queue full");  
         enqueue(e);  
         ++n;  
       }  
       count.set(n);  
     } finally {  
       putLock.unlock();  
     }  
   }    
   public E take() throws InterruptedException {  
     E x;  
     int c = -1;  
     final AtomicInteger count = this.count;  
     final ReentrantLock takeLock = this.takeLock;  
     takeLock.lockInterruptibly();  
     try {  
         while (count.get() == 0) {  
           notEmpty.await();  
         }  
       x = dequeue();  
       c = count.getAndDecrement();  
       if (c > 1)  
         notEmpty.signal();  
     } finally {  
       takeLock.unlock();  
     }  
     if (c == capacity)  
       signalNotFull();  
     return x;  
   }  
    * Inserts the specified element at the tail of this queue, waiting if  
    * necessary for space to become available.    
   public void put(E e) throws InterruptedException {  
     if (e == null) throw new NullPointerException();  
     // Note: convention in all put/take/etc is to preset local var  
     // holding count negative to indicate failure unless set.  
     int c = -1;  
     final ReentrantLock putLock = this.putLock;  
     final AtomicInteger count = this.count;  
     putLock.lockInterruptibly();  
     try {  
       /*  
        * Note that count is used in wait guard even though it is  
        * not protected by lock. This works because count can  
        * only decrease at this point (all other puts are shut  
        * out by lock), and we (or some other waiting put) are  
        * signalled if it ever changes from  
        * capacity. Similarly for all other uses of count in  
        * other wait guards.  
        */  
       while (count.get() == capacity) {   
           notFull.await();  
       }  
       enqueue(e);  
       c = count.getAndIncrement();  
       if (c + 1 < capacity)  
         notFull.signal();  
     } finally {  
       putLock.unlock();  
     }  
     if (c == 0)  
       signalNotEmpty();  
   }  
   public boolean offer(E e, long timeout, TimeUnit unit)  
     throws InterruptedException {  
     if (e == null) throw new NullPointerException();  
     long nanos = unit.toNanos(timeout);  
     int c = -1;  
     final ReentrantLock putLock = this.putLock;  
     final AtomicInteger count = this.count;  
     putLock.lockInterruptibly();  
     try {  
       while (count.get() == capacity) {  
         if (nanos <= 0)  
           return false;  
         nanos = notFull.awaitNanos(nanos);  
       }  
       enqueue(e);  
       c = count.getAndIncrement();  
       if (c + 1 < capacity)  
         notFull.signal();  
     } finally {  
       putLock.unlock();  
     }  
     if (c == 0)  
       signalNotEmpty();  
     return true;  
   }  
   public E poll(long timeout, TimeUnit unit) throws InterruptedException {  
     E x = null;  
     int c = -1;  
     long nanos = unit.toNanos(timeout);  
     final AtomicInteger count = this.count;  
     final ReentrantLock takeLock = this.takeLock;  
     takeLock.lockInterruptibly();  
     try {  
         while (count.get() == 0) {   
          if (nanos <= 0)  
           return null;  
          nanos = notEmpty.awaitNanos(nanos);  
         }  
         x = dequeue();  
         c = count.getAndDecrement();  
         if (c > 1)  
           notEmpty.signal();  
     } finally {  
       takeLock.unlock();  
     }  
     if (c == capacity)  
       signalNotFull();  
     return x;  
   }  
   private void enqueue(E x) {  
     // assert putLock.isHeldByCurrentThread();  
     last = last.next = new Node<E>(x);  
   }  
   private E dequeue() {  
     // assert takeLock.isHeldByCurrentThread();  
     Node<E> h = head;  
     Node<E> first = h.next;  
     h.next = h; // help GC  
     head = first;  
     E x = first.item;  
     first.item = null;  
     return x;  
   }    
   public Iterator<E> iterator() {  
    return new Itr();  
   }  
   private class Itr implements Iterator<E> {  
     /*  
      * Basic weakly-consistent iterator. At all times hold the next  
      * item to hand out so that if hasNext() reports true, we will  
      * still have it to return even if lost race with a take etc.  
      */  
     private Node<E> current;  
     private Node<E> lastRet;  
     private E currentElement;  
     Itr() {  
       fullyLock();  
       try {  
         current = head.next;  
         if (current != null)  
           currentElement = current.item;  
       } finally {  
         fullyUnlock();  
       }  
     }  
     public boolean hasNext() {  
       return current != null;  
     }  
     public E next() {  
       fullyLock();  
       try {  
         if (current == null)  
           throw new NoSuchElementException();  
         E x = currentElement;  
         lastRet = current;  
         current = nextNode(current);  
         currentElement = (current == null) ? null : current.item;  
         return x;  
       } finally {  
         fullyUnlock();  
       }  
     }  
     public void remove() {  
       if (lastRet == null)  
         throw new IllegalStateException();  
       fullyLock();  
       try {  
         Node<E> node = lastRet;  
         lastRet = null;  
         for (Node<E> trail = head, p = trail.next;  
            p != null;  
            trail = p, p = p.next) {  
            if (p == node) {  
              unlink(p, trail);  
              break;  
            }  
         }  
       } finally {  
         fullyUnlock();  
       }  
     }  
   }  
   public boolean remove(Object o) {  
     if (o == null) return false;  
     fullyLock();  
     try {  
       for (Node<E> trail = head, p = trail.next;  
          p != null;  
          trail = p, p = p.next) {  
         if (o.equals(p.item)) {  
           unlink(p, trail);  
           return true;  
         }  
       }  
       return false;  
     } finally {  
       fullyUnlock();  
     }  
 }  
   public void clear() {  
     fullyLock();  
     try {  
       for (Node<E> p, h = head; (p = h.next) != null; h = p) {  
         h.next = h;  
         p.item = null;  
       }  
       head = last;  
       // assert head.item == null && head.next == null;  
       if (count.getAndSet(0) == capacity)  
         notFull.signal();  
     } finally {  
       fullyUnlock();  
     }  
   }  
   boolean isFullyLocked() {  
     return (putLock.isHeldByCurrentThread() &&  
         takeLock.isHeldByCurrentThread());  
   }  
 }  
PriorityQueue
Point – build min heap, and heapify, siftDown, use bitwise operators to improve performance.
 public class PriorityQueue<E> extends AbstractQueue<E>  
 implements java.io.Serializable {  
    * Priority queue represented as a balanced binary heap: the two  
    * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The  
    * priority queue is ordered by comparator, or by the elements'  
    * natural ordering, if comparator is null: For each node n in the  
    * heap and each descendant d of n, n <= d. The element with the  
    * lowest value is in queue[0], assuming the queue is nonempty.  
 private transient Object[] queue;  
    * The number of elements in the priority queue.  
   private int size = 0;  
    * The comparator, or null if priority queue uses elements'  
    * natural ordering.  
   private final Comparator<? super E> comparator;  
    * The number of times this priority queue has been  
    * <i>structurally modified</i>. See AbstractList for gory details.  
  private transient int modCount = 0;  
 private void initFromCollection(Collection<? extends E> c) {  
     Object[] a = c.toArray();  
     // If c.toArray incorrectly doesn't return Object[], copy it.  
     if (a.getClass() != Object[].class)  
       a = Arrays.copyOf(a, a.length, Object[].class);  
     queue = a;  
     size = a.length;  
 }  
 // provide a better api to client  
   public PriorityQueue(PriorityQueue<? extends E> c) {  
     comparator = (Comparator<? super E>)c.comparator();  
     initFromCollection(c);  
   }  
   public PriorityQueue(Collection<? extends E> c) {  
     initFromCollection(c);  
     if (c instanceof SortedSet)  
       comparator = (Comparator<? super E>)  
         ((SortedSet<? extends E>)c).comparator();  
     else if (c instanceof PriorityQueue)  
       comparator = (Comparator<? super E>)  
         ((PriorityQueue<? extends E>)c).comparator();  
     else {  
       comparator = null;  
       heapify();  
     }  
 }  
   private void heapify() {  
     for (int i = (size >>> 1) - 1; i >= 0; i--)  
       siftDown(i, (E) queue[i]);  
   }  
    * Inserts item x at position k, maintaining heap invariant by  
    * demoting x down the tree repeatedly until it is less than or  
    * equal to its children or is a leaf.  
   private void siftDown(int k, E x) {  
     if (comparator != null)  
       siftDownUsingComparator(k, x);  
     else  
       siftDownComparable(k, x);  
   }  
   private void siftDownComparable(int k, E x) {  
     Comparable<? super E> key = (Comparable<? super E>)x;  
     int half = size >>> 1;    // loop while a non-leaf  
     while (k < half) {  
       int child = (k << 1) + 1; // assume left child is least  
       Object c = queue[child];  
       int right = child + 1;  
       if (right < size &&  
         ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)  
         c = queue[child = right];  
       if (key.compareTo((E) c) <= 0)  
         break;  
       queue[k] = c;  
       k = child;  
     }  
     queue[k] = key;  
   }  
   private void siftDownUsingComparator(int k, E x) {  
     int half = size >>> 1;  
     while (k < half) {  
       int child = (k << 1) + 1;  
       Object c = queue[child];  
       int right = child + 1;  
       if (right < size &&  
         comparator.compare((E) c, (E) queue[right]) > 0)  
         c = queue[child = right];  
       if (comparator.compare(x, (E) c) <= 0)  
         break;  
       queue[k] = c;  
       k = child;  
     }  
     queue[k] = x;  
 }  
 // be careful to check overflow when do four arithmetic operations.  
   private void grow(int minCapacity) {  
     if (minCapacity < 0) // overflow  
       throw new OutOfMemoryError();  
      int oldCapacity = queue.length;  
     // Double size if small; else grow by 50%  
     int newCapacity = ((oldCapacity < 64)?  
               ((oldCapacity + 1) * 2):  
               ((oldCapacity / 2) * 3));  
     if (newCapacity < 0) // overflow  
       newCapacity = Integer.MAX_VALUE;  
     if (newCapacity < minCapacity)  
       newCapacity = minCapacity;  
     queue = Arrays.copyOf(queue, newCapacity);  
 }  
 // throws NullPointerException explicitly, and document the NPE.  
    * Inserts the specified element into this priority queue.  
    * @throws ClassCastException if the specified element cannot be  
    *     compared with elements currently in this priority queue  
    *     according to the priority queue's ordering  
    * @throws NullPointerException if the specified element is null  
    */  
   public boolean offer(E e) {  
     if (e == null)  
       throw new NullPointerException();  
     modCount++;  
     int i = size;  
     if (i >= queue.length)  
       grow(i + 1);  
     size = i + 1;  
     if (i == 0)  
       queue[0] = e;  
     else  
       siftUp(i, e);  
     return true;  
   }  
    * To simplify and speed up coercions and comparisons. the  
    * Comparable and Comparator versions are separated into different  
    * methods that are otherwise identical. (Similarly for siftDown.)  
   private void siftUp(int k, E x) {  
     if (comparator != null)  
       siftUpUsingComparator(k, x);  
     else  
       siftUpComparable(k, x);  
   }  
   private void siftUpComparable(int k, E x) {  
     Comparable<? super E> key = (Comparable<? super E>) x;  
     while (k > 0) {  
       int parent = (k - 1) >>> 1;  
       Object e = queue[parent];  
       if (key.compareTo((E) e) >= 0)  
         break;  
       queue[k] = e;  
       k = parent;  
     }  
     queue[k] = key;  
   }  
   private void siftUpUsingComparator(int k, E x) {  
     while (k > 0) {  
       int parent = (k - 1) >>> 1;  
       Object e = queue[parent];  
       if (comparator.compare(x, (E) e) >= 0)  
         break;  
       queue[k] = e;  
       k = parent;  
     }  
     queue[k] = x;  
 }  
   public E poll() {  
     if (size == 0)  
       return null;  
     int s = --size;  
     modCount++;  
     E result = (E) queue[0];  
     E x = (E) queue[s];  
     queue[s] = null;  
     if (s != 0)  
       siftDown(0, x);  
     return result;  
   }  
   public boolean remove(Object o) {  
      int i = indexOf(o);  
      if (i == -1)  
        return false;  
      else {  
        removeAt(i);  
        return true;  
      }  
 }  
    * Normally this method leaves the elements at up to i-1,  
    * inclusive, untouched. Under these circumstances, it returns  
    * null. Occasionally, in order to maintain the heap invariant,  
    * it must swap a later element of the list with one earlier than  
    * i. Under these circumstances, this method returns the element  
    * that was previously at the end of the list and is now at some  
    * position before i. This fact is used by iterator.remove so as to  
    * avoid missing traversing elements.  
   private E removeAt(int i) {  
     assert i >= 0 && i < size;  
     modCount++;  
     int s = --size;  
     if (s == i) // removed last element  
       queue[i] = null;  
     else {  
       E moved = (E) queue[s];  
       queue[s] = null;  
       siftDown(i, moved);  
       if (queue[i] == moved) {  
         siftUp(i, moved);  
         if (queue[i] != moved)  
           return moved;  
       }  
     }  
     return null;  
   }  
 *   String[] y = x.toArray(new String[0]);  
 * @throws NullPointerException if the specified array is null  
   public <T> T[] toArray(T[] a) {  
     if (a.length < size)  
       // Make a new array of a's runtime type, but my contents:  
       return (T[]) Arrays.copyOf(queue, size, a.getClass());  
      System.arraycopy(queue, 0, a, 0, size);  
     if (a.length > size)  
       a[size] = null;  
     return a;  
   }  
   public Iterator<E> iterator() {  
     return new Itr();  
   }  
   private final class Itr implements Iterator<E> {  
      * Index (into queue array) of element to be returned by  
      * subsequent call to next.  
     private int cursor = 0;  
      * Index of element returned by most recent call to next,  
      * unless that element came from the forgetMeNot list.  
      * Set to -1 if element is deleted by a call to remove.  
     private int lastRet = -1;  
      * A queue of elements that were moved from the unvisited portion of  
      * the heap into the visited portion as a result of "unlucky" element  
      * removals during the iteration. (Unlucky element removals are those  
      * that require a siftup instead of a siftdown.) We must visit all of  
      * the elements in this list to complete the iteration. We do this  
      * after we've completed the "normal" iteration.  
      *  
      * We expect that most iterations, even those involving removals,  
      * will not need to store elements in this field.  
     private ArrayDeque<E> forgetMeNot = null;  
      * Element returned by the most recent call to next iff that  
      * element was drawn from the forgetMeNot list.  
     private E lastRetElt = null;  
      * The modCount value that the iterator believes that the backing  
      * Queue should have. If this expectation is violated, the iterator  
      * has detected concurrent modification.  
     private int expectedModCount = modCount;  
     public boolean hasNext() {  
       return cursor < size ||  
         (forgetMeNot != null && !forgetMeNot.isEmpty());  
     }  
     public E next() {  
       if (expectedModCount != modCount)  
         throw new ConcurrentModificationException();  
       if (cursor < size)  
         return (E) queue[lastRet = cursor++];  
       if (forgetMeNot != null) {  
         lastRet = -1;  
         lastRetElt = forgetMeNot.poll();  
         if (lastRetElt != null)  
           return lastRetElt;  
       }  
       throw new NoSuchElementException();  
     }  
     public void remove() {  
       if (expectedModCount != modCount)  
         throw new ConcurrentModificationException();  
       if (lastRet != -1) {  
         E moved = PriorityQueue.this.removeAt(lastRet);  
         lastRet = -1;  
         if (moved == null)  
           cursor--;  
         else {  
           if (forgetMeNot == null)  
             forgetMeNot = new ArrayDeque<E>();  
           forgetMeNot.add(moved);  
         }  
       } else if (lastRetElt != null) {  
         PriorityQueue.this.removeEq(lastRetElt);  
         lastRetElt = null;  
       } else {  
         throw new IllegalStateException();  
        }  
       expectedModCount = modCount;  
     }  
 }  
   private void writeObject(java.io.ObjectOutputStream s)  
     throws java.io.IOException{  
     // Write out element count, and any hidden stuff  
     s.defaultWriteObject();  
     // Write out array length, for compatibility with 1.5 version  
     s.writeInt(Math.max(2, size + 1));  
     // Write out all elements in the "proper order".  
     for (int i = 0; i < size; i++)  
       s.writeObject(queue[i]);  
 }  
   private void readObject(java.io.ObjectInputStream s)  
     throws java.io.IOException, ClassNotFoundException {  
     // Read in size, and any hidden stuff  
     s.defaultReadObject();  
     // Read in (and discard) array length  
     s.readInt();  
      queue = new Object[size];  
     // Read in all elements.  
     for (int i = 0; i < size; i++)  
       queue[i] = s.readObject();  
      // Elements are guaranteed to be in "proper order", but the  
      // spec has never explained what that might be.  
      heapify();  
 }  
 }  
DelayQueue
It uses PriorityQueue underlying.
 package java.util.concurrent;  
 * An unbounded {@linkplain BlockingQueue blocking queue} of  
  * <tt>Delayed</tt> elements, in which an element can only be taken  
  * when its delay has expired. The <em>head</em> of the queue is that  
  * <tt>Delayed</tt> element whose delay expired furthest in the  
  * past. If no delay has expired there is no head and <tt>poll</tt>  
  * will return <tt>null</tt>. Expiration occurs when an element's  
  * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less  
  * than or equal to zero. Even though unexpired elements cannot be  
  * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise  
  * treated as normal elements. For example, the <tt>size</tt> method  
  * returns the count of both expired and unexpired elements.  
  * This queue does not permit null elements.   
 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>  
   implements BlockingQueue<E> {  
   private transient final ReentrantLock lock = new ReentrantLock();  
   private transient final Condition available = lock.newCondition();  
   private final PriorityQueue<E> q = new PriorityQueue<E>();  
   public boolean offer(E e) {  
     final ReentrantLock lock = this.lock;  
     lock.lock();  
     try {  
       E first = q.peek();  
       q.offer(e);  
       if (first == null || e.compareTo(first) < 0)  
         available.signalAll();  
       return true;  
     } finally {  
       lock.unlock();  
     }  
 }  
    * Retrieves and removes the head of this queue, or returns <tt>null</tt>  
    * if this queue has no elements with an expired delay.  
    * @return the head of this queue, or <tt>null</tt> if this  
    *     queue has no elements with an expired delay  
   public E poll() {  
     final ReentrantLock lock = this.lock;  
     lock.lock();  
     try {  
       E first = q.peek();  
       if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)  
         return null;  
       else {  
         E x = q.poll();  
         assert x != null;  
         if (q.size() != 0)  
           available.signalAll();  
         return x;  
       }  
     } finally {  
       lock.unlock();  
     }  
   }  
    * Retrieves and removes the head of this queue, waiting if necessary  
    * until an element with an expired delay is available on this queue,  
    * or the specified wait time expires.  
   public E poll(long timeout, TimeUnit unit) throws InterruptedException {  
     long nanos = unit.toNanos(timeout);  
     final ReentrantLock lock = this.lock;  
     lock.lockInterruptibly();  
     try {  
       for (;;) {  
         E first = q.peek();  
         if (first == null) {  
           if (nanos <= 0)  
             return null;  
           else  
             nanos = available.awaitNanos(nanos);  
         } else {  
           long delay = first.getDelay(TimeUnit.NANOSECONDS);  
           if (delay > 0) {  
             if (nanos <= 0)  
               return null;  
             if (delay > nanos)  
               delay = nanos;  
             long timeLeft = available.awaitNanos(delay);  
             nanos -= delay - timeLeft;  
           } else {  
             E x = q.poll();  
             assert x != null;  
             if (q.size() != 0)  
               available.signalAll();  
             return x;  
           }  
         }  
       }  
     } finally {  
       lock.unlock();  
     }  
   }  
    * Retrieves and removes the head of this queue, waiting if necessary  
    * until an element with an expired delay is available on this queue.  
   public E take() throws InterruptedException {  
     final ReentrantLock lock = this.lock;  
     lock.lockInterruptibly();  
     try {  
       for (;;) {  
         E first = q.peek();  
         if (first == null) {  
           available.await();  
         } else {  
           long delay = first.getDelay(TimeUnit.NANOSECONDS);  
           if (delay > 0) {  
             long tl = available.awaitNanos(delay);  
           } else {  
             E x = q.poll();  
             assert x != null;  
             if (q.size() != 0)  
               available.signalAll(); // wake up other takers  
             return x;  
           }  
         }  
       }  
     } finally {  
       lock.unlock();  
     }  
   }  
    * Returns an iterator over all the elements (both expired and  
    * unexpired) in this queue. The iterator does not return the  
    * elements in any particular order. The returned  
    * <tt>Iterator</tt> is a "weakly consistent" iterator that will  
    * never throw {@link ConcurrentModificationException}, and  
    * guarantees to traverse elements as they existed upon  
    * construction of the iterator, and may (but is not guaranteed  
    * to) reflect any modifications subsequent to construction.  
   public Iterator<E> iterator() {  
     return new Itr(toArray());  
   }  
    * Snapshot iterator that works off copy of underlying q array.  
   private class Itr implements Iterator<E> {  
     final Object[] array; // Array of all elements  
      int cursor;      // index of next element to return;  
      int lastRet;     // index of last element, or -1 if no such  
     Itr(Object[] array) {  
       lastRet = -1;  
       this.array = array;  
     }  
     public boolean hasNext() {  
       return cursor < array.length;  
     }  
     public E next() {  
       if (cursor >= array.length)  
         throw new NoSuchElementException();  
       lastRet = cursor;  
       return (E)array[cursor++];  
     }  
     public void remove() {  
       if (lastRet < 0)  
           throw new IllegalStateException();  
       Object x = array[lastRet];  
       lastRet = -1;  
       // Traverse underlying queue to find == element,  
       // not just a .equals element.  
       lock.lock();  
       try {  
         for (Iterator it = q.iterator(); it.hasNext(); ) {  
           if (it.next() == x) {  
             it.remove();  
             return;  
           }  
         }  
       } finally {  
         lock.unlock();  
       }  
     }  
 }  
 }  
 * A mix-in style interface for marking objects that should be  
  * acted upon after a given delay.  
 * <p>An implementation of this interface must define a  
  * <tt>compareTo</tt> method that provides an ordering consistent with  
  * its <tt>getDelay</tt> method.  
 public interface Delayed extends Comparable<Delayed> {  
   long getDelay(TimeUnit unit);  
 }  
ExecutorCompletionService
 package java.util.concurrent;  
 public class ExecutorCompletionService<V> implements CompletionService<V> {  
   private final Executor executor;  
   private final AbstractExecutorService aes;  
 private final BlockingQueue<Future<V>> completionQueue;  
   public ExecutorCompletionService(Executor executor,  
                    BlockingQueue<Future<V>> completionQueue) {  
     if (executor == null || completionQueue == null)  
       throw new NullPointerException();  
     this.executor = executor;  
     this.aes = (executor instanceof AbstractExecutorService) ?  
       (AbstractExecutorService) executor : null;  
     this.completionQueue = completionQueue;  
   }  
   public Future<V> submit(Callable<V> task) {  
     if (task == null) throw new NullPointerException();  
     RunnableFuture<V> f = newTaskFor(task);  
     executor.execute(new QueueingFuture(f));  
     return f;  
   }  
   public Future<V> submit(Runnable task, V result) {  
     if (task == null) throw new NullPointerException();  
     RunnableFuture<V> f = newTaskFor(task, result);  
     executor.execute(new QueueingFuture(f));  
     return f;  
   }  
   public Future<V> take() throws InterruptedException {  
     return completionQueue.take();  
   }  
   public Future<V> poll() {  
     return completionQueue.poll();  
   }  
   public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {  
     return completionQueue.poll(timeout, unit);  
   }  
   private class QueueingFuture extends FutureTask<Void> {  
     QueueingFuture(RunnableFuture<V> task) {  
       super(task, null);  
       this.task = task;  
     }  
     protected void done() { completionQueue.add(task); }  
     private final Future<V> task;  
   }  
   private RunnableFuture<V> newTaskFor(Callable<V> task) {  
     if (aes == null)  
       return new FutureTask<V>(task);  
     else  
       return aes.newTaskFor(task);  
   }  
   private RunnableFuture<V> newTaskFor(Runnable task, V result) {  
     if (aes == null)  
       return new FutureTask<V>(task, result);  
     else  
       return aes.newTaskFor(task, result);  
 }  
 }  
ExecutorService
 public interface ExecutorService extends Executor {  
   <T> Future<T> submit(Callable<T> task);  
   <T> Future<T> submit(Runnable task, T result);  
   Future<?> submit(Runnable task);  
   <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
     throws InterruptedException;  
   <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)  
     throws InterruptedException;  
    * Executes the given tasks, returning the result  
    * of one that has completed successfully (i.e., without throwing  
    * an exception), if any do. Upon normal or exceptional return,  
    * tasks that have not completed are cancelled.  
    * The results of this method are undefined if the given  
    * collection is modified while this operation is in progress.  
   <T> T invokeAny(Collection<? extends Callable<T>> tasks)  
     throws InterruptedException, ExecutionException;  
   <T> T invokeAny(Collection<? extends Callable<T>> tasks,  
           long timeout, TimeUnit unit)  
     throws InterruptedException, ExecutionException, TimeoutException;  
   boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
   void shutdown();  
   List<Runnable> shutdownNow();  
   boolean isShutdown();  
   boolean isTerminated();  
 }  
Future task
 public interface Future<V> {  
   boolean cancel(boolean mayInterruptIfRunning);  
   boolean isCancelled();  
   boolean isDone();  
   V get() throws InterruptedException, ExecutionException;  
   V get(long timeout, TimeUnit unit)  
     throws InterruptedException, ExecutionException, TimeoutException;  
 }  
 public interface RunnableFuture<V> extends Runnable, Future<V> {  
   void run();  
 }  
 public class FutureTask<V> implements RunnableFuture<V> {}  
Post a Comment

Labels

Java (159) Lucene-Solr (112) Interview (61) All (58) J2SE (53) Algorithm (45) Soft Skills (38) Eclipse (33) Code Example (31) Linux (25) JavaScript (23) Spring (22) Windows (22) Web Development (20) Tools (19) Nutch2 (18) Bugs (17) Debug (16) Defects (14) Text Mining (14) J2EE (13) Network (13) Troubleshooting (13) PowerShell (11) Chrome (9) Design (9) How to (9) Learning code (9) Performance (9) Problem Solving (9) UIMA (9) html (9) Http Client (8) Maven (8) Security (8) bat (8) blogger (8) Big Data (7) Continuous Integration (7) Google (7) Guava (7) JSON (7) Shell (7) ANT (6) Coding Skills (6) Database (6) Lesson Learned (6) Programmer Skills (6) Scala (6) Tips (6) css (6) Algorithm Series (5) Cache (5) Dynamic Languages (5) IDE (5) System Design (5) adsense (5) xml (5) AIX (4) Code Quality (4) GAE (4) Git (4) Good Programming Practices (4) Jackson (4) Memory Usage (4) Miscs (4) OpenNLP (4) Project Managment (4) Spark (4) Testing (4) ads (4) regular-expression (4) Android (3) Apache Spark (3) Become a Better You (3) Concurrency (3) Eclipse RCP (3) English (3) Happy Hacking (3) IBM (3) J2SE Knowledge Series (3) JAX-RS (3) Jetty (3) Restful Web Service (3) Script (3) regex (3) seo (3) .Net (2) Android Studio (2) Apache (2) Apache Procrun (2) Architecture (2) Batch (2) Bit Operation (2) Build (2) Building Scalable Web Sites (2) C# (2) C/C++ (2) CSV (2) Career (2) Cassandra (2) Distributed (2) Fiddler (2) Firefox (2) Google Drive (2) Gson (2) How to Interview (2) Html Parser (2) Http (2) Image Tools (2) JQuery (2) Jersey (2) LDAP (2) Life (2) Logging (2) Python (2) Software Issues (2) Storage (2) Text Search (2) xml parser (2) AOP (1) Application Design (1) AspectJ (1) Chrome DevTools (1) Cloud (1) Codility (1) Data Mining (1) Data Structure (1) ExceptionUtils (1) Exif (1) Feature Request (1) FindBugs (1) Greasemonkey (1) HTML5 (1) Httpd (1) I18N (1) IBM Java Thread Dump Analyzer (1) JDK Source Code (1) JDK8 (1) JMX (1) Lazy Developer (1) Mac (1) Machine Learning (1) Mobile (1) My Plan for 2010 (1) Netbeans (1) Notes (1) Operating System (1) Perl (1) Problems (1) Product Architecture (1) Programming Life (1) Quality (1) Redhat (1) Redis (1) Review (1) RxJava (1) Solutions logs (1) Team Management (1) Thread Dump Analyzer (1) Visualization (1) boilerpipe (1) htm (1) ongoing (1) procrun (1) rss (1)

Popular Posts