System.Threading.Channels - produsen-konsumen dan asinkron berperforma tinggi tanpa alokasi dan tumpukan penyelaman

Halo lagi. Beberapa waktu yang lalu saya menulis tentang alat lain yang tidak banyak dikenal untuk para penghobi kinerja tinggi - System.IO.Pipelines . Pada intinya, System.Threading.Channels yang dipertimbangkan (selanjutnya disebut "saluran") dibangun berdasarkan prinsip-prinsip yang sama seperti Pipa, memecahkan masalah yang sama - Produser-Konsumen. Namun, ia memiliki api yang jauh lebih sederhana yang dengan anggun akan bergabung ke dalam segala jenis kode perusahaan. Pada saat yang sama, ia menggunakan asynchrony tanpa alokasi dan tanpa menumpuk, bahkan dalam kasus asinkron! (Tidak selalu, tetapi sering).







Daftar Isi







pengantar



Masalah Produser / Konsumen ditemui di jalan programmer cukup sering dan selama lebih dari selusin tahun. Edsger Dijkstra sendiri memiliki andil dalam memecahkan masalah ini - ia muncul dengan ide menggunakan semaphores untuk menyinkronkan utas ketika mengorganisir kerja berdasarkan produsen / konsumen. Dan meskipun solusinya dalam bentuk yang paling sederhana diketahui dan agak sepele, di dunia nyata pola ini (Produser / Konsumen) dapat terjadi dalam bentuk yang jauh lebih rumit. Juga, standar pemrograman modern memaksakan sidik jari mereka, kode ini ditulis lebih disederhanakan dan rusak untuk digunakan kembali lebih lanjut. Semuanya dilakukan untuk menurunkan ambang batas untuk menulis kode berkualitas tinggi dan menyederhanakan proses ini. Dan namespace yang dimaksud - System.Threading.Channels - adalah langkah lain menuju tujuan itu.



Saya melihat System.IO.Pipelines beberapa waktu lalu. Itu membutuhkan pekerjaan yang lebih penuh perhatian dan pemahaman yang mendalam tentang masalah ini, Span dan Memori digunakan, dan untuk pekerjaan yang efisien diperlukan untuk tidak memanggil metode yang jelas (untuk menghindari alokasi memori yang tidak perlu) dan terus-menerus berpikir dalam byte. Karena itu, antarmuka pemrograman dari Pipelines adalah non-sepele dan tidak intuitif.



Dalam System.Threading.Channels, pengguna disajikan dengan api yang lebih sederhana untuk bekerja dengannya. Perlu disebutkan bahwa meskipun kesederhanaan api, alat ini sangat dioptimalkan dan kemungkinan besar tidak akan mengalokasikan memori selama kerjanya. Mungkin ini disebabkan oleh fakta bahwa ValueTask banyak digunakan di bawah tenda , dan bahkan dalam kasus asynchrony nyata, IValueTaskSource digunakan, yang digunakan kembali untuk operasi lebih lanjut. Di sinilah seluruh kepentingan pelaksanaan Saluran terletak.



Saluran bersifat generik, jenis generiknya, seperti yang Anda duga, jenis yang akan diproduksi dan dikonsumsi. Menariknya, implementasi kelas Channel, yang cocok dalam 1 baris (sumber github ):



namespace System.Threading.Channels
{
    public abstract class Channel<T> : Channel<T, T> { }
}


Dengan demikian, kelas utama saluran diparameterisasi oleh 2 jenis - secara terpisah untuk saluran produsen dan saluran konsumen. Tetapi untuk saluran yang direalisasikan, ini tidak digunakan.

Bagi mereka yang terbiasa dengan Pipeline, pendekatan umum untuk memulai akan terasa akrab. Yaitu. Kami membuat 1 kelas pusat dari mana kami menarik secara terpisah produsen ( ChannelWriter ) dan konsumen ( ChannelReader ). Terlepas dari namanya, perlu diingat bahwa ini adalah produsen / konsumen, dan bukan pembaca / penulis dari tugas multi-threading klasik lain dengan nama yang sama. ChannelReader mengubah status saluran umum (mengeluarkan nilai), yang tidak lagi tersedia. Ini berarti bahwa dia lebih suka tidak membaca, tetapi mengkonsumsi. Tapi kita akan berkenalan dengan implementasinya nanti.



Awal pekerjaan. Saluran



Memulai dengan saluran dimulai dengan kelas Saluran <T> abstrak dan kelas Saluran statis yang menciptakan implementasi yang paling tepat. Selanjutnya, dari Saluran umum ini, Anda bisa mendapatkan ChannelWriter untuk menulis ke saluran dan ChannelReader untuk konsumsi dari saluran. Saluran adalah gudang informasi umum untuk ChannelWriter dan ChannelReader, sehingga di situlah semua data disimpan. Dan sudah logika rekaman atau konsumsi mereka tersebar di ChannelWriter dan ChannelReader. Secara konvensional, saluran dapat dibagi menjadi 2 kelompok - tidak terbatas dan terbatas. Yang pertama lebih sederhana dalam implementasi, Anda dapat menulis di dalamnya tanpa batas (selama memori memungkinkan). Yang terakhir terbatas pada nilai maksimum tertentu dari jumlah catatan.



Ini mengikuti sifat asinkron yang sedikit berbeda. Dalam saluran tanpa batas, operasi penulisan akan selalu selesai secara sinkron, tidak ada yang berhenti menulis ke saluran. Untuk saluran terbatas, situasinya berbeda. Dengan perilaku standar (yang dapat diganti), operasi penulisan akan berakhir secara sinkron selama ada ruang untuk instance baru di saluran. Setelah pipa penuh, operasi tulis tidak akan selesai sampai ruang dibebaskan (setelah konsumen mengkonsumsi yang dikonsumsi). Oleh karena itu, di sini operasi akan benar-benar tidak sinkron dengan mengubah utas dan perubahan terkait (atau tanpa mengubah, yang akan dijelaskan nanti).



Untuk sebagian besar, perilaku pembaca adalah sama - jika ada sesuatu di saluran, maka pembaca hanya membacanya dan berakhir di sinkronisasi. Jika tidak ada apa-apa, maka menunggu seseorang untuk menuliskan sesuatu.



Kelas statis Saluran berisi 4 metode untuk membuat saluran di atas:



Channel<T> CreateUnbounded<T>();
Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
Channel<T> CreateBounded<T>(int capacity);
Channel<T> CreateBounded<T>(BoundedChannelOptions options);


Jika diinginkan, Anda dapat menentukan opsi yang lebih akurat untuk membuat saluran yang akan membantu mengoptimalkannya untuk kebutuhan yang ditentukan.



UnboundedChannelOptions berisi 3 properti, yang disetel ke false secara default:



  1. AllowSynchronousContinuations — , , . -. , . , , , . , , , . , - - , ;
  2. SingleReader — , . , ;
  3. SingleWriter — , ;


BoundedChannelOptions berisi 3 properti yang sama dan 2 lainnya di atas



  1. AllowSynchronousContinuations - sama;
  2. SingleReader - sama;
  3. SingleWriter - sama;
  4. Kapasitas - jumlah catatan yang ditempatkan di saluran. Parameter ini juga merupakan parameter konstruktor;
  5. FullMode - enumerasi BoundedChannelFullMode, yang memiliki 4 opsi, menentukan perilaku ketika mencoba menulis ke saluran lengkap:

    • Tunggu - Menunggu ruang kosong untuk menyelesaikan operasi asinkron
    • DropNewest - item yang ditulis menimpa yang terbaru yang ada, berakhir secara sinkron
    • DropOldest - item yang ditulis menimpa item tertua yang ada berakhir secara sinkron
    • DropWrite - item yang sedang ditulis tidak ditulis, itu berakhir secara serempak




Bergantung pada parameter yang diteruskan dan metode yang dipanggil, salah satu dari 3 implementasi akan dibuat: SingleConsumerUnboundedChannel , UnboundedChannel , BoundedChannel . Tetapi ini tidak begitu penting, karena kita akan menggunakan saluran melalui saluran kelas dasar <TWrite, TRead>.



Ini memiliki 2 properti:



  • ChannelReader <TRead> Reader {get; set yang dilindungi; }
  • ChannelWriter <TWrite> Writer {get; set yang dilindungi; }


Dan juga, 2 operator casting implisit ke ChannelReader <TRead> dan ChannelWriter <TWrite>.



Contoh memulai dengan saluran:



Channel<int> channel = Channel.CreateUnbounded<int>();
//  
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader; 
// 
ChannelWriter<int> writer = channel;
ChannelReader<int> reader = channel;


Data disimpan dalam antrian. Untuk 3 jenis, 3 antrian yang berbeda digunakan - ConcurrentQueue <T>, Deque <T> dan SingleProducerSingleConsumerQueue <T>. Pada titik ini, saya merasa sudah ketinggalan zaman dan melewatkan banyak koleksi baru yang paling sederhana. Tapi saya segera kecewa - itu bukan untuk semua orang. Mereka diberi label internal, sehingga tidak dapat digunakan. Tetapi jika Anda tiba-tiba membutuhkannya di prod - Anda dapat menemukannya di sini (SingleProducerConsumerQueue) dan di sini (Deque) . Implementasi yang terakhir ini cukup sederhana. Saya menyarankan Anda untuk membacanya, Anda dapat mempelajarinya dengan sangat cepat.



Jadi, mari kita mulai mempelajari secara langsung ChannelReader dan ChannelWriter, serta detail implementasi yang menarik. Semuanya menjadi asinkron, tidak ada alokasi memori menggunakan IValueTaskSource.



ChannelReader - konsumen



Ketika objek konsumen diminta, salah satu implementasi kelas abstrak ChannelReader <T> dikembalikan. Sekali lagi, tidak seperti Pipeline, API sederhana dan ada beberapa metode. Cukup ketahui daftar metode untuk memahami cara menggunakan ini dalam praktek.



Metode:



  1. Penyelesaian Tugas properti get-only Virtual {get; }

    Objek bertipe Tugas yang selesai saat saluran ditutup;
  2. Property get-only virtual int Count {get; }

    Di sini harus dicatat bahwa jumlah objek saat ini yang tersedia untuk dibaca dikembalikan;
  3. Bangku properti hanya dapatkan virtual CanCount {get; }

    Menunjukkan apakah properti Count tersedia;
  4. bool TryRead(out T item)

    . bool, , . out ( null, );
  5. ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)

    ValueTask true, , . ValueTask false, ( );
  6. ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)

    . , . .



    , TryRead WaitToReadAsync. ( cancelation tokens), — TryRead. , while(true) WaitToReadAsync. true, , TryRead. TryRead , , . — , WaitToReadAsync, , , .

    , , - .




ChannelWriter - pabrikan



Semuanya mirip dengan konsumen, jadi mari kita lihat metode segera:



  1. Metode virtual bool TryComplete (Exception? Error = null)

    Mencoba untuk menandai saluran sebagai selesai, mis. menunjukkan bahwa tidak ada lagi data yang akan ditulis untuk itu. Sebagai parameter opsional, Anda dapat melempar pengecualian yang menyebabkan penghentian saluran. Mengembalikan nilai true jika berhasil diselesaikan, jika tidak palsu (jika saluran telah selesai atau tidak mendukung penghentian);
  2. Metode abstrak bool TryWrite (item T) Berusaha

    untuk menulis nilai ke saluran. Mengembalikan nilai true jika berhasil dan false jika tidak.
  3. Metode abstrak ValueTask <bool> WaitToWriteAsync (PembatalanToken cancellationToken = default)

    Mengembalikan ValueTask dengan nilai sebenarnya yang akan selesai ketika ada ruang untuk menulis di saluran. Nilai akan salah jika penulisan ke saluran tidak lagi diizinkan;
  4. Metode Virtual ValueTask WriteAsync (item T, PembatalanToken cancellationToken = default)

    Menulis secara sinkron ke saluran. Misalnya, jika saluran penuh, operasi akan benar-benar tidak sinkron dan akan selesai hanya setelah membebaskan ruang untuk catatan ini;
  5. Metode batal Selesai (Pengecualian? Kesalahan = null)

    Hanya mencoba menandai saluran sebagai selesai menggunakan TryComplete, dan jika terjadi kegagalan melempar pengecualian.


Contoh kecil di atas (untuk dengan mudah memulai eksperimen Anda sendiri):



Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();

//      ,        
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;

//     
int objectToWriteInChannel = 555;
await writer.WriteAsync(objectToWriteInChannel);
//  ,     ,   ,  
writer.Complete();

//         
int valueFromChannel = await reader.ReadAsync();


Sekarang mari kita beralih ke bagian yang paling menarik.



Sinkronisasi tanpa alokasi



Dalam proses menulis dan mempelajari kode, saya menyadari bahwa hampir tidak ada yang menarik dalam implementasi semua operasi ini. Secara umum, Anda bisa menggambarkannya dengan cara ini - menghindari kunci yang tidak perlu menggunakan koleksi kompetitif dan berlimpahnya penggunaan ValueTask, yang merupakan struktur yang menghemat memori. Namun, saya segera mengingatkan Anda bahwa tidak perlu dengan cepat menelusuri semua file di PC Anda dan mengganti semua Tugas dengan ValueTask. Masuk akal hanya dalam kasus-kasus di mana operasi dalam banyak kasus berakhir secara sinkron. Bagaimanapun, seperti yang kita ingat, dengan asinkron, sangat mungkin untuk mengubah utas, yang berarti bahwa tumpukan tidak lagi sama dengan sebelumnya. Bagaimanapun, seorang profesional kinerja sejati tahu - jangan mengoptimalkan sebelum masalah muncul.



Satu hal yang baik adalah bahwa saya tidak akan mendaftarkan diri sebagai seorang profesional, dan oleh karena itu saatnya untuk mencari tahu apa rahasia penulisan kode asinkron tanpa alokasi memori, yang sekilas terdengar terlalu bagus untuk kebenaran. Tetapi itu terjadi.



IValueTaskSource Interface



Mari kita mulai perjalanan kita dari awal - struktur ValueTask , yang ditambahkan dalam .net core 2.0 dan diubah pada 2.1. Di dalam struktur ini, ada bidang objek _obj rumit. Mudah ditebak, berdasarkan nama yang dapat dijelaskan sendiri, bahwa satu dari 3 hal dapat disembunyikan di bidang ini - null, Tugas / Tugas <T> atau IValueTaskSource. Sebenarnya, ini berasal dari cara Anda membuat ValueTask.



Seperti yang dipastikan pabrikan, struktur ini hanya boleh digunakan secara jelas - dengan kata kunci yang menunggu. Artinya, Anda tidak harus menerapkan menunggu berkali-kali pada ValueTask yang sama, gunakan kombinator, tambahkan beberapa kelanjutan, dll. Selain itu, Anda tidak boleh mendapatkan hasilnya dari ValueTask lebih dari sekali. Dan ini disebabkan oleh fakta bahwa kami mencoba untuk memahami - penggunaan kembali semua hal ini tanpa mengalokasikan memori.



Saya telah menyebutkan antarmuka IValueTaskSource . Dialah yang membantu menghemat memori. Ini dilakukan dengan menggunakan kembali IValueTaskSource itu sendiri beberapa kali untuk banyak tugas. Tetapi justru karena penggunaan kembali ini tidak ada cara untuk mencoba-coba dengan ValueTask.



Jadi IValueTaskSource. Antarmuka ini memiliki 3 metode, dengan mengimplementasikan yang Anda akan berhasil menghemat memori dan waktu pada alokasi byte yang berharga.



  1. GetResult - Disebut sekali, ketika di mesin negara, dibentuk pada saat runtime untuk metode asinkron, hasilnya diperlukan. ValueTask memiliki metode GetResult, yang memanggil metode antarmuka dengan nama yang sama, yang, seperti yang kita ingat, dapat disimpan di bidang _obj.
  2. GetStatus - Dipanggil oleh mesin negara untuk menentukan status operasi. Juga melalui ValueTask.
  3. OnCompleted - Sekali lagi, dipanggil oleh mesin negara untuk menambahkan kelanjutan ke tugas yang luar biasa pada saat itu.


Tetapi meskipun antarmuka yang sederhana, implementasi akan memerlukan beberapa keterampilan. Dan di sini Anda dapat mengingat tentang apa yang kami mulai dengan - Saluran . Implementasi ini menggunakan kelas AsyncOperationyang merupakan implementasi dari IValueTaskSource. Kelas ini tersembunyi di belakang pengubah akses internal. Tapi ini tidak mengganggu pemahaman mekanisme dasar. Pertanyaannya adalah, mengapa tidak memberikan implementasi IValueTaskSource kepada massa? Alasan pertama (demi kesenangan) adalah ketika palu ada di tangan, paku ada di mana-mana, ketika implementasi IValueTaskSource ada di tangan, ada pekerjaan yang buta huruf dengan memori di mana-mana. Alasan kedua (lebih masuk akal) adalah bahwa meskipun antarmuka sederhana dan universal, implementasi nyata optimal ketika menggunakan nuansa aplikasi tertentu. Dan mungkin karena alasan inilah Anda dapat menemukan implementasi di berbagai bagian .net yang hebat dan kuat, seperti AsyncOperation di bawah penutup saluran, AsyncIOOperation di dalam soket API baru, dan sebagainya.

Namun, dalam keadilan, masih ada satu implementasi yang umum -ManualResetValueTaskSourceCore . Tapi ini sudah terlalu jauh dari topik artikel.



Bandingkan Pertukaran



Cukup metode populer dari kelas populer yang menghindari overhead primitif sinkronisasi klasik. Saya pikir sebagian besar sudah terbiasa dengan itu, tetapi masih layak dijelaskan dalam 3 kata, karena konstruksi ini cukup sering digunakan dalam AsyncOperation.

Dalam literatur arus utama, fungsi ini disebut membandingkan dan menukar (CAS). Di .net tersedia di kelas Interlocked .



Tanda tangannya adalah sebagai berikut:



public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;


Ada juga kelebihan dengan objek int, panjang, mengambang, ganda, IntPtr.



Metode itu sendiri adalah atomik, yaitu dijalankan tanpa gangguan. Membandingkan 2 nilai dan, jika sama, memberikan nilai baru ke variabel. Mereka memecahkan masalah ketika Anda perlu memeriksa nilai suatu variabel dan mengubah variabel tergantung padanya.



Katakanlah Anda ingin menambahkan variabel jika nilainya kurang dari 10.



Lalu ada 2 utas.



Streaming 1 Streaming 2
Memeriksa nilai variabel untuk suatu kondisi (yaitu, jika kurang dari 10), yang berfungsi -
Antara memeriksa dan mengubah nilai Menetapkan variabel nilai yang tidak memenuhi suatu kondisi (misalnya, 15)
Mengubah nilainya, meskipun seharusnya tidak, karena kondisinya tidak lagi terpenuhi -




Saat menggunakan metode ini, Anda dapat mengubah nilai yang Anda inginkan, atau tidak berubah, saat menerima nilai variabel yang sebenarnya.



location1 adalah variabel yang nilainya ingin kita ubah. Ini dibandingkan dengan pembanding, dalam hal kesetaraan, nilai ditulis di lokasi1. Jika operasi berhasil, metode ini akan mengembalikan nilai masa lalu dari variabel location1. Jika tidak, nilai sebenarnya dari location1 akan dikembalikan.

Lebih dalam lagi, ada instruksi bahasa assembly, cmpxchg, yang melakukan ini. Dialah yang digunakan di bawah tenda.



Menyelam tumpukan



Mempertimbangkan semua kode ini, saya menemukan beberapa referensi untuk "Stack Dive". Ini adalah hal yang sangat keren dan menarik, yang sebenarnya sangat tidak diinginkan. Intinya adalah bahwa dengan eksekusi berkelanjutan secara sinkron kita dapat kehabisan sumber daya tumpukan.



Katakanlah kita memiliki 10.000 tugas, dengan penuh gaya



//code1
await ...
//code2


Misalkan tugas pertama menyelesaikan eksekusi dan dengan demikian membebaskan kelanjutan dari yang kedua, yang kami segera mulai jalankan secara serempak di utas ini, yaitu, mengambil sepotong tumpukan dengan bingkai kelanjutan ini. Pada gilirannya, kelanjutan ini akan membuka blokir kelanjutan dari tugas ketiga, yang juga akan segera kami laksanakan. Dll Jika tidak ada lagi menunggu dalam kelanjutan atau sesuatu yang entah bagaimana mengguyur tumpukan, maka kita hanya akan menghabiskan ruang tumpukan sepenuhnya. Apa yang dapat menyebabkan StackOverflow dan aplikasi macet. Dalam ulasan kode saya, saya akan menyebutkan bagaimana AsyncOperation berjuang melawan ini.



AsyncOperation sebagai implementasi IValueTaskSource



Kode sumber .



Di dalam AsyncOperation, ada bidang _continuation dari tipe Action <object>. Lapangan digunakan untuk, percaya atau tidak, kelanjutan. Tetapi, seperti yang sering terjadi dalam kode yang terlalu modern, bidang memiliki tanggung jawab tambahan (seperti pengumpul sampah dan bit terakhir dalam referensi tabel metode). Field _continuation dari seri yang sama. Ada 2 nilai khusus yang dapat disimpan di bidang ini, selain kelanjutan itu sendiri dan nol. s_availableSentinel dan s_completedSentinel . Bidang-bidang ini menunjukkan bahwa operasi masing-masing tersedia dan selesai. Ini tersedia hanya untuk digunakan kembali untuk operasi yang sepenuhnya tidak sinkron.



Juga AsyncOperation mengimplementasikan IThreadPoolWorkItemdengan metode tunggal - void Execute () => SetCompletionAndInvokeContinuation (). Metode SetCompletionAndInvokeContinuation melakukan kelanjutan. Dan metode ini disebut baik secara langsung dalam kode AsyncOperation, atau melalui Execute yang disebutkan. Setelah semua, tipe yang mengimplementasikan IThreadPoolWorkItem dapat dilemparkan ke dalam kumpulan thread entah bagaimana seperti ThreadPool ini. Tidak amanQueueUserWorkItem (ini, preferLocal: false).



Metode Execute akan dieksekusi oleh kumpulan thread.



Eksekusi kelanjutan itu sendiri cukup sepele.



Kelanjutan disalin ke variabel lokal dan s_completedSentinel ditulis di tempatnya- objek boneka buatan (atau penjaga, saya tidak tahu bagaimana berbicara kepada saya dalam pidato kami), yang menunjukkan bahwa tugas sudah selesai. Dan kemudian salinan lokal dari kelanjutan nyata dijalankan. Dengan ExecutionContext, tindakan ini diposting ke konteks. Tidak ada rahasia di sini. Kode ini dapat dipanggil langsung oleh kelas - hanya dengan memanggil metode yang merangkum tindakan ini, atau melalui antarmuka IThreadPoolWorkItem di kumpulan thread. Sekarang Anda dapat menebak bagaimana fungsi dengan eksekusi lanjutan bekerja secara sinkron.



Metode pertama dari antarmuka IValueTaskSource adalah GetResult ( github ).



Sederhana saja, dia:



  1. _currentId.

    _currentId — , . . ;
  2. _continuation - s_availableSentinel. , , AsyncOperation . , (pooled = true);
  3. _result.

    _result TrySetResult .


Metode TrySetResult ( github ).



Metode ini sepele. - ia menyimpan parameter yang diterima dalam _result dan penyelesaian sinyal, yaitu, ia memanggil metode SignalCompleteion , yang cukup menarik.



Metode SignalCompletion ( github ).



Metode ini menggunakan semua yang kita bicarakan di awal.



Pada awalnya, jika _continuation == null, kita menulis boneka s_completedSentinel.



Selanjutnya, metode ini dapat dibagi menjadi 4 blok. Saya harus mengatakan segera untuk kesederhanaan memahami skema, blok ke-4 hanyalah eksekusi sinkron dari kelanjutan. Artinya, eksekusi sepele dari kelanjutan melalui metode, seperti yang saya jelaskan dalam paragraf tentang IThreadPoolWorkItem.



  1. _schedulingContext == null, .. ( if).

    _runContinuationsAsynchronously == true, , — ( if).

    IThreadPoolWorkItem . AsyncOperation . .

    , if ( , ), , 2 3 , — .. 4 ;
  2. _schedulingContext is SynchronizationContext, ( if).

    _runContinuationsAsynchronously = true. . , , . , . 2 , :

    sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
    


    . , , ( , ), 4 — ;
  3. , 2 . .

    , _schedulingContext TaskScheduler, . , 2, .. _runContinuationsAsynchronously = true TaskScheduler . , Task.Factory.StartNew . .
  4. — . , .


Metode kedua dari antarmuka IValueTaskSource adalah GetStatus ( github )

Sama seperti donat Petersburg.



Jika _continuation! = _CompletedSentinel, lalu kembalikan ValueTaskSourceStatus.Pending

Jika kesalahan == null, maka kembalikan ValueTaskSourceStatus. Berhasil

Jika _error.SourceException adalah OperationCanceledException, lalu kembalikan ValueTaskSourceStatus.Canceled

Nah, karena sudah banyak nilai yang muncul di sini, kembalilah sebagai nilai , lalu kembalilah keaslian, lalu kembalikan Nilai kiriman dari



ketiga , kembalilah keaslian, kembalilah keaslian , lalu kembalikan nilai ke kala, kembalikan nilai kiriman ke sini, kembalilah ke kiriman, lalu kembalikan. , tetapi metode yang paling kompleks dari antarmuka IValueTaskSource adalah OnCompleted ( github )



Metode ini menambahkan kelanjutan yang dieksekusi setelah selesai.



Menangkap Konteks Eksekusi dan Konteks Sinkronisasi sesuai kebutuhan.



Selanjutnya, Interlocked.CompareExchange , dijelaskan di atas, digunakan untuk menyimpan kelanjutan di lapangan, membandingkannya dengan nol. Saya mengingatkan Anda bahwa CompareExchange mengembalikan nilai variabel saat ini.



Jika simpan kelanjutan telah berlalu, maka nilai yang ada di variabel sebelum pembaruan dikembalikan, yaitu, nol. Ini berarti bahwa operasi tidak selesai pada saat merekam kelanjutan. Dan orang yang menyelesaikannya sendiri akan mengerti segalanya (seperti yang kita lihat di atas). Dan tidak masuk akal bagi kita untuk melakukan tindakan tambahan apa pun. Dan ini melengkapi pekerjaan metode.



Jika menyimpan nilai tidak berhasil, artinya, sesuatu selain null dikembalikan dari CompareExchange. Dalam hal ini, seseorang berhasil memasukkan nilai lebih cepat dari kami. Artinya, salah satu dari 2 situasi terjadi - baik tugas diselesaikan lebih cepat daripada yang kita dapatkan di sini, atau ada upaya untuk menulis lebih dari 1 kelanjutan, yang tidak dapat dilakukan.



Jadi, kami memeriksa nilai yang dikembalikan, apakah itu sama dengan s_completedSentinel - itu akan ditulis jika sudah selesai.



  • Jika ini bukan s_completedSentinel , maka kami tidak digunakan sesuai rencana - mereka mencoba menambahkan lebih dari satu kelanjutan. Yaitu, yang sudah ditulis, dan yang kita tulis. Dan ini adalah situasi yang luar biasa;
  • s_completedSentinel, , , . , _runContinuationsAsynchronously = false.

    , , OnCompleted, awaiter'. . , AsyncOperation — System.Threading.Channels. , . , . , , ( ) . , awaiter' , , . awaiter'.

    Untuk menghindari situasi ini, terlepas dari segalanya, perlu untuk memulai melanjutkan secara tidak sinkron. Ini dijalankan sesuai dengan skema yang sama dengan 3 blok pertama dalam metode SignalCompleteion - cukup dalam kumpulan, dalam konteks, atau melalui pabrik dan penjadwal


Dan berikut adalah contoh kelanjutan sinkron:



class Program
    {
        static async Task Main(string[] args)
        {
            Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
            {
                AllowSynchronousContinuations = true
            });

            ChannelWriter<int> writer = unboundedChannel;
            ChannelReader<int> reader = unboundedChannel;

            Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");

            var writerTask = Task.Run(async () =>
            {
                Thread.Sleep(500);
                int objectToWriteInChannel = 555;
                Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
                await writer.WriteAsync(objectToWriteInChannel);
                Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
            });

            //Blocked here because there are no items in channel
            int valueFromChannel = await reader.ReadAsync();
            Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");

            await writerTask;

            Console.Read();
        }
    }


Output:



Utama, sebelum menunggu. Id

utas : 1 Utas yang dibuat untuk menulis dengan penundaan, sebelum menunggu menulis. Id

utas : 4 Utama, setelah menunggu (akan diproses oleh utas yang dibuat untuk ditulis). Id

utas : 4 Utas yang dibuat untuk penulisan dengan penundaan, setelah menunggu penulisan. Id utas: 4



All Articles