Acting Concurrent

The Akka actor model for Java and Scala

By Dennis M. Sosnoski, ScalableScala.com / Sosnoski Software Associates Ltd

## About me Java and Scala developer, writer, consultant and training instructor Open source developer * Apache CXF web services for Java * JiBX XML data binding for Java * Other older projects Author (mostly IBM developerWorks): * [JVM Concurrency series](http://www.ibm.com/developerworks/views/java/libraryview.jsp?search_by=jvm+concurrency:) Most recent experience in enterprise systems, especially data integration and web services Performance a special area of expertise Consulting and training services to clients worldwide
## Outline * Concurrency basics * Java 8 concurrency features * Scala concurrency features * Actor model concurrency * Introduction to Akka - ActorSystem and Actors - Messages and message passing - Actors roles - Scala and Java usage - Actor-based I/O * Retrofitting Java applications to Akka * Scaling Akka systems * Effective actor designs
## What is concurrency Multiple computations operating simultaneously and potentially interacting 1. Time-shared computations on a single processor 2. Computations running simultaneously on multiple cores within a processor 3. Computations running simultaneously on multiple physically-separated processors Long history of analysis of concurrency models and approaches
## Concurrency for multiple CPUs Until last decade, processor performance grew by increasing clock speed * Exponential performance growth over decades * Based on ever smaller and faster switches * But all exponential growths eventually come to an end! Now performance grows mainly by adding cores * Four or more cores now common on both computers and devices * Likely to be sixteen in another 10 years Performant applications require concurrency to use multiple cores
## Concurrency across systems Enterprise applications need to be scalable * Single system speed not enough for internet-level loads * Spreading load to multiple systems allows scaling * Cloud computing just a special case Course-grained scaling distributes users across multiple systems * Helps limit response time as number of users increases * Doesn't help with response time for more-complex requests Finer-grained scaling distributes tasks across multiple systems * Add systems as needed to handle increased load * Distribute tasks based on interactions * Allows faster processing of complex requests (sometimes)
## Concurrency limitations Performance gain from concurrency limited For doing more of a task: * Limited by amount of communications and coordination required * If completely independent, unlimited scalability (e.g. static website) For doing a task faster: * Limited by proportion of code that has to be sequential (Amdahl's law) * Limited by external delays
## Blocking vs. non-blocking Concurrency requires handling asynchronous events (completions, in particular) Two basic approaches to handling asynchronous events: * Have thread wait for event (blocking) * Have event provider signal arrival (non-blocking) * Set a flag the main thread can check * Execute application code supplied by user (callback) * Add event to queue or such Blocking limits performance and scalability: * Ties up threads (limited resource) * Adds overhead from context switches * Adds unpredictable latency from context switches Non-blocking best for scalable applications
## Java 8 concurrency Big gains for concurrent programming in Java 8: * CompletableFuture * One-time-only completion with result or throwable * Can block, poll, use callback, combine/compose futures * Streams * Parallel execution of computations * Controlled merging of results from threads * Improved ForkJoinPool

CompletableFuture blocking

    // task definitions
    private static CompletableFuture<Integer> task1(int input) {
        return TimedEventSupport.delayedSuccess(1, input + 1);
    }
    private static CompletableFuture<Integer> task2(int input) {
        return TimedEventSupport.delayedSuccess(2, input + 2);
    }
    private static CompletableFuture<Integer> task3(int input) {
        return TimedEventSupport.delayedSuccess(3, input + 3);
    }
    private static CompletableFuture<Integer> task4(int input) {
        return TimedEventSupport.delayedSuccess(1, input + 4);
    }
    
    /**
     * Run events with blocking waits.
     * 
     * @return future for result (already complete)
     */
    private static CompletableFuture<Integer> runBlocking() {
        Integer i1 = task1(1).join();
        CompletableFuture<Integer> future2 = task2(i1);
        CompletableFuture<Integer> future3 = task3(i1);
        Integer result = task4(future2.join() + future3.join()).join();
        return CompletableFuture.completedFuture(result);
    }

CompletableFuture non-blocking

 

    /** Run events with composition. */
    private static CompletableFuture<Integer> runNonblocking() {
        return task1(1).thenCompose(i1 -> ((CompletableFuture<Integer>)task2(i1)
            .thenCombine(task3(i1), (i2,i3) -> i2+i3)))
            .thenCompose(i4 -> task4(i4));
    }
    
    /** Run task2 and task3 and combine the results. This is just a refactoring of {@link
     * #runNonblocking()} to make it easier to understand the code. */
    private static CompletableFuture<Integer> runTask2and3(Integer i1) {
        CompletableFuture<Integer> task2 = task2(i1);
        CompletableFuture<Integer> task3 = task3(i1);
        BiFunction<Integer, Integer, Integer> sum = (a, b) -> a + b;
        return task2.thenCombine(task3, sum);
    }
    
    /** Run events with composition. This is just a refactoring of {@link #runNonblocking()} to make
     * it easier to understand the code. */
    private static CompletableFuture<Integer> runNonblockingAlt() {
        CompletableFuture<Integer> task1 = task1(1);
        CompletableFuture<Integer> comp123 = task1.thenCompose(EventComposition::runTask2and3);
        return comp123.thenCompose(EventComposition::task4);
    }

Streams

Automatically partitition work to be done across threads

    private final List<ChunkDistanceChecker> chunkCheckers;
    
    /**
     * Find best match to word executing chunk checkers in parallel. Each chunk checker finds the best
     * match for the set of words it knows. The individual results are combined here to find the 
     * overall best result.
     */
    public DistancePair bestMatch(String target) {
        return chunkCheckers.parallelStream()
            .map(checker -> checker.bestDistance(target))
            .reduce(DistancePair.worstMatch(), (a, b) -> DistancePair.best(a, b));
    }

Great for the special case of parallel operations

## Scala concurrency Similar features to Java 8 for some time: * Promise / Future * Promise is one-time-only completion with result or throwable * Future comes from a promise * Can block, poll, use callback, use standard collections methods * Streams * Parallel execution of computations * Use standard collections methods to merge results * Improved ForkJoinPool But also async macro

Promise / Future blocking

  // task definitions
  def task1(input: Int) = TimedEvent.delayedSuccess(1, input + 1)
  def task2(input: Int) = TimedEvent.delayedSuccess(2, input + 2)
  def task3(input: Int) = TimedEvent.delayedSuccess(3, input + 3)
  def task4(input: Int) = TimedEvent.delayedSuccess(1, input + 4)
  
  /** Run tasks with blocking waits. */
  def runBlocking() = {
    val v1 = Await.result(task1(1), Duration.Inf)
    val future2 = task2(v1)
    val future3 = task3(v1)
    val v2 = Await.result(future2, Duration.Inf)
    val v3 = Await.result(future3, Duration.Inf)
    val v4 = Await.result(task4(v2 + v3), Duration.Inf)
    val result = Promise[Int]
    result.success(v4)
    result.future
  }

Promise / Future non-blocking

Can use standard collections (monadic) operators such as forEach

  /** Run tasks with flatMap. */
  def runFlatMap() = {
    task1(1) flatMap {v1 =>
      val a = task2(v1)
      val b = task3(v1)
      a flatMap { v2 =>
        b flatMap { v3 => task4(v2 + v3) }}
    }
  }

for composition for more complex cases

async macro

Scala macros perform compile-time transformations of code

async macro converts what looks like sequential code to asynchronous:

  /** Run tasks with async macro. */
  def runAsync(): Future[Int] = {
    async {
      val v1 = await(task1(1))
      val a = task2(v1)
      val b = task3(v1)
      await(task4(await(a) + await(b)))
    }
  }

Similar to C# and F# feature, but cleaner and more flexible

Some limitations, but a very powerful abstraction

async details

async has several effects:

  • Declares block to be asynchronous
  • Executes block asynchronously (by default)
  • Returns result as a future
  • Allows non-blocking await to be used inside block (suspends until future completed)

Implemented with a generated state machine class

  • Attaches to onComplete handling for each await in turn
  • Completion passes on failure or moves to next await until done

Limitations a result of implementation:

  • Can't use await inside nested closure, class definition
  • Can't use await inside nested try / catch block
  • Debugging can be confusing (as with other callback approaches)
## Managed concurrency issues Great for simple situations Complex scenarios run into problems: * Blocking approaches can have deadlocks, starvation, etc. * Non-blocking approaches have their own issues * Code gets cumbersome with many different events involved * Building large chains of futures can add overhead * Most approaches basically a variation on callbacks * Callback hell common when debugging Is it possible to abstract management out of the code?
## Actor model Long-established approach to analyzing and modeling concurrency Actors are light-weight entities with state * May have multiple (many) instances of a particular actor type * Each actor a processing step All interactions in terms of messages: * Do something in response to a received message * Initiate something external with a sent message (to another actor) Large body of practical experience from Erlang
## Akka concurrency Akka a Scala library, for both Java and Scala applications Designed to support writing correct concurrent, fault-tolerant and scalable applications * Actors * Fault tolerance * Location transparency Supports scaling both up and out * Scaling up to use added resources effectively within a system * Scaling out to effectively use added systems in a cloud/cluster Applications needing high throughput and low latency are ideal candidates for Akka
## Akka actors Abstracts out the details of threading and synchronization * Thread needed to run an actor when an in message is available * Messages are immutable, so no synchronization required Provides message guarantees: * At-most-once delivery (as opposed to at-least-once, exactly-once) * Message ordering enforced between each actor pair (in absence of priority) High performance message queues (mailboxes) * No blocking on the "hot path" * Many different versions of queues
## Actor flexibility Usage is up to you and application needs: * Walk-on actors that do one thing and then die * Well-known actors that perform "starring" roles * Pool of actors sharing the same role * Typically used when resources associated with actors * Example: database connections Keep actor execution non-blocking where possible * Avoids overhead of context switches * Avoids unpredicatable latency from core startups

Scala example setup

Creates an actor of type Root and sends it a Start message

  • ! is an alias for the tell method to send a message with no response
  • ? is an alias for the ask method to send a message with Future response

timeout is an implicit argument to some of the ask calls on the next slide

  case class Start()
  case class Generate()
  case class Number(n: Int)
  case class Report()
  case class Sum(n: Int)
  
  implicit val timeout = Timeout(2000)
  
  val system = ActorSystem("actor-demo")
  system.actorOf(Props[Root]) ! Start
  Thread sleep 2000
  system shutdown

Scala example actors

  class Root extends Actor {
    def receive = {
      case Start => {
        val summer = context.actorOf(Props[Summer])
        context.actorOf(Props[Generator]).tell(Generate, summer)
        context.actorOf(Props[Generator]).tell(Generate, summer)
        val sum = summer ? Report
        println("Sum of values is " + Await.result(sum, Duration.Inf))
      }
    }
  }

  class Generator extends Actor {
    var lastOut = 0
    def receive = {
      case Generate => {
        lastOut = lastOut + 1
        sender ! Number(lastOut)
      }
    }
  }
  
  class Summer extends Actor {
    var total = 0
    def receive = {
      case Number(n) => total = total + n
      case Report => sender ! Sum(total)
    }
  }

Java example messages and main

    public static final Object START = new Object();
    public static final Object STOP = new Object();
    public static final Object GENERATE = new Object();
    public static final Object REPORT = new Object();

    public static class Number {
        public final int value;
        public Number(int val) {
            value = val;
        }
    }

    public static class Sum {
        public final int value;
        public Sum(int val) {
            value = val;
        }
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("actor-demo-java");
        ActorRef root = system.actorOf(Props.create(Root.class));
        root.tell(START, ActorRef.noSender());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) { /* ignored */
        }
        root.tell(STOP, ActorRef.noSender());
    }

Java example actors part 1

    public static class Root extends UntypedActor {
        private final ActorRef summer = getContext().actorOf(Props.create(Summer.class));
        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg == START) {
                getContext().actorOf(Props.create(Generator.class)).tell(GENERATE, summer);
                getContext().actorOf(Props.create(Generator.class)).tell(GENERATE, summer);
            } else if (msg == STOP) {
                Future<Object> future = ask(summer, REPORT, 1000);
                Object result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
                System.out.println("Sum of values is " + ((Sum)result).value);
            } else {
                unhandled(msg);
            }
        }
    }

    public static class Generator extends UntypedActor {
        private int lastOut;
        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg == GENERATE) {
                getSender().tell(new Number(++lastOut), self());
            } else {
                unhandled(msg);
            }
        }
    }

Java example actors part 2

    public static class Summer extends UntypedActor {
        private int total;

        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg instanceof Number) {
                total += ((Number) msg).value;
            } else if (msg == REPORT) {
                getSender().tell(new Sum(total), self());
            } else {
                unhandled(msg);
            }
        }

    }
## Actor supervision Actors supervise the actors they create When an actor fails (throws an exception) the supervisor can: * Resume the failed actor * Restart the failed actor * Escalate the problem to its supervisor Easy to fail up to a level where everything can be restarted "Let it Crash" philosophy simpler than error handling * Error handling requires you to know what problems can occur * Fault recovery handles unknown conditions as well as known
## Actor systems and dispatchers Akka actor system organizes a hierarchy of actors * Common configuration for actors in system * Entry point for creating or looking up actors Dispatchers actually run actors * Every dispatcher is also an `ExecutionContext` * Each actor system has a default dispatcher * Other dispatchers can be created as needed Actor system configuration (normally from file) links actors to dispatchers Changing the configuration lets you restructure your system * Statically, in configuration file * Dynamically, in code responding to events
## Routers Routers distribute messages to a pool of actors of the same type * Round-robin routes to each in turn * Random routes to any * Smallest mailbox to least backed-up * Broadcast to all * Scatter-gather first completed to all, response from first completed * Consistent hashing based on message Don't *need* to use routers - only when using a pool
## Location transparency Actors have names (set explicitly, or generated by default) Name is decoupled from how it is deployed * Deployed locally by default * Simple configuration change to make it remote * Can also adapt topology at runtime Lets you scale out to cluster or cloud without code changes
## Akka I/O Akka tries to avoid blocking wherever possible * Blocking ties up a thread waiting for something * Thread switches cost performance I/O a common cause of blocking (especially network I/O for connected applications) `akka.io` incorporates asynchronous networking into actor model: * UDP * TCP Originally part of Spray project, which still supplies other support Further extensions part of Reactive Streams initiative

Akka UDP server

UDP socket server example:

  • Bind to listen to a local socket
  • Echo any received datagrams back to sender
class UdpEchoService(address: InetSocketAddress) extends Actor {
  import context.system
  IO(Udp) ! Udp.Bind(self, address)

  def receive = {
    case Udp.Bound(_) =>
      context.become(ready(sender))
  }

  def ready(socket: ActorRef): Receive = {
    case Udp.Received(data, remote) =>
      socket ! Udp.Send(data, remote)
    case Udp.Unbind => socket ! Udp.Unbind
    case Udp.Unbound => context.stop(self)
  }
}

Akka UDP client

UDP socket client example:

  • "Bind" to a local socket
  • Send string messages as UDP to remote address
  • Print received UDP messages to console
class UdpEchoClient(local: InetSocketAddress, remote: InetSocketAddress) extends Actor {
  import context.system
  IO(Udp) ! Udp.Bind(self, local)

  def receive = {
    case Udp.Bound(_) =>
      context.become(ready(sender))
  }

  def ready(socket: ActorRef): Receive = {
    case msg: String =>
      socket ! Udp.Send(ByteString(msg, "UTF-8"), remote)
    case Udp.Received(data, _) =>
      println(s"Client received ${data.decodeString("UTF-8")}")
    case Udp.Unbind => socket ! Udp.Unbind
    case Udp.Unbound => context.stop(self)
  }
}
## Designing actor systems Forget object-oriented concepts! (at actor level) Identify potentially concurrent operations (processing steps or separate) * Different application events * Processing stages in a particular event Often convenient to supplement with special-purpose actors * Initialization actor to setup main actor * Pooled resource actors for limited resources * Delegation actors for handling interactions * Sequence of message exchanges can use sequence of actors * Facade-type actors for control
## Effective actor designs Actor task should be significant processing * Low overhead passing messages intra-JVM * But still much higher than simple method call * Much higher overhead passing messages cross-JVM * Need to have larger tasks to justify the overhead * Pluggable serialization formats can help Low creation / termination overhead for actors * Basic memory usage about 200 bytes per Akka actor * Little overhead to initialize * Often convenient to use one-offs
## Retrofitting Java applications Existing Java applications may not be designed for concurrency Look at application data flows in terms of actors Some potential issues: * Not supposed to pass mutable data in message (but not enforced) * The real issue is sharing mutable state across actors (don't do it!) * No problem as long as mutable data passed (and not held) Clean approach combines Scala and Java: * Use Scala for actual actor implementation and message handling * Call existing Java code directly from Scala to perform operations * Gives you cleaner Scala Akka code without wholesale change Example of WSDL analysis application
## Distributed Akka systems Akka supports automatic cluster membership and notification * "Self-organizing" cluster structure * Node can come and go as needed * Actors can register to receive cluster messages Configuration allows you control over distribution: * How actors can be distributed across nodes * Routers to determine how messages are passed to actors Doesn't avoid all distributed system issues: * Message communication overhead (processor, time, and bandwidth) * Communications failures and cluster partitioning But actor model makes many issues easier to handle
## References Scala language site - http://www.scala.org Typesafe site - http://www.typesafe.com Akka site - http://www.akka.io Effective Actors - http://www.infoq.com/presentations/akka-scala-actor-patterns Scaling out with Akka Actors - http://www.infoq.com/presentations/akka-scala-actors-distributed-system Scalable Scala site - http://www.scalablescala.com Sosnoski Software Associates Ltd. * New Zealand and worldwide - http://www.sosnoski.co.nz * United States - http://www.sosnoski.com Dennis Sosnoski - dms@sosnoski.com