Menggunakan Atomics.wait (), Atomics.notify () dan Atomics.waitAsync ()

Metode statis Atomics.wait () dan Atomics.notify () adalah primitif sinkronisasi tingkat rendah yang dapat digunakan untuk mengimplementasikan mutex dan mekanisme serupa lainnya. Tetapi, karena metode Atomics.wait()ini memblokir, itu tidak dapat dipanggil di utas utama (jika Anda mencoba melakukan ini, kesalahan akan muncul TypeError).



Mesin V8 sejak versi 8.7 mendukung opsi non-pemblokiran yang Atomics.wait()disebut Atomics.waitAsync () . Metode baru ini dapat digunakan di utas utama. Hari ini kami akan menunjukkan kepada Anda cara menggunakan API tingkat rendah ini untuk membuat mutex yang dapat berjalan secara sinkron (di thread pekerja) dan secara asinkron (di thread pekerja atau di thread utama).











Atomics.wait () dan Atomics.waitAsync ()



Metode Atomics.wait()dan Atomics.waitAsync()ambil parameter berikut:



  • buffer: array tipe Int32Arrayatau BigInt64Array, yang didasarkan pada SharedArrayBuffer.
  • index: indeks sebenarnya dari elemen dalam array.
  • expectedValue: nilai yang kami harapkan akan terwakili dalam memori, di lokasi yang dijelaskan dengan bufferdan index.
  • timeout: batas waktu dalam milidetik (opsional, defaultnya Infinity).


Atomics.wait()mengembalikan sebuah string. Jika nilai yang diharapkan tidak ditemukan di lokasi memori yang ditentukan, itu Atomics.wait()segera keluar, mengembalikan string not-equal. Jika tidak, utas akan diblokir. Salah satu kejadian berikut harus terjadi agar kunci dibuka. Yang pertama adalah panggilan dari utas lain dari suatu metode Atomics.notify()dengan indikasi tempat dalam memori yang diminati metode tersebut Atomics.wait(). Yang kedua adalah berakhirnya batas waktu. Dalam kasus pertama, ini akan Atomics.wait()mengembalikan string ok, di kasus kedua - nilai string timed-out.



Metode tersebut Atomics.notify()mengambil parameter berikut:



  • typedArray: array tipe Int32Arrayatau BigInt64Array, yang didasarkan pada SharedArrayBuffer.
  • index: indeks sebenarnya dari elemen dalam array.
  • count: jumlah agen yang menunggu pemberitahuan (parameter opsional, disetel ke default Infinity).


Metode ini Atomics.notify()memberi tahu sejumlah agen yang menunggu pemberitahuan di alamat yang dijelaskan typedArraydan indexmelewati mereka dalam urutan FIFO. Jika beberapa panggilan telah dibuat Atomics.wait()atau Atomics.waitAsync()menonton di tempat yang sama dalam memori, maka semuanya berakhir dalam antrian yang sama.



Tidak seperti metode Atomics.wait(), metode Atomics.waitAsync()langsung mengembalikan nilai di lokasi panggilan. Ini bisa menjadi salah satu dari nilai berikut:



  • { async: false, value: 'not-equal' } - jika lokasi memori yang ditentukan tidak mengandung nilai yang diharapkan.
  • { async: false, value: 'timed-out' } - hanya jika batas waktu disetel ke 0.
  • { async: true, value: promise } - dalam kasus lain.


Sebuah janji, setelah beberapa waktu, bisa berhasil diselesaikan dengan nilai string ok(jika sebuah metode dipanggil Atomics.notify(), yang memberikan informasi tentang tempat di memori yang diteruskan Atomics.waitAsync()). Itu bisa diselesaikan dengan nilai timed-out. Janji ini tidak pernah ditolak.



Contoh berikut menunjukkan dasar-dasar penggunaan Atomics.waitAsync():



const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
//                                     |  |  ^ - ()
//                                     |  ^  
//                                     ^ 

if (result.value === 'not-equal') {
  //   SharedArrayBuffer   .
} else {
  result.value instanceof Promise; // true
  result.value.then(
    (value) => {
      if (value == 'ok') { /*   */ }
      else { /*  - */ }
    });
}

//      :
Atomics.notify(i32a, 0);


Sekarang mari kita bicara tentang cara membuat mutex yang dapat digunakan dalam mode sinkron dan asinkron. Perlu dicatat bahwa implementasi versi sinkron mutex telah dibahas sebelumnya. Misalnya - dalam materi ini .



Dalam contoh ini, kami tidak akan menggunakan parameter timeoutsaat memanggil Atomics.wait()dan Atomics.waitAsync(). Parameter ini dapat digunakan untuk mengimplementasikan persyaratan terkait waktu tunggu.



Kelas kami yang AsyncLockmewakili mutex bekerja dengan buffer SharedArrayBufferdan mengimplementasikan metode berikut:



  • lock(): memblokir utas hingga kami memiliki kesempatan untuk menangkap mutex (hanya berlaku di utas pekerja).
  • unlock(): membebaskan mutex (yang ini sebaliknya lock()).
  • executeLocked(callback): mencoba mendapatkan kunci tanpa memblokir utas. Metode ini dapat digunakan di utas utama. Ia berencana untuk mengeksekusi callback pada saat kami dapat memperoleh kunci.


Mari kita lihat bagaimana metode ini dapat diterapkan. Deklarasi kelas menyertakan konstanta dan konstruktor yang membutuhkan buffer SharedArrayBuffer.



class AsyncLock {
  static INDEX = 0;
  static UNLOCKED = 0;
  static LOCKED = 1;

  constructor(sab) {
    this.sab = sab;
    this.i32a = new Int32Array(sab);
  }

  lock() {
    /* … */
  }

  unlock() {
    /* … */
  }

  executeLocked(f) {
    /* … */
  }
}


Di sini elemen i32a[0]berisi nilai LOCKEDatau UNLOCKED. Dia, sebagai tambahan, mewakili tempat dalam ingatan yang menarik Atomics.wait()dan Atomics.waitAsync(). Kelas AsyncLockmenyediakan kemampuan dasar berikut:



  1. i32a[0] == LOCKEDdan utas dalam keadaan menunggu (setelah dipanggil Atomics.wait()atau Atomics.waitAsync()), menonton i32a[0], akhirnya akan diberi tahu.
  2. Setelah utas diberi tahu, utas akan mencoba mendapatkan kunci. Jika berhasil, maka, saat membuka kunci, ia akan memanggil Atomics.notify().


Pengambilan dan pelepasan kunci sinkron



Pertimbangkan kode untuk metode lock()yang hanya bisa dipanggil dari thread pekerja.



lock() {
  while (true) {
    const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                        /*   >>> */  AsyncLock.UNLOCKED,
                        /*   >>> */  AsyncLock.LOCKED);
    if (oldValue == AsyncLock.UNLOCKED) {
      return;
    }
    Atomics.wait(this.i32a, AsyncLock.INDEX,
                 AsyncLock.LOCKED); // <<< ,    
  }
}


Ketika sebuah metode dipanggil dari utas lock(), metode ini pertama kali mencoba mendapatkan kunci, menggunakannya Atomics.compareExchange()untuk mengubah status kunci dari UNLOCKEDmenjadi LOCKED. Metode ini Atomics.compareExchange()mencoba melakukan operasi atom untuk mengubah status kunci; metode ini mengembalikan nilai asli yang terletak di area memori yang ditentukan. Jika nilai aslinya adalah UNLOCKED, maka kita tahu bahwa perubahan status berhasil dan bahwa utas telah memperoleh kunci. Anda tidak perlu melakukan apapun.



Jika tidak Atomics.compareExchange()dapat mengubah status kunci, itu berarti ada utas lain yang menahan kunci. Akibatnya, utas tempat metode dipanggil lock()mencoba menggunakan metode tersebutAtomics.wait()untuk menunggu sampai kunci dilepaskan oleh utas lain. Jika nilai yang diharapkan masih disimpan di area memori yang diinginkan (dalam kasus kami - AsyncLock.LOCKED), maka panggilan akan Atomics.wait()memblokir utas. Pengembalian dari Atomics.wait()hanya akan terjadi ketika utas lain memanggil Atomics.notify().



Metode ini unlock()melepaskan kunci dengan menyetelnya di status UNLOCKEDdan memanggilnya Atomics.notify()untuk memberi tahu agen yang menunggu kunci untuk dilepaskan. Diasumsikan bahwa operasi perubahan status kunci selalu berhasil. Ini karena utas yang melakukan operasi ini menahan kunci. Oleh karena itu, tidak ada lagi yang harus memanggil metode ini saat ini unlock().



unlock() {
  const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                      /*   >>> */  AsyncLock.LOCKED,
                      /*   >>> */  AsyncLock.UNLOCKED);
  if (oldValue != AsyncLock.LOCKED) {
    throw new Error('Tried to unlock while not holding the mutex');
  }
  Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}


Dalam kasus umum, semuanya terjadi seperti ini: kuncinya gratis dan thread T1 menangkapnya, mengubah statusnya menggunakan Atomics.compareExchange(). Thread T2 mencoba mendapatkan kunci dengan memanggil Atomics.compareExchange(), tetapi tidak dapat mengubah statusnya. Kemudian T2 memanggil Atomics.wait(), panggilan ini akan memblokir utas. Setelah beberapa waktu, utas T1 melepaskan kunci dan panggilan Atomics.notify(). Ini menyebabkan panggilan Atomics.wait()ke T2 kembali okdan utas T2 keluar dari kunci. T2 kemudian mencoba mendapatkan kunci lagi. Kali ini dia berhasil.



Ada dua kasus khusus di sini. Analisis mereka bertujuan untuk menunjukkan alasannya Atomics.wait(), dan Atomics.waitAsync()memeriksa nilai tertentu pada indeks yang ditentukan dari elemen array. Ini adalah kasusnya:



  • T1 , T2 . T2 , Atomics.compareExchange(), . T1 , T2 Atomics.wait(). T2 Atomics.wait(), not-equal. T2 .
  • T1 , T2 Atomics.wait() . T1 , T2 ( Atomics.wait()) Atomics.compareExchange() . , T3, . . Atomics.compareExchange() T2 . T2 Atomics.wait() , T3 .


Kasus khusus terakhir menunjukkan fakta bahwa mutex kami tidak berfungsi dengan adil. Mungkin saja utas T2 sedang menunggu kunci dilepaskan, tetapi T3 berhasil mendapatkannya segera setelah dilepaskan. Implementasi kunci yang lebih cocok untuk penggunaan dunia nyata dapat menggunakan beberapa status kunci yang ada untuk membedakan antara situasi di mana kunci hanya "diperoleh" dan di mana "ada konflik selama akuisisi".



Tangkapan kunci asinkron



Tidak executeLocked()seperti metode lock(), metode non-pemblokiran dapat dipanggil dari utas utama. Ia menerima, sebagai satu-satunya parameter, callback, dan menjadwalkan callback setelah penguncian yang berhasil diperoleh.



executeLocked(f) {
  const self = this;

  async function tryGetLock() {
    while (true) {
      const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
                          /*   >>> */  AsyncLock.UNLOCKED,
                          /*   >>> */  AsyncLock.LOCKED);
      if (oldValue == AsyncLock.UNLOCKED) {
        f();
        self.unlock();
        return;
      }
      const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
                                       AsyncLock.LOCKED);
                                   //  ^ ,    
      await result.value;
    }
  }

  tryGetLock();
}


Fungsi bagian dalam tryGetLock()pertama kali mencoba mendapatkan kunci dengan Atomics.compareExchange(). Jika memanggil metode ini menghasilkan perubahan status kunci yang berhasil, fungsi tersebut dapat memanggil callback, lalu melepaskan kunci dan keluar.



Jika panggilan Atomics.compareExchange()untuk mendapatkan kunci tidak memungkinkan, kita perlu mencoba melakukannya lagi, pada saat kunci mungkin akan bebas. Tetapi kami tidak dapat memblokir utas dan menunggu kunci dibuka. Sebagai gantinya, kami Atomics.waitAsync()menjadwalkan upaya baru untuk mendapatkan kunci menggunakan metode dan janji yang dikembalikannya.



Jika kita berhasil menjalankan metode ini Atomics.waitAsync(), maka promise yang dikembalikan oleh metode ini diselesaikan ketika thread yang menahan penguncian memanggilAtomics.notify()... Setelah itu, utas yang ingin mendapatkan kunci, seperti sebelumnya, mencoba melakukannya lagi.



Di sini, kasus-kasus khusus tersebut dimungkinkan yang merupakan karakteristik dari versi sinkron (kunci dilepaskan di antara panggilan Atomics.compareExchange()dan Atomics.waitAsync(); kunci ditangkap oleh utas lain, melakukan ini di antara saat-saat penyelesaian janji dan panggilan Atomics.compareExchange()). Oleh karena itu, dalam kode serupa yang berlaku di proyek nyata, ini harus diperhitungkan.



Hasil



Pada artikel ini, kami berbicara tentang primitif sinkronisasi tingkat rendah Atomics.wait(), Atomics.waitAsync()dan Atomics.notify(). Kami telah menganalisis contoh pembuatan mutex berdasarkan mereka, yang dapat digunakan baik di utas utama dan di utas pekerja.



Akankah Atomics.wait (), Atomics.waitAsync (), dan Atomics.notify () akan berguna dalam proyek Anda?



All Articles