====== Παραδείγματα Χρήσης Εργαλείων Συγχρονισμού ====== ===== Blocking Queue με Locks ===== Ας υποθέσουμε ότι έχετε το αρχείο κειμένου {{:java:largedict.txt.zip|largedict.txt}} για το οποίο θέλετε να κάνετε τα εξής. Έχετε **Ν** νήματα που διαβάζουν από το αρχείο και **Μ** νήματα που στο νέο αρχείο. Θέλετε το νέο αρχείο που θα γραφεί να είναι ίδιο με το αρχικό αρχείο που διαβάστηκε. Το αρχείο κειμένου διαβάζεται και γράφεται γραμμή-γραμμή. Για την επικοινωνία μεταξύ αναγνωστών και εγγραφέων θα χρησιμοποιηθεί ένα blocking queue με σχετικά περιορισμένη χωρητικότητα **Κ**, όπου **K < min(M,N)**. Επίσης για την υλοποίηση του συγχρονισμού διαθέτουμε δύο κλειδαριές **α)** μία κλειδαριά η οποία βεβαιώνει ότι μόνο ένα νήμα διαβάζει από το αρχείο και γράφει στο //Blocking Queue// και **β)** μία κλειδαριά η οποία βεβαιώνει ότι μόνο ένα νήμα διαβάζει από το //Blocking Queue// και γράφει στο νέο αρχείο. Για τον παρακάτω παράδειγμα κώδικα που υλοποιεί τον συγχρονισμό μεταξύ των νημάτων ανάγνωσης και εγγραφής, μεταγλωττίστε και εκτελέστε αρχικά απενεργοποιώντας τις δύο κλειδαριές και στη συνέχεια ενεργοποιώντας τες. Ποια διαφορά παρατηρείτε στο τελικό αρχείο στην μία και την άλλη περίπτωση. Σε τι οφείλεται η διαφορά αυτή; import java.util.concurrent.*; import java.util.concurrent.locks.*; import java.io.*; public class BlockingQueueWithLocks { public static void main(String args[]) { BufferedReader in; FileWriter out; if( args.length == 0 ) { System.err.println("Insufficient number of arguments!"); } String filename = args[0]; try { in = new BufferedReader(new FileReader(filename)); out = new FileWriter(filename+".copy"); } catch(IOException ex) { ex.printStackTrace(); return ; } boolean useLocks = true; ArrayBlockingQueue queue = new ArrayBlockingQueue<>(2); ReentrantLock readLock = new ReentrantLock(); ReentrantLock writeLock = new ReentrantLock(); FileReaderQueueWriter w1 = new FileReaderQueueWriter(in, queue, writeLock, "QueueWriter-1", useLocks); FileReaderQueueWriter w2 = new FileReaderQueueWriter(in, queue, writeLock, "QueueWriter-2", useLocks); FileReaderQueueWriter w3 = new FileReaderQueueWriter(in, queue, writeLock, "QueueWriter-3", useLocks); FileWriterQueueReader r1 = new FileWriterQueueReader(out, queue, readLock, "QueueReader-1", useLocks); FileWriterQueueReader r2 = new FileWriterQueueReader(out, queue, readLock, "QueueReader-2", useLocks); FileWriterQueueReader r3 = new FileWriterQueueReader(out, queue, readLock, "QueueReader-3", useLocks); w1.start(); w2.start(); w3.start(); r1.start(); r2.start(); r3.start(); } } class FileReaderQueueWriter extends Thread { BufferedReader in; BlockingQueue queue; ReentrantLock lock; boolean useLocks; public FileReaderQueueWriter(BufferedReader in, BlockingQueue queue, ReentrantLock lock, String name, boolean useLocks) { this.in = in; this.queue = queue; this.lock = lock; setName(name); this.useLocks = useLocks; } void printError(String msg) { System.err.println("["+getName()+"] "+msg); } void printOut(String msg) { System.out.println("["+getName()+"] "+msg); } public void run() { int i=0; String input; try { while( true ) { if( useLocks ) lock.lock(); input = in.readLine(); if( input != null) { queue.put(input); if( useLocks ) lock.unlock(); } else { if( useLocks ) lock.unlock(); break; } } } catch(IOException ex) { printError("IOException while reading!"); } catch(InterruptedException ex) { printError("Queue put method interrupted."); } finally { if( useLocks && lock.isHeldByCurrentThread() ) lock.unlock(); try { in.close(); printError("Buffered Reader closed!"); } catch( IOException ex) { printError("IOException while closing! Stream already closed."); } } } } class FileWriterQueueReader extends Thread { FileWriter writer; BlockingQueue queue; ReentrantLock lock; boolean useLocks; public FileWriterQueueReader(FileWriter writer, BlockingQueuequeue, ReentrantLock lock, String name, boolean useLocks) { this.writer = writer; this.queue = queue; this.lock = lock; setName(name); this.useLocks = useLocks; } void printError(String msg) { System.err.println("["+getName()+"] "+msg); } void printOut(String msg) { System.out.println("["+getName()+"] "+msg); } public void run() { try { String input; while( true) { if( useLocks ) lock.lock(); if( (input = queue.poll(500, TimeUnit.MILLISECONDS)) != null ) { input += "\n"; writer.append( input.subSequence(0, input.length()), 0, input.length() ); if( useLocks ) lock.unlock(); } else { if( useLocks ) lock.unlock(); break; } } } catch(IOException ex) { printError("Error while writing to file!"); } catch(InterruptedException ex) { printError("Tired of waiting. Queue is probably empty!"); } finally { if( useLocks && lock.isHeldByCurrentThread() ) lock.unlock(); try { writer.close(); printError("File Writer closed!"); } catch( IOException ex) { printError("IOException while closing! Stream already closed."); } } } } ===== Υλοποίηση μίας διασυνδεδεμένης λίστας η οποία είναι συγχρονισμένη ===== Όπως ίσως θα γνωρίζετε η διασυνδεδεμένες λίστες που υπάρχουν στο πακέτο **[[https://docs.oracle.com/javase/7/docs/api/java/util/package-frame.html|java.util]]** δεν είναι συγχρονισμένες, δηλαδή δεν μπορούν να παρέχουν καμία εγγύηση ότι εάν προσπελαστούν ταυτόχρονα από δύο ή περισσότερα νήματα η δομή θα διατηρήσει την εσωτερική της συνοχή και το περιεχόμενο της θα είναι σωστό. Προκειμένου να μπορούμε να προσπελάσουμε μία λίστα από περισσότερα του ενός νήματα δημιουργούμε την κλάση SynchronizedList η οποία χρησιμοποιεί ένα [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.ReadLock.html|java.util.concurrent.locks.ReentrantLock.ReadLock]] για διάβασμα και ένα [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.ReadLock.html|ReentrantReadWriteLock.WriteLock.html]] για μεταβολή του περιεχομένου της λίστας. Δείτε πως διαμορφώνεται η κλάση καθώς και ένα παράδειγμα χρήσης της κλάσης αυτής. import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; class SynchronizedList { ArrayList list; ReentrantReadWriteLock lock; Lock rlock, wlock; public SynchronizedList() { list = new ArrayList<>(); lock = new ReentrantReadWriteLock(); rlock = lock.readLock(); wlock = lock.writeLock(); } public SynchronizedList(int capacity) { list = new ArrayList<>(capacity); lock = new ReentrantReadWriteLock(); rlock = lock.readLock(); wlock = lock.writeLock(); } public void listWriteLock() { wlock.lock(); } public void listWriteUnlock() { wlock.unlock(); } public void listReadLock() { rlock.lock(); } public void listReadUnlock() { rlock.unlock(); } public void add(E e) { wlock.lock(); list.add(e); wlock.unlock(); } public void add(int index, E e) { wlock.lock(); list.add(index, e); wlock.unlock(); } public boolean addAll(Collection c) { wlock.lock(); boolean result = list.addAll(c); wlock.unlock(); return result; } public void clear() { wlock.lock(); list.clear(); wlock.unlock(); } public boolean contains(Object o) { rlock.lock(); boolean result = list.contains(o); rlock.unlock(); return result; } public void ensureCapacity(int minCapacity) { wlock.lock(); list.ensureCapacity(minCapacity); wlock.unlock(); } public E get(int index) { rlock.lock(); E e = list.get(index); rlock.unlock(); return e; } public int indexOf(Object o) { rlock.lock(); int index = list.indexOf(o); rlock.unlock(); return index; } public boolean isEmpty() { rlock.lock(); boolean empty = list.isEmpty(); rlock.unlock(); return empty; } public int lastIndexOf(Object o) { rlock.lock(); int lastIndex = list.lastIndexOf(o); rlock.unlock(); return lastIndex; } public E remove(int index) { wlock.lock(); E e = list.remove(index); wlock.unlock(); return e; } public E set(int index, E element) { wlock.lock(); E e = list.set(index, element); wlock.unlock(); return e; } public int size() { rlock.lock(); int lsize = list.size(); rlock.unlock(); return lsize; } List subList(int fromIndex, int toIndex) { rlock.lock(); List newlist = list.subList(fromIndex, toIndex); rlock.unlock(); return newlist; } class SyncrhonizedIterator implements Iterator { Iterator it; Lock rlock; Lock wlock; public SyncrhonizedIterator(Lock readlock, Lock writelock) { rlock = readlock; wlock = writelock; it = list.iterator(); } public boolean hasNext() { rlock.lock(); boolean hasnext = it.hasNext(); rlock.unlock(); return hasnext; } public E next() { rlock.lock(); Object o = it.next(); rlock.unlock(); return (E)o; } public void remove() { wlock.lock(); it.remove(); wlock.unlock(); } } } class ListModifierThread extends Thread { Random rand; SynchronizedList list; //ArrayList list; public ListModifierThread(SynchronizedList list) { //public ListModifierThread(ArrayList list) { this.list = list; rand = new Random( new Date().getTime() ); } public void run() { for(int i=0; i<1000; i++) list.add( rand.nextInt(1000) ); for(int i=0; i<1000; i++) list.remove(0); if( list.isEmpty() ) System.out.println( this.getName() +": list is empty!"); else System.out.println( this.getName() +": list is NOT empty!"); } } class ListSynchronizer { public static void main(String []args) { SynchronizedList list = new SynchronizedList<>(); //ArrayList list = new ArrayList<>(); for(int i=0; i<10; i++) { (new ListModifierThread(list)).start(); } } } Αντί για τη συγχρονισμένη υλοποίηση της διασυνδεδεμένης λίστας που χρησιμοποιήσαμε μπορείτε να χρησιμοποιήσετε ένα απλό java.util.ArrayList. Αλλάξτε τον κώδικα και δείτε την συμπεριφορά (το πρόγραμμα εμφανίζει //IndexOutOfBoundException//).