Skip to content

Implement metrics for external queue #4292

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 67 commits into
base: series/3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
6da0c9c
Adding metrics
Atharva-Kanherkar Mar 5, 2025
08806ce
Added demo test and some other getter methods
Atharva-Kanherkar Mar 5, 2025
084adf6
Removed the unnecessary commit
Atharva-Kanherkar Mar 5, 2025
af7b422
removed unnecessary files
Atharva-Kanherkar Mar 5, 2025
77b35ce
Remove .scala-build/ files from repository
Atharva-Kanherkar Mar 5, 2025
cdc302e
Add metrics for external queue tracking in WorkStealingThreadPool
Atharva-Kanherkar Mar 5, 2025
96bd55b
Final commit
Atharva-Kanherkar Mar 5, 2025
bb178dd
Some changes
Atharva-Kanherkar Mar 5, 2025
297c1cc
Added the requested changes
Atharva-Kanherkar Mar 7, 2025
ee97bc3
Added the changes requested
Atharva-Kanherkar Mar 7, 2025
8ab78e2
Fixed compile errors
Atharva-Kanherkar Mar 8, 2025
9386f2d
Fixed warnings and formatting issues
Atharva-Kanherkar Mar 8, 2025
b7cb918
Fixed CI errors
Atharva-Kanherkar Mar 8, 2025
e3eb52f
Fixed CI errors
Atharva-Kanherkar Mar 8, 2025
d3304a4
fixed more CI errrors
Atharva-Kanherkar Mar 8, 2025
6d548c0
Fixed more CI errors
Atharva-Kanherkar Mar 8, 2025
2ce80fa
Move external queue metrics tracking to ScalQueue
Atharva-Kanherkar Mar 9, 2025
6f7212b
fixed ci errors
Atharva-Kanherkar Mar 9, 2025
75f76ef
Fixed different CI errors again
Atharva-Kanherkar Mar 9, 2025
16f241f
Fixed CI errors
Atharva-Kanherkar Mar 9, 2025
02851e7
fixed other ci errors
Atharva-Kanherkar Mar 9, 2025
9e4156a
Fixed CI errors
Atharva-Kanherkar Mar 9, 2025
1f084bf
Added requested changes
Atharva-Kanherkar Mar 10, 2025
405d68b
Added the requested changes, without the test
Atharva-Kanherkar Mar 11, 2025
69ea0dd
Added headers
Atharva-Kanherkar Mar 11, 2025
c2f62a4
Removed warnings
Atharva-Kanherkar Mar 11, 2025
8e940ba
Added the requested changes
Atharva-Kanherkar Mar 12, 2025
448c999
Trying to fix the CI
Atharva-Kanherkar Mar 12, 2025
0496df1
Formatted
Atharva-Kanherkar Mar 12, 2025
4e9115b
Fixed tests
Atharva-Kanherkar Mar 12, 2025
f790bc2
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 12, 2025
41932bf
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 12, 2025
a21af3e
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 12, 2025
5c0aeee
Requested changes
Atharva-Kanherkar Mar 13, 2025
fa29e87
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 14, 2025
24a1d02
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 14, 2025
693dc11
Added requested changes
Atharva-Kanherkar Mar 14, 2025
3d9973e
Implement striped metrics counters in ScalQueue
Atharva-Kanherkar Mar 14, 2025
60bedf1
Fixed warnings
Atharva-Kanherkar Mar 14, 2025
0dd74d6
Update core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealin…
Atharva-Kanherkar Mar 18, 2025
69f2d3a
Added requested changes
Atharva-Kanherkar Mar 18, 2025
e0ff288
Fixed warnings
Atharva-Kanherkar Mar 18, 2025
1d03483
FIXED WARNINGS
Atharva-Kanherkar Mar 18, 2025
e2e1733
Removed un necessary comments
Atharva-Kanherkar Mar 19, 2025
8a92974
Adding requested changes
Atharva-Kanherkar Mar 20, 2025
9cd6a61
Added test
Atharva-Kanherkar Mar 20, 2025
24721c7
Removed not important file
Atharva-Kanherkar Mar 20, 2025
f24fb0d
Reformatting
Atharva-Kanherkar Mar 20, 2025
52be56c
Some changes
Atharva-Kanherkar Mar 20, 2025
c13a4c5
Fixed warnings
Atharva-Kanherkar Mar 20, 2025
c7c6f5d
Added tests
Atharva-Kanherkar Mar 21, 2025
ae89bcb
Fixed errors
Atharva-Kanherkar Mar 21, 2025
062c66e
ensured code quality
Atharva-Kanherkar Mar 21, 2025
bac1d6a
Update core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Atharva-Kanherkar Mar 26, 2025
0661804
Changes
Atharva-Kanherkar Mar 26, 2025
268a6ea
Added requested changes
Atharva-Kanherkar Mar 26, 2025
4a6ce8e
Changed names
Atharva-Kanherkar Apr 3, 2025
d6baedc
Fixed build.sbt
Atharva-Kanherkar Apr 3, 2025
11c705a
Merge remote-tracking branch 'origin/series/3.x' into external-queue-…
Atharva-Kanherkar Apr 8, 2025
3759bff
Update tests/jvm/src/test/scala/cats/effect/unsafe/ScalQueueSuite.scala
Atharva-Kanherkar Apr 12, 2025
d623124
Changes
Atharva-Kanherkar Apr 12, 2025
ae146cb
Added requested changes
Atharva-Kanherkar Apr 12, 2025
33b9101
Formatting
Atharva-Kanherkar Apr 12, 2025
9286d86
Added changes
Atharva-Kanherkar Apr 12, 2025
1d4b6cd
Added shorthands
Atharva-Kanherkar Apr 12, 2025
ba72a2a
Formatting fixes
armanbilge Apr 13, 2025
c94c294
Merge branch 'series/3.x' into external-queue-tracking
djspiewak Jul 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,10 @@ private final class LocalQueue extends LocalQueuePadding {

// Enqueue all of the batches of fibers on the batched queue with a bulk
// add operation.
external.offerAll(batches, random)
// Loop again for a chance to insert the original fiber to be enqueued
// on the local queue.
// (loop through each batch)
for (batch <- batches) {
external.offer(batch, random)
}
}

// None of the three final outcomes have been reached, loop again for a
Expand Down Expand Up @@ -702,7 +703,6 @@ private final class LocalQueue extends LocalQueuePadding {
totalSpilloverCount += SpilloverBatchSize
Tail.updater.lazySet(this, tl)
}

external.offer(batch, random)
return
}
Expand Down
88 changes: 81 additions & 7 deletions core/jvm/src/main/scala/cats/effect/unsafe/ScalQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cats.effect.unsafe

import java.util.concurrent.{ConcurrentLinkedQueue, ThreadLocalRandom}
import java.util.concurrent.atomic.AtomicLong

/**
* A striped queue implementation inspired by the [[https://scal.cs.uni-salzburg.at/dq/ Scal]]
Expand All @@ -33,6 +34,12 @@ import java.util.concurrent.{ConcurrentLinkedQueue, ThreadLocalRandom}
*/
private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) {

// Metrics counters for tracking external queue submissions
private val singletonsSubmittedCount = new AtomicLong(0)
private val singletonsPresentCount = new AtomicLong(0)
private val batchesSubmittedCount = new AtomicLong(0)
private val batchesPresentCount = new AtomicLong(0)

/**
* Calculates the next power of 2 using bitwise operations. This value actually represents the
* bitmask for the next power of 2 and can be used for indexing into the array of concurrent
Expand Down Expand Up @@ -80,7 +87,15 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) {
def offer(a: A, random: ThreadLocalRandom): Unit = {
val idx = random.nextInt(numQueues)
queues(idx).offer(a)
()

// Track metrics - if it's a runnable but not an array, it's a singleton task
if (!a.isInstanceOf[Array[?]]) {
singletonsSubmittedCount.incrementAndGet()
val _ = singletonsPresentCount.incrementAndGet()
} else {
batchesSubmittedCount.incrementAndGet()
val _ = batchesPresentCount.incrementAndGet()
}
}

/**
Expand All @@ -104,7 +119,7 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) {
* @param random
* an uncontended source of randomness, used for randomly choosing a destination queue
*/
def offerAll(as: Array[? <: A], random: ThreadLocalRandom): Unit = {
def offerAll(as: Array[A], random: ThreadLocalRandom): Unit = {
val nq = numQueues
val len = as.length
var i = 0
Expand All @@ -114,6 +129,10 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) {
queues(idx).offer(fiber)
i += 1
}

// Track as batch submissions
batchesSubmittedCount.incrementAndGet()
val _ = batchesPresentCount.incrementAndGet()
}

/**
Expand All @@ -129,15 +148,23 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) {
val nq = numQueues
val from = random.nextInt(nq)
var i = 0
var a = null.asInstanceOf[A]
var element = null.asInstanceOf[A]

while ((a eq null) && i < nq) {
while ((element eq null) && i < nq) {
val idx = (from + i) & mask
a = queues(idx).poll()
element = queues(idx).poll()
i += 1
}

a
if (element != null) {
if (element.isInstanceOf[Array[?]]) {
val _ = batchesPresentCount.decrementAndGet()
} else {
val _ = singletonsPresentCount.decrementAndGet()
}
}

element
}

/**
Expand All @@ -156,7 +183,7 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) {
* @param a
* the element to be removed
*/
def remove(a: A): Unit = {
def remove(a: A): Boolean = {
val nq = numQueues
var i = 0
var done = false
Expand All @@ -165,6 +192,16 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) {
done = queues(i).remove(a)
i += 1
}

if (done) {
if (a.isInstanceOf[Array[?]]) {
val _ = batchesPresentCount.decrementAndGet()
} else {
val _ = singletonsPresentCount.decrementAndGet()
}
}

done
}

/**
Expand Down Expand Up @@ -220,5 +257,42 @@ private[effect] final class ScalQueue[A <: AnyRef](threadCount: Int) {
queues(i).clear()
i += 1
}

// Reset present counters when clearing the queue
singletonsPresentCount.set(0)
batchesPresentCount.set(0)
}

/**
* Returns the total number of singleton tasks submitted to this queue.
*/
def getSingletonsSubmittedCount(): Long = singletonsSubmittedCount.get()

/**
* Returns the number of singleton tasks currently in this queue.
*/
def getSingletonsPresentCount(): Long = singletonsPresentCount.get()

/**
* Returns the total number of batch tasks submitted to this queue.
*/
def getBatchesSubmittedCount(): Long = batchesSubmittedCount.get()

/**
* Returns the number of batch tasks currently in this queue.
*/
def getBatchesPresentCount(): Long = batchesPresentCount.get()
}

object ScalQueue {

/**
* Creates a new Scal queue.
*
* @param threadCount
* the number of threads to load balance between
* @return
* a new Scal queue instance
*/
def apply[A <: AnyRef](threadCount: Int): ScalQueue[A] = new ScalQueue(threadCount)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we add this method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The apply method was added to follow the standard Scala idiom for object creation through companion objects, It's being used in WorkStealingThreadPool.scala line 241 to create the externalQueue.

Copy link
Member

@armanbilge armanbilge Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! In Cats Effect's private internal code, we often don't follow Scala idioms and instead write code optimized for performance. In this case, we prefer using new instead of an apply method.

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
worker.ownsPoller(poller)
} else false
}

private[this] val externalQueue: ScalQueue[AnyRef] =
new ScalQueue(threadCount << 2)
ScalQueue[AnyRef](threadCount << 2)

/**
* Represents two unsigned 16 bit integers. The 16 most significant bits track the number of
Expand Down Expand Up @@ -239,7 +238,6 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
destQueue.enqueueBatch(batch, destWorker)
} else if (element.isInstanceOf[Runnable]) {
val fiber = element.asInstanceOf[Runnable]

if (isStackTracing) {
destWorker.active = fiber
parkedSignals(dest).lazySet(false)
Expand Down Expand Up @@ -529,6 +527,43 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
externalQueue.offer(fiber, random)
notifyParked(random)
()

}

/**
* Offers a batch of runnables to the external queue and updates batch metrics.
*
* @param batch
* the batch of runnables to be offered to the external queue
* @param random
* a reference to an uncontended source of randomness
* @return
* true if the batch was successfully offered
*/
private[unsafe] def offerBatchToExternalQueue(
batch: Array[Runnable],
random: ThreadLocalRandom): Boolean = {
externalQueue.offer(batch, random)
true // Assume success
}

/**
* Offers multiple batches of runnables to the external queue and updates batch metrics.
*
* @param batches
* the batches of runnables to be offered to the external queue
* @param random
* a reference to an uncontended source of randomness
* @return
* true if the batches were successfully offered
*/
private[unsafe] def offerAllBatchesToExternalQueue(
batches: Array[AnyRef],
random: ThreadLocalRandom): Boolean = {
for (batch <- batches) {
externalQueue.offer(batch.asInstanceOf[Array[Runnable]], random)
}
true // Assume success
}

/**
Expand Down Expand Up @@ -844,6 +879,37 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
}
sum
}

/**
* Returns the total number of singleton tasks submitted to the external queue.
*/
private[unsafe] def getSingletonsSubmittedCount(): Long =
externalQueue.getSingletonsSubmittedCount()

/**
* Returns the total number of batch tasks submitted to the external queue.
*/
private[unsafe] def getBatchesSubmittedCount(): Long =
externalQueue.getBatchesSubmittedCount()

/**
* Returns the number of singleton tasks currently in the external queue.
*/
private[unsafe] def getSingletonsPresentCount(): Long =
externalQueue.getSingletonsPresentCount()

/**
* Returns the number of batch tasks currently in the external queue.
*/
private[unsafe] def getBatchesPresentCount(): Long = externalQueue.getBatchesPresentCount()

private[unsafe] def logQueueMetrics(): Unit = {
println(s"[Thread Pool ${id}] Queue Metrics:")
println(s" Singletons submitted: ${getSingletonsSubmittedCount()}")
println(s" Singletons present: ${getSingletonsPresentCount()}")
println(s" Batches submitted: ${getBatchesSubmittedCount()}")
println(s" Batches present: ${getBatchesPresentCount()}")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of exposing these on the WSTP, let's make a new ExternalQueueMetrics interface in this file:

https://github.com/typelevel/cats-effect/blob/series/3.x/core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingThreadPoolMetrics.scala

}

private object WorkStealingThreadPool {
Expand Down
7 changes: 7 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,13 @@ private[effect] final class WorkerThread[P <: AnyRef](
private[unsafe] def ownsPoller(poller: P): Boolean =
poller eq _poller

/**
* Returns the thread pool that owns this worker thread.
*
* @return
* reference to the owning WorkStealingThreadPool
*/
private[unsafe] def getPool(): WorkStealingThreadPool[P] = pool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another Good catch! The getPool() method isn't being used anywhere in the codebase. I'll remove it in the next commit.

private[unsafe] def ownsTimers(timers: TimerHeap): Boolean =
sleepers eq timers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ sealed trait WorkStealingThreadPoolMetrics {
*/
def suspendedFiberCount(): Long

/**
* Returns the total number of singleton tasks submitted to the external queue.
*
* @note
* the value may differ between invocations
*/
def singletonsSubmittedCount(): Long

/**
* Returns the total number of batch tasks submitted to the external queue.
*
* @note
* the value may differ between invocations
*/
def batchesSubmittedCount(): Long

/**
* Returns the number of singleton tasks currently in the external queue.
*
* @note
* the value may differ between invocations
*/
def singletonsPresentCount(): Long

/**
* Returns the number of batch tasks currently in the external queue.
*
* @note
* the value may differ between invocations
*/
def batchesPresentCount(): Long

/**
* The list of worker-specific metrics of this work-stealing thread pool.
*/
Expand Down Expand Up @@ -263,6 +295,10 @@ object WorkStealingThreadPoolMetrics {
def blockedWorkerThreadCount(): Int = wstp.getBlockedWorkerThreadCount()
def localQueueFiberCount(): Long = wstp.getLocalQueueFiberCount()
def suspendedFiberCount(): Long = wstp.getSuspendedFiberCount()
def batchesSubmittedCount(): Long = wstp.getBatchesSubmittedCount()
def singletonsPresentCount(): Long = wstp.getSingletonsPresentCount()
def batchesPresentCount(): Long = wstp.getBatchesPresentCount()
def singletonsSubmittedCount(): Long = wstp.getSingletonsSubmittedCount()

val workerThreads: List[WorkerThreadMetrics] =
List.range(0, workerThreadCount()).map(workerThreadMetrics(wstp, _))
Expand Down
2 changes: 1 addition & 1 deletion tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2024 Typelevel
* Copyright 2020-2025 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Loading
Loading