Akka Streams & HTTP

Musings from production

@gamsd, blog.gamsd.com

What is this all about

Akka Streams, implementation on top of Akka

Akka HTTP, a lean and performant library for HTTP services on top of Akka Streams

Who we are

Vivareal, largest real estate portal in Brazil

Your host tonight

Oh, Canada!

scalaupnorth.com

Why should we bother

Life is hard

Monoliths are hard

Microservices are harder

Why should we bother

Coupling

Asynchronicity

Error handling

Performance and elasticity

We need better tools

First steps with Scala

Dojos didn't work

Streams for tools

HTTP for APIs

Streams for tools

Moving data everywhere

No threading mess

Preformance for free (almost)

Streams for tools


val images = TableQuery[Images]
val query = images.take(100).result
val fromDatabase: DatabasePublisher[Image] = db.stream(query)

Source(fromDatabase)
  .map({x => println(x); x})
  .mapAsyncUnordered(p)(gatherMoreInfo)
  .map(transformInfo)
  .mapAsyncUnordered(p)(updateImage)
  .map(logSuccessOrError)
  .to(Sink.ignore)
  .run()
					

HTTP for APIs

Akka Streams led to Akka HTTP

Spray-based, performant, supported by Typesafe/Lightbend

Other libs are fine, but we had to pick one

HTTP for APIs


val routes = (get & path("echo" / Segment)) { s =>
  val f = Future.successful(s)
  complete(f)
}

Http().bindAndHandle(routes, "localhost", 8080)
					

Impressions

Performant, despite not being optmized

Library, not a framework

Easy to extend with directives and implicits

Security directives


def routes(user: User) = (get & path("echo" / Segment)) { s =>
  val message = s"User: ${user.name}, Message: $s"
  complete(message)
}

val authRoutes =
  authenticateBasic(realm = "Lançamentos", CustomAuthenticator.authenticate) { user =>
    routes(user)
  }

Http().bindAndHandle(authRoutes, "localhost", 8080)
					

Custom directives


val requestLogging =
  logRequestResult(LoggingMagnet(_ => RequestLogging.logRequestResult)) &
  handleRejections(RejectionHandler.default)

def routes = (get & path("echo" / Segment)) { s =>
  complete(s)
}

val loggingRoutes = requestLogging { routes }

Http().bindAndHandle(loggingRoutes, "localhost", 8080)
					

Implicits


implicit def responseWithBody(r: (StatusCode, JValue)): ToResponseMarshallable = {
  val (status, json) = r
  status -> HttpEntity(`application/json`, write(json))
}
					

Problems

Lots of stuff to learn

FileUpload

Monitoring

On the other hand

No DI mess

Shorter code

Same old mistakes left behind (mostly)

Moving forward

Upgrading

Fast release cycles

From 1.0 to 2.0 in no time, seamless for us

Documented migration paths (mostly)

Integral part of Akka, from 2.4.2

Solutions

Kamon.io + NewRelic

FileUploadDirectives

Kamon.io + NewRelic

github.com/gamsd/kamon-akka-http-newrelic

FileUpload: from this


(post & path("upload")) {
  entity(as[Multipart.FormData]) { (formData) =>
    val uploadedUrlsFuture = formData.parts.map(_.entity.dataBytes).mapAsync(parallelism = 1)(part =>
      part
        .map(_.toArray)
        .runFold(Array[Byte]())((totalBytes, bytes) => totalBytes ++ bytes)
        .map(fileService.upload(_))
      ).grouped(1000).runWith(Sink.head)

    val response = uploadedUrlsFuture.map(PhotosResource(_)).map[ToResponseMarshallable](OK -> _)
    complete(response)
  }
}
                    

FileUpload: to this


val routes = (post & path("upload")) {

  uploadedFile("file") { case (metadata, file) =>
    val length = file.length() / 1024
    val message = s"Uploaded file length: $length K\n"

    file.delete()
    complete(message)
  }
}

Http().bindAndHandle(routes, "localhost", 8080)
                    

Learning is not a problem

Streams FTW

Streaming APIs


val routes = (path("stream") & get) {

  val source = Source(1 to 10)
  val delimiter = "\r\n"

  val bsSource = source
    .map(i => s"""{"count": "$i"}""")
    .map(_.replace("\\r", ""))
    .map(ByteString.fromString(_) ++ ByteString(delimiter))

  val entity = HttpEntity.Chunked.fromData(MediaTypes.`application/json`, bsSource)
  val response = HttpResponse(OK, entity = entity)

  complete(response)
}
                    

End-to-end streams, part 1


def stream(): Source[Wat, Any] = {

  def chunkConsumer(res: HttpResponse) = {
    res.entity.dataBytes
      .via(Framing.delimiter(ByteString(delimiter), frameSize, allowTruncation = true))
      .map[Wat](bs => { parse(bs.utf8String).extract[Wat] })
  }

  val req = HttpRequest(method = HttpMethods.GET, uri = s"http://$host:$port/stream")
  val res: Source[Wat, Any] = Source.single(req).via(client).flatMapConcat(chunkConsumer)

  res
}
                    

End-to-end streams, part 2


val mat = StreamingClient.stream()
  .map(_.toString)
  .map({x => println(x); x})
  .map(_ => 1)
  .toMat(Sink.fold[Int, Int](0)(_ + _))(Keep.right)
  .run()
                    

Graphs


private val graph = RunnableGraph.fromGraph(GraphDSL.create(Sink.ignore) { implicit builder =>
  sink =>

  val in = Source(1 to 25)

  val f1 = Flow[Int].map(pingPong)

  val fOk  = Flow[(Int, Option[String])] filter (_._2.isDefined) map (_._2.get) map System.out.println
  val fNok = Flow[(Int, Option[String])] filter (_._2.isEmpty) map (x => s"I don't care for ${x._1}") map System.err.println

  val bcast = builder.add(Broadcast[(Int, Option[String])](2))
  val merge = builder.add(Merge[Unit](2))

  in ~> f1 ~> bcast ~> fOk  ~> merge ~> sink.in
              bcast ~> fNok ~> merge

  ClosedShape
})
                    

Closing thoughts

Closing thoughts

Powerful toolkit

Low-level and high-level DSLs

Akka Streams in the small, Akka Actors in the large

Akka HTTP as integration layer

Scala at Vivareal

Scala at Vivareal

Async APIs

Geocoding

Processing at scale

Data science

engenharia.vivareal.com.br

We're hiring!

www.vivareal.com.br/empresa/carreira/vagas


guilherme.dantas _ vivareal.com

Thank you!

Q => A


github.com/gamsd/akka-http-starter

Akka Streams & HTTP

Musings from production

@gamsd, blog.gamsd.com