-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix DefaultExecutor having a separate thread and being used for cleanup work and running Dispatchers.Unconfined #4277
base: develop
Are you sure you want to change the base?
Conversation
17af696
to
660ee49
Compare
389e9eb
to
448ecaa
Compare
a1ef099
to
4cb7adc
Compare
val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now) | ||
DefaultExecutor.invokeOnTimeout(delayTimeMillis, delayedTask, EmptyCoroutineContext) | ||
startThreadOrObtainSleepingThread()?.let { | ||
it.executeAfter(0L, {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did the comment get lost? send an empty task to unpark the waiting event loop
this one
* | ||
* `close` needs to run somewhere, but it can't run on the closed dispatcher. | ||
* | ||
* On the JVM and Native, we reschedule to the thread pool backing `Dispatchers.IO`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Reading as it goes] From the contract standpoint: here we change the behaviour -- previously, all such cleanups have been executed synchronously (because DefaultExecutor
is single-threaded); with this change, it's no longer the case.
Not sure if it will actually affect anyone, rather pointing it out
* If we were to create a separate view of [Dispatchers.IO] for cleanup, this number would be exceeded, which | ||
* is a regression. | ||
*/ | ||
Dispatchers.IO.dispatch(Dispatchers.IO, task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the writeup!
unTrackTask(this) /** see the comment in [startThreadOrObtainSleepingThread] */ | ||
try { | ||
while (true) { | ||
Thread.interrupted() // just reset interruption flag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please account #4399 when rebasing on develop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate? What's the relation between that PR and this piece of code?
override fun startThreadOrObtainSleepingThread(): Thread? { | ||
// Check if the thread is already running | ||
_thread.value?.let { return it } | ||
/* Now we know that at the moment of this call the thread was not initially running. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please use a consistent style of comments here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rules I use for writing comments are roughly this:
- If I have to use code references, I use
/** */
. - If not, then I use
//
if it's a short phrase that fits on one line of code. - For multiline comments or comments that are at risk of becoming multiline,
/* */
.
I could just always use /** */
everywhere, but why not use the other forms if the language provides them?
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) { | ||
trackTask() // this is needed for virtual time support | ||
fun dispatch( | ||
block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false, track: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false, track: Boolean = true | |
block: Runnable, taskContext: TaskContext = NonBlockingContext, | |
fair: Boolean = false, track: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But why? It's 117 characters, so it fits the 120-character limit.
val task = createTask(block, taskContext) | ||
if (track) trackTask(task) // this is needed for virtual time support |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An idea: you can simplify the code here and avoid extra checks with the following trick:
- Untrackable tasks (which are extremely rare) are wrapped into
NonTrackableRunnable : Runnable
- The only time tracker is the test one -- it can always check if
obj
is an instance of non-trackable runnable
/** A view separate from [Dispatchers.IO]. | ||
* [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */ | ||
private val ioView by lazy { Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) } | ||
// Without `lazy`, there is a cycle between `ioView` and `DefaultDelay` initialization, leading to a segfault. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably should be above the declaration line.
Could you please also drop a clarification here?
I.e.:
there is a cycle between
ioViewand
DefaultDelay initialization
->
there is a cycle between
ioViewand
DefaultDelay initialization (limitedParallelism -> LimitedDispatcher -> DefaultDelay dependency in constructor -> this file -> ioView)
Before this change, DefaultExecutor was occasionally used for executing the work of dispatchers that no longer function. This is no longer the case: instead, Dispatchers.IO is used for that on our multithreaded targets.
Tests segfault on my machine with this stacktrace: 0 kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism(kotlin.Int;kotlin.String?){}kotlinx.coroutines.CoroutineDispatcher-trampoline () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 1 0x00000000003bed59 in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism$default(kotlin.Int;kotlin.String?;kotlin.Int){}kotlinx.coroutines.CoroutineDispatcher (_this=0x0, parallelism=2147483647, name=0x0, $mask0=2) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 2 0x00000000004bfa97 in kfun:kotlinx.coroutines.$init_global#internal.18 () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/DefaultDelay.kt:101 3 0x0000000000cac194 in CallInitGlobalPossiblyLock () 4 0x00000000004bfb60 in kfun:kotlinx.coroutines#<get-DefaultDelay>(){}kotlinx.coroutines.Delay () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/DefaultDelay.kt:1 5 0x0000000000496840 in kfun:kotlinx.coroutines.internal.LimitedDispatcher#<init>(kotlinx.coroutines.CoroutineDispatcher;kotlin.Int;kotlin.String?){} ($this=0x7ffff64a0668, dispatcher=0x7ffff64a0620, parallelism=64, name=0x0) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt:26 6 0x00000000003bec1d in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism(kotlin.Int;kotlin.String?){}kotlinx.coroutines.CoroutineDispatcher (_this=0x7ffff64a0620, parallelism=64, name=0x0) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:178 7 0x00000000004c53ad in kfun:kotlinx.coroutines.MultiWorkerDispatcher.limitedParallelism#internal (_this=0x7ffff64a0620, parallelism=64, name=0x0) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt:151 8 0x0000000000b3de7a in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism(kotlin.Int;kotlin.String?){}kotlinx.coroutines.CoroutineDispatcher-trampoline () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 9 0x00000000003bed59 in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism$default(kotlin.Int;kotlin.String?;kotlin.Int){}kotlinx.coroutines.CoroutineDispatcher (_this=0x7ffff64a0620, parallelism=64, name=0x0, $mask0=2) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 10 0x00000000004c13c0 in kfun:kotlinx.coroutines.DefaultIoScheduler.<init>#internal ($this=0x7ffff6630700) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/Dispatchers.kt:27 11 0x00000000004c127f in kfun:kotlinx.coroutines.DefaultIoScheduler.$init_global#internal () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/Dispatchers.kt:1
8ed3f71
to
0857fee
Compare
Fixes #4262