Skip to content

Commit 17af696

Browse files
committed
Fix VirtualTimeSource not being able to skip delays
1 parent 18c9a0c commit 17af696

File tree

8 files changed

+40
-28
lines changed

8 files changed

+40
-28
lines changed

Diff for: kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt

+10-7
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ internal abstract class AbstractTimeSource {
1010
abstract fun currentTimeMillis(): Long
1111
abstract fun nanoTime(): Long
1212
abstract fun wrapTask(block: Runnable): Runnable
13-
abstract fun trackTask()
14-
abstract fun unTrackTask()
13+
abstract fun trackTask(obj: Any)
14+
abstract fun unTrackTask(obj: Any)
1515
abstract fun registerTimeLoopThread()
1616
abstract fun unregisterTimeLoopThread()
1717
abstract fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0
@@ -80,18 +80,21 @@ internal inline fun wrapTask(block: Runnable): Runnable =
8080
*
8181
* If the second thread is not tracked, the first thread effectively enters a spin loop until the second thread
8282
* physically changes the shared state.
83+
*
84+
* Every call to [trackTask] must be accompanied by a call to [unTrackTask] with the same [obj],
85+
* but [unTrackTask] can be called even if the corresponding [trackTask] wasn't called.
8386
*/
8487
@InlineOnly
85-
internal inline fun trackTask() {
86-
timeSource?.trackTask()
88+
internal inline fun trackTask(obj: Any) {
89+
timeSource?.trackTask(obj)
8790
}
8891

8992
/**
90-
* Decrements the number of tasks not under our control. See [trackTask] for more details.
93+
* Marks the task [obj] as complete. If [obj] wasn't tracked, does nothing. See [trackTask] for more details.
9194
*/
9295
@InlineOnly
93-
internal inline fun unTrackTask() {
94-
timeSource?.unTrackTask()
96+
internal inline fun unTrackTask(obj: Any) {
97+
timeSource?.unTrackTask(obj)
9598
}
9699

97100
/**

Diff for: kotlinx-coroutines-core/jvm/src/DefaultDelay.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package kotlinx.coroutines
33

44
import kotlinx.atomicfu.*
55
import kotlinx.coroutines.internal.*
6+
import kotlinx.coroutines.scheduling.*
7+
import kotlinx.coroutines.scheduling.scheduleBackgroundIoTask
68
import kotlin.coroutines.*
79
import kotlin.time.Duration
810

@@ -105,7 +107,7 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable {
105107
This means that whatever thread is going to be running by the end of this function,
106108
it's going to notice the tasks it's supposed to run.
107109
We can return `null` unconditionally. */
108-
ioView.dispatch(ioView, this)
110+
scheduleBackgroundIoTask(this)
109111
return null
110112
}
111113

@@ -156,6 +158,7 @@ private fun defaultDelayRunningUnconfinedLoop(): Nothing {
156158
)
157159
}
158160

161+
159162
/** A view separate from [Dispatchers.IO].
160163
* [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */
161164
private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE)

Diff for: kotlinx-coroutines-core/jvm/src/Executors.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
129129
try {
130130
executor.execute(wrapTask(block))
131131
} catch (e: RejectedExecutionException) {
132-
unTrackTask()
132+
unTrackTask(block)
133133
cancelJobOnRejection(context, e)
134134
rescheduleTaskFromClosedDispatcher(block)
135135
}

Diff for: kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -391,9 +391,11 @@ internal class CoroutineScheduler(
391391
* - Concurrent [close] that effectively shutdowns the worker thread.
392392
* Used for [yield].
393393
*/
394-
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
395-
trackTask() // this is needed for virtual time support
394+
fun dispatch(
395+
block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false, track: Boolean = true
396+
) {
396397
val task = createTask(block, taskContext)
398+
if (track) trackTask(task) // this is needed for virtual time support
397399
val isBlockingTask = task.isBlocking
398400
// Invariant: we increment counter **before** publishing the task
399401
// so executing thread can safely decrement the number of blocking tasks
@@ -588,7 +590,7 @@ internal class CoroutineScheduler(
588590
val thread = Thread.currentThread()
589591
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
590592
} finally {
591-
unTrackTask()
593+
unTrackTask(task)
592594
}
593595
}
594596

Diff for: kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt

+8-4
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() {
3737

3838
@InternalCoroutinesApi
3939
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
40-
DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
40+
DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = true, track = true)
4141
}
4242

4343
override fun dispatch(context: CoroutineContext, block: Runnable) {
44-
DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
44+
DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = false, track = true)
4545
}
4646

4747
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
@@ -58,6 +58,10 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() {
5858
}
5959
}
6060

61+
internal fun scheduleBackgroundIoTask(block: Runnable) {
62+
DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = false, track = false)
63+
}
64+
6165
// Dispatchers.IO
6266
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
6367

@@ -126,8 +130,8 @@ internal open class SchedulerCoroutineDispatcher(
126130
coroutineScheduler.dispatch(block, fair = true)
127131
}
128132

129-
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) {
130-
coroutineScheduler.dispatch(block, context, fair)
133+
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean, track: Boolean) {
134+
coroutineScheduler.dispatch(block, context, fair = fair, track = track)
131135
}
132136

133137
override fun close() {

Diff for: kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt

+9-9
Original file line numberDiff line numberDiff line change
@@ -48,30 +48,29 @@ internal class VirtualTimeSource(
4848
@Volatile
4949
private var time: Long = 0
5050

51-
private var trackedTasks = 0
51+
private val trackedTasks = HashSet<Any>()
5252

5353
private val threads = ConcurrentHashMap<Thread, ThreadStatus>()
5454

5555
override fun currentTimeMillis(): Long = TimeUnit.NANOSECONDS.toMillis(time)
5656
override fun nanoTime(): Long = time
5757

5858
override fun wrapTask(block: Runnable): Runnable {
59-
trackTask()
59+
trackTask(block)
6060
return Runnable {
6161
try { block.run() }
62-
finally { unTrackTask() }
62+
finally { unTrackTask(block) }
6363
}
6464
}
6565

6666
@Synchronized
67-
override fun trackTask() {
68-
trackedTasks++
67+
override fun trackTask(obj: Any) {
68+
trackedTasks.add(obj)
6969
}
7070

7171
@Synchronized
72-
override fun unTrackTask() {
73-
assert(trackedTasks > 0)
74-
trackedTasks--
72+
override fun unTrackTask(obj: Any) {
73+
trackedTasks.remove(obj)
7574
}
7675

7776
@Synchronized
@@ -125,7 +124,7 @@ internal class VirtualTimeSource(
125124
return
126125
}
127126
if (threads[mainThread] == null) return
128-
if (trackedTasks != 0) return
127+
if (trackedTasks.isNotEmpty()) return
129128
val minParkedTill = minParkedTill()
130129
if (minParkedTill <= time) return
131130
time = minParkedTill
@@ -145,6 +144,7 @@ internal class VirtualTimeSource(
145144
isShutdown = true
146145
wakeupAll()
147146
while (!threads.isEmpty()) (this as Object).wait()
147+
assert(trackedTasks.isEmpty()) { "There are still tracked tasks: $trackedTasks" }
148148
}
149149

150150
private fun wakeupAll() {

Diff for: kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.junit.runners.*
99
import kotlin.test.*
1010

1111
@RunWith(Parameterized::class)
12-
class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase(disableOutCheck = true) {
12+
class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() {
1313
companion object {
1414
@Parameterized.Parameters(name = "{0}")
1515
@JvmStatic

Diff for: kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,11 @@ internal fun SchedulerCoroutineDispatcher.blocking(parallelism: Int = 16): Corou
101101

102102
@InternalCoroutinesApi
103103
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
104-
this@blocking.dispatchWithContext(block, BlockingContext, true)
104+
this@blocking.dispatchWithContext(block, BlockingContext, fair = true, track = true)
105105
}
106106

107107
override fun dispatch(context: CoroutineContext, block: Runnable) {
108-
this@blocking.dispatchWithContext(block, BlockingContext, false)
108+
this@blocking.dispatchWithContext(block, BlockingContext, fair = false, track = true)
109109
}
110110
}.limitedParallelism(parallelism)
111111
}

0 commit comments

Comments
 (0)