Streaming Data to HTTP using Akka Streams with Exponential Backoff on 429 Too Many Requests

Posted on March 12, 2019 by


HTTP/REST is probably the most used protocol to exchange data between different services, especially in today’s microservice world.

Before I even start, let me make it clear this project/example builds on this blog post from Colin Breck: https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/ .

I stumbled on his post while working on an integration project, but I wanted to build on it to include a couple more features, plus I wanted to put together a little github project that can be used to play with different features/settings, while also make it a little more general.

Working on integration projects using Akka Streams, I was looking for a way to send potentially lots of data streaming over HTTP, while at the same time slow down when the server starts to complain.

Now, the ‘proper’ way that a server tells the client to slow down is by sending a 429 Too Many Requests code.

In this case, the behavior we want is exponential backoff, that is, keep retrying (up to a maximum number of times) but an exponentially increasing amount of time (for instance, doubling it at every try).

This is well explained in Colin’s blog post here
https://blog.colinbreck.com/backoff-and-retry-error-handling-for-akka-streams/ , it is achieved by building a small stream for each request, that is retried on failure, by using akka streams’ built-in exponential backoff.

At first I was skeptical since constructing a stream for every request seemed expensive, however in my tests (ran on a local HTTP server) each successful request took only a few milliseconds, including parsing the response.

One improvement I needed however was being able to select which response should be retried and which one should just cause the stream to fail altogether.

  
// generate random data
val ids = 1.to(10).map { _ => Id(UUID.randomUUID().toString) }
// A shared kill switch used to terminate the *entire stream* if needed
val killSwitch = KillSwitches.shared("ks")
// create the stream, using the killSwitch.
val aggregate = Source(ids) // the data source
   .viaMat(killSwitch.flow)(Keep.right)
// up to 4 parallel connections to HTTP server
   .mapAsync(parallelism = 4)(getResponse) 
   .map(_.value)
 // we calculate the sum. Should be equal to ids.length
   .runWith(Sink.fold(0)(_ + _))

In the above code, we create the stream, posting simple data (a random UUID string) and a simple response (a number). For the sake of simplicity, the server, on success, returns a 1. So, in the stream we calculate the sum of all the ones. The materialized value of the stream must then be equal to the number of events the source will send, in this case, 10, no matter how many times we retried them.

This is the model for our request and our response.

  // our simple model
  final case class Id(id:String)
  final case class Response(value:Int)

The magic is done by the getResponse function, which creates a mini-stream with backoff, as also described by Colin’s blog post:

<code>// use a superpool vs SingleRequest: connection pooling, automatic keepalive, etc.
val pool = Http().superPool[Id]()  

def getResponse(id: Id): Future[Model.Response] = timedFuture {
    RestartSource.withBackoff(
      minBackoff = 20.milliseconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2,
      maxRestarts = 10
    ) { () =>
        val request = HttpRequest(uri = s"http://localhost:8080/${id.id}")
        Source.single((request, id)).via(pool)
          .mapAsync(parallelism = 1)(handleResponse)
      }.runWith(Sink.head).recover {
        case _ => throw StreamFailedAfterMaxRetriesExceptionForId(id)
      }
  }</code>

Now, lots of going on here. We’re connecting to our own test server, posting a UUID. We are using a SuperPool which will handle HTTP connections, keepalive, etc, for us (see https://doc.akka.io/docs/akka-http/current/client-side/request-level.html#flow-based-variant).

We set all the parameters for exponential backoff (see https://doc.akka.io/docs/akka/2.5/stream/operators/RestartSource/withBackoff.html ) . The request will be tried a maximum of 10 times before failing. We are catching the failure and rethrowing it with our own error.

Now, the last part is the handleResponse function. You’ll likely need to change this function to fit your needs, since it depends on how the server you’re dealing with behaves. Some will actually be able to rate limit your requests by sending back 429 errors, but some other may not be sophisticated enough and just start failing with 500s.

It also depends on your requirements, there may be a condition (like a 500) that means you’re done streaming. The code below shows how to handle the two situations differently, by just applying exponential backoff on 429 and terminating the stream on 500:

<code>  val handleResponse:PartialFunction[(Try[HttpResponse],Id),Future[Response]]  =  {
      // we can connect, and we get a valid response. Response however could be 500, etc.
    case (Success(s), _) => s match {
      case HttpResponse(StatusCodes.OK, _, entity, _) =>
        Unmarshal(entity).to[Response]
      case HttpResponse(StatusCodes.TooManyRequests,_,entity,_) =>
        entity.discardBytes()
        throw TooManyRequestsException
      case HttpResponse(statusCode, _, entity, _) =>
        entity.discardBytes()
        killSwitch.shutdown()
        throw DatabaseUnexpectedException(statusCode)
    }
    // something went wrong, can't connect.
    case (Failure(f), _) => throw f
  }</code>

It’s worth noticing that exponential backoff happens also when you can’t connect to the server, by handling first the Try, which will fail if we can’t connect to the server, and then handling different codes withing a Success, which means we got a response from the server, but the server may be returning an error.

To test it, we can create first a server that randomly returns either OK or Too Many Requests:

<code>  val server = new TestHttpServer(0.2D, StatusCodes.TooManyRequests)</code>

You’ll see output like the following:

Return ok
Return ok
Return error
Return ok
[WARN] [03/12/2019 13:12:25.770] [QuickStart-akka.actor.default-dispatcher-9] [RestartWithBackoffSource(akka://QuickStart)] Restarting graph due to failure. stack_trace:
com.nuvola_tech.akka.streams.Model$TooManyRequestsException$
....

Return ok
(Sum : 10,
Map(6ab726e5-78c9-44ed-830b-c94dff66c7c3 -> 1,
f931f6fa-7917-48ad-bb58-5a5ec53f65ee -> 1,
d3ece91a-2b5e-4868-bc43-f60ef1d657dd -> 2,
7e943c2f-fc08-44fa-8667-be1b0e7253d7 -> 1,
5496e659-9083-49cd-8864-88af82e49c20 -> 1,
0dc9a0ab-e0ac-45e9-be26-a1bfdf8dd5ce -> 2,
64486cfc-a1ac-4663-aed5-51bd547765bb -> 1,
b39521a2-7592-4508-bf6f-81a7cedd33a5 -> 1,
33e8e8ed-c7f9-49e4-bd1c-e6ce83e2150c -> 1,
56b6547b-a635-48cb-98aa-004de4abff65 -> 1))
Execution time avg: 98: List(46, 3, 3, 371, 3, 3, 4, 351)

all terminated

The (edited) output above shows us that two tries failed and were retried. This is also reflected on the execution times. The sum is correct (10).

If we set error probability = 1, the stream will fail after it retries the request up to its maximum number of times, with com.nuvola_tech.akka.streams.Model$StreamFailedAfterMaxRetriesExceptionForId

Last, if you set the Test Server to return InternalServerError, the stream will terminate at the first error:

Starting HTTP server
Return ok
Return ok
Return ok
Return error
[WARN] [03/12/2019 13:20:31.399] [QuickStart-akka.actor.default-dispatcher-10] [RestartWithBackoffSource(akka://QuickStart)] Restarting graph due to failure. stack_trace:
com.nuvola_tech.akka.streams.Model$DatabaseUnexpectedException
Return ok
(Sum : 4,
Map(85966d9d-bbb3-4560-a06f-784cc20f043f -> 1,
42503ccb-b194-4d21-a831-f92a9ca21e47 -> 1,
5a83b3ef-4816-4b77-8058-df7ee90c9bc1 -> 2,
5d716bcd-42c9-4a76-9503-b662d10d1455 -> 1))
Execution time avg: 337: List(351, 324)

Note that the sum is now 4, because only 4 calls were successful, but also that it was able to retry one before the kill switch (which works in parallel) was able to terminate the stream. This was ok in my case.

You can play with the code on github here: https://github.com/nuvolatech/akka-streams-http-example

Comments

comments

Posted in: programming