Giới thiệu Mutex trong Kotlin Coroutines

Trong quá trình làm việc với Kotlin khi gặp phải những vấn đề mà yêu cầu cần xử lý các công việc đồng thời với nhau thì Coroutines hẳn không còn xa lạ với mỗi chúng ta. Tuy nhiên khi có nhiều tiến trình công việc mà cùng tương tác với một nguồn tài nguyên (shared resources) thì vấn đề về xung đột tài nguyên có thể xảy ra. Giống như việc 2 người thợ đốn củi dùng chung 1 chiếc rừu nếu trong lúc làm việc (cùng 1 thời điểm) mà 2 người cùng cần dùng đến cây rừu để thực hiện công việc của mình thì rất có thể sẽ xảy ra tranh chấp. Việc chúng ta cần lúc này là phải đảm bảo được việc phân bổ tài nguyên dùng chung ấy sao cho hợp lý cụ thể ở đây là ai (hay công việc nào) đã dùng tài nguyên trước thì người đến sau (hay công việc phía sau) sẽ phải đợi để có thể sử dụng tài nguyên chung đó. Trong Kotlin Coroutines để giải quyết được vấn đề này thì Mutex chính là thứ chúng ta cần.

Cũng giống với synchronized của Java, Mutex sẽ đứng ra đảm bảo sẽ chỉ có 1 tiến trình hay một luồng công việc được phép truy cập vào 1 nguồn tài nguyên chung tại bất kỳ thời điểm nào. Trong Kotlin với nhiều Coroutines cùng truy cập vào 1 tài nguyên, Coroutine nào đang sử dụng tài nguyên đó trước thì các Coroutines khác sẽ tạm ngừng (suspend) bởi Mutex, và chờ cho đến đến khi tài nguyên dùng chung đó không còn được sử dụng (bởi Coroutine trước đó) nữa thì chúng mới có thể tiếp tục thực hiện công việc của mình với dữ liệu dùng chung đó.

Mutex hoạt động thế nào?

  1. Khi một coroutine yêu cầu truy cập vào 1 tài nguyên dùng chung hàm lock() của Mutex sẽ được gọi để bắt đầu việc kiểm soát tài nguyên chung đó.
  2. Mutex sẽ kiểm tra xem đang có coroutine nào sử dụng tài nguyên chung không.
  3. Nếu không có coroutine nào đang dùng tài nguyên chung đó, coroutine đó sẽ được trao quyền sử dụng tài nguyên chung đó để thực hiện công việc của mình.
  4. Nếu tài nguyên chung đang được sử dụng thì Mutex sẽ tạm ngừng các coroutines mà đang muốn truy cập vào tài nguyên chung đó và sẽ cho các coroutines được tiếp tục chạy khi mà tài nguyên chung được coroutine trước đó sử dụng xong.
  5. Khi tài nguyên chung được sử dụng xong bởi coroutine đang chiếm dữ, hàm unlock() của Mutex sẽ được gọi để cho phép các coroutines đang chờ đến lượt được quyền sử dụng dùng tài nguyên đó.

Mutex quản lý các coroutines chờ đến lượt sử dụng tài nguyên chung thông qua một hàng đợi, coroutine nào đến trước thì sẽ được sử dụng tài nguyên trước hay FIFO (first-in-first-out).

Ví dụ sử dụng Mutex

Giả sử ta có 2 tiến trình chạy đồng thời cùng sử dụng chung 1 biến đếm (counter), mỗi tiến trình sẽ thực hiện việc tăng giá trị của biến đếm đó lên 5000 và sau khi thực hiện xong cả 2 tiến trình biến đếm (counter) của chúng ta sẽ thu được giá trị là 10000.

Trường hợp không sử dụng Mutex.

import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.Default

private var counter = 0

suspend fun main( ) {

    val job1 = CoroutineScope(Default).launch {
        for (i in 1..500) {
            incrementCounter()
        }
    }

    val job2 = CoroutineScope(Default).launch {
        for (i in 1..500) {
            incrementCounter()
        }
    }

    joinAll(job1, job2)

    println("Counter: $counter")
}

private fun incrementCounter() {
    for (i in 0 until 10) {
        counter++
    }
}

Sau 5 lần chạy kết quả mà ta thu được lần lượt sẽ là:

Với kết quả các lần chạy ở trên ta có thể dễ dàng nhận ra một điều là các job có sự xung đột về tài nguyên chung ở đây là biến counter dẫn đến việc sẽ có nhiều trường hợp kết quả mà ta thu được không đúng với kỳ vọng. Giờ ta hãy sử dụng Mutex để đảm bảo lại tính đúng đắn của kết quả.

Trường hợp sử dụng Mutex

import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.Default
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

private val mutex = Mutex()
private var counter = 0

suspend fun main( ) {
    val job1 = CoroutineScope(Default).launch {
        for (i in 1..500) {
            incrementCounter()
        }
    }

    val job2 = CoroutineScope(Default).launch {
        for (i in 1..500) {
            incrementCounter()
        }
    }

    joinAll(job1, job2)

    println("Counter: $counter")
}

private suspend fun incrementCounter() {
    mutex.withLock {
        for (i in 0 until 10) {
            counter++
        }
    }
}

Kết quả sau 5 lần chạy thu được:

Mutex đã điều phối việc sử dụng biến counter để đảm bảo kết quả thu được đúng như kỳ vọng của chúng ta.

Leave a Reply

Your email address will not be published. Required fields are marked *