## About me
Java and Scala developer, writer, consultant and training instructor
Open source developer
* Apache CXF web services for Java
* JiBX XML data binding for Java
* Other older projects
Author (mostly IBM developerWorks):
* [JVM Concurrency series](http://www.ibm.com/developerworks/views/java/libraryview.jsp?search_by=jvm+concurrency:)
Most recent experience in enterprise systems, especially data integration and web services
Performance a special area of expertise
Consulting and training services to clients worldwide
## Outline
* Concurrency basics
* Java 8 concurrency features
* Scala concurrency features
* Actor model concurrency
* Introduction to Akka
- ActorSystem and Actors
- Messages and message passing
- Actors roles
- Scala and Java usage
- Actor-based I/O
* Retrofitting Java applications to Akka
* Scaling Akka systems
* Effective actor designs
## What is concurrency
Multiple computations operating simultaneously and potentially interacting
1. Time-shared computations on a single processor
2. Computations running simultaneously on multiple cores within a processor
3. Computations running simultaneously on multiple physically-separated processors
Long history of analysis of concurrency models and approaches
## Concurrency for multiple CPUs
Until last decade, processor performance grew by increasing clock speed
* Exponential performance growth over decades
* Based on ever smaller and faster switches
* But all exponential growths eventually come to an end!
Now performance grows mainly by adding cores
* Four or more cores now common on both computers and devices
* Likely to be sixteen in another 10 years
Performant applications require concurrency to use multiple cores
## Concurrency across systems
Enterprise applications need to be scalable
* Single system speed not enough for internet-level loads
* Spreading load to multiple systems allows scaling
* Cloud computing just a special case
Course-grained scaling distributes users across multiple systems
* Helps limit response time as number of users increases
* Doesn't help with response time for more-complex requests
Finer-grained scaling distributes tasks across multiple systems
* Add systems as needed to handle increased load
* Distribute tasks based on interactions
* Allows faster processing of complex requests (sometimes)
## Concurrency limitations
Performance gain from concurrency limited
For doing more of a task:
* Limited by amount of communications and coordination required
* If completely independent, unlimited scalability (e.g. static website)
For doing a task faster:
* Limited by proportion of code that has to be sequential (Amdahl's law)
* Limited by external delays
## Blocking vs. non-blocking
Concurrency requires handling asynchronous events (completions, in particular)
Two basic approaches to handling asynchronous events:
* Have thread wait for event (blocking)
* Have event provider signal arrival (non-blocking)
* Set a flag the main thread can check
* Execute application code supplied by user (callback)
* Add event to queue or such
Blocking limits performance and scalability:
* Ties up threads (limited resource)
* Adds overhead from context switches
* Adds unpredictable latency from context switches
Non-blocking best for scalable applications
## Java 8 concurrency
Big gains for concurrent programming in Java 8:
* CompletableFuture
* One-time-only completion with result or throwable
* Can block, poll, use callback, combine/compose futures
* Streams
* Parallel execution of computations
* Controlled merging of results from threads
* Improved ForkJoinPool
CompletableFuture blocking
// task definitions
private static CompletableFuture<Integer> task1(int input) {
return TimedEventSupport.delayedSuccess(1, input + 1);
}
private static CompletableFuture<Integer> task2(int input) {
return TimedEventSupport.delayedSuccess(2, input + 2);
}
private static CompletableFuture<Integer> task3(int input) {
return TimedEventSupport.delayedSuccess(3, input + 3);
}
private static CompletableFuture<Integer> task4(int input) {
return TimedEventSupport.delayedSuccess(1, input + 4);
}
/**
* Run events with blocking waits.
*
* @return future for result (already complete)
*/
private static CompletableFuture<Integer> runBlocking() {
Integer i1 = task1(1).join();
CompletableFuture<Integer> future2 = task2(i1);
CompletableFuture<Integer> future3 = task3(i1);
Integer result = task4(future2.join() + future3.join()).join();
return CompletableFuture.completedFuture(result);
}
CompletableFuture non-blocking
/** Run events with composition. */
private static CompletableFuture<Integer> runNonblocking() {
return task1(1).thenCompose(i1 -> ((CompletableFuture<Integer>)task2(i1)
.thenCombine(task3(i1), (i2,i3) -> i2+i3)))
.thenCompose(i4 -> task4(i4));
}
/** Run task2 and task3 and combine the results. This is just a refactoring of {@link
* #runNonblocking()} to make it easier to understand the code. */
private static CompletableFuture<Integer> runTask2and3(Integer i1) {
CompletableFuture<Integer> task2 = task2(i1);
CompletableFuture<Integer> task3 = task3(i1);
BiFunction<Integer, Integer, Integer> sum = (a, b) -> a + b;
return task2.thenCombine(task3, sum);
}
/** Run events with composition. This is just a refactoring of {@link #runNonblocking()} to make
* it easier to understand the code. */
private static CompletableFuture<Integer> runNonblockingAlt() {
CompletableFuture<Integer> task1 = task1(1);
CompletableFuture<Integer> comp123 = task1.thenCompose(EventComposition::runTask2and3);
return comp123.thenCompose(EventComposition::task4);
}
Streams
Automatically partitition work to be done across threads
private final List<ChunkDistanceChecker> chunkCheckers;
/**
* Find best match to word executing chunk checkers in parallel. Each chunk checker finds the best
* match for the set of words it knows. The individual results are combined here to find the
* overall best result.
*/
public DistancePair bestMatch(String target) {
return chunkCheckers.parallelStream()
.map(checker -> checker.bestDistance(target))
.reduce(DistancePair.worstMatch(), (a, b) -> DistancePair.best(a, b));
}
Great for the special case of parallel operations
## Scala concurrency
Similar features to Java 8 for some time:
* Promise / Future
* Promise is one-time-only completion with result or throwable
* Future comes from a promise
* Can block, poll, use callback, use standard collections methods
* Streams
* Parallel execution of computations
* Use standard collections methods to merge results
* Improved ForkJoinPool
But also async
macro
Promise / Future blocking
// task definitions
def task1(input: Int) = TimedEvent.delayedSuccess(1, input + 1)
def task2(input: Int) = TimedEvent.delayedSuccess(2, input + 2)
def task3(input: Int) = TimedEvent.delayedSuccess(3, input + 3)
def task4(input: Int) = TimedEvent.delayedSuccess(1, input + 4)
/** Run tasks with blocking waits. */
def runBlocking() = {
val v1 = Await.result(task1(1), Duration.Inf)
val future2 = task2(v1)
val future3 = task3(v1)
val v2 = Await.result(future2, Duration.Inf)
val v3 = Await.result(future3, Duration.Inf)
val v4 = Await.result(task4(v2 + v3), Duration.Inf)
val result = Promise[Int]
result.success(v4)
result.future
}
Promise / Future non-blocking
Can use standard collections (monadic) operators such as forEach
/** Run tasks with flatMap. */
def runFlatMap() = {
task1(1) flatMap {v1 =>
val a = task2(v1)
val b = task3(v1)
a flatMap { v2 =>
b flatMap { v3 => task4(v2 + v3) }}
}
}
for composition for more complex cases
async
macro
Scala macros perform compile-time transformations of code
async
macro converts what looks like sequential code to asynchronous:
/** Run tasks with async macro. */
def runAsync(): Future[Int] = {
async {
val v1 = await(task1(1))
val a = task2(v1)
val b = task3(v1)
await(task4(await(a) + await(b)))
}
}
Similar to C# and F# feature, but cleaner and more flexible
Some limitations, but a very powerful abstraction
async
details
async
has several effects:
- Declares block to be asynchronous
- Executes block asynchronously (by default)
- Returns result as a future
- Allows non-blocking
await
to be used inside block (suspends until future completed)
Implemented with a generated state machine class
- Attaches to onComplete handling for each await in turn
- Completion passes on failure or moves to next await until done
Limitations a result of implementation:
- Can't use
await
inside nested closure, class definition
- Can't use
await
inside nested try
/ catch
block
- Debugging can be confusing (as with other callback approaches)
## Managed concurrency issues
Great for simple situations
Complex scenarios run into problems:
* Blocking approaches can have deadlocks, starvation, etc.
* Non-blocking approaches have their own issues
* Code gets cumbersome with many different events involved
* Building large chains of futures can add overhead
* Most approaches basically a variation on callbacks
* Callback hell common when debugging
Is it possible to abstract management out of the code?
## Actor model
Long-established approach to analyzing and modeling concurrency
Actors are light-weight entities with state
* May have multiple (many) instances of a particular actor type
* Each actor a processing step
All interactions in terms of messages:
* Do something in response to a received message
* Initiate something external with a sent message (to another actor)
Large body of practical experience from Erlang
## Akka concurrency
Akka a Scala library, for both Java and Scala applications
Designed to support writing correct concurrent, fault-tolerant and scalable applications
* Actors
* Fault tolerance
* Location transparency
Supports scaling both up and out
* Scaling up to use added resources effectively within a system
* Scaling out to effectively use added systems in a cloud/cluster
Applications needing high throughput and low latency are ideal candidates for Akka
## Akka actors
Abstracts out the details of threading and synchronization
* Thread needed to run an actor when an in message is available
* Messages are immutable, so no synchronization required
Provides message guarantees:
* At-most-once delivery (as opposed to at-least-once, exactly-once)
* Message ordering enforced between each actor pair (in absence of priority)
High performance message queues (mailboxes)
* No blocking on the "hot path"
* Many different versions of queues
## Actor flexibility
Usage is up to you and application needs:
* Walk-on actors that do one thing and then die
* Well-known actors that perform "starring" roles
* Pool of actors sharing the same role
* Typically used when resources associated with actors
* Example: database connections
Keep actor execution non-blocking where possible
* Avoids overhead of context switches
* Avoids unpredicatable latency from core startups
Scala example setup
Creates an actor of type Root and sends it a Start message
- ! is an alias for the tell method to send a message with no response
- ? is an alias for the ask method to send a message with
Future
response
timeout is an implicit argument to some of the ask calls on the next slide
case class Start()
case class Generate()
case class Number(n: Int)
case class Report()
case class Sum(n: Int)
implicit val timeout = Timeout(2000)
val system = ActorSystem("actor-demo")
system.actorOf(Props[Root]) ! Start
Thread sleep 2000
system shutdown
Scala example actors
class Root extends Actor {
def receive = {
case Start => {
val summer = context.actorOf(Props[Summer])
context.actorOf(Props[Generator]).tell(Generate, summer)
context.actorOf(Props[Generator]).tell(Generate, summer)
val sum = summer ? Report
println("Sum of values is " + Await.result(sum, Duration.Inf))
}
}
}
class Generator extends Actor {
var lastOut = 0
def receive = {
case Generate => {
lastOut = lastOut + 1
sender ! Number(lastOut)
}
}
}
class Summer extends Actor {
var total = 0
def receive = {
case Number(n) => total = total + n
case Report => sender ! Sum(total)
}
}
Java example messages and main
public static final Object START = new Object();
public static final Object STOP = new Object();
public static final Object GENERATE = new Object();
public static final Object REPORT = new Object();
public static class Number {
public final int value;
public Number(int val) {
value = val;
}
}
public static class Sum {
public final int value;
public Sum(int val) {
value = val;
}
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("actor-demo-java");
ActorRef root = system.actorOf(Props.create(Root.class));
root.tell(START, ActorRef.noSender());
try {
Thread.sleep(1000);
} catch (InterruptedException e) { /* ignored */
}
root.tell(STOP, ActorRef.noSender());
}
Java example actors part 1
public static class Root extends UntypedActor {
private final ActorRef summer = getContext().actorOf(Props.create(Summer.class));
@Override
public void onReceive(Object msg) throws Exception {
if (msg == START) {
getContext().actorOf(Props.create(Generator.class)).tell(GENERATE, summer);
getContext().actorOf(Props.create(Generator.class)).tell(GENERATE, summer);
} else if (msg == STOP) {
Future<Object> future = ask(summer, REPORT, 1000);
Object result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
System.out.println("Sum of values is " + ((Sum)result).value);
} else {
unhandled(msg);
}
}
}
public static class Generator extends UntypedActor {
private int lastOut;
@Override
public void onReceive(Object msg) throws Exception {
if (msg == GENERATE) {
getSender().tell(new Number(++lastOut), self());
} else {
unhandled(msg);
}
}
}
Java example actors part 2
public static class Summer extends UntypedActor {
private int total;
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof Number) {
total += ((Number) msg).value;
} else if (msg == REPORT) {
getSender().tell(new Sum(total), self());
} else {
unhandled(msg);
}
}
}
## Actor supervision
Actors supervise the actors they create
When an actor fails (throws an exception) the supervisor can:
* Resume the failed actor
* Restart the failed actor
* Escalate the problem to its supervisor
Easy to fail up to a level where everything can be restarted
"Let it Crash" philosophy simpler than error handling
* Error handling requires you to know what problems can occur
* Fault recovery handles unknown conditions as well as known
## Actor systems and dispatchers
Akka actor system organizes a hierarchy of actors
* Common configuration for actors in system
* Entry point for creating or looking up actors
Dispatchers actually run actors
* Every dispatcher is also an `ExecutionContext`
* Each actor system has a default dispatcher
* Other dispatchers can be created as needed
Actor system configuration (normally from file) links actors to dispatchers
Changing the configuration lets you restructure your system
* Statically, in configuration file
* Dynamically, in code responding to events
## Routers
Routers distribute messages to a pool of actors of the same type
* Round-robin routes to each in turn
* Random routes to any
* Smallest mailbox to least backed-up
* Broadcast to all
* Scatter-gather first completed to all, response from first completed
* Consistent hashing based on message
Don't *need* to use routers - only when using a pool
## Location transparency
Actors have names (set explicitly, or generated by default)
Name is decoupled from how it is deployed
* Deployed locally by default
* Simple configuration change to make it remote
* Can also adapt topology at runtime
Lets you scale out to cluster or cloud without code changes
## Akka I/O
Akka tries to avoid blocking wherever possible
* Blocking ties up a thread waiting for something
* Thread switches cost performance
I/O a common cause of blocking (especially network I/O for connected applications)
`akka.io` incorporates asynchronous networking into actor model:
* UDP
* TCP
Originally part of Spray project, which still supplies other support
Further extensions part of Reactive Streams initiative
Akka UDP server
UDP socket server example:
- Bind to listen to a local socket
- Echo any received datagrams back to sender
class UdpEchoService(address: InetSocketAddress) extends Actor {
import context.system
IO(Udp) ! Udp.Bind(self, address)
def receive = {
case Udp.Bound(_) =>
context.become(ready(sender))
}
def ready(socket: ActorRef): Receive = {
case Udp.Received(data, remote) =>
socket ! Udp.Send(data, remote)
case Udp.Unbind => socket ! Udp.Unbind
case Udp.Unbound => context.stop(self)
}
}
Akka UDP client
UDP socket client example:
- "Bind" to a local socket
- Send string messages as UDP to remote address
- Print received UDP messages to console
class UdpEchoClient(local: InetSocketAddress, remote: InetSocketAddress) extends Actor {
import context.system
IO(Udp) ! Udp.Bind(self, local)
def receive = {
case Udp.Bound(_) =>
context.become(ready(sender))
}
def ready(socket: ActorRef): Receive = {
case msg: String =>
socket ! Udp.Send(ByteString(msg, "UTF-8"), remote)
case Udp.Received(data, _) =>
println(s"Client received ${data.decodeString("UTF-8")}")
case Udp.Unbind => socket ! Udp.Unbind
case Udp.Unbound => context.stop(self)
}
}
## Designing actor systems
Forget object-oriented concepts! (at actor level)
Identify potentially concurrent operations (processing steps or separate)
* Different application events
* Processing stages in a particular event
Often convenient to supplement with special-purpose actors
* Initialization actor to setup main actor
* Pooled resource actors for limited resources
* Delegation actors for handling interactions
* Sequence of message exchanges can use sequence of actors
* Facade-type actors for control
## Effective actor designs
Actor task should be significant processing
* Low overhead passing messages intra-JVM
* But still much higher than simple method call
* Much higher overhead passing messages cross-JVM
* Need to have larger tasks to justify the overhead
* Pluggable serialization formats can help
Low creation / termination overhead for actors
* Basic memory usage about 200 bytes per Akka actor
* Little overhead to initialize
* Often convenient to use one-offs
## Retrofitting Java applications
Existing Java applications may not be designed for concurrency
Look at application data flows in terms of actors
Some potential issues:
* Not supposed to pass mutable data in message (but not enforced)
* The real issue is sharing mutable state across actors (don't do it!)
* No problem as long as mutable data passed (and not held)
Clean approach combines Scala and Java:
* Use Scala for actual actor implementation and message handling
* Call existing Java code directly from Scala to perform operations
* Gives you cleaner Scala Akka code without wholesale change
Example of WSDL analysis application
## Distributed Akka systems
Akka supports automatic cluster membership and notification
* "Self-organizing" cluster structure
* Node can come and go as needed
* Actors can register to receive cluster messages
Configuration allows you control over distribution:
* How actors can be distributed across nodes
* Routers to determine how messages are passed to actors
Doesn't avoid all distributed system issues:
* Message communication overhead (processor, time, and bandwidth)
* Communications failures and cluster partitioning
But actor model makes many issues easier to handle
## References
Scala language site - http://www.scala.org
Typesafe site - http://www.typesafe.com
Akka site - http://www.akka.io
Effective Actors - http://www.infoq.com/presentations/akka-scala-actor-patterns
Scaling out with Akka Actors - http://www.infoq.com/presentations/akka-scala-actors-distributed-system
Scalable Scala site - http://www.scalablescala.com
Sosnoski Software Associates Ltd.
* New Zealand and worldwide - http://www.sosnoski.co.nz
* United States - http://www.sosnoski.com
Dennis Sosnoski - dms@sosnoski.com