- Create shared Semaphore object , lock and unlock in consumer - producer blocks by limiting thread accessible count to 1
- Output will be like produce 1 and consume 1 , produce 2 and consume 2 ...
- Producer.java
public class Producer implements Runnable { private static List
LIST; private static Semaphore semaphore; public Producer(List LISTv, Semaphore semaphoreV) { LIST = LISTv; semaphore = semaphoreV; } public void run() { produce(); } private static void produce() { try { int i = 1; while (true) { semaphore.acquire(); LIST.add(i); System.out.println(i + " Produced"); i++; semaphore.release(); if (i > 100) { break; } } } catch (Exception e) { e.printStackTrace(); } } } - Consumer.java
public class Consumer implements Runnable { private static List
LIST; private static Semaphore semaphore; public Consumer(List LISTv, Semaphore semaphoreV) { LIST = LISTv; semaphore = semaphoreV; } public void run() { consume(); } private static void consume() { int index = 0; try { while (true) { semaphore.acquire(); index = LIST.size() - 1; System.out.println(LIST.get(index) + " Removed"); LIST.remove(index); semaphore.release(); } } catch (Exception e) { e.printStackTrace(); } } } - App.java
public class App { private static List
LIST = new ArrayList (); private static Semaphore SEMAPHORE = new Semaphore(1, true); public static void main(String[] args) { Thread producer = new Thread(new Producer(LIST, SEMAPHORE)); Thread consumer = new Thread(new Consumer(LIST, SEMAPHORE)); producer.start(); consumer.start(); } } - Output
1 Produced 1 Removed 2 Produced 2 Removed 3 Produced 3 Removed 4 Produced 4 Removed 5 Produced 5 Removed 6 Produced 6 Removed 7 Produced 7 Removed 8 Produced 8 Removed 9 Produced 9 Removed 10 Produced 10 Removed
Showing posts with label Mutitheading. Show all posts
Showing posts with label Mutitheading. Show all posts
Monday, 22 October 2018
Implement Producer Consumer Pattern using Semaphores
Implement Producer Consumer Pattern using Blocking Queue
- BlockingQueue amazingly simplifies implementation of Producer-Consumer design pattern by providing outofbox support of blocking on put() and take().
- No need of manual empty or full check, Blocking Queue handle it internally.
- Only put and take operation required
- Output is like one N produce and then consume like FIFO ordedr
- Producer.java
public class Producer implements Runnable { private static BlockingQueue
QUEUE; public Producer(BlockingQueue QUEUE_V) { QUEUE = QUEUE_V; } public void run() { try { int i = 1; while (true) { QUEUE.put(i); System.out.println(i + " Produced"); i++; } } catch (Exception e) { e.printStackTrace(); } } }
public class Consumer implements Runnable { private static BlockingQueue
QUEUE; public Consumer(BlockingQueue QUEUE_V) { QUEUE = QUEUE_V; } public void run() { consumer(); } private static void consumer() { int item; try { Thread.sleep(1000); while (true) { item = QUEUE.take(); System.out.println(item + " Consumed"); } } catch (Exception e) { e.printStackTrace(); } } }
-
private static BlockingQueue
QUEUE = new ArrayBlockingQueue (10); public static void main(String[] args) { Thread producer = new Thread(new Producer(QUEUE)); Thread consumer = new Thread(new Consumer(QUEUE)); producer.start(); consumer.start(); }
1 Produced 2 Produced 3 Produced 4 Produced 5 Produced 6 Produced 7 Produced 8 Produced 9 Produced 10 Produced 1 Consumed 11 Produced 2 Consumed 3 Consumed 4 Consumed 5 Consumed 12 Produced 6 Consumed 13 Produced 7 Consumed 14 Produced 8 Consumed 15 Produced 9 Consumed 16 Produced 10 Consumed 17 Produced 11 Consumed 18 Produced
Producer Consumer Problem using Synchronized block
- If List is full then our PRODUCER thread waits until CONSUMER thread consume one item and make space in your queue and call notify() method to inform PRODUCER thread. Both wait() and notify() method are called on shared object which is List in our case.
- Need synchronised block
- Check for List size manually and wait for consumer and producer threads
- Since it is synchronised,
- Needs to wait till N production
- Needs to wait till N Consumption
- Producer.java
public class Producer implements Runnable { private static List
LIST; private static int SIZE; public Producer(List LIST_V, int SIZE_V) { LIST = LIST_V; SIZE = SIZE_V; } public void run() { producer(); } private static void producer() { try { int i = 1; while (true) { synchronized (LIST) { if (LIST.size() == SIZE) { System.out.println("Producer Waiting for consumer to consume object"); LIST.wait(); } LIST.add(i); System.out.println(i + " Produced"); LIST.notify(); } i++; if (i > 100) { break; } } } catch (Exception e) { e.printStackTrace(); } } } - Consumer.java
public class Consumer implements Runnable { private static List
LIST; public Consumer(List LIST_V) { LIST = LIST_V; } public void run() { consumer(); } private static void consumer() { int index = 0; try { Thread.sleep(2000); while (true) { synchronized (LIST) { if (LIST.size() == 0) { System.out.println("Consumer is waiting for producer to produce"); LIST.wait(); } System.out.println(LIST.get(index) + " Consumed"); LIST.remove(index); LIST.notify(); } } } catch (Exception e) { e.printStackTrace(); } } } - App.java
public class App { private static List
LIST = new ArrayList (); private static int SIZE = 10; public static void main(String[] args) { Thread producer = new Thread(new Producer(LIST, SIZE)); Thread consumer = new Thread(new Consumer(LIST)); producer.start(); consumer.start(); } } - Output
1 Produced 2 Produced 3 Produced 4 Produced 5 Produced 6 Produced 7 Produced 8 Produced 9 Produced 10 Produced Producer Waiting for consumer to consume object 1 Consumed 2 Consumed 3 Consumed 4 Consumed 5 Consumed 6 Consumed 7 Consumed 8 Consumed 9 Consumed 10 Consumed Consumer is waiting for producer to produce 11 Produced 12 Produced 13 Produced
Friday, 12 October 2018
Java Executor Framework
- It is the first concurrent utility framework in java and used for standardising invocation, scheduling, execution and control of asynchronous tasks in parallel threads.
- Executor implementation in java uses thread pools which consists of worker threads. The entire management of worker threads is handled by the framework. So the overhead in memory management is much reduced compared to earlier multithreading approaches.
- The Java Executor framework creates tasks by using instances of Runnable or Callable. In case of Runnable, the run () method does not return a value or throw any checked exception. But Callable is a more functional version in that area. It defines a call () method that allows the return of some computed value which can be used in future processing and it also throws an exception if necessary.
- The FutureTask class is another important component which is used to get future information about the processing. An instance of this class can wrap either a Callable or a Runnable. You can get an instance of this as the return value of submit () method of an ExecutorService. You can also manually wrap your task in a FutureTask before calling execute () method.
- Following are the functional steps to implement the Java ThreadPoolExecutor.
- Create an executor
- Executor class has a number of static factory methods to create an ExecutorService depending upon the requirement of the application.
- The newFixedThreadPool () returns a ThreadPoolExecutor instance with an initialized and unbounded queue and a fixed number of threads.
- The newCachedThreadPool () returns a ThreadPoolExecutor instance initialized with an unbounded queue and unbounded number of threads
- newFixedThreadPool ()
- No extra thread is created during execution
- If there is no free thread available the task has to wait and then execute when one thread is free
- newCachedThreadPool ()
- Existing threads are reused if available. But if no free thread is available, a new one is created and added to the pool to complete the new task. Threads that have been idle for longer than a timeout period will be removed automatically from the pool.
- This is a fixed pool of 10 threads.
- This is a cached thread pool
- Following is an example of customized thread pool executor. The parameter values depend upon the application need. Here the core pool is having 8 threads which can run concurrently and the maximum number is 12. The queue is capable of keeping 250 tasks. Here one point should be remembered that the pool size should be kept on a higher side to accommodate all tasks. The idle time limit is kept as 5 ms.
- Create one or more tasks and put in the queue
- After creating the executor now it’s time for creating tasks. Create one or more tasks to be performed as instances of either Runnable or Callable. In this framework, all the tasks are created and populated in a queue. After the task creation is complete the populated queue is submitted for concurrent execution.
- Submit the task to the Executor
- After creating the ExecutorService and proposed tasks, you need to submit the task to the executor by using either submit () or execute () method. Now as per your configuration the tasks will be picked up from the queue and run concurrently. For example if you have configured 5 concurrent executions, then 5 tasks will be picked up from the queue and run in parallel. This process will continue till all the tasks are finished from the queue.
- Execute the task
- Next the actual execution of the tasks will be managed by the framework. The Executor is responsible for managing the task’s execution, thread pool, synchronization and queue. If the pool has less than its configured number of minimum threads, new threads will be created as per requirement to handle queued tasks until that limit is reached. If the number is higher than the configured minimum, then the pool will not start any more threads. Instead, the task is queued until a thread is freed up to process the request. If the queue is full, then a new thread is started to handle it. But again it depends upon the type of constructor used during executor creation.
- Shutdown the Executor
- The termination is executed by invoking its shutdown () method. You can choose to terminate it gracefully, or abruptly.
private static final Executor executor = Executors.newFixedThreadPool(10);
private static ExecutorService exec = Executors.newCachedThreadPool();
private static final Executor executor = new ThreadPoolExecutor(5, 12, 50000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(250));
Semaphore
- A Semaphore is a thread synchronization construct that can be used either to send signals between threads to avoid missed signals, or to guard a critical section like you would with a lock
- Simple Semaphore implementation:
- The take() method sends a signal which is stored internally in the Semaphore. The release() method waits for a signal. When received the signal flag is cleared again, and the release() method exited.
// Prototype pattern public abstract class Prototype implements Cloneable { public Prototype clone() throws CloneNotSupportedException{ return (Prototype) super.clone(); } } public class ConcretePrototype1 extends Prototype { @Override public Prototype clone() throws CloneNotSupportedException { return (ConcretePrototype1)super.clone(); } } public class ConcretePrototype2 extends Prototype { @Override public Prototype clone() throws CloneNotSupportedException { return (ConcretePrototype2)super.clone(); } }
- Using a semaphore like this you can avoid missed signals. You will call take() instead of notify() and release() instead of wait().
- Using Semaphores for Signaling
- Here is a simplified example of two threads signaling each other using a Semaphore:
- MainApp.java
- SendingThread.java
- RecevingThread.java
- Semaphore to block threads
Semaphore semaphore = new Semaphore();
SendingThread sender = new SendingThread(semaphore);
ReceivingThread receiver = new ReceivingThread(semaphore);
receiver.start();
sender.start();
public class SendingThread {
Semaphore semaphore = null;
public SendingThread(Semaphore semaphore){
this.semaphore = semaphore;
}
public void run(){
while(true){
//do something, then signal
this.semaphore.take();
}
}
}
public class RecevingThread {
Semaphore semaphore = null;
public ReceivingThread(Semaphore semaphore){
this.semaphore = semaphore;
}
public void run(){
while(true){
this.semaphore.release();
//receive signal, then do something...
}
}
}
Multi-threading Interview Questions Java
- How Volatile in Java works?
- The Java volatile keyword cannot be used with method or class and it can only be used with a variable.
- Java volatile keyword also guarantees visibility and ordering and write to any volatile variable happens before any read into the volatile variable.
- Example: Singleton Class
public class Singleton{ private static volatile Singleton _instance; //volatile variable public static Singleton getInstance(){ if(_instance == null){ synchronized(Singleton.class){ if(_instance == null) _instance = new Singleton(); } } return _instance; }
- writer thread comes out of synchronized block, memory will not be synchronized and value of _instance will not be updated in main memory. With Volatile keyword in Java, this is handled by Java himself and such updates will be visible by all reader threads.
- If a variable is not shared between multiple threads, you don't need to use volatile keyword with that variable.
- Both T1 and T2 can refer to a class containing this variable. You can then make this variable volatile, and this means that changes to that variable are immeditately visible in both threads.
-
public class App { public static volatile boolean isEven = true; public static void main(String[] args) { Object mutex = new Object(); Thread odd = new Thread(new Runnable() { @Override public void run() { try { int i = 0; while (i < 20) { synchronized (mutex) { if (isEven) { mutex.wait(); } System.out.println("Odd"); isEven = true; mutex.notify(); } i++; } } catch (Exception e) { e.printStackTrace(); } } }); Thread even = new Thread(new Runnable() { @Override public void run() { try { int i = 0; while (i < 20) { synchronized (mutex) { if (!isEven) { mutex.wait(); } System.out.println("Even"); isEven = false; mutex.notify(); } i++; } } catch (Exception e) { e.printStackTrace(); } } }); odd.start(); even.start(); } }
- Volatile keyword in Java guarantees that value of the volatile variable will always be read from main memory and not from Thread's local cache.
- In Java reads and writes are atomic for all variables declared using Java volatile keyword (including long and double variables).
- Using the volatile keyword in Java on variables reduces the risk of memory consistency errors because any write to a volatile variable in Java establishes a happens-before relationship with subsequent reads of that same variable.
- Java volatile keyword doesn't mean atomic, its common misconception that after declaring volatile ++ will be atomic, to make the operation atomic you still need to ensure exclusive access using synchronized method or block in Java.
- How is CountDownLatch used in Java Multithreading?
- CountDownLatch works in latch principle, the main thread will wait until the gate is open. One thread waits for n threads, specified while creating the CountDownLatch.
- Any thread, usually the main thread of the application, which calls CountDownLatch.await() will wait until count reaches zero or it's interrupted by another thread.
- All other threads are required to count down by calling CountDownLatch.countDown() once they are completed or ready.
- As soon as count reaches zero, the waiting thread continues. One of the disadvantages/advantages of CountDownLatch is that it's not reusable: once count reaches zero you cannot use CountDownLatch any more.
- can we make array volatile in java?
- Yes, you can make an array (both primitive and reference type array e.g. an int array and String array) volatile in Java
- But declaring an array volatile does NOT give volatile access to it's fields. you're declaring the reference itself volatile, not it's elements.
- protected volatile int[] primes = new int[10];
- then if you assign a new array to primes variable, change will be visible to all threads, but changes to individual indices will not be covered under volatile guarantee i.e
- primes = new int[20];
- will follow the "happens-before" rule and cause memory barrier refresh visible to all threads
- primes[0] = 10;
- will not visible changes in all threads
- Same for collections also
- In other words you're declaring a volatile set of elements, not a set of volatile elements. The solution here is to use AtomicIntegerArray in case you want to use integers
- Thread Local?
- The ThreadLocal class in Java enables you to create variables that can only be read and writte by the same thread.
-
private ThreadLocal
myThreadLocal = new ThreadLocal (); - Now you can only store strings in the ThreadLocal instance.
-
myThreadLocal.set("Hello ThreadLocal"); String threadLocalValue = myThreadLocal.get();
- How immutable objects manage memory ?
- The advantage we get with String is that a common pool of string literals is kept by the virtual machine stopping the Heap getting filled up . The reasoning behind this is that much of the memory of a program can be taken up with storing commonly used strings.
- How to throw exceptions from Runnable.run?
- Do not use
Runnable
interface from Thread library, but instead create your own interface with the modified signature that allows checked exception to be thrown public interface MyRunnable { void myRun ( ) throws MyException; }
- You may even create an adapter that converts this interface to real Runnable ( by handling checked exception ) suitable for use in Thread framework.
- Difference Between Daemon and User Threads?
- Java offers two types of threads: user threads and daemon threads.
- JVM will wait for all active user threads to finish their execution before it shutdown itself.
- Daemon thread doesn't get that preference, JVM will exit and close the Java program even if there is a daemon thread running in the background
- Daemon threads are low-priority threads whose only role is to provide services to user threads..
- A daemon thread is a thread that does not prevent the JVM from exiting when the user thread finishes but the thread is still running. An example for a daemon thread is the garbage collection.
- That’s why infinite loops, which typically exist in daemon threads, will not cause problems, because any code, including the finally blocks, won’t be executed once all user threads have finished their execution. For this reason, daemon threads are not recommended for I/O tasks.
// Java program to demonstrate the usage of // setDaemon() and isDaemon() method. public class DaemonThread extends Thread { public DaemonThread(String name){ super(name); } public void run() { // Checking whether the thread is Daemon or not if(Thread.currentThread().isDaemon()) { System.out.println(getName() + " is Daemon thread"); } else { System.out.println(getName() + " is User thread"); } } public static void main(String[] args) { DaemonThread t1 = new DaemonThread("t1"); DaemonThread t2 = new DaemonThread("t2"); DaemonThread t3 = new DaemonThread("t3"); // Setting user thread t1 to Daemon t1.setDaemon(true); // starting first 2 threads t1.start(); t2.start(); // Setting user thread t3 to Daemon t3.setDaemon(true); t3.start(); } } ######OUT PUT#### t1 is Daemon thread t2 is User thread
Synchronization in Java
- If your code is executing in a multi-threaded environment, you need synchronization for objects, which are shared among multiple threads, to avoid any corruption of state or any kind of unexpected behavior.
- Synchronization in Java will only be needed if shared object is mutable.
- JVM guarantees that Java synchronized code will only be executed by one thread at a time.
- we can not have synchronized variable in java. Using synchronized keyword with a variable is illegal and will result in compilation error. You can use java synchronized keyword only on synchronized method or synchronized block.
- we need to take care is that static synchronized method locked on class object lock and nonstatic synchronized method locks on current object (this). So it’s possible that both static and nonstatic java synchronized method running in parallel.
-
public class Counter{ private static int count = 0; public static synchronized int getCount(){ return count; } public synchoronized setCount(int count){ this.count = count; } }
- In this example of Java, the synchronization code is not properly synchronized because both getCount() and setCount() are not getting locked on the same object and can run in parallel which may result in the incorrect count.
- Whenever a thread enters into java synchronized method or blocks it acquires a lock and whenever it leaves java synchronized method or block it releases the lock. The lock is released even if thread leaves synchronized method after completion or due to any Error or Exception.
- Object level lock
- Java Thread acquires an object level lock when it enters into an instance synchronized java method
- Class level lock
- Acquires a class level lock when it enters into static synchronized java method.
- Re-entrant Lock
- if a java synchronized method calls another synchronized method which requires the same lock then the current thread which is holding lock can enter into that method without acquiring the lock.
- Locks
- One Major disadvantage of Java synchronized keyword is that it doesn't allow concurrent read, which can potentially limit scalability.
- By using the concept of lock stripping and using different locks for reading and writing, you can overcome this limitation of synchronized in Java. You will be glad to know that java.util.concurrent.locks.ReentrantReadWriteLock provides ready-made implementation of ReadWriteLock in Java.
- One more limitation of java synchronized keyword is that it can only be used to control access to a shared object within the same JVM. If you have more than one JVM and need to synchronize access to a shared file system or database, the Java synchronized keyword is not at all sufficient. You need to implement a kind of global lock for that.
- Java synchronized block is better than java synchronized method in Java because by using synchronized block you can only lock critical section of code and avoid locking the whole method which can possibly degrade performance.
- Do not synchronize on the non-final field on synchronized block in Java. because the reference of the non-final field may change anytime and then different thread might synchronizing on different objects i.e. no synchronization at all.
- Locks vs Synchronisation
- Synchronisation
- One thread at a time and other threads waiting
- Cannot do multi read even no write happening
- Cannot interrupt any thread which is waiting for acquire lock
- Cannot change priority of threads
- Locks
- Lock comes under java.util.concurrent.Lock
- One thread at a time and other threads waiting
- ReetantReadWriteLock.readLock
- ReetantReadWriteLock.writeLock
- Allows multiple reads as long as no write happens
- tryLock ->if lock available lock it
- Fairness Policy
- Longest waiting will get higher priority
- Able interrupt thread