import java.util.concurrent.*; import java.util.concurrent.locks.*; import java.io.*; public class BlockingQueueWithLocks { public static void main(String args[]) { BufferedReader in; FileWriter out; if( args.length == 0 ) { System.err.println("Insufficient number of arguments!"); } String filename = args[0]; try { in = new BufferedReader(new FileReader(filename)); out = new FileWriter(filename+".copy"); } catch(IOException ex) { ex.printStackTrace(); return ; } boolean useLocks = true; ArrayBlockingQueue queue = new ArrayBlockingQueue<>(2); ReentrantLock readLock = new ReentrantLock(); ReentrantLock writeLock = new ReentrantLock(); FileReaderQueueWriter w1 = new FileReaderQueueWriter(in, queue, writeLock, "QueueWriter-1", useLocks); FileReaderQueueWriter w2 = new FileReaderQueueWriter(in, queue, writeLock, "QueueWriter-2", useLocks); FileReaderQueueWriter w3 = new FileReaderQueueWriter(in, queue, writeLock, "QueueWriter-3", useLocks); FileWriterQueueReader r1 = new FileWriterQueueReader(out, queue, readLock, "QueueReader-1", useLocks); FileWriterQueueReader r2 = new FileWriterQueueReader(out, queue, readLock, "QueueReader-2", useLocks); FileWriterQueueReader r3 = new FileWriterQueueReader(out, queue, readLock, "QueueReader-3", useLocks); w1.start(); w2.start(); w3.start(); r1.start(); r2.start(); r3.start(); } } class FileReaderQueueWriter extends Thread { BufferedReader in; BlockingQueue queue; ReentrantLock lock; boolean useLocks; public FileReaderQueueWriter(BufferedReader in, BlockingQueue queue, ReentrantLock lock, String name, boolean useLocks) { this.in = in; this.queue = queue; this.lock = lock; setName(name); this.useLocks = useLocks; } void printError(String msg) { System.err.println("["+getName()+"] "+msg); } void printOut(String msg) { System.out.println("["+getName()+"] "+msg); } public void run() { int i=0; String input; try { while( true ) { if( useLocks ) lock.lock(); input = in.readLine(); if( input != null) { queue.put(input); if( useLocks ) lock.unlock(); } else { if( useLocks ) lock.unlock(); break; } } } catch(IOException ex) { printError("IOException while reading!"); } catch(InterruptedException ex) { printError("Queue put method interrupted."); } finally { if( useLocks && lock.isHeldByCurrentThread() ) lock.unlock(); try { in.close(); printError("Buffered Reader closed!"); } catch( IOException ex) { printError("IOException while closing! Stream already closed."); } } } } class FileWriterQueueReader extends Thread { FileWriter writer; BlockingQueue queue; ReentrantLock lock; boolean useLocks; public FileWriterQueueReader(FileWriter writer, BlockingQueuequeue, ReentrantLock lock, String name, boolean useLocks) { this.writer = writer; this.queue = queue; this.lock = lock; setName(name); this.useLocks = useLocks; } void printError(String msg) { System.err.println("["+getName()+"] "+msg); } void printOut(String msg) { System.out.println("["+getName()+"] "+msg); } public void run() { try { String input; while( true) { if( useLocks ) lock.lock(); if( (input = queue.poll(500, TimeUnit.MILLISECONDS)) != null ) { input += "\n"; writer.append( input.subSequence(0, input.length()), 0, input.length() ); if( useLocks ) lock.unlock(); } else { if( useLocks ) lock.unlock(); break; } } } catch(IOException ex) { printError("Error while writing to file!"); } catch(InterruptedException ex) { printError("Tired of waiting. Queue is probably empty!"); } finally { if( useLocks && lock.isHeldByCurrentThread() ) lock.unlock(); try { writer.close(); printError("File Writer closed!"); } catch( IOException ex) { printError("IOException while closing! Stream already closed."); } } } }