-
Notifications
You must be signed in to change notification settings - Fork 557
Implement metrics for external queue #4292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/3.x
Are you sure you want to change the base?
Changes from 15 commits
6da0c9c
08806ce
084adf6
af7b422
77b35ce
cdc302e
96bd55b
bb178dd
297c1cc
ee97bc3
8ab78e2
9386f2d
b7cb918
e3eb52f
d3304a4
6d548c0
2ce80fa
6f7212b
75f76ef
16f241f
02851e7
9e4156a
1f084bf
405d68b
69ea0dd
c2f62a4
8e940ba
448c999
0496df1
4e9115b
f790bc2
41932bf
a21af3e
5c0aeee
fa29e87
24a1d02
693dc11
3d9973e
60bedf1
0dd74d6
69f2d3a
e0ff288
1d03483
e2e1733
8a92974
9cd6a61
24721c7
f24fb0d
52be56c
c13a4c5
c7c6f5d
ae89bcb
062c66e
bac1d6a
0661804
268a6ea
4a6ce8e
d6baedc
11c705a
3759bff
d623124
ae146cb
33b9101
9286d86
1d4b6cd
ba72a2a
c94c294
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
package cats.effect.unsafe | ||
|
||
import java.util.concurrent.{ConcurrentLinkedQueue, ThreadLocalRandom} | ||
import java.util.concurrent.atomic.AtomicLong | ||
|
||
/** | ||
* A striped queue implementation inspired by the [[https://scal.cs.uni-salzburg.at/dq/ Scal]] | ||
|
@@ -33,6 +34,12 @@ import java.util.concurrent.{ConcurrentLinkedQueue, ThreadLocalRandom} | |
*/ | ||
private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { | ||
|
||
// Metrics counters for tracking external queue submissions | ||
private val singletonsSubmittedCount = new AtomicLong(0) | ||
private val singletonsPresentCount = new AtomicLong(0) | ||
private val batchesSubmittedCount = new AtomicLong(0) | ||
private val batchesPresentCount = new AtomicLong(0) | ||
|
||
/** | ||
* Calculates the next power of 2 using bitwise operations. This value actually represents the | ||
* bitmask for the next power of 2 and can be used for indexing into the array of concurrent | ||
|
@@ -80,7 +87,15 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { | |
def offer(a: A, random: ThreadLocalRandom): Unit = { | ||
val idx = random.nextInt(numQueues) | ||
queues(idx).offer(a) | ||
() | ||
|
||
// Track metrics - if it's a runnable but not an array, it's a singleton task | ||
if (!a.isInstanceOf[Array[?]]) { | ||
singletonsSubmittedCount.incrementAndGet() | ||
val _ = singletonsPresentCount.incrementAndGet() | ||
} else { | ||
batchesSubmittedCount.incrementAndGet() | ||
val _ = batchesPresentCount.incrementAndGet() | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -104,7 +119,7 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { | |
* @param random | ||
* an uncontended source of randomness, used for randomly choosing a destination queue | ||
*/ | ||
def offerAll(as: Array[? <: A], random: ThreadLocalRandom): Unit = { | ||
def offerAll(as: Array[A], random: ThreadLocalRandom): Unit = { | ||
val nq = numQueues | ||
val len = as.length | ||
var i = 0 | ||
|
@@ -114,6 +129,10 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { | |
queues(idx).offer(fiber) | ||
i += 1 | ||
} | ||
|
||
// Track as batch submissions | ||
batchesSubmittedCount.incrementAndGet() | ||
val _ = batchesPresentCount.incrementAndGet() | ||
} | ||
|
||
/** | ||
|
@@ -129,15 +148,23 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { | |
val nq = numQueues | ||
val from = random.nextInt(nq) | ||
var i = 0 | ||
var a = null.asInstanceOf[A] | ||
var element = null.asInstanceOf[A] | ||
|
||
while ((a eq null) && i < nq) { | ||
while ((element eq null) && i < nq) { | ||
val idx = (from + i) & mask | ||
a = queues(idx).poll() | ||
element = queues(idx).poll() | ||
i += 1 | ||
} | ||
|
||
a | ||
if (element != null) { | ||
if (element.isInstanceOf[Array[?]]) { | ||
Atharva-Kanherkar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val _ = batchesPresentCount.decrementAndGet() | ||
} else { | ||
val _ = singletonsPresentCount.decrementAndGet() | ||
} | ||
} | ||
|
||
element | ||
} | ||
|
||
/** | ||
|
@@ -156,7 +183,7 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { | |
* @param a | ||
* the element to be removed | ||
*/ | ||
def remove(a: A): Unit = { | ||
def remove(a: A): Boolean = { | ||
val nq = numQueues | ||
var i = 0 | ||
var done = false | ||
|
@@ -165,6 +192,16 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { | |
done = queues(i).remove(a) | ||
i += 1 | ||
} | ||
|
||
if (done) { | ||
if (a.isInstanceOf[Array[?]]) { | ||
Atharva-Kanherkar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val _ = batchesPresentCount.decrementAndGet() | ||
} else { | ||
val _ = singletonsPresentCount.decrementAndGet() | ||
} | ||
} | ||
|
||
done | ||
} | ||
|
||
/** | ||
|
@@ -220,5 +257,42 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) { | |
queues(i).clear() | ||
i += 1 | ||
} | ||
|
||
// Reset present counters when clearing the queue | ||
singletonsPresentCount.set(0) | ||
batchesPresentCount.set(0) | ||
} | ||
|
||
/** | ||
* Returns the total number of singleton tasks submitted to this queue. | ||
*/ | ||
def getSingletonsSubmittedCount(): Long = singletonsSubmittedCount.get() | ||
|
||
/** | ||
* Returns the number of singleton tasks currently in this queue. | ||
*/ | ||
def getSingletonsPresentCount(): Long = singletonsPresentCount.get() | ||
|
||
/** | ||
* Returns the total number of batch tasks submitted to this queue. | ||
*/ | ||
def getBatchesSubmittedCount(): Long = batchesSubmittedCount.get() | ||
|
||
/** | ||
* Returns the number of batch tasks currently in this queue. | ||
*/ | ||
def getBatchesPresentCount(): Long = batchesPresentCount.get() | ||
} | ||
|
||
object ScalQueue { | ||
|
||
/** | ||
* Creates a new Scal queue. | ||
* | ||
* @param threadCount | ||
* the number of threads to load balance between | ||
* @return | ||
* a new Scal queue instance | ||
*/ | ||
def apply[A <: AnyRef](threadCount: Int): ScalQueue[A] = new ScalQueue(threadCount) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did we add this method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The apply method was added to follow the standard Scala idiom for object creation through companion objects, It's being used in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see! In Cats Effect's private internal code, we often don't follow Scala idioms and instead write code optimized for performance. In this case, we prefer using |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -121,9 +121,8 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( | |
worker.ownsPoller(poller) | ||
} else false | ||
} | ||
|
||
private[this] val externalQueue: ScalQueue[AnyRef] = | ||
new ScalQueue(threadCount << 2) | ||
ScalQueue[AnyRef](threadCount << 2) | ||
|
||
/** | ||
* Represents two unsigned 16 bit integers. The 16 most significant bits track the number of | ||
|
@@ -239,7 +238,6 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( | |
destQueue.enqueueBatch(batch, destWorker) | ||
} else if (element.isInstanceOf[Runnable]) { | ||
val fiber = element.asInstanceOf[Runnable] | ||
|
||
if (isStackTracing) { | ||
destWorker.active = fiber | ||
parkedSignals(dest).lazySet(false) | ||
|
@@ -529,6 +527,43 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( | |
externalQueue.offer(fiber, random) | ||
notifyParked(random) | ||
() | ||
|
||
Atharva-Kanherkar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
/** | ||
* Offers a batch of runnables to the external queue and updates batch metrics. | ||
* | ||
* @param batch | ||
* the batch of runnables to be offered to the external queue | ||
* @param random | ||
* a reference to an uncontended source of randomness | ||
* @return | ||
* true if the batch was successfully offered | ||
*/ | ||
private[unsafe] def offerBatchToExternalQueue( | ||
batch: Array[Runnable], | ||
random: ThreadLocalRandom): Boolean = { | ||
externalQueue.offer(batch, random) | ||
true // Assume success | ||
} | ||
|
||
/** | ||
* Offers multiple batches of runnables to the external queue and updates batch metrics. | ||
* | ||
* @param batches | ||
* the batches of runnables to be offered to the external queue | ||
* @param random | ||
* a reference to an uncontended source of randomness | ||
* @return | ||
* true if the batches were successfully offered | ||
*/ | ||
private[unsafe] def offerAllBatchesToExternalQueue( | ||
batches: Array[AnyRef], | ||
random: ThreadLocalRandom): Boolean = { | ||
for (batch <- batches) { | ||
externalQueue.offer(batch.asInstanceOf[Array[Runnable]], random) | ||
} | ||
true // Assume success | ||
} | ||
|
||
/** | ||
|
@@ -844,6 +879,37 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( | |
} | ||
sum | ||
} | ||
|
||
/** | ||
* Returns the total number of singleton tasks submitted to the external queue. | ||
*/ | ||
private[unsafe] def getSingletonsSubmittedCount(): Long = | ||
externalQueue.getSingletonsSubmittedCount() | ||
|
||
/** | ||
* Returns the total number of batch tasks submitted to the external queue. | ||
*/ | ||
private[unsafe] def getBatchesSubmittedCount(): Long = | ||
externalQueue.getBatchesSubmittedCount() | ||
|
||
/** | ||
* Returns the number of singleton tasks currently in the external queue. | ||
*/ | ||
private[unsafe] def getSingletonsPresentCount(): Long = | ||
externalQueue.getSingletonsPresentCount() | ||
|
||
/** | ||
* Returns the number of batch tasks currently in the external queue. | ||
*/ | ||
private[unsafe] def getBatchesPresentCount(): Long = externalQueue.getBatchesPresentCount() | ||
|
||
private[unsafe] def logQueueMetrics(): Unit = { | ||
println(s"[Thread Pool ${id}] Queue Metrics:") | ||
println(s" Singletons submitted: ${getSingletonsSubmittedCount()}") | ||
println(s" Singletons present: ${getSingletonsPresentCount()}") | ||
println(s" Batches submitted: ${getBatchesSubmittedCount()}") | ||
println(s" Batches present: ${getBatchesPresentCount()}") | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of exposing these on the WSTP, let's make a new |
||
} | ||
|
||
private object WorkStealingThreadPool { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -303,6 +303,13 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |
private[unsafe] def ownsPoller(poller: P): Boolean = | ||
poller eq _poller | ||
|
||
/** | ||
* Returns the thread pool that owns this worker thread. | ||
* | ||
* @return | ||
* reference to the owning WorkStealingThreadPool | ||
*/ | ||
private[unsafe] def getPool(): WorkStealingThreadPool[P] = pool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need this method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another Good catch! The |
||
private[unsafe] def ownsTimers(timers: TimerHeap): Boolean = | ||
sleepers eq timers | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.