Saturday, November 23, 2019

Unit testing in Functional Programming (part 1)


Mocking in FP is considered a code smell, if not a downright antipattern but you can't avoid it with clever testing techniques. You must refactor the code.

This is what I had to do with some FS2 code from here. Now, this code demonstrates FS2's abilities wonderfully but it's not great to use in production as it's hard to test. Take this code in the link:

    val stream =
      consumerStream(consumerSettings)
        .evalTap(_.subscribeTo("topic"))
        .flatMap(_.stream)
        .mapAsync(25) { committable =>
          processRecord(committable.record)
            .as(committable.offset)
        }
        .through(commitBatchWithin(500, 15.seconds))

Not only is this hard to test, it's very dependent on Kafka specific code.

We  can abstract it so it looks like this:

  def pipeline[K, C, P, R, O, T](s:              Stream[IO, K],
                                 subscribe:      K => IO[Unit],
                                 toRecords:      K => Stream[IO, C],
                                 commitRead:     C => IO[P],
                                 producerPipe:   Pipe[IO, P, R],
                                 toWriteRecords: R => O,
                                 commitWrite:    Pipe[IO, O, T]): Stream[IO, T] = 
    s.evalTap(subscribe).flatMap(toRecords).mapAsync(25)(commitRead).through(producerPipe).map(toWriteRecords).through(commitWrite)

This makes testing easier for two reasons:

  1. The code has no dependency on Kafka.
  2. I can submit my own functions that use FS2's in-memory streams.
Now, the test looks like this. The domain object look like this:


  case class Kafka()
  case class Record(id: Int)
  case class ProducerRecords(id: Int)
  case class CommittableOffset(id: Int)



and although their names have a superficial resemblance to Kafka classes, this is just to make things conceptually simpler. We're dealing with a much higher layer of abstraction than that.

For instance, instead have having to connect to an embedded Kafka instance, I can have:

      val records = (1 to nToRead).map(x => Record(x))
      
      val toRecords: Kafka => Stream[IO, Record] =
        _ => Stream.emits(records).covary[IO]

and pass this as the toRecords argument to the function we want to test, pipeline. It's all in memory, no ports and sockets to worry about.

[Aside: from the docs: given a pure stream, "to convert to an effectful stream, use covary"]

The takeaway point is this: if you want to use FP to test your OO code and avoid mocks, it isn't going to work. You need to do a root and branch refactoring to get the benefits of avoiding Mockito. But your code will also be more abstract and therefore more re-usable.

No comments:

Post a Comment