зачекайте та повідомте () Методи в Java

1. Вступ

У цій статті ми розглянемо один з найбільш фундаментальних механізмів Java - синхронізацію потоків.

Спочатку ми обговоримо деякі основні терміни та методології, пов’язані з паралельністю.

І ми розробимо простий додаток - де ми будемо мати справу з проблемами паралельності, з метою кращого розуміння wait () та notify ().

2. Синхронізація потоків у Java

У багатопотоковому середовищі кілька потоків можуть спробувати змінити один і той же ресурс. Якщо потоками не керувати належним чином, це, звичайно, призведе до проблем узгодженості.

2.1. Захищені блоки в Java

Один інструмент, який ми можемо використовувати для координації дій кількох потоків на Java - це захищені блоки. Такі блоки зберігають перевірку на певний стан перед відновленням виконання.

З огляду на це, ми використаємо:

  • Object.wait () - для призупинення потоку
  • Object.notify () - для пробудження потоку

Це можна краще зрозуміти з наступної схеми, яка зображує життєвий цикл нитки :

Зверніть увагу, що існує безліч способів контролювати цей життєвий цикл; однак у цій статті ми зосередимося лише на wait () та notify ().

3. Очікування () Метод

Простіше кажучи, коли ми викликаємо wait () - це змушує поточний потік чекати, поки якийсь інший потік викличе notify () або notifyAll () на тому самому об'єкті.

Для цього поточний потік повинен володіти монітором об’єкта. На думку Javadocs, це може статися, коли:

  • ми виконали метод синхронізованого екземпляра для даного об’єкта
  • ми виконали тіло синхронізованого блоку для даного об'єкта
  • виконуючи синхронізовані статичні методи для об'єктів типу Class

Зауважте, що лише один активний потік може одночасно володіти монітором об’єкта.

Цей метод wait () постачається з трьома перевантаженими підписами. Давайте подивимось на них.

3.1. почекай ()

Метод wait () змушує поточний потік нескінченно чекати, поки інший потік не викличе notify () для цього об’єкта або notifyAll () .

3.2. чекати (довгий тайм-аут)

За допомогою цього методу ми можемо вказати тайм-аут, після якого потік буде автоматично пробуджуватися. Потік можна пробудити до досягнення тайм-ауту за допомогою notify () або notifyAll ().

Зверніть увагу, що виклик очікування (0) - це те саме, що виклик очікування ().

3.3. зачекати (довгий тайм-аут, int nanos)

Це ще один підпис, що забезпечує ту саму функціональність, з тією лише різницею, що ми можемо забезпечити більш високу точність.

Загальний період очікування (в наносекундах) обчислюється як 1_000_000 * час очікування + нанос.

4. notify () та notifyAll ()

Метод notify () використовується для пробудження потоків, які чекають доступу до монітора цього об'єкта.

Існує два способи повідомлення потоків очікування.

4.1. сповістити ()

Для всіх потоків, що очікують на моніторі цього об’єкта (за допомогою будь-якого з методів wait () ), метод notify () повідомляє будь-який з них про довільне пробудження. Вибір, який саме потік пробудити, є недетермінованим і залежить від реалізації.

Оскільки notify () пробуджує один випадковий потік, його можна використовувати для реалізації взаємовиключного блокування там, де потоки виконують подібні завдання, але в більшості випадків було б більш життєздатним реалізувати notifyAll () .

4.2. notifyAll ()

Цей метод просто пробуджує всі потоки, які очікують на моніторі цього об'єкта.

Пробуджені нитки завершаться звичайним способом - як і будь-яка інша нитка.

Але перш ніж ми дозволимо продовжувати їх виконання, завжди визначайте швидку перевірку умови, необхідної для продовження роботи з потоком - тому що можуть бути деякі ситуації, коли нитка прокинулася без отримання сповіщення (цей сценарій обговорюється далі в прикладі) .

5. Проблема синхронізації відправника та одержувача

Тепер, коли ми зрозуміли основи, давайте пройдемо просту програму відправника - одержувача, яка використовуватиме методи wait () та notify () для налаштування синхронізації між ними:

  • Відправник повинен послати пакет даних в приймач
  • Приймач не може обробляти пакет даних до тих пір , відправник не буде завершеним відправкою
  • Подібним чином відправник не повинен намагатися надіслати інший пакет, якщо одержувач вже не обробив попередній пакет

Let's first create Data class that consists of the data packet that will be sent from Sender to Receiver. We'll use wait() and notifyAll() to set up synchronization between them:

public class Data { private String packet; // True if receiver should wait // False if sender should wait private boolean transfer = true; public synchronized void send(String packet) { while (!transfer) { try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } transfer = false; this.packet = packet; notifyAll(); } public synchronized String receive() { while (transfer) { try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } transfer = true; notifyAll(); return packet; } }

Let's break down what's going on here:

  • The packet variable denotes the data that is being transferred over the network
  • We have a boolean variable transfer – which the Sender and Receiver will use for synchronization:
    • If this variable is true, then the Receiver should wait for Sender to send the message
    • If it's false, then Sender should wait for Receiver to receive the message
  • The Sender uses send() method to send data to the Receiver:
    • If transfer is false, we'll wait by calling wait() on this thread
    • But when it is true, we toggle the status, set our message and call notifyAll() to wake up other threads to specify that a significant event has occurred and they can check if they can continue execution
  • Similarly, the Receiver will use receive() method:
    • If the transfer was set to false by Sender, then only it will proceed, otherwise we'll call wait() on this thread
    • When the condition is met, we toggle the status, notify all waiting threads to wake up and return the data packet that was Receiver

5.1. Why Enclose wait() in a while Loop?

Since notify() and notifyAll() randomly wakes up threads that are waiting on this object's monitor, it's not always important that the condition is met. Sometimes it can happen that the thread is woken up, but the condition isn't actually satisfied yet.

We can also define a check to save us from spurious wakeups – where a thread can wake up from waiting without ever having received a notification.

5.2. Why Do We Need to Synchronize send() and receive() Methods?

We placed these methods inside synchronized methods to provide intrinsic locks. If a thread calling wait() method does not own the inherent lock, an error will be thrown.

We'll now create Sender and Receiver and implement the Runnable interface on both so that their instances can be executed by a thread.

Let's first see how Sender will work:

public class Sender implements Runnable { private Data data; // standard constructors public void run() { String packets[] = { "First packet", "Second packet", "Third packet", "Fourth packet", "End" }; for (String packet : packets) { data.send(packet); // Thread.sleep() to mimic heavy server-side processing try { Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } } }

For this Sender:

  • We're creating some random data packets that will be sent across the network in packets[] array
  • For each packet, we're merely calling send()
  • Then we're calling Thread.sleep() with random interval to mimic heavy server-side processing

Finally, let's implement our Receiver:

public class Receiver implements Runnable { private Data load; // standard constructors public void run() { for(String receivedMessage = load.receive(); !"End".equals(receivedMessage); receivedMessage = load.receive()) { System.out.println(receivedMessage); // ... try { Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } } }

Here, we're simply calling load.receive() in the loop until we get the last “End” data packet.

Let's now see this application in action:

public static void main(String[] args) { Data data = new Data(); Thread sender = new Thread(new Sender(data)); Thread receiver = new Thread(new Receiver(data)); sender.start(); receiver.start(); }

We'll receive the following output:

First packet Second packet Third packet Fourth packet 

And here we are – we've received all data packets in the right, sequential order and successfully established the correct communication between our sender and receiver.

6. Conclusion

In this article, we discussed some core synchronization concepts in Java; more specifically, we focused on how we can use wait() and notify() to solve interesting synchronization problems. And finally, we went through a code sample where we applied these concepts in practice.

Before we wind down here, it's worth mentioning that all these low-level APIs, such as wait(), notify() and notifyAll() – are traditional methods that work well, but higher-level mechanism are often simpler and better – such as Java's native Lock and Condition interfaces (available in java.util.concurrent.locks package).

Щоб отримати додаткову інформацію про пакет java.util.concurrent , відвідайте наш огляд статті o java.util.concurrent, а Блокування та стан описані в посібнику до java.util.concurrent.Locks тут.

Як завжди, усі фрагменти коду, використані в цій статті, доступні на GitHub.