java:java_util_concurrent
Differences
This shows you the differences between two versions of the page.
Both sides previous revisionPrevious revisionNext revision | Previous revision | ||
java:java_util_concurrent [2017/03/21 07:39] – [Το interface BlockingQueue] gthanos | java:java_util_concurrent [2017/03/21 14:36] (current) – [Semaphore] gthanos | ||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== Έτοιμα εργαλεία συγχρονισμού στο πακέτο java.util.concurrent | + | ====== Έτοιμα εργαλεία συγχρονισμού στο πακέτο java.util.concurrent |
Το πακέτο [[http:// | Το πακέτο [[http:// | ||
Line 11: | Line 11: | ||
Ένα //blocking queue// είναι μία ουρά μηνυμάτων (//queue//) στην οποία μπορούν να γράφουν ή να διαβάζουν από αυτή με ασφάλεια δύο ή περισσότερα νήματα. Όταν η ουρά είναι άδεια και ένα νήμα επιχειρήσει να διαβάσει αυτό μπλοκάρει μέχρι κάποιο άλλο νήμα να γράψει στην ουρά. Αντίστοιχα, | Ένα //blocking queue// είναι μία ουρά μηνυμάτων (//queue//) στην οποία μπορούν να γράφουν ή να διαβάζουν από αυτή με ασφάλεια δύο ή περισσότερα νήματα. Όταν η ουρά είναι άδεια και ένα νήμα επιχειρήσει να διαβάσει αυτό μπλοκάρει μέχρι κάποιο άλλο νήμα να γράψει στην ουρά. Αντίστοιχα, | ||
- | {{ :java:design_and_implement_a_blocking_queue-600x0.png?450 | + | |
+ | {{ :java:blockingqueue.png? }} | ||
Οι βασικές μέθοδοι που υποστηρίζονται από ένα //blocking queue// είναι οι εξής: | Οι βασικές μέθοδοι που υποστηρίζονται από ένα //blocking queue// είναι οι εξής: | ||
Line 31: | Line 32: | ||
===== Semaphore ===== | ===== Semaphore ===== | ||
- | Στο πακέτο [[http:// | + | Στο πακέτο |
- | * Δέσμευση πόρων (acquire permits): | + | * Δέσμευση πόρων (//acquire permits//): |
- | * **acquire(): | + | * **[[https:// |
- | * **acquire(int N):** Δέσμευει Ν permits εφόσον είναι διαθέσιμα, | + | * **[[https:// |
- | * **acquireUninterruptibly(): | + | * **[[https:// |
- | * **acquireUninterruptibly(int permits):** Δέσμευει Ν permits εφόσον είναι διαθέσιμα, | + | * **[[https:// |
- | * **tryAcquire(): | + | * **[[https:// |
- | * **tryAcquire(int N):** Επιχειρεί να δεσμεύσει N permits, εφόσον αυτό είναι διαθέσιμα. Επιστρέφει **true/ | + | * **[[https:// |
- | * **tryAcquire(int permits, long timeout, TimeUnit unit):** Δέσμευει ένα permit, εφόσον αυτό είναι διαθέσιμο, | + | * **[[https:// |
- | * **drainPermits(): | + | * **[[https:// |
- | * Απελευθέρωση πόρων (release permits): | + | * Απελευθέρωση πόρων (//release permits//): |
- | * **release(): | + | * **[[https:// |
- | * **release(int | + | * **[[https:// |
- | | + | Νήματα που αναμένουν ένα ή περισσότερους πόρους του σηματοφορέα και μπορούν να ικανοποιηθούν από την απελευθέρωση πόρων ξυπνούν και χρονοπρογραμματίζεται η εκτέλεση τους. |
- | + | ||
- | Άλλες χρήσιμες μέθοδοι είναι: **availablePermits**, | + | |
+ | Άλλες χρήσιμες μέθοδοι είναι: | ||
+ | * **[[https:// | ||
+ | * **[[https:// | ||
+ | * **[[https:// | ||
===== Lock ===== | ===== Lock ===== | ||
- | To interface [[http:// | + | To interface |
+ | * **lock():** Επιχειρεί να κλειδώσει την κλειδαριά. Εάν η κλειδαριά είναι κλειδωμένη αναμένει το ξεκλείδωμα της. | ||
+ | * **tryLock(): | ||
+ | * **tryLock(long time, TimeUnit unit):** Επιχειρεί να κλειδώσει τη κλειδαριά. Εάν η κλειδαριά είναι κλειδωμένη αναμένει για μέγιστο χρονικό διάστημα που περιγράφεται από τα ορίσματα της μεθόδου και εάν δεν γίνει διαθέσιμη επιστρέφει χωρίς να κλειδώσει. | ||
+ | * **unlock(): | ||
- | Υλοποίηση του παραπάνω interface είναι η κλάση [[http:// | + | Υλοποίηση του παραπάνω |
===== ReadWriteLock ===== | ===== ReadWriteLock ===== | ||
- | Αντίστοιχα με τo interface | + | Το //interface// [[https:// |
+ | ReadWriteLock]] | ||
- | * ΜΟΝΟ ένα νήμα μπορεί να γράψει. | + | * ΜΟΝΟ ένα νήμα μπορεί να γράψει. Κανένα νήμα δεν μπορεί να διαβάσει. |
* Πολλαπλά νήματα μπορούν να διαβάσουν εάν δεν υπάρχει νήμα που θέλει να γράψει. | * Πολλαπλά νήματα μπορούν να διαβάσουν εάν δεν υπάρχει νήμα που θέλει να γράψει. | ||
Το παραπάνω interface υλοποιείται μεσω της κλάσης [[http:// | Το παραπάνω interface υλοποιείται μεσω της κλάσης [[http:// | ||
- | ===== Atomic Integer, Long, Boolean, Reference ===== | ||
- | ===== Παραδείγματα Χρήσης Εργαλείων Συγχρονισμού ===== | ||
- | ==== Blocking Queue με locks ==== | + | ===== Atomic Integer, Long, Boolean, Reference ===== |
- | Ας υποθέσουμε ότι έχετε | + | Στο πακέτο [[https:// |
- | + | ||
- | Για την επικοινωνία μεταξύ αναγνωστών και εγγραφέων θα χρησιμοποιηθεί ένα blocking queue με σχετικά περιορισμένη χωρητικότητα **Κ**, όπου **K < min(M,N)**. Επίσης για | + | |
- | + | ||
- | Δείτε το παρακάτω παράδειγμα | + | |
- | + | ||
- | <code java UtilConcurrentDemo1.java> | + | |
- | import java.util.concurrent.*; | + | |
- | import java.util.concurrent.locks.*; | + | |
- | import java.io.*; | + | |
- | + | ||
- | public class UtilConcurrentDemo1 { | + | |
- | + | ||
- | public static void main(String args[]) { | + | |
- | BufferedReader in; | + | |
- | FileWriter out; | + | |
- | + | ||
- | if( args.length == 0 ) { | + | |
- | System.err.println(" | + | |
- | } | + | |
- | String filename = args[0]; | + | |
- | + | ||
- | try { | + | |
- | in = new BufferedReader(new FileReader(filename)); | + | |
- | out = new FileWriter(filename+" | + | |
- | } | + | |
- | catch(IOException ex) { | + | |
- | ex.printStackTrace(); | + | |
- | return ; | + | |
- | } | + | |
- | + | ||
- | boolean useLocks = false; | + | |
- | + | ||
- | ArrayBlockingQueue< | + | |
- | ReentrantLock readLock = new ReentrantLock(); | + | |
- | ReentrantLock writeLock = new ReentrantLock(); | + | |
- | FileReaderQueueWriter w1 = new FileReaderQueueWriter(in, | + | |
- | FileReaderQueueWriter w2 = new FileReaderQueueWriter(in, | + | |
- | FileReaderQueueWriter w3 = new FileReaderQueueWriter(in, | + | |
- | FileWriterQueueReader r1 = new FileWriterQueueReader(out, | + | |
- | FileWriterQueueReader r2 = new FileWriterQueueReader(out, | + | |
- | FileWriterQueueReader r3 = new FileWriterQueueReader(out, | + | |
- | w1.start(); | + | |
- | w2.start(); | + | |
- | w3.start(); | + | |
- | r1.start(); | + | |
- | r2.start(); | + | |
- | r3.start(); | + | |
- | } | + | |
- | } | + | |
- | + | ||
- | class FileReaderQueueWriter extends Thread { | + | |
- | BufferedReader in; | + | |
- | BlockingQueue< | + | |
- | ReentrantLock lock; | + | |
- | boolean useLocks; | + | |
- | + | ||
- | public FileReaderQueueWriter(BufferedReader in, BlockingQueue< | + | |
- | this.in = in; | + | |
- | this.queue = queue; | + | |
- | this.lock = lock; | + | |
- | setName(name); | + | |
- | this.useLocks = useLocks; | + | |
- | } | + | |
- | + | ||
- | void printError(String msg) { | + | |
- | System.err.println(" | + | |
- | } | + | |
- | + | ||
- | void printOut(String msg) { | + | |
- | System.out.println(" | + | |
- | } | + | |
- | + | ||
- | public void run() { | + | |
- | + | ||
- | int i=0; String input; | + | |
- | try { | + | |
- | while( true ) { | + | |
- | if( useLocks ) | + | |
- | lock.lock(); | + | |
- | input = in.readLine(); | + | |
- | if( input != null) { | + | |
- | queue.put(input); | + | |
- | + | ||
- | if( useLocks ) | + | |
- | lock.unlock(); | + | |
- | } | + | |
- | else { | + | |
- | if( useLocks ) | + | |
- | lock.unlock(); | + | |
- | break; | + | |
- | } | + | |
- | } | + | |
- | } | + | |
- | catch(IOException ex) { | + | |
- | printError(" | + | |
- | } | + | |
- | catch(InterruptedException ex) { | + | |
- | printError(" | + | |
- | } | + | |
- | finally { | + | |
- | if( useLocks && lock.isHeldByCurrentThread() ) | + | |
- | lock.unlock(); | + | |
- | try { | + | |
- | in.close(); | + | |
- | printError(" | + | |
- | } catch( IOException ex) { | + | |
- | printError(" | + | |
- | } | + | |
- | } | + | |
- | } | + | |
- | } | + | |
- | + | ||
- | class FileWriterQueueReader extends Thread { | + | |
- | FileWriter writer; | + | |
- | BlockingQueue< | + | |
- | ReentrantLock lock; | + | |
- | boolean useLocks; | + | |
- | + | ||
- | public FileWriterQueueReader(FileWriter writer, BlockingQueue< | + | |
- | this.writer = writer; | + | |
- | this.queue = queue; | + | |
- | this.lock = lock; | + | |
- | setName(name); | + | |
- | this.useLocks = useLocks; | + | |
- | } | + | |
- | + | ||
- | void printError(String msg) { | + | |
- | System.err.println(" | + | |
- | } | + | |
- | + | ||
- | void printOut(String msg) { | + | |
- | System.out.println(" | + | |
- | } | + | |
- | + | ||
- | public void run() { | + | |
- | try { | + | |
- | String input; | + | |
- | while( true) { | + | |
- | if( useLocks ) | + | |
- | lock.lock(); | + | |
- | if( (input = queue.poll(500, | + | |
- | input += " | + | |
- | writer.append( input.subSequence(0, | + | |
- | if( useLocks ) | + | |
- | lock.unlock(); | + | |
- | } | + | |
- | else { | + | |
- | if( useLocks ) | + | |
- | lock.unlock(); | + | |
- | break; | + | |
- | } | + | |
- | + | ||
- | } | + | |
- | } catch(IOException ex) { | + | |
- | | + | |
- | } catch(InterruptedException ex) { | + | |
- | | + | |
- | } finally { | + | |
- | if( useLocks && lock.isHeldByCurrentThread() ) | + | |
- | lock.unlock(); | + | |
- | try { | + | |
- | writer.close(); | + | |
- | printError(" | + | |
- | } catch( IOException ex) { | + | |
- | printError(" | + | |
- | } | + | |
- | } | + | |
- | + | ||
- | } | + | |
- | } | + | |
- | </ | + | |
- | + | ||
- | ==== Υλοποίηση μίας διασυνδεδεμένης λίστας η οποία είναι συγχρονισμένη ==== | + | |
- | + | ||
- | Όπως ίσως θα γνωρίζετε η διασυνδεδεμένες λίστες που υπάρχουν στο πακέτο java.util δεν είναι συγχρονισμένες, δηλαδή δεν | + | |
- | + | ||
- | Δείτε πως διαμορφώνεται η κλάση καθώς και ένα παράδειγμα χρήσης της κλάσης αυτής. | + | |
- | + | ||
- | <code java UtilConcurrentDemo2.java> | + | |
- | import java.util.*; | + | |
- | import java.util.concurrent.*; | + | |
- | import java.util.concurrent.locks.*; | + | |
- | + | ||
- | class SynchronizedList< | + | |
- | + | ||
- | ArrayList< | + | |
- | ReentrantReadWriteLock lock; | + | |
- | Lock rlock, wlock; | + | |
- | + | ||
- | public SynchronizedList() { | + | |
- | list = new ArrayList<> | + | |
- | lock = new ReentrantReadWriteLock(); | + | |
- | rlock = lock.readLock(); | + | |
- | wlock = lock.writeLock(); | + | |
- | } | + | |
- | + | ||
- | public SynchronizedList(int capacity) { | + | |
- | list = new ArrayList<> | + | |
- | lock = new ReentrantReadWriteLock(); | + | |
- | rlock = lock.readLock(); | + | |
- | wlock = lock.writeLock(); | + | |
- | } | + | |
- | + | ||
- | public void listWriteLock() { | + | |
- | wlock.lock(); | + | |
- | } | + | |
- | + | ||
- | public void listWriteUnlock() { | + | |
- | wlock.unlock(); | + | |
- | } | + | |
- | + | ||
- | public void listReadLock() { | + | |
- | rlock.lock(); | + | |
- | } | + | |
- | + | ||
- | public void listReadUnlock() { | + | |
- | rlock.unlock(); | + | |
- | } | + | |
- | + | ||
- | public void add(E e) { | + | |
- | wlock.lock(); | + | |
- | list.add(e); | + | |
- | wlock.unlock(); | + | |
- | } | + | |
- | + | ||
- | public void add(int index, E e) { | + | |
- | wlock.lock(); | + | |
- | list.add(index, | + | |
- | wlock.unlock(); | + | |
- | } | + | |
- | + | ||
- | public boolean addAll(Collection<? | + | |
- | wlock.lock(); | + | |
- | boolean result = list.addAll(c); | + | |
- | wlock.unlock(); | + | |
- | return result; | + | |
- | } | + | |
- | + | ||
- | public void clear() { | + | |
- | wlock.lock(); | + | |
- | list.clear(); | + | |
- | wlock.unlock(); | + | |
- | } | + | |
- | + | ||
- | public boolean contains(Object o) { | + | |
- | rlock.lock(); | + | |
- | boolean result = list.contains(o); | + | |
- | rlock.unlock(); | + | |
- | return result; | + | |
- | } | + | |
- | + | ||
- | public void ensureCapacity(int minCapacity) { | + | |
- | wlock.lock(); | + | |
- | list.ensureCapacity(minCapacity); | + | |
- | wlock.unlock(); | + | |
- | } | + | |
- | + | ||
- | public E get(int index) { | + | |
- | rlock.lock(); | + | |
- | E e = list.get(index); | + | |
- | rlock.unlock(); | + | |
- | return e; | + | |
- | } | + | |
- | + | ||
- | public int indexOf(Object o) { | + | |
- | rlock.lock(); | + | |
- | int index = list.indexOf(o); | + | |
- | rlock.unlock(); | + | |
- | return index; | + | |
- | } | + | |
- | + | ||
- | public boolean isEmpty() { | + | |
- | rlock.lock(); | + | |
- | boolean empty = list.isEmpty(); | + | |
- | rlock.unlock(); | + | |
- | return empty; | + | |
- | } | + | |
- | + | ||
- | public int lastIndexOf(Object o) { | + | |
- | rlock.lock(); | + | |
- | int lastIndex = list.lastIndexOf(o); | + | |
- | rlock.unlock(); | + | |
- | return lastIndex; | + | |
- | } | + | |
- | + | ||
- | public E remove(int index) { | + | |
- | wlock.lock(); | + | |
- | E e = list.remove(index); | + | |
- | wlock.unlock(); | + | |
- | return e; | + | |
- | } | + | |
- | + | ||
- | public E set(int index, E element) { | + | |
- | wlock.lock(); | + | |
- | E e = list.set(index, | + | |
- | wlock.unlock(); | + | |
- | return e; | + | |
- | } | + | |
- | + | ||
- | public int size() { | + | |
- | rlock.lock(); | + | |
- | int lsize = list.size(); | + | |
- | rlock.unlock(); | + | |
- | return lsize; | + | |
- | } | + | |
- | + | ||
- | List< | + | |
- | rlock.lock(); | + | |
- | List< | + | |
- | rlock.unlock(); | + | |
- | return newlist; | + | |
- | } | + | |
- | + | ||
- | class SyncrhonizedIterator implements Iterator< | + | |
- | Iterator it; | + | |
- | Lock rlock; | + | |
- | Lock wlock; | + | |
- | + | ||
- | public SyncrhonizedIterator(Lock readlock, Lock writelock) { | + | |
- | rlock = readlock; | + | |
- | wlock = writelock; | + | |
- | it = list.iterator(); | + | |
- | } | + | |
- | + | ||
- | public boolean hasNext() { | + | |
- | rlock.lock(); | + | |
- | boolean hasnext = it.hasNext(); | + | |
- | rlock.unlock(); | + | |
- | return hasnext; | + | |
- | } | + | |
- | + | ||
- | public E next() { | + | |
- | rlock.lock(); | + | |
- | Object o = it.next(); | + | |
- | rlock.unlock(); | + | |
- | return (E)o; | + | |
- | } | + | |
- | + | ||
- | public void remove() { | + | |
- | wlock.lock(); | + | |
- | it.remove(); | + | |
- | wlock.unlock(); | + | |
- | } | + | |
- | } | + | |
- | } | + | |
- | + | ||
- | class ListModifierThread extends Thread { | + | |
- | Random rand; | + | |
- | SynchronizedList< | + | |
- | + | ||
- | public ListModifierThread(SynchronizedList< | + | |
- | this.list = list; | + | |
- | rand = new Random( new Date().getTime() ); | + | |
- | } | + | |
- | + | ||
- | public void run() { | + | |
- | for(int i=0; i<1000; i++) | + | |
- | list.add( rand.nextInt(1000) ); | + | |
- | list.listWriteLock(); | + | * [[https:// |
- | while( list.size() > 0 ) | + | * [[https://docs.oracle.com/ |
- | System.out.println( list.remove( list.size()-1 ) ); | + | * [[https:// |
- | + | * [[https:// | |
- | if( list.isEmpty() ) | + | * [[https:// |
- | System.out.println( this.getName() +": list is empty!" | + | * [[https:// |
- | else | + | * [[https:// |
- | System.out.println( this.getName() +": list is NOT empty!" | + | |
- | list.listWriteUnlock(); | + | |
- | } | + | |
- | } | + | |
- | + | ||
- | class ListSynchronizer { | + | |
- | public static void main(String []args) { | + | |
- | SynchronizedList< | + | |
- | + | ||
- | for(int i=0; i<10; i++) { | + | |
- | (new ListModifierThread(list)).start(); | + | |
- | } | + | |
- | } | + | |
- | } | + | |
- | </ | + | |
- | + | ||
- | <WRAP tip 80% center round> | + | |
- | Αντί για τη συγχρονισμένη υλοποίηση | + | |
- | </ | + | |
- | + | ||
- | + | ||
- | + | ||
- | + | ||
- | + | ||
- | + | ||
+ | Οι βασικές μέθοδοι που διαθέτουν οι παραπάνω κλάσεις είναι οι εξής: | ||
+ | * **compareAndSet: | ||
+ | * **get:** Επιστρέφει την τρέχουσα τιμή. | ||
+ | * **getAndSet: | ||
+ | * **lazySet: | ||
+ | * **set:** Θέτει μία νέα τιμή για την μεταβλητή. | ||
java/java_util_concurrent.1490081981.txt.gz · Last modified: 2017/03/21 07:39 (external edit)