Features of the program are :
1) Publisher Threads (Producer) insert a new entry in database table and put it in ‘taskQueue’ for tracking.
2) Subscriber threads (Consumer) will delete the entry one by one from DB table.
3) Max capacity of taskQueue is 100 i.e. maximum 100 entries will insert in DB table.
4) Both threads run infinitely.
Source Code: Publisher.java
import
java.sql.Connection;
Source Code: Main method
2) Subscriber threads (Consumer) will delete the entry one by one from DB table.
3) Max capacity of taskQueue is 100 i.e. maximum 100 entries will insert in DB table.
4) Both threads run infinitely.
Source Code: Publisher.java
import
java.sql.Connection;
import java.sql.Date;
import
java.sql.DriverManager;
import
java.sql.PreparedStatement;
import
java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Calendar;
import
java.util.List;
public class Publisher implements
Runnable {
private final
List<Integer> taskQueue;
final int
MAX;
PreparedStatement
pstmt;
public Publisher(List<Integer> sharedQueue,int
sharedI) {
this.MAX=sharedI;
this.taskQueue=sharedQueue;
}
@Override
public void run() {
int i=0;
try {
Connection c=DriverManager.getConnection("jdbc:postgresql://localhost:5432/postgres",
"postgres", "oracle");
String
ins_into_pub_log="Insert
into
trx_pub(trx_id,curr_date,pub_code,sub_code)values(nextval('trx_id_1sq'),current_date,?,?);";
pstmt=c.prepareStatement(ins_into_pub_log);
while(true){
try {
produce(i++);
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void
produce(int i)
throws InterruptedException,
SQLException {
synchronized (taskQueue)
{
while(taskQueue.size()==MAX)
{
System.out.println("Queue
is full. Thread: "+ Thread.currentThread().getName() + " is Waiting for Maintainer");
taskQueue.wait();
}
Thread.sleep(100);
taskQueue.add(i);
pstmt.setString(1,Thread.currentThread().getName());
pstmt.setString(2, "SUB");
pstmt.execute();
taskQueue.notifyAll();
}}}
Source Code: Subscriber.java
import
java.sql.DriverManager;
import
java.sql.PreparedStatement;
import
java.sql.SQLException;
import
java.util.List;
public class Subscriber implements
Runnable {
private List<Integer> taskQueue;
PreparedStatement
pstmt;
public Subscriber(List<Integer> sharedQueue) {
// TODO Auto-generated constructor stub
this.taskQueue=sharedQueue;
}
@Override
public void
run() {
// TODO Auto-generated method stub
try {
Connection
c=DriverManager.getConnection("jdbc:postgresql://localhost:5432/postgres",
"postgres", "oracle");
String
del_from_pub_log="delete
from trx_pub where trx_id in(select min(trx_id) from trx_pub);";
pstmt=c.prepareStatement(del_from_pub_log);
while(true){
try {
consume();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void
consume() throws
InterruptedException, SQLException {
// TODO Auto-generated method stub
synchronized (taskQueue)
{
while(taskQueue.isEmpty()){
System.out.println("Queue
is empty. Subscriber: "+Thread.currentThread().getName() + " is waiting ");;
taskQueue.wait();
}
Thread.sleep(100);
int i =
(Integer) taskQueue.remove(0);
System.out.println("Consumed: " + i);
pstmt.execute();
taskQueue.notifyAll();}}}
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import
java.util.ArrayList;
import
java.util.List;
public class Maintainer {
public static
void main(String[] args) {
// TODO Auto-generated method stub
List<Integer>
mylist=new
ArrayList<>();
int MAX_CAPACITY=100;
Publisher p1=new Publisher(mylist,MAX_CAPACITY);
Subscriber s1=new Subscriber(mylist);
Thread t4=new
Thread(s1);
t4.setName("S1");
Thread
t5=new
Thread(s1);
t5.setName("S2");
Thread t1=new Thread(p1);
t1.setName("A");
Thread t2=new Thread(p1);
t2.setName("B");
Thread t3=new Thread(p1);
t3.setName("C");
t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
}}
No comments:
Post a Comment