On Scala's futures
This is a write up of a lunch session on Scala's Future
and Promise
abstractions. We'll implement parts of these abstractions and gloss over
details like synchronization to focus on the abstractions themselves.
You can clone fgeller/futures.scala and follow the examples via tags, but this write-up should be self-contained as well.
Background
Commonly the JVM runs a single OS process and we use threads for performing
parallel or asynchronous computation. Scala offers thin abstractions wrapping
around such threads called Future and Promise. These abstractions grew out of
SIP-14 which included learnings from akka as well as Scala's own
implementation of actors that is deprecated by now. Compared to actors a
Future
is a thinner abstraction with more emphasis on composability.
Basics
Let's look at a simple multi-threaded application in Java:
public class Runner {
public static void main(String[] args) {
new Thread() {
public void run() {
System.out.println("hello, from elsewhere.");
}
}.start();
try { Thread.sleep(1); } catch (InterruptedException ex) {}
System.out.println("hello, from runner.");
}
}
We create a new Thread
object and override the run
method to print a simple
greeting. We start this thread right away and briefly block the main thread by
sleeping for one millisecond, before we print a greeting from the main
thread. It prints the following:
hello, from elsewhere.
hello, from runner.
What would this look like in Scala?
new Thread() {
override def run(): Unit = {
println("hello, from elsewhere.")
}
}.start()
Thread.sleep(1)
println("hello, from runner.")
We drop some of the boilerplate code, but we're doing the same thing as in the Java example above. It still prints:
hello, from elsewhere.
hello, from runner.
How would we achieve this with futures?
import scala.concurrent._
import ExecutionContext.Implicits._
val f: Future[Unit] =
Future { println("hello, from elsewhere.") }
Thread.sleep(1)
println("hello, from runner.")
This still prints:
hello, from elsewhere.
hello, from runner.
Some differences to notice when compared to the examples using Java's thread represenation:
- We need to import the desired abstractions from
scala.concurrent
- We import a global execution context, implicitly defining that our futures should be scheduled on this default thread pool.
- The
Future
is parameterized, identifying the type of the computation that we perform. In our caseprintln
"produces" aUnit
.
Let's do this ourselves.
So let's try to implement this:
class Future[T](value: T)
object Future {
def apply[T](v: T): Future[T] = { new Future(v) }
}
println("hello, world.")
val of: Future[Unit] = Future(println("In the future!"))
We create a class Future
that holds a given value and an apply
method in its
companion object to construct instances of this class. It prints the
following:
hello, world.
In the future!
Looks good right?
Ok, ok, let's check whether it's multi-threaded:
def log(msg: String) = println(s"${Thread.currentThread}: $msg")
class Future[T](value: T)
object Future {
def apply[T](v: T): Future[T] = { new Future(v) }
}
log("hello, world.")
val of: Future[Unit] = Future(log("In the future!"))
So we wrap the println
call and include the current thread. This should tell
us what thread a given println
expression is evaluated on. It prints the
following:
Thread[main,5,main]: hello, world.
Thread[main,5,main]: In the future!
So I guess that didn't work. Let's actually add multi-threading:
def log(msg: String) = println(s"${Thread.currentThread}: $msg")
class Future[T]() { var value: T = _ }
object Future {
def apply[T](v: T): Future[T] = {
val result = new Future[T]()
val thread = new Thread() {
override def run(): Unit = { result.value = v }
}
thread.start()
result
}
}
log("hello, world.")
val of: Future[Unit] = Future(log("In the future!"))
So we create a new Thread
instance in our apply
method and change the
container class to allow us to assign the value from the outside by making the
instance variable re-assignable. That should do it, right? Let's see what it
prints:
Thread[main,5,main]: hello, world.
Thread[main,5,main]: In the future!
Well, not really. What's missing? Scala is an eager or applicative order
language. This means that it evaluates the arguments to a function call before
evaluating the function itself. This means that we evaluate the call to log
before we even start the evaluation of the apply
method, let alone start a new
thread.
What to do? Compiler magic to the rescue!
def log(msg: String) = println(s"${Thread.currentThread}: $msg")
class Future[T]() { var value: T = _ }
object Future {
def apply[T](v: β T): Future[T] = {
val result = new Future[T]()
val thread = new Thread() {
override def run(): Unit = { result.value = v }
}
thread.start()
result
}
}
log("hello, world.")
val of: Future[Unit] = Future(log("In the future!"))
The only thing that changes in the above example is the following line:
def apply[T](v: β T): Future[T] = {
We added a β to the type which means that this argument will only be evaluated when needed. We could do this ourselves by wrapping the argument in a closure like this:
val of: Future[Unit] = Future.apply({ () β log("In the future!")})
This would delay the execution of the call to log
until the closure is
actually evaluated. The β type annotation is essentially syntactic sugar for
wrapping function arguments in closures.
So did that actually work? This is what it prints:
Thread[main,5,main]: hello, world.
Thread[Thread-0,5,main]: In the future!
Looks like we're successfully printing from different threads now!
Futures and Promises
Let's step back and look at what the SIP says about a future value:
- A future is an abstraction which represents a value which may become available at some point.
- A Future object either holds a result of a computation or an exception in the case that the computation failed.
- An important property of a future is that it is in effect immutable - it can never be written to or failed by the holder of the Future object.
Our implementation seems to work for the first part, but we're missing the second and third statements. Let's first focus on the second statement:
We currently neglect the fact that a computation might fail and only
implicitly encode that a computation hasn't finished through the null
value
that we use to initialize the container. Let's use Option[A]
to encode a
possibly unfinished computation and Try[B]
to encode a possibly failed or
succeeded computation:
import scala.util._
def log(msg: String) = println(s"${Thread.currentThread}: $msg")
class Future[T]() { var value = Option.empty[Try[T]] }
object Future {
def apply[T](v: β T): Future[T] = {
val result = new Future[T]()
val thread = new Thread() {
override def run(): Unit = { result.value = Some(Try(v)) }
}
thread.start()
result
}
}
log("hello, world.")
val of: Future[Unit] = Future(log("In the future!"))
The type of the contained value changes to Option[Try[T]]
and we wrap the
evaluation of a computation in a Try
to capture possible failures.
That should tick two boxes. How about that third statement:
- An important property of a future is that it is in effect immutable - it can never be written to or failed by the holder of the Future object.
That's certainly not true - we use a var
to enable setting the container
from a different thread. That's where promises come in. If we continue reading
the SIP we get to this part:
- While futures are defined as a type of read-only placeholder object created for a result which doesnβt yet exist, a promise can be thought of as a writeable, single-assignment container, which completes a future.
Turns out we were writing a promise all along! (Ignoring the single-assignment part for now). But still, how do these immutable futures work? The REPL offers help:
import scala.concurrent._
import ExecutionContext.Implicits.global
println(s"Our future is ${Future(42)}")
This prints the following:
Our future is scala.concurrent.impl.Promise$DefaultPromise@87aac27
So a future is a promise, is a future, is a promise? Can we do this then?
import scala.concurrent._
import ExecutionContext.Implicits.global
println(s"Our future is ${Future(42): Promise[Int]}")
The typer doesn't like that:
found : scala.concurrent.Future[Int]
required: scala.concurrent.Promise[Int]
How do we get a Future
from a Promise
in Scala? We generally call future
on
the promise - so how is that implemented?
private[concurrent] trait Promise[T]
extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
def future: this.type = this
}
So a promise is a promise is a future! We're simply restricting access to the
mutability to the Promise
type and the immutable parts to the Future
. That
looks doable, let's try!
First, let's switch everything we have so far to a Promise:
import scala.util._
def log(msg: String) = println(s"${Thread.currentThread}: $msg")
class Promise[T]() { var value = Option.empty[Try[T]] }
object Promise {
def apply[T](v: β T): Promise[T] = {
val result = new Promise[T]()
val thread = new Thread() {
override def run(): Unit = { result.value = Some(Try(v)) }
}
thread.start()
result
}
}
log("hello, world.")
val of: Promise[Unit] = Promise(log("In the future!"))
This still prints:
Thread[main,5,main]: hello, world.
Thread[Thread-3,5,main]: In the future!
Let's try the rest in parts. First we wrap our implementation in a namespace that we can restrict access to -- I'll use objects in this example, but packages would work the same:
import scala.util._
object our {
trait Promise[T] { def complete(value: Try[T]): Promise[T] }
trait Future[T] {}
}
We define traits to represent our two abstractions to the outside and add a
complete
method on promises that allows an external caller to write a value to
the container. Let's add the implementation of apply
using this complete
method:
import scala.util._
object our {
trait Promise[T] { def complete(value: Try[T]): Promise[T] }
trait Future[T] {}
object Future {
def apply[T](v: β T): Future[T] = {
val result = new impl.Promise[T]()
val thread = new Thread() {
override def run(): Unit = { result.complete(Try(v)) }
}
thread.start()
result.future
}
}
}
And then we add the implementation of our promise:
import scala.util._
object our {
trait Promise[T] { def complete(value: Try[T]): Promise[T] }
trait Future[T] {}
object Future {
// left out for brevity
}
object impl {
private[our] class Promise[T] extends our.Promise[T] with our.Future[T] {
def future: Future[T] = this
private var value = Option.empty[Try[T]]
def complete(v: Try[T]): Promise[T] = {
if (this.value.isDefined)
throw new IllegalStateException("Can only complete a promise once.")
this.value = Some(v)
this
}
}
}
}
We provide an accessor to view the underlying promise as a future and protect against completing the promise multiple times.
And the full example:
import scala.util._
object our {
trait Promise[T] { def complete(value: Try[T]): Promise[T] }
trait Future[T] {}
object Future {
def apply[T](v: β T): Future[T] = {
val result = new impl.Promise[T]()
val thread = new Thread() {
override def run(): Unit = { result.complete(Try(v)) }
}
thread.start()
result.future
}
}
object impl {
private[our] class Promise[T] extends our.Promise[T] with our.Future[T] {
def future: Future[T] = this
private var value = Option.empty[Try[T]]
def complete(v: Try[T]): Promise[T] = {
if (this.value.isDefined)
throw new IllegalStateException("Can only complete a promise once.")
this.value = Some(v)
this
}
}
}
}
import our._
def log(msg: String) = println(s"${Thread.currentThread}: $msg")
log("hello, world")
val of: Future[Unit] = Future(log("In the future!"))
This will print the following:
Thread[main,5,main]: hello, world Thread[Thread-0,5,main]: In the future!
Now let's provide access to the value of a future. Our first attempt is to install a callback that will be evaluated when the promise is completed. We add the ability to add a callback on the future:
trait Future[T] { def onComplete(fun: Try[T] β Unit): Unit }
The callback will receive a Try[T]
which encodes the result of the computation
and we'll ignore the result of the provided closure. To our implementation we
add a Set[Try[T] β Unit] to keep track of the installed callbacks. We'll use
a set because the order of these callbacks is not guaranteed.
private[our] class Promise[T] extends Future[T] {
def future: Future[T] = this
private var value = Option.empty[Try[T]]
private var onCompletes = mutable.Set.empty[Try[T] β Unit]
def onComplete(fun: Try[T] β Unit): Unit = {
this.value match {
case Some(v) β fun(v)
case None β onCompletes += fun
}
}
}
And then we need to make sure that we execute the installed callbacks on completion of our promise:
private[our] class Promise[T] extends Future[T] {
def complete(v: Try[T]): Promise[T] = {
if (this.value.isDefined)
throw new IllegalStateException("Can only complete a promise once.")
this.value = Some(v)
this.onCompletes.foreach(_(v))
this
}
}
Here's the full example:
import scala.util._
import scala.collection.mutable
object our {
trait Promise[T] { def complete(value: Try[T]): Promise[T] }
trait Future[T] { def onComplete(fun: Try[T] β Unit): Unit }
object Future {
def apply[T](v: β T): Future[T] = {
val result = new impl.Promise[T]()
val thread = new Thread() {
override def run(): Unit = { result.complete(Try(v)) }
}
thread.start()
result.future
}
}
object impl {
private[our] class Promise[T] extends Future[T] {
def future: Future[T] = this
private var value = Option.empty[Try[T]]
private var onCompletes = mutable.Set.empty[Try[T] β Unit]
def onComplete(fun: Try[T] β Unit): Unit = {
this.value match {
case Some(v) β fun(v)
case None β onCompletes += fun
}
}
def complete(v: Try[T]): Promise[T] = {
if (this.value.isDefined)
throw new IllegalStateException("Can only complete a promise once.")
this.value = Some(v)
this.onCompletes.foreach(_(v))
this
}
}
}
}
import our._
def log(msg: String) = println(s"${Thread.currentThread}: $msg")
val of: Future[Unit] = Future(log("In the future!"))
of.onComplete { value β log(s"Our future: ${value}") }
It prints the following:
Thread[Thread-0,5,main]: In the future!
Thread[Thread-0,5,main]: Our future: Success(())
We started out by claiming that Scala's abstractions over Threads have an
emphasis on composability and our current API doesn't allow for that. We throw
away the result of installed callbacks and don't allow chaining when we want
to install callbacks. Let's add a map
combinator that enables composability
through chaining of defered computations. The only required change is the
implementation of the combinator on Future
:
object our {
trait Future[T] {
def onComplete(fun: Try[T] β Unit): Unit
def map[U](fun: T β U): Future[U] = {
val result = new impl.Promise[U]()
this.onComplete {
case Success(v) β result.complete(Try(fun(v)))
case Failure(th) β log("uhoh... not evaluating fun")
}
result.future
}
}
}
The combinator creates a new promise and returns the corresponding future. It
also installs a callback on the future that it is invoked on. In the callback
we complete the new promise with the application of the result of the first
future to the closure given to map
. So we "thread" the result from the first
future through to the next future, applying the closure given to map
along the
way. This allows for ordered chaining of closures like we do in this modified
version of our example:
import scala.util._
import scala.collection.mutable
object our {
trait Promise[T] { def complete(value: Try[T]): Promise[T] }
trait Future[T] {
def onComplete(fun: Try[T] β Unit): Unit
def map[U](fun: T β U): Future[U] = {
val result = new impl.Promise[U]()
this.onComplete {
case Success(v) β result.complete(Try(fun(v)))
case Failure(th) β log("uhoh... not evaluating fun")
}
result.future
}
}
object Future {
def apply[T](v: β T): Future[T] = {
val result = new impl.Promise[T]()
val thread = new Thread() {
override def run(): Unit = { result.complete(Try(v)) }
}
thread.start()
result.future
}
}
object impl {
private[our] class Promise[T] extends Future[T] {
def future: Future[T] = this
private var value = Option.empty[Try[T]]
private var onCompletes = mutable.Set.empty[Try[T] β Unit]
def onComplete(fun: Try[T] β Unit): Unit = {
this.value match {
case Some(v) β fun(v)
case None β onCompletes += fun
}
}
def complete(v: Try[T]): Promise[T] = {
if (this.value.isDefined)
throw new IllegalStateException("Can only complete a promise once.")
this.value = Some(v)
this.onCompletes.foreach(_(v))
this
}
}
}
}
import our._
def log(msg: String) = println(s"${Thread.currentThread}: $msg")
Future(23).map(_ + 23).map(_.toString).map(log)
Which simply prints:
Thread[Thread-0,5,main]: 46
Now that we have the map
combinator the next step would be to add the flatMap
combinator for composing functions that produce futures. It would also allow
us to use for comprehensions that allow us to compose functions in a concise
manner. But I'll leave that and the other combinators as an exercise to the
reader or a second part to this post :)
Conclusion
Please note that the multi-threaded nature of our examples means that the main
thread might have finished and died before execution of all other threads has
finished. You can get around that by either waiting on the main thread or
implementing an abstraction similar to Await
using a polling mechanism.
Have a look at Scala's implementation or the docs on scala-lang.org for more details. Happy hacking!