Friday, 15 April 2016

Program for Producer Consumer Program Using JDBC in JAVA

Features of the program are :
1) Publisher Threads (Producer) insert a new entry in database table and put it in ‘taskQueue’ for tracking.
2) Subscriber threads (Consumer) will delete the entry one by one from DB table.
3) Max capacity of taskQueue is 100 i.e. maximum 100 entries will insert in DB table.
4) Both threads run infinitely.

Source Code: Publisher.java

import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.List;

public class Publisher implements Runnable {
               
                private final List<Integer> taskQueue;
                final int MAX;
                PreparedStatement pstmt;
                public Publisher(List<Integer> sharedQueue,int sharedI) {
                                this.MAX=sharedI;
                                this.taskQueue=sharedQueue;
                }

                @Override
                public  void run() {
                                int i=0; 
                               
                                                try {
                                                                 Connection c=DriverManager.getConnection("jdbc:postgresql://localhost:5432/postgres", "postgres", "oracle");
                                                                                String ins_into_pub_log="Insert into trx_pub(trx_id,curr_date,pub_code,sub_code)values(nextval('trx_id_1sq'),current_date,?,?);";
                                                               
                                                                pstmt=c.prepareStatement(ins_into_pub_log);
                                                                while(true){
                                                                               
                                                                                try {
                                                                                                produce(i++);
                                                                                } catch (InterruptedException e) {
                                                                                                // TODO Auto-generated catch block
                                                                                                e.printStackTrace();
                                                                                }
                                               
                                }
                               
                                                } catch (SQLException e) {
                                                                // TODO Auto-generated catch block
                                                                e.printStackTrace();
                                                }
                                }

                private void produce(int i) throws InterruptedException, SQLException {
                               
                               
                                synchronized (taskQueue) {      
                while(taskQueue.size()==MAX)
                {

                                               
                                System.out.println("Queue is full. Thread: "+ Thread.currentThread().getName() + " is Waiting for Maintainer");
                                                                taskQueue.wait();
               
                }
                                Thread.sleep(100);
                                taskQueue.add(i);
                                pstmt.setString(1,Thread.currentThread().getName());
                                pstmt.setString(2, "SUB");          
                                pstmt.execute();
                                taskQueue.notifyAll();
                                }}}

Source Code: Subscriber.java

 import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

public class Subscriber implements Runnable {

                private List<Integer> taskQueue;
                PreparedStatement pstmt;
               
                public Subscriber(List<Integer> sharedQueue) {
                                // TODO Auto-generated constructor stub
                                this.taskQueue=sharedQueue;
                }
               
                @Override
                public void run() {
                                // TODO Auto-generated method stub

                                                try {
                                                                Connection c=DriverManager.getConnection("jdbc:postgresql://localhost:5432/postgres", "postgres", "oracle");
                                                                String del_from_pub_log="delete from trx_pub where trx_id in(select min(trx_id) from trx_pub);";

pstmt=c.prepareStatement(del_from_pub_log);


                                while(true){
                                                                try {
                                                                                consume();
                                                                } catch (InterruptedException e) {
                                                                                // TODO Auto-generated catch block
                                                                                e.printStackTrace();
                                                                }
}
                                                } catch (SQLException e) {
                                                                // TODO Auto-generated catch block
                                                                e.printStackTrace();
                                                }
                               
                }

                private void consume() throws InterruptedException, SQLException  {
                                // TODO Auto-generated method stub
                                synchronized (taskQueue) {
                                                while(taskQueue.isEmpty()){
                                                                System.out.println("Queue is empty. Subscriber: "+Thread.currentThread().getName() + " is waiting ");;
                                                                taskQueue.wait();
                                                }
                                                Thread.sleep(100);
                                                int i = (Integer) taskQueue.remove(0);
                         System.out.println("Consumed: " + i);
                         pstmt.execute();
                         taskQueue.notifyAll();}}}


 Source Code: Main method 

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

public class Maintainer {

                 
                public static void main(String[] args) {
                                // TODO Auto-generated method stub
                               
                                List<Integer> mylist=new ArrayList<>();
                                 int MAX_CAPACITY=100;
                                 
                                 Publisher p1=new Publisher(mylist,MAX_CAPACITY);
                                 Subscriber s1=new Subscriber(mylist);
                                 Thread  t4=new Thread(s1);
                                  t4.setName("S1");
                                  Thread  t5=new Thread(s1);
                                  t5.setName("S2");
                                Thread  t1=new Thread(p1);
                                  t1.setName("A");
                                  Thread t2=new Thread(p1);
                                  t2.setName("B");
                                  Thread t3=new Thread(p1);
                                  t3.setName("C");
                                 t1.start();
                                 t2.start();
                                t3.start();
                                t4.start();
                                t5.start();
                }}

No comments:

Post a Comment