Skip to content

Commit 7319db2

Browse files
Add a test Console
1 parent 74cdc89 commit 7319db2

File tree

2 files changed

+705
-0
lines changed

2 files changed

+705
-0
lines changed
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
/*
2+
* Copyright 2020-2025 Typelevel
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package cats.effect.testkit
18+
19+
import cats.{Parallel, Show}
20+
import cats.data.{Chain, NonEmptyChain}
21+
import cats.effect.Concurrent
22+
import cats.effect.kernel.{Deferred, Ref, Resource}
23+
import cats.effect.std.{Console, Semaphore}
24+
import cats.effect.testkit.TestConsole.{ConsoleClosedException, TestStdInState}
25+
import cats.effect.testkit.TestConsole.TestStdInState._
26+
import cats.syntax.all._
27+
28+
import scala.annotation.tailrec
29+
import scala.util.control.NoStackTrace
30+
31+
import java.io.EOFException
32+
import java.nio.charset.Charset
33+
34+
/**
35+
* Implement a test version of [[cats.effect.std.Console]]
36+
*/
37+
final class TestConsole[F[_]: Parallel](
38+
stdInSemaphore: Semaphore[F],
39+
stdInStateRef: Ref[F, TestStdInState[F]],
40+
stdOutRef: Ref[F, Chain[String]],
41+
stdErrRef: Ref[F, Chain[String]],
42+
logsRef: Ref[F, Chain[String]],
43+
readIdRef: Ref[F, Int]
44+
)(implicit F: Concurrent[F])
45+
extends Console[F] {
46+
private val defaultCharset = Charset.defaultCharset()
47+
private def streamClosed = new EOFException("End Of File")
48+
private def log(msg: String): F[Unit] = logsRef.update(_.append(msg))
49+
50+
/**
51+
* Write a string to the simulated stdIn
52+
*
53+
* Blocked calls to [[readLineWithCharset]] will be woken up if `str` contains one or more
54+
* lines.
55+
*
56+
* @note
57+
* Blocked calls will be woken in a first-in-first-out order.
58+
*/
59+
def write[A](value: A, charset: Charset = defaultCharset)(implicit S: Show[A]): F[Unit] =
60+
log(show"Writing to stdin: $value") *> writeImpl(Chunk(value.show, charset))
61+
62+
/**
63+
* Write a string and a newline to the simulated stdIn
64+
*
65+
* At least one blocked call to [[readLineWithCharset]] will be woken up, if it exists.
66+
*
67+
* @note
68+
* Blocked calls will be woken in a first-in-first-out order.
69+
*/
70+
def writeln[A](value: A, charset: Charset = defaultCharset)(implicit S: Show[A]): F[Unit] =
71+
log(show"Writing line to stdin: $value") *> writeImpl(Chunk(show"$value\n", charset))
72+
73+
private def writeImpl(chunk: Chunk): F[Unit] =
74+
if (chunk.isEmpty) F.unit
75+
else
76+
stdInSemaphore.permit.use { _ =>
77+
stdInStateRef.get.flatMap {
78+
case Closed() => F.raiseError(ConsoleClosedException())
79+
case Ready(lines, partial) =>
80+
val (newLines, newPartial) = partial.append(chunk)
81+
stdInStateRef.set(Ready[F](lines.appendChain(newLines), newPartial))
82+
case Waiting(requests, buffer) =>
83+
val (lines, partial) = buffer.append(chunk)
84+
if (lines.isEmpty)
85+
stdInStateRef.set(Waiting[F](requests, partial))
86+
else {
87+
def loop(
88+
remainingLines: Chain[Line],
89+
remainingRequests: Chain[Deferred[F, Either[Throwable, Array[Byte]]]])
90+
: F[TestStdInState[F]] =
91+
(remainingLines.uncons, remainingRequests.uncons) match {
92+
case (None, None) =>
93+
Waiting[F](Chain.empty, PartialLine.empty).pure[F].widen
94+
case (None, Some(_)) =>
95+
Waiting[F](remainingRequests, partial).pure[F].widen
96+
case (Some((nextLine, otherLines)), None) =>
97+
Ready[F](NonEmptyChain.fromChainPrepend(nextLine, otherLines), partial)
98+
.pure[F]
99+
.widen
100+
case (Some((nextLine, otherLines)), Some((nextRequest, otherRequests))) =>
101+
nextRequest
102+
.complete(nextLine.bytes.asRight) >> loop(otherLines, otherRequests)
103+
}
104+
105+
loop(lines, requests).flatMap(stdInStateRef.set)
106+
}
107+
}
108+
}
109+
110+
override def readLineWithCharset(charset: Charset): F[String] =
111+
readIdRef.getAndUpdate(_ + 1).flatMap { readId =>
112+
stdInSemaphore
113+
.permit
114+
.use { _ =>
115+
log(s"Reading stdIn [id: $readId]") *>
116+
stdInStateRef.get.flatMap {
117+
case Closed() =>
118+
F.raiseError[Deferred[F, Either[Throwable, Array[Byte]]]](streamClosed)
119+
case Ready(lines, partial) =>
120+
val newState =
121+
NonEmptyChain
122+
.fromChain(lines.tail)
123+
.fold[TestStdInState[F]](Waiting(Chain.empty, PartialLine.empty))(
124+
Ready(_, partial))
125+
126+
stdInStateRef.set(newState) *>
127+
Deferred[F, Either[Throwable, Array[Byte]]].flatTap(
128+
_.complete(lines.head.bytes.asRight))
129+
case Waiting(requests, buffer) =>
130+
Deferred[F, Either[Throwable, Array[Byte]]].flatTap(d =>
131+
stdInStateRef.set(Waiting(requests.append(d), buffer)))
132+
}
133+
}
134+
.flatMap(_.get)
135+
.flatMap(_.traverse(bytes => Concurrent[F].catchNonFatal(new String(bytes, charset))))
136+
.flatTap {
137+
case Left(ex) => log(s"Read from stdin failed [id: $readId]: $ex")
138+
case Right(line) => log(s"Read from stdin [id: $readId]: $line")
139+
}
140+
.rethrow
141+
}
142+
143+
override def print[A](a: A)(implicit S: Show[A]): F[Unit] =
144+
log(show"print($a)") *> stdOutRef.update(_.append(a.show))
145+
146+
override def println[A](a: A)(implicit S: Show[A]): F[Unit] =
147+
log(show"println($a)") *> stdOutRef.update(_.append(a.show).append("\n"))
148+
149+
override def error[A](a: A)(implicit S: Show[A]): F[Unit] =
150+
log(show"error($a)") *> stdErrRef.update(_.append(a.show))
151+
152+
override def errorln[A](a: A)(implicit S: Show[A]): F[Unit] =
153+
log(show"errorln($a)") *> stdErrRef.update(_.append(a.show).append("\n"))
154+
155+
/**
156+
* Close the TestConsole
157+
*
158+
* Any blocked calls to [[readLineWithCharset]] terminate with a raised
159+
* [[java.io.EOFException]]
160+
*/
161+
def close: F[Unit] = stdInSemaphore.permit.use { _ =>
162+
stdInStateRef
163+
.get
164+
.flatTap(_ => log("Closing"))
165+
.flatMap {
166+
case Closed() => F.unit
167+
case Ready(lines, partial) =>
168+
log(s"Discarding ${lines.length} lines and ${partial.chunks.length} bytes from stdIn") *>
169+
stdInStateRef.set(Closed[F]())
170+
case Waiting(requests, buffer) =>
171+
log(s"Discarding ${buffer.chunks.length} bytes from stdIn")
172+
.unlessA(buffer.chunks.isEmpty) *>
173+
log(s"Notifying ${requests.length} pending read requests")
174+
.unlessA(requests.isEmpty) *>
175+
stdInStateRef.set(Closed[F]()) *> requests.parTraverse_(
176+
_.complete(streamClosed.asLeft))
177+
}
178+
.flatTap(_ => log("Closed"))
179+
}
180+
181+
/**
182+
* @return
183+
* The current contents of stdOut
184+
*/
185+
def stdOutContents: F[String] = stdOutRef.get.map(_.mkString_(""))
186+
187+
/**
188+
* @return
189+
* The current contents of stdErr
190+
*/
191+
def stdErrContents: F[String] = stdErrRef.get.map(_.mkString_(""))
192+
193+
/**
194+
* @return
195+
* A human-readable description of the activity log and current status of this instance.
196+
*
197+
* Handy for debugging failing or blocked tests.
198+
*/
199+
def activityLog: F[String] =
200+
(logsRef.get.map(_.mkString_("\n")), stdStateDescription).mapN { (logStr, stateStr) =>
201+
s"""|=== Activity Log ===
202+
|$logStr
203+
|=== Current State ===
204+
|$stateStr""".stripMargin
205+
}
206+
207+
/**
208+
* Clear the human-readable activity log
209+
*/
210+
def clearLog: F[Unit] = logsRef.set(Chain.empty)
211+
212+
private def stdStateDescription: F[String] = stdInStateRef.get.map {
213+
case Closed() => "Closed"
214+
case Ready(lines, partial) =>
215+
val linesStr = lines.mkString_("\n")
216+
val partialStr =
217+
if (partial.isEmpty) "No partial line"
218+
else s"Partial line: '${partial.render}'"
219+
220+
s"""Ready for read
221+
|$partialStr
222+
|--- Complete Lines ---
223+
|$linesStr""".stripMargin
224+
case Waiting(requests, buffer) =>
225+
val bufferStr =
226+
if (buffer.isEmpty) "No partial line"
227+
else s"Partial line: '${buffer.render}'"
228+
s"""Waiting for read
229+
|Pending requests: ${requests.length}
230+
|$bufferStr""".stripMargin
231+
}
232+
}
233+
object TestConsole {
234+
235+
/**
236+
* Create a [[TestConsole]] instance without lifecycle management
237+
*
238+
* <h>CAUTION</h>
239+
*
240+
* Be careful to ensure that [[TestConsole.close]] is called before the end of the test, to
241+
* make sure that no fibers are blocked waiting on a call to
242+
* [[TestConsole.readLineWithCharset]]
243+
*/
244+
def unsafe[F[_]: Concurrent: Parallel]: F[TestConsole[F]] =
245+
(
246+
Semaphore[F](1L),
247+
Ref.of[F, TestStdInState[F]](TestStdInState.Waiting[F](Chain.empty, PartialLine.empty)),
248+
Ref.empty[F, Chain[String]],
249+
Ref.empty[F, Chain[String]],
250+
Ref.empty[F, Chain[String]],
251+
Ref.of[F, Int](0)
252+
).mapN(new TestConsole[F](_, _, _, _, _, _))
253+
254+
/**
255+
* Create a resource which instantiates and closes a [[TestConsole]]
256+
*
257+
* This is the preferred usage pattern, as it ensures that no fibers are left blocked on calls
258+
* to [[TestConsole.readLineWithCharset]]
259+
*/
260+
def resource[F[_]: Concurrent: Parallel]: Resource[F, TestConsole[F]] =
261+
Resource.make[F, TestConsole[F]](unsafe[F])(_.close.recover {
262+
case ConsoleClosedException() => ()
263+
})
264+
265+
private[testkit] final case class ConsoleClosedException()
266+
extends IllegalStateException("Console is closed")
267+
with NoStackTrace
268+
269+
private[testkit] sealed trait TestStdInState[F[_]]
270+
private[testkit] object TestStdInState {
271+
final case class Chunk(value: String, charset: Charset) {
272+
def bytes: Array[Byte] = value.getBytes(charset)
273+
def isEmpty: Boolean = value.isEmpty
274+
def modify(f: String => String): Chunk = Chunk(f(value), charset)
275+
def split(char: Char): Option[(Chunk, Chunk)] = {
276+
val idx = value.indexOf(char.toInt)
277+
if (idx === -1) None
278+
else {
279+
val (head, tail) = value.splitAt(idx)
280+
Some((Chunk(head, charset), Chunk(tail.drop(1), charset)))
281+
}
282+
}
283+
}
284+
object Chunk {
285+
implicit val show: Show[Chunk] = Show.show(_.value)
286+
}
287+
288+
final case class PartialLine(chunks: Chain[Chunk]) {
289+
def isEmpty: Boolean = chunks.forall(_.isEmpty)
290+
291+
def render: String = chunks.mkString_("")
292+
293+
def toLine: Line = Line(chunks)
294+
295+
def append(chunk: Chunk): (Chain[Line], PartialLine) =
296+
if (chunk.value.startsWith("\n"))
297+
PartialLine.empty.append(chunk.modify(_.drop(1))).leftMap(_.prepend(toLine))
298+
else if (chunk.value.endsWith("\n")) {
299+
val (lines, lastLine) = append(chunk.modify(_.dropRight(1)))
300+
lines.append(lastLine.toLine) -> PartialLine.empty
301+
} else {
302+
if (chunk.isEmpty) (Chain.empty, this)
303+
else {
304+
@tailrec
305+
def loop(accum: Chain[Line], remaining: Chunk): (Chain[Line], PartialLine) =
306+
if (remaining.isEmpty) (accum, PartialLine.empty)
307+
else {
308+
remaining.split('\n') match {
309+
case None => (accum, PartialLine.one(remaining))
310+
case Some((head, tail)) => loop(accum.append(Line.one(head)), tail)
311+
}
312+
}
313+
314+
chunk.split('\n') match {
315+
case Some((head, tail)) =>
316+
loop(Chain.one(Line(chunks.append(head))), tail)
317+
case None =>
318+
if (isEmpty) (Chain.empty, PartialLine.one(chunk))
319+
else (Chain.empty, PartialLine(chunks.append(chunk)))
320+
}
321+
}
322+
}
323+
}
324+
object PartialLine {
325+
def one(c: Chunk): PartialLine = PartialLine(Chain.one(c))
326+
def empty: PartialLine = PartialLine(Chain.empty)
327+
}
328+
329+
final case class Line(chunks: Chain[Chunk]) {
330+
def isEmpty: Boolean = chunks.forall(_.isEmpty)
331+
def render: String = chunks.mkString_("")
332+
def bytes: Array[Byte] =
333+
chunks.map(_.bytes).toVector.toArray.flatten
334+
}
335+
object Line {
336+
def one(chunk: Chunk): Line = Line(Chain.one(chunk))
337+
def empty: Line = Line(Chain.empty)
338+
implicit val show: Show[Line] = Show.show(_.render)
339+
}
340+
341+
final case class Closed[F[_]]() extends TestStdInState[F]
342+
final case class Ready[F[_]](lines: NonEmptyChain[Line], partial: PartialLine)
343+
extends TestStdInState[F]
344+
345+
final case class Waiting[F[_]](
346+
requests: Chain[Deferred[F, Either[Throwable, Array[Byte]]]],
347+
buffer: PartialLine)
348+
extends TestStdInState[F]
349+
}
350+
}

0 commit comments

Comments
 (0)