| Both sides previous revision
Previous revision
Next revision
|
Previous revision
Next revision
Both sides next revision
|
java:java_util_concurrent [2017/03/20 13:54] gthanos [Blocking Queue με locks] |
java:java_util_concurrent [2017/03/21 12:29] gthanos [Atomic Integer, Long, Boolean, Reference] |
| ====== Έτοιμα εργαλεία συγχρονισμού στο πακέτο java.util.concurrent ====== | ====== Έτοιμα εργαλεία συγχρονισμού στο πακέτο java.util.concurrent & java.util.concurrent.locks ====== |
| |
| Το πακέτο [[http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-frame.html|java.util.concurrent]] παρέχει κάποια έτοιμα εργαλεία συγχρονισμού μεταξύ νημάτων στην java. Τα βασικά εργαλεία είναι τα εξής: | Το πακέτο [[http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-frame.html|java.util.concurrent]] παρέχει κάποια έτοιμα εργαλεία συγχρονισμού μεταξύ νημάτων στην java. Τα βασικά εργαλεία είναι τα εξής: |
| ===== Το interface BlockingQueue ===== | ===== Το interface BlockingQueue ===== |
| |
| Ένα //blocking queue// είναι μία ουρά μηνυμάτων (//queue//) στην οποία μπορούν να γράφουν ή να διαβάζουν από αυτή με ασφάλεια δύο ή περισσότερα νήματα. Όταν η ουρά είναι άδεια και ένα νήμα πάει να διαβάσει αυτό μπλοκάρει μέχρι κάποιο άλλο νήμα να γράψει στην ουρά. Αντίστοιχα, όταν ένα νήμα επιχειρεί να γράψει σε μία γεμάτη ουρά το νήμα μπλοκάρει μέχρι να ελευθερωθεί μία θέση στην ουρά μηνυμάτων από κάποιο άλλο νήμα που διάβασε. Στο παρακάτω σχήμα βλέπετε την σχηματική αναπαράσταση ενός blocking queue. | Ένα //blocking queue// είναι μία ουρά μηνυμάτων (//queue//) στην οποία μπορούν να γράφουν ή να διαβάζουν από αυτή με ασφάλεια δύο ή περισσότερα νήματα. Όταν η ουρά είναι άδεια και ένα νήμα επιχειρήσει να διαβάσει αυτό μπλοκάρει μέχρι κάποιο άλλο νήμα να γράψει στην ουρά. Αντίστοιχα, όταν ένα νήμα επιχειρεί να γράψει σε μία γεμάτη ουρά το νήμα μπλοκάρει μέχρι να ελευθερωθεί μία θέση στην ουρά μηνυμάτων από κάποιο άλλο νήμα που διάβασε. Στο παρακάτω σχήμα βλέπετε την σχηματική αναπαράσταση ενός blocking queue. |
| {{ :java:design_and_implement_a_blocking_queue-600x0.png?450 }} | |
| | {{ :java:blockingqueue.png? }} |
| |
| Οι βασικές μέθοδοι που υποστηρίζονται από ένα //blocking queue// είναι οι εξής: | Οι βασικές μέθοδοι που υποστηρίζονται από ένα //blocking queue// είναι οι εξής: |
| |
| ^ ^ Throws exception ^ Special value ^ Blocks ^ Times out ^ | ^ ^ Throws exception ^ Special value ^ Blocks ^ Times out ^ |
| | **Insert** | add(e) | offer(e) | put(e) | offer(e, time, unit) | | | **Insert** | [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html#add(E)|add(e)]] | [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html#offer(E)|offer(e)]] | [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html#put(E)|put(e)]] | [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html#offer(E,%20long,%20java.util.concurrent.TimeUnit)|offer(e, time, unit)]] | |
| | **Remove** | remove() | poll() | take() | poll(time, unit) | | | **Remove** | [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html#remove(java.lang.Object)|remove()]] | [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html#poll(long,%20java.util.concurrent.TimeUnit)|poll()]] | [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html#take()|take()]] | [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html#poll(long,%20java.util.concurrent.TimeUnit)|poll(time, unit)]] | |
| | **Examine** | element() | peek() | not applicable | not applicable | | | **Examine** | [[https://docs.oracle.com/javase/7/docs/api/java/util/Queue.html#element()|element()]] | [[https://docs.oracle.com/javase/7/docs/api/java/util/Queue.html#peek()|peek()]] | not applicable | not applicable | |
| | |
| | Όλες οι παραπάνω μέθοδοι εξασφαλίζουν ότι μόνο ένα νήμα θα προσπελάσει τα κρίσιμα τμήματα κώδικα της δομή blocking queue (//thread-safe//), χωρίς όμως να μπορούν να προδιαγράψουν τη σειρά εκτέλεσης των νημάτων. |
| |
| Υλοποιήσεις του interface Blocking Queue: | Υλοποιήσεις του interface Blocking Queue: |
| ===== Semaphore ===== | ===== Semaphore ===== |
| |
| Στο πακέτο [[http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-frame.html|java.util.concurrent]] υλοποιείται η κλάση του [[java:semaphores|σηματοφορέα]] έτοιμη προς χρήση. Οι βασικές μέθοδοι για τον σηματοφορέα είναι: | Στο πακέτο **[[http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-frame.html|java.util.concurrent]]** υλοποιείται η κλάση του [[java:semaphores|σηματοφορέα]] έτοιμη προς χρήση. Οι βασικές μέθοδοι για τον σηματοφορέα είναι: |
| |
| * Δέσμευση πόρων (acquire permits): | * Δέσμευση πόρων (//acquire permits//): |
| * **acquire():** Δέσμευει ένα permit, εφόσον αυτό είναι διαθέσιμο, διαφορετικά το νήμα μπλοκάρει. Εάν κατά την αναμονή το νήμα διακοπεί από άλλο νήμα τότε η μέθοδος "πετάει" [[https://docs.oracle.com/javase/7/docs/api/java/lang/InterruptedException.html|InterruptedException]]. | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#acquire()|acquire()]]:** Δέσμευει ένα permit, εφόσον αυτό είναι διαθέσιμο, διαφορετικά το νήμα μπλοκάρει. Εάν κατά την αναμονή το νήμα διακοπεί από άλλο νήμα τότε η μέθοδος "πετάει" [[https://docs.oracle.com/javase/7/docs/api/java/lang/InterruptedException.html|InterruptedException]]. |
| * **acquire(int N):** Δέσμευει Ν permits εφόσον είναι διαθέσιμα, διαφορετικά μπλοκάρει. Εάν κατά την αναμονή το νήμα διακοπεί από άλλο νήμα τότε η μέθοδος "πετάει" [[https://docs.oracle.com/javase/7/docs/api/java/lang/InterruptedException.html|InterruptedException]]. | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#acquire(int)|acquire(int N)]]:** Δέσμευει Ν permits εφόσον είναι διαθέσιμα, διαφορετικά μπλοκάρει. Εάν κατά την αναμονή το νήμα διακοπεί από άλλο νήμα τότε η μέθοδος "πετάει" [[https://docs.oracle.com/javase/7/docs/api/java/lang/InterruptedException.html|InterruptedException]]. |
| * **acquireUninterruptibly():** Δέσμευει ένα permit, εφόσον αυτό είναι διαθέσιμο, διαφορετικά το νήμα μπλοκάρει. Εάν κατά την αναμονή το νήμα διακοπεί από άλλο νήμα τότε αυτό εξακολουθεί και περιμένει. | * **https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#acquireUninterruptibly()|acquireUninterruptibly()]]:** Δέσμευει ένα permit, εφόσον αυτό είναι διαθέσιμο, διαφορετικά το νήμα μπλοκάρει. Εάν κατά την αναμονή το νήμα διακοπεί από άλλο νήμα τότε αυτό εξακολουθεί και περιμένει. |
| * **acquireUninterruptibly(int permits):** Δέσμευει Ν permits εφόσον είναι διαθέσιμα, διαφορετικά μπλοκάρει. Εάν κατά την αναμονή το νήμα διακοπεί από άλλο νήμα τότε αυτό εξακολουθεί και περιμένει. | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#acquireUninterruptibly(int)|acquireUninterruptibly(int permits)]]:** Δέσμευει Ν permits εφόσον είναι διαθέσιμα, διαφορετικά μπλοκάρει. Εάν κατά την αναμονή το νήμα διακοπεί από άλλο νήμα τότε αυτό εξακολουθεί και περιμένει. |
| * **tryAcquire():** Επιχειρεί να δεσμεύσει ένα permit, εφόσον αυτό είναι διαθέσιμο. Επιστρέφει **true/false** σε περίπτωση επιτυχίας/αποτυχίας. | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#tryAcquire()|tryAcquire()]]:** Επιχειρεί να δεσμεύσει ένα permit, εφόσον αυτό είναι διαθέσιμο. Επιστρέφει **true/false** σε περίπτωση επιτυχίας/αποτυχίας. |
| * **tryAcquire(int N):** Επιχειρεί να δεσμεύσει N permits, εφόσον αυτό είναι διαθέσιμα. Επιστρέφει **true/false** σε περίπτωση επιτυχίας/αποτυχίας. | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#tryAcquire(int)|tryAcquire(int N)]]:** Επιχειρεί να δεσμεύσει N permits, εφόσον αυτό είναι διαθέσιμα. Επιστρέφει **true/false** σε περίπτωση επιτυχίας/αποτυχίας. |
| * **tryAcquire(int permits, long timeout, TimeUnit unit):** Δέσμευει ένα permit, εφόσον αυτό είναι διαθέσιμο, διαφορετικά το νήμα μπλοκάρει για χρονικό διάστημα ίσο με //timeout//. Επιστρέφει true εφόσον τα ζητούμενα permits είναι διαθέσιμα και false έαν το χρονικό όριο //timeout// παρέλθει χωρίς τα ζητούμενα permits να γίνουν διαθέσιμα. Εάν κατά την αναμονή το νήμα διακοπεί από άλλο νήμα τότε η μέθοδος "πετάει" [[https://docs.oracle.com/javase/7/docs/api/java/lang/InterruptedException.html|InterruptedException]]. | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#tryAcquire(int,%20long,%20java.util.concurrent.TimeUnit)|tryAcquire(int permits, long timeout, TimeUnit unit)]]:** Δέσμευει ένα permit, εφόσον αυτό είναι διαθέσιμο, διαφορετικά το νήμα μπλοκάρει για χρονικό διάστημα ίσο με //timeout//. Επιστρέφει true εφόσον τα ζητούμενα permits είναι διαθέσιμα και false έαν το χρονικό όριο //timeout// παρέλθει χωρίς τα ζητούμενα permits να γίνουν διαθέσιμα. Εάν κατά την αναμονή το νήμα διακοπεί από άλλο νήμα τότε η μέθοδος "πετάει" [[https://docs.oracle.com/javase/7/docs/api/java/lang/InterruptedException.html|InterruptedException]]. |
| * **drainPermits():** Λαμβάνει όλα τα διαθέσιμα permits. Επιστρέφει τον αριθμό τους. | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#drainPermits()|drainPermits()]]:** Λαμβάνει όλα τα διαθέσιμα permits. Επιστρέφει τον αριθμό τους. |
| * Απελευθέρωση πόρων (release permits): | * Απελευθέρωση πόρων (//release permits//): |
| * **release():** | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#release()|release()]]:** Απελευθερώνει ένα πόρο από τον σηματοφορέα. Δεν υπάρχει καμία προϋπόθεση το νήμα που απελευθερώνει ένα πόρο να τον έχει καταλάβει μέσω της //acquire// προηγούμενα. |
| * **release(int permits):** | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#release(int)|release(int Ν)]]:** Απελευθερώνει **Ν** πόρους από τον σηματοφορέα. Δεν υπάρχει καμία προϋπόθεση το νήμα που απελευθερώνει τους πόρους να τους έχει καταλάβει μέσω της //acquire// προηγούμενα. |
| * **release:** | Νήματα που αναμένουν ένα ή περισσότερους πόρους του σηματοφορέα και μπορούν να ικανοποιηθούν από την απελευθέρωση πόρων ξυπνούν και χρονοπρογραμματίζεται η εκτέλεση τους. |
| | |
| Άλλες χρήσιμες μέθοδοι είναι: **availablePermits**, **getQueuedThreads** | |
| |
| | Άλλες χρήσιμες μέθοδοι είναι: |
| | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#availablePermits()|availablePermits]]:** Επιστρέφει τον αριθμό των διαθέσιμων πόρων του σηματοφορέα. |
| | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#getQueuedThreads()|getQueuedThreads]]:** Επιστρέφει τη λίστα με τα νήματα που περιμένουν να καταλάβουν πόρους του σηματοφορέα. |
| | * **[[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html#getQueueLength()|getQueuedLength]]:** Επιστρέφει τον αριθμό των νημάτων που περιμένουν να καταλάβουν πόρους του σηματοφορέα. |
| ===== Lock ===== | ===== Lock ===== |
| |
| To interface [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Lock.html| java.util.concurrent.locks.Lock]] προδιαγράφει μία κλειδαριά, η οποία επιτρέπει μόνο ένα νήμα να κλειδώσει την κλειδαριά. Όλα τα νήματα που δεν καταφέρνουν να κλειδώσουν παραμένουν ανενεργά μέχρι η κλειδαριά να ξεκλειδώσει. | To interface **[[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Lock.html| java.util.concurrent.locks.Lock]]** προδιαγράφει μία κλειδαριά, η οποία επιτρέπει μόνο σε ένα νήμα κάθε φορά να την κλειδώσει. Όλα τα νήματα που δεν καταφέρνουν να κλειδώσουν παραμένουν ανενεργά μέχρι η κλειδαριά να ξεκλειδώσει. Οι βασικές μέθοδοι που προδιαγράφει το συγκεκριμένο interface είναι οι εξής: |
| | * **lock():** Επιχειρεί να κλειδώσει την κλειδαριά. Εάν η κλειδαριά είναι κλειδωμένη αναμένει το ξεκλείδωμα της. |
| | * **tryLock():** Κλειδώνει την κλειδαριά μόνο εάν αυτή είναι ελεύθερη. Διαφορετικά επιστρέφει χωρίς να κλειδώσει. |
| | * **tryLock(long time, TimeUnit unit):** Επιχειρεί να κλειδώσει τη κλειδαριά. Εάν η κλειδαριά είναι κλειδωμένη αναμένει για μέγιστο χρονικό διάστημα που περιγράφεται από τα ορίσματα της μεθόδου και εάν δεν γίνει διαθέσιμη επιστρέφει χωρίς να κλειδώσει. |
| | * **unlock():** Ξεκλειδώνει την κλειδαριά. |
| | |
| | Υλοποίηση του παραπάνω //interface// είναι η κλάση [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantLock.html|java.util.concurrent.locks.ReentrantLock]], η οποία επιτρέπει το πολλαπλό κλείδωμα της κλειδαριάς από το ίδιο νήμα. Το ξεκλείδωμα της κλειδαριάς πραγματοποιείται μόνον εφόσον το νήμα που κλείδωσε ξεκλειδώσει τόσες φορές όσες έχει προηγούμενα κλειδώσει. Η συγκεκριμένη κλάση δίνει την δυνατότητα του ισότιμου χρονοπρογραμματισμού των νημάτων που περιμένουν, εφόσον κληθεί [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantLock.html#ReentrantLock(boolean)|ο κατασκευαστής]] με όρισμα //true//. |
| |
| Υλοποίηση του παραπάνω interface είναι η κλάση [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantLock.html|java.util.concurrent.locks.ReentrantLock]], η οποία επιτρέπει το πολλαπλό κλείδωμα της κλειδαριάς από το ίδιο νήμα. Το ξεκλείδωμα της κλειδαριάς πραγματοποιείται μόνο εφόσον το νήμα που κλείδωσε ξεκλειδώσει τόσες φόρές όσες έχει προηγούμενα κλειδώσει. | |
| |
| |
| ===== ReadWriteLock ===== | ===== ReadWriteLock ===== |
| |
| Αντίστοιχα με τo interface Lock μόνο που προδιαγράφονται 2 κλειδαριές, μία για διάβασμα και μία για γράψιμο. Η διαφορά με την προηγούμενη κλειδαριά είναι ότι την κλειδαριά που επιτρέπει το διάβασμα μπορούν να την κλειδώσουν παράλληλα πολλά νήματα που θέλουν να διαβάσουν εάν δεν υπάρχει νήμα που θέλει να γράψει. Αντίστοιχα την κλειδαριά για γράψιμο μπορεί να την κλειδώσει μόνο ένα νήμα. Παράλληλα δεν επιτρέπειται η πρόσβαση και στην κλειδαριά που επιτρέπει το διάβασμα. | Το //interface// [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html| |
| | ReadWriteLock]] προδιαγράφει δύο κλειδαριές, μία για διάβασμα και μία για γράψιμο. Η διαφορά με την προηγούμενη κλειδαριά είναι ότι την κλειδαριά που επιτρέπει το διάβασμα μπορούν να την κλειδώσουν παράλληλα πολλά νήματα που θέλουν να διαβάσουν εάν δεν υπάρχει νήμα που θέλει να γράψει. Αντίστοιχα την κλειδαριά που επιτρέπει το γράψιμο μπορεί να την κλειδώσει μόνο ένα νήμα και παράλληλα απαγορεύεται η πρόσβαση στην κλειδαριά που επιτρέπει το διάβασμα. Συνοπτικά: |
| |
| * ΜΟΝΟ ένα νήμα μπορεί να γράψει. | * ΜΟΝΟ ένα νήμα μπορεί να γράψει. Κανένα νήμα δεν μπορεί να διαβάσει. |
| * Πολλαπλά νήματα μπορούν να διαβάσουν εάν δεν υπάρχει νήμα που θέλει να γράψει. | * Πολλαπλά νήματα μπορούν να διαβάσουν εάν δεν υπάρχει νήμα που θέλει να γράψει. |
| |
| Το παραπάνω interface υλοποιείται μεσω της κλάσης [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html||java.util.concurrent.locks.ReentrantReadWriteLock]]. Η κλάση αυτή περιέχει δύο άλλες εσωτερικές κλάσεις την [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.ReadLock.html|ReentrantReadWriteLock.ReadLock]] και [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.WriteLock.html|ReentrantReadWriteLock.WriteLock]]. Η πρώτη χρησιμοποιείται για διάβασμα και η δεύτερη για γράψιμο. | Το παραπάνω interface υλοποιείται μεσω της κλάσης [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html||java.util.concurrent.locks.ReentrantReadWriteLock]]. Η κλάση αυτή περιέχει δύο άλλες εσωτερικές κλάσεις την [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.ReadLock.html|ReentrantReadWriteLock.ReadLock]] και [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.WriteLock.html|ReentrantReadWriteLock.WriteLock]]. Η πρώτη χρησιμοποιείται για διάβασμα και η δεύτερη για γράψιμο. |
| ===== Atomic Integer, Long, Boolean, Reference ===== | ===== Atomic Integer, Long, Boolean, Reference ===== |
| | |
| | Στο πακέτο [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/package-summary.html|java.util.concurrent.atomic]] η γλώσσα παρέχει ισοδύναμους των βασικών τύπων δεδομένων οι οποίοι όμως μπορούν να προσπελαστούν από δύο ή περισσότερα νήματα χωρίς να προκύψει ασάφεια ως προς την τιμή τους. Οι τύποι αυτοί είναι οι εξής: |
| | |
| | * [[AtomicBoolean:]] Μία boolean μεταβλητή η οποία μπορεί να αλλάξει τιμή ατομικά. |
| | * [[AtomicInteger:]] Μία Integer μεταβλητή η οποία μπορεί να αλλάξει τιμή ατομικά. |
| | * [[AtomicIntegerArray:]] Ένας πίνακας από ακεραίους του οποίου τα στοιχεία μπορούν να αλλάξουν τιμή ατομικά. |
| | * [[AtomicLong:]] Μία Long μεταβλητή η οποία μπορεί να αλλάξει τιμή ατομικά. |
| | * [[AtomicLongArray:]] Ένας πίνακας από ακεραίους τύπου Long του οποίου τα στοιχεία μπορούν να αλλάξουν τιμή ατομικά. |
| | * [[AtomicReference:]] Μία αναφορά σε αντικείμενο η οποία μπορεί να αλλάξει ατομικά. |
| | * [[AtomicReferenceArray:]] Ένας πίνακας από αναφορές του οποίου τα στοιχεία μπορούν να αλλάξουν τιμή ατομικά. |
| | |
| | Οι βασικές μέθοδοι που διαθέτουν οι παραπάνω κλάσεις είναι οι εξής: |
| | * **compareAndSet:** Η μέθοδος αλλάζει την τιμή της μεταβλητής με την προϋπόθεση ότι η υφιστάμενη τιμή ταυτίζεται με το 1ο όρισμα της μεθόδου. |
| | * **get:** Επιστρέφει την τρέχουσα τιμή. |
| | * **getAndSet:** Επιστρέφει την υφιστάμενη τιμή και την ανανεώνει με μία νέα. |
| | * **lazySet:** Θέτει μία νέα τιμή για την μεταβλητή. |
| | * **set:** Θέτει μία νέα τιμή για την μεταβλητή. |
| | |
| |
| ===== Παραδείγματα Χρήσης Εργαλείων Συγχρονισμού ===== | ===== Παραδείγματα Χρήσης Εργαλείων Συγχρονισμού ===== |
| |
| ==== Blocking Queue με locks ==== | ===== Blocking Queue με Locks ===== |
| |
| Ας υποθέσουμε ότι έχετε το αρχείο κειμένου {{:java:largedict.txt.zip|largedict.txt}} για το οποίο θέλετε να κάνετε τα εξής. Έχετε **Ν** νήματα που διαβάζουν από το αρχείο και **Μ** νήματα που στο νέο αρχείο. Θέλετε το νέο αρχείο που θα γραφεί να είναι ίδιο με το αρχικό αρχείο που διαβάστηκε. Το αρχείο κειμένου διαβάζεται και γράφεται γραμμή-γραμμή. | Ας υποθέσουμε ότι έχετε το αρχείο κειμένου {{:java:largedict.txt.zip|largedict.txt}} για το οποίο θέλετε να κάνετε τα εξής. Έχετε **Ν** νήματα που διαβάζουν από το αρχείο και **Μ** νήματα που στο νέο αρχείο. Θέλετε το νέο αρχείο που θα γραφεί να είναι ίδιο με το αρχικό αρχείο που διαβάστηκε. Το αρχείο κειμένου διαβάζεται και γράφεται γραμμή-γραμμή. |
| |
| Για την επικοινωνία μεταξύ αναγνωστών και εγγραφέων θα χρησιμοποιηθεί ένα blocking queue με σχετικά περιορισμένη χωρητικότητα **Κ**, όπου **K < min(M,N)**. | Για την επικοινωνία μεταξύ αναγνωστών και εγγραφέων θα χρησιμοποιηθεί ένα blocking queue με σχετικά περιορισμένη χωρητικότητα **Κ**, όπου **K < min(M,N)**. Επίσης για την υλοποίηση του συγχρονισμού διαθέτουμε δύο κλειδαριές **α)** μία κλειδαριά η οποία βεβαιώνει ότι μόνο ένα νήμα διαβάζει από το αρχείο και γράφει στο //Blocking Queue// και **β)** μία κλειδαριά η οποία βεβαιώνει ότι μόνο ένα νήμα διαβάζει από το //Blocking Queue// και γράφει στο νέο αρχείο. |
| |
| Δείτε το παρακάτω παράδειγμα κώδικα που υλοποιεί τον συγχρονισμό μεταξύ των νημάτων ανάγνωσης και εγγραφής. | <WRAP tip 80% center round> |
| | Για τον παρακάτω παράδειγμα κώδικα που υλοποιεί τον συγχρονισμό μεταξύ των νημάτων ανάγνωσης και εγγραφής, μεταγλωττίστε και εκτελέστε αρχικά απενεργοποιώντας τις δύο κλειδαριές και στη συνέχεια ενεργοποιώντας τες. Ποια διαφορά παρατηρείτε στο τελικό αρχείο στην μία και την άλλη περίπτωση. Σε τι οφείλεται η διαφορά αυτή; |
| | </WRAP> |
| |
| <code java UtilConcurrentDemo1.java> | <code java BlockingQueueWithLocks.java> |
| import java.util.concurrent.*; | import java.util.concurrent.*; |
| import java.util.concurrent.locks.*; | import java.util.concurrent.locks.*; |
| import java.io.*; | import java.io.*; |
| |
| public class UtilConcurrentDemo1 { | public class BlockingQueueWithLocks { |
| | |
| public static void main(String args[]) { | public static void main(String args[]) { |
| } | } |
| | |
| boolean useLocks = false; | boolean useLocks = true; |
| | |
| ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2); | ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2); |
| ==== Υλοποίηση μίας διασυνδεδεμένης λίστας η οποία είναι συγχρονισμένη ==== | ==== Υλοποίηση μίας διασυνδεδεμένης λίστας η οποία είναι συγχρονισμένη ==== |
| |
| Όπως ίσως θα γνωρίζετε η διασυνδεδεμένες λίστες που υπάρχουν στο πακέτο java.util δεν είναι συγχρονισμένες, δηλαδή δεν μπορούν να προσπελαστούν ταυτόχρονα από δύο ή περισσότερα νήματα. Προκειμένου να μπορούμε να προσπελάσουμε μία λίστα από περισσότερα του ενός νήματα δημιουργούμε την κλάση SynchronizedList η οποία χρησιμοποιεί ένα [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.ReadLock.html|java.util.concurrent.locks.ReentrantLock.ReadLock]] για διάβασμα και ένα [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.ReadLock.html|ReentrantReadWriteLock.WriteLock.html]] για μεταβολή του περιεχομένου της λίστας. | Όπως ίσως θα γνωρίζετε η διασυνδεδεμένες λίστες που υπάρχουν στο πακέτο **[[https://docs.oracle.com/javase/7/docs/api/java/util/package-frame.html|java.util]]** δεν είναι συγχρονισμένες, δηλαδή δεν μπορούν να παρέχουν καμία εγγύηση ότι εάν προσπελαστούν ταυτόχρονα από δύο ή περισσότερα νήματα η δομή θα διατηρήσει την εσωτερική της συνοχή και το περιεχόμενο της θα είναι σωστό. Προκειμένου να μπορούμε να προσπελάσουμε μία λίστα από περισσότερα του ενός νήματα δημιουργούμε την κλάση SynchronizedList η οποία χρησιμοποιεί ένα [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.ReadLock.html|java.util.concurrent.locks.ReentrantLock.ReadLock]] για διάβασμα και ένα [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.ReadLock.html|ReentrantReadWriteLock.WriteLock.html]] για μεταβολή του περιεχομένου της λίστας. |
| |
| Δείτε πως διαμορφώνεται η κλάση καθώς και ένα παράδειγμα χρήσης της κλάσης αυτής. | Δείτε πως διαμορφώνεται η κλάση καθώς και ένα παράδειγμα χρήσης της κλάσης αυτής. |
| |
| <code java UtilConcurrentDemo2.java> | <code java SynchronizedList.java> |
| import java.util.*; | import java.util.*; |
| import java.util.concurrent.*; | import java.util.concurrent.*; |
| Random rand; | Random rand; |
| SynchronizedList<Integer> list; | SynchronizedList<Integer> list; |
| | //ArrayList<Integer> list; |
| | |
| public ListModifierThread(SynchronizedList<Integer> list) { | public ListModifierThread(SynchronizedList<Integer> list) { |
| | //public ListModifierThread(ArrayList<Integer> list) { |
| this.list = list; | this.list = list; |
| rand = new Random( new Date().getTime() ); | rand = new Random( new Date().getTime() ); |
| list.add( rand.nextInt(1000) ); | list.add( rand.nextInt(1000) ); |
| | |
| list.listWriteLock(); | for(int i=0; i<1000; i++) |
| while( list.size() > 0 ) | list.remove(0); |
| System.out.println( list.remove( list.size()-1 ) ); | |
| | |
| if( list.isEmpty() ) | if( list.isEmpty() ) |
| else | else |
| System.out.println( this.getName() +": list is NOT empty!"); | System.out.println( this.getName() +": list is NOT empty!"); |
| list.listWriteUnlock(); | |
| } | } |
| } | } |
| public static void main(String []args) { | public static void main(String []args) { |
| SynchronizedList<Integer> list = new SynchronizedList<>(); | SynchronizedList<Integer> list = new SynchronizedList<>(); |
| | //ArrayList<Integer> list = new ArrayList<>(); |
| | |
| for(int i=0; i<10; i++) { | for(int i=0; i<10; i++) { |