Java Concurrency Tips


Executors
executor.execute vs submit
execute can only accept Runnable, execute returns nothing
- void execute(Runnable task)
- used for fire and forget

submit can accept Runnable and Callable, submit returns Future
Future submit(Callable task)
Future submit(Runnable task)
- used when need get the result

Exception handling - if task throws unhanded exception
- execute: the UncaughtExceptionHandler for the Thread running the task to be invoked. By default it prints exception to System.err. We can register global UncaughtExceptionHandler: Thread.setDefaultUncaughtExceptionHandler();
- submit: Any exception thrown will be bound in Future. Call future.get will throw an ExecutionException with the original Throwable as its cause.

RejectedExecutionHandler: ThreadPoolExecutor.CallerRunsPolicy(), NewThreadRunsPolicy

Threadpool
-Isolation: Allocate a separate thread pool for slow web requests within the same application

- corePoolSize 30, maximumPoolSize 50, ArrayBlockingQueue(100): what it means

Deadlock prevention
- Same Lock Ordering
- Lock Timeout

Mutual Exclusion
Hold and Wait
No Preemption

Circular Wait

Lock
Intrinsic Lock - synchronized
- every object and class is logically associated with a monitor
- wait, notify, notifyAll
ReentrantLock
- acquire lock interruptibly and with timeout - tryLock
- lockInterruptibly: used to interrupt thread when it is waiting for lock
- fair lock
- better performance: uses atomic variable and faster CAS operation
- get list of waiting thread for lock
disadvantages
- need put it into try-finally block, code unreadable
- easier to make mistake: forget to unlock in finally block
associate a counter with each thread

Condition
- wait for/notify a particular condition to become true
- can have multiple conditions on same lock
- enables the effect of having multiple wait-sets per object
- factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object
- Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

- await releases the lock, suspend the thread, signal, signalAll
- BoundedBuffer
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();

Semaphore
- can be released by other threads, useful for deadlock recovery
- P(acquire), V(release)

- a counter (integer) that allows a thread to get into a critical region if the value of the counter is greater than 0
- implementation: no lock, CAS
- acquire/(permits), tryAcquire, release/(permits)
CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(TOTAL_EVENTS);
countDownLatch.countDown();
countDownLatch.await();
Barrier
CyclicBarrier
- We can reuse CyclicBarrier even if Barrier is broken but can't reuse CountDownLatch

Programming Concurrency on the JVM
Read-write lock - ReentrantReadWriteLock
Thread local storage
Prefer local thread variables over static and shared when possible
Avoiding deadlocks by ordering the locks
Using atomic variables instead of synchronization
Holding locks for as short a time as possible
Avoid executing inside the critical section the code you don't control.

Use lock instead of synchronize
Use CyclicBarrier and CountdownLatch instead of wait/notify

volatile
- visibility: any thread that reads a field will see the most recently written value
- operations on it not be reordered

- not cached in registers or in caches; always read from and write to main memory - thus a little slower

Ensure Visibility
- cross the memory barrier on getters, use volatile, atomicXXX, readlock etc
Ensure Atomicity

Don't start threads in constructors
- use static factory methods, thread pool
Use tryLock - instead of lock
Cancel a task - future.cancel, executor.cancel

Initialization-on-demand holder idiom
Double-checked locking

AtomicLongArray
The long elements in the AtomicLongArray can be updated atomically
- compareAndSet: compare the value of a given element with a specified value, and if the two values are equal, set a new value for that element.
- getAndAdd(index, incrment), getAndIncrement(index), incrementAndGet

AtomicReferenceArray
AtomicReference

Concurrent data structures
ConcurrentHashMap
- guarantees O(log(n)) operations
- sorted, ceilingEntry/Key, floorEntry/Key

ConcurrentSkipListMap
ConcurrentSkipListSet
- thread-safe implementation of the SortedSet
- concurrent crud
- slower than TreeSet, don't use it unless in highly ...
- size is slow, O(n)
- head, nil are in every skip list
- when add, a random number generator is used to determine the levels of the new node
ConcurrentLinkedDeque
CopyOnWriteArrayList
CompletionService
extend thread class vs implement runnable interface
- use threadpool to share threads if implement runnable

Missed Signals - link
boolean wasSignalled = false;
synchronized(myMonitorObject){
  while(!wasSignalled){
    try{
      myMonitorObject.wait();
     } catch(InterruptedException e){...}
  }
  wasSignalled = false;
}

Don't call wait() on constant String's or global objects
LongAdder
- use more memory
- prefer AtomicLong when contention is low

unsafe

unsafe.getAndAddLong(this, valueOffset, 1L)
Practices
Implement AtomicInteger
synchronized
int currentValue;
int previousValue;

Implement ReadWriteLock and ReentrantReadWriteLock - link
private Map readingThreads =
     new HashMap();

 private int writeAccesses    = 0;
 private int writeRequests    = 0;
 private Thread writingThread = null;
Implement Lock and ReEntrantLock
int lockHoldCount;
long IdOfThreadCurrentlyHoldingLock;
IdOfThreadCurrentlyHoldingLock=Thread.currentThread().getId();
synchronized

Implement Semaphore
int permits; synchronized
while(this.permits == 0) wait();

Implement CyclicBarrier
2 or more threads wait for each other to reach a common barrier point. When all threads have reached common barrier point (i.e. when all threads have called await() method), all waiting threads are released.
CyclicBarrier(int parties)

int initialParties; //total parties
int partiesAwait; //parties yet to arrive
synchronized await
-
partiesAwait--;
if(partiesAwait>0){
  this.wait();
}

Implementing (Thread Safe) Stacklink
AtomicReference
- CAS
- if(head.compareAndSet(oldHead, newHead))
Design threadsafe real time counter
private AtomicLongArray counter = new AtomicLongArray(GRANULARITY);
private volatile int pos = 0;
- Another thread that update pos every second
void incrementPosition(){
    counter.set((pos + 1)%GRANULARITY, 0);
    pos = (pos + 1)%GRANULARITY;

}

Person A holds A, PB holds B, PC holds C, how can we guarantee always print ABC
- Use Semaphore

Print even and odd numbers using threads
Dining Philosophers Problem

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)