CONCURRENT AND DISTRIBUTED SYSTEMS LECTURE 6 A TRANSCRIPT (C) 2023 DJ Greaves, University of Cambridge, Computer Laboratory. L6-A ACTIVE OBJECTS AND RELIABLE MESSAGE SYSTEMS Good morning, everybody. Last time we looked at deadlock and liveness, with avoidance detection and recovery considerations for deadlock. We talked about priority scheduling, priority inversion and priority inversion avoidance using priority inheritance. Concurrency turns out to be pretty hard. It would be nice if we could avoid some of these hassles of shared memory. It's shared mutable memory that really is the hassle. And it would be nice if we could programme at a higher level where we don't need to worry about the concurrency so much. Some may argue that functional programming is the way forward, because any part of the program can be evaluated as soon as its input data is ready. Nothing ever changes, and intrinsically there's a lot of parallelism. But the underlying hardware that we use is imperative hardware. Memory is a very important mutable resource and totally ignoring the capabilities of the memory to change its value is limiting. Nonetheless, the most effective systems, concurrent and parallel today, do tend to program in a far more declarative style than ever before. THIS TIME This time we'll look at concurrency without shared data. We'll look at higher level models that could be implemented as a library or hardwired into programming languages. We'll describe active objects as found in ADA and look at message passing as found in Occam and Erlang. We'll then move up the abstraction hierarchy and consider transactions, which consist of a sequence of atomic operations. These will need to meet the ACID properties. We'll be wanting to run multiple of these transactions at once, but we'll want the effect that they are isolated from each other so that it appears that they were all run one after the other. If the side effects of these transactions corresponds to some nominal execution order serially, then we'll say that the result was a SERIALIZABLE consequence of running those transactions. And we'll introduce the history graph as one means of determining whether an interleaving of transactions was serializable. The ACID properties of course, you may have come across if you have database experience which I hope you have. Although the first year course hardly spoke about data mutation. And overall, we're starting to move away from the relatively boring low-level concurrency mechanisms towards implementing some interesting distributed systems. The examples so far have all considered shared memory, mutable shared memory: threads can arbitrarily read and write variables in that shared memory space. Of course, between processes on a time-sharing computer and in distributed systems, we don't directly normally have shared memory, although on the same computer the operating system will give you a shared memory pool to share between processes. If you use the shared memory get system call or similar (eg mmap). Threads within a process will share a memory space, although each will have its own private stack. With shared mutable memory, then we have inevitable race conditions that we need to programme defensively against. The mainstream alternative paradigm is to completely ban shared data. Different threads can still be programmed imperatively, and can still have conventional mutable read and write access to their own local data, but only one thread has access to any particular mutable variable. It's easy enough to set up a concurrent system of that nature, but we will need communication means between these concurrent activities so that one can request an operation from another, and those request operations need to be thread-safe. So it's a change in fundamental design dimension: rather than using shared data, we will use explicit communication between concurrent tasks. And of course, on today's multiprocessor systems, that's what the cache system is already doing for us. It's sending messages between processor cores as one process or updates a variable that another processor wants to read. You could argue that you don't really need caches and cache consistency, especially if you these days have to put fence instructions in to ensure sequential consistency. But to do away with caches entirely would be a silly thing to do. They give huge performance benefits in terms of providing a system of memories at different points in the design hierarchy, which each trade off density with access time efficiently. It's the cache consistency, which is perhaps more arguable as to whether we really need it. CLICK Previously we looked at the monitor as a structure for concurrency control. As you'll recall, the monitor had entry points that threads could invoke. The active object is a related concept, except that we will keep the threads inside the object boundary as well. In fact, they'll just be one thread inside each object permanently in it. We'll still nominally have the concept of entry points that threads from other objects can invoke, but we'll think of it more of sending a message between objects rather than a thread executing remotely inside another object. The remote invocation may return only when the request of task is completed, which might be sensible if there's a result or return code. Or it can potentially be done in a posted or asynchronous way where it returns immediately and the receiver gets on with the work in parallel. All of the same complexities that we've had before still exist under the bonnet. There can be multiple incoming requests at once and we will have to provide mutual exclusion if there really is a parallel implementation inside the active object. And to maintain reliability we probably need back pressure to defer new actions arriving if we're currently very busy or all of our local storage for messages is full up. That might be the case, for instance, if the producer wants to insert in a buffer, but that buffer is full. We've seen that, but now the system's going to have its own hidden buffers as well, potentially, where the same thing can happen, but in a less apparent programme a generated way. As with the monitor, within the active object, we're going to have total mutual exclusion. So we don't need to program defensively inside the active object. CLICK Here's a concrete example in the ADA programming language. We have an infinite loop introduced by the loop keyword. This is what we call a process loop. It's eternal. It loops forever and it accepts work, and then it handles the work, and then it loops back to the top to receive the next work item. Then we have the select keyword, which you'll also find as a Unix system call primitive, which is where we can wait for some number of named events or conditions. (Not that this would be implemented to system calls. This will all be compiled into one user program and much of the message passing will be removed by the compiler for efficiency). Our SELECT statement has two accept clauses with signatures, the accept and insert of an item, or accept a consume of an item. And each except is qualified with an explicit condition in the WHEN guard predicate. The first one, the insert operation can only happen when the count is less than the buffer size. We see two operations on the count variable and two tests on it, but there's no chance of these racing adding to the intrinsic semantics of the active object (only one thread). If more than one of the messages insert and consume happen to be in the input queue, then it may be non-deterministic which one is processed next, although obviously the qualifying guard condition in the WHEN clause must hold. Not shown here, but there's an extension available in some systems which is called the chord. In a chordal system there will be an input queue for each possible message type that can be received. And the ACCEPT construct will be prepared to de-queue simultaneously from several of these input queues according (sic) to patterns present in its argument. So the system will match up patterns of arriving messages and process them together. Anyway, the exact syntax and semantics of various implementations doesn't matter. I'm just trying to give a general flavour of active object systems here. CLICK As I said, the interactions between our active object instances could be implemented with message passing. Passing real messages in FIFO buffers between the instances. But typically a good active optic system will optimise away quite a lot of that where it can by applying wait-for graphs and other optimizations to the source code. The remote procedure called Paradigm for distributed computing is, of course, also a message passing system. The message will include the name of the operation that needs to be invoked along with the arguments, and they must be routed to the appropriate destination, agent or machine. The receiving entity must have a thread waiting to receive these messages, and when it receives the message, it will invoke the code associated with that message. Return values can be sent back in another message. Java has a standard RPC package called RMI Remote Method invocation. In general, RPC systems can support various semantics about what to do in the case of unreliable message delivery. As I said at the outset, the difference between parallel programming and distributed systems tends to boil down largely to the likelihood of errors. The likelihood of machines being available or switched off, and the recovery techniques applied in the case of an error. There'll be much more about that with Doctor X in the second-half of this course. But of course message passing can also just run on a single machine, either between threads of a process or between processes. CLICK Generally, when we're talking about message passing, we're thinking roughly of sending something like an e-mail. The sender prepares the contents locally and then issues the SEND primitive. The system at some time later delivers a copy of that to the receiver and the receiver looks at the message. This is asynchronous. The sender doesn't need to wait for message delivery, but there may of course choose to wait for a reply. But if the receiver is busy and the available FIFOs are full, then we may get back pressure and the sender has to wait before it can send the message. The queues may not be FIFO in some systems. The receiving process is also asynchronous. The messages are first delivered into some mailbox and later retrieved, and the message is a copy of the data. There is no actual sharing. Although we normally think of these systems as asynchronous in a lossless implementation, the back pressure will tend to cause degree of synchronisation. But that's not the same as a synchronous message passing system, which we'll discuss on the next slide. CLICK Here's our finite state machine view of message passing. We have machine 1 communicating with machine 2. In these diagrams, we will have actions and guards on our arcs. The send operation is an action and the receiver can potentially be a guard, but sometimes the two get conflated, as we'll see. The send operation is denoted with the exclamation mark that I shall call pling. So C1!18 means send the number 18 down the channel named C1. As background, we have some number of machines and we have some named channels with a fixed wiring between them. The receive operation is denoted with a question mark: machine 2 is doing ?C1 as it moves from state one to two. And in synchronous message passing send and receive operators are completely synchronous. They happen at the same instance in the Esterel language. This is called an 'emit' and in certain process calculus disciplines as well that you'll study in other courses no doubt. In this example, machine 1 emits 18 down C1 as it moves from A to B and it emits 22 down C1 as it moves from B to C. Now C1 is also looked at by Machine 2 and machine 2 will only move from state one to two if it's receiving and the value 18 on channel C1. Likewise, it will only move from 2 to 3 if it's receiving 22 on channel C1. If it's in state two and it gets another value such as 18 on channel 1 again, then it will go off somewhere else not shown. And also shown is side effects to local mutable variables. For instance setting Q to three. Now the essential thing about synchronous machine composition in this case is that M1 has moved at exactly the same time as machine M2 moves from 1 to 2, or potentially moves from 2 to wherever it goes off to the right, because both those are guarded with the same value being rx'd on channel 1. So is the sending operation an action that should be performed as the arc is taken, or is it a guard that must hold before we can take that edge? Well, in synchronous message passing, it's both and indeed a channel doesn't really need to have a direction specified for it. But I mentioned this just for comparison and completeness with respect to asynchronous message passing, which is mostly what we'll be doing in everyday concurrent system design. Although there are certain system design languages which build on the synchronous message passing discipline. CLICK In asynchronous message passing, we have FIFO queues between the participating machines, although the FIFO discipline isn't strictly necessary, that's the default discipline. And now the system is asynchronous in that the time at which a message is put into a FIFO queue is not the same time as the message is taken out. It's taken out afterwards. Now any real FIFO has some limited finite capacity, and the question arises as what to do when the FIFO is full. Well, just in the same way that attempting to read from an empty FIFO should be a blocking operation, so should attempting to write to a full FIFO: that should be a blocking operation. If we don't write to something that's full, we can't get overrun. We have lossless overall behaviour. So quickly in this example, we see that unconditionally machine one moves from A to B while emitting 18 down channel one, but channel one must not be full, otherwise that's an intrinsic guard [condition] on that arc being taken. And ditto when it comes to write 22 afterwards. Now machine 2 in this variant is unconditionally moving from state one to two, and as it does so, it dequeues from channel one and stores the resulting value in local variable X. Again, that's not completely unconditional: it does rely on the queue being non-empty, otherwise that machine will be blocked, or at least it can't take that arc out of state one. The advantages of message passing are that the copy semantics can avoid a race condition, at least directly on the data. The reader and the writer have separate copies. The intrinsic FIFOs of asynchronous message passing provide a flexible means for variations of 1 sort or another. We can batch up messages and send them all at once. Similarly, we could batch up a set of replies. There's more freedom on scheduling: we can choose when to receive, whom to receive from and which messages to prioritise, although some of that will require more advanced than FIFO queuing at the receiver. And we can support broadcast and multicast. We can send messages to many recipients. We can also record and log the message stream for audits or debug purposes. CLICK So, not surprisingly, message passing has been used directly as the basis for some high-level language designs. A notable example is Occam, which was developed in Bristol by David May and Co from the Transputer project. And he's invented some similar ones in recent decades as well. Occam was directly based on CSP communicating sequential processes from Tony Hoare. It's a projection of that theoretical process algebra into a real world usable language. There are no shared variables and all inter process communication is along the named channels. We've already seen the query and pling operators being used for input and output. Although there's a slight variation in syntax here with the receive primitive in that we must always receive into a named variable. What we haven't seen quite before is the ALT, SEQ and PAR constructors. These use the block structure. In an everyday imperative language such as C or Java, we have the block construct for serial execution of a number of imperative commands. In C, we just use braces and we terminate each command with a semicolon. In ML, the semicolons used in the same way for side-effecting commands. Well, in Occam, that is called SEQ and we see in this example two SEQ blocks where the statements that follow them are put in sequence. Occam, like Fortran, Python and FShapr, is a white-space-sensitive language, so the scope of the sequence blocks is explicit with the. But as well as see, we have the PAR construct which has exactly the same syntax, but rather than executing its arguments in sequence, they're all put in parallel, so that's a bit like our THREAD CREATE primitive. And at the end of the PAR block, all of the threads join before we proceed onwards. There isn't one in this small fragment. The third construct for grouping imperative statements is the ALT construct. This indicates non-deterministic choice: any, but just one of, the embodied statements will be executed. And all of these combinators can be nested arbitrarily within each other just like any other statement. The branches of the alternate construct have explicit guards as to when they might be able to run: The first branch requires that count one is less than 100, and that we can receive something from channel one and store it in the variable called data. The first SEQ block will execute each time that holds, reading a new value in from C1, incrementing the counter variable and writing out the data to another channel called merged. And the other branch of the alternate block does the same, but for Channel 2. And the overall effect of this block of code is to merge the data from channels 1 and 2, write it out on the merge channel, but also keeping a count of how many data were read from each input channel. CLICK Another language that takes a fairly novel approach is Erlang. The Erlang is the unit of telephony. It's one person speaking for one hour. And the Erlang language was developed for control of telephone exchanges. It's a functional language and as we know, functional languages are in principle more amenable to parallel processing and distributed computing. But the principal motivation for this language was live update of a running system. We can't afford to reset a complete telephone exchange every time we want to add a new telephony feature to it. Being functional, we cannot change the values of any variables. We can simply create fresh ones and the old ones will eventually go out of scope. This all should sound familiar to those of us who use languages like ML or F# on a regular basis. Erlang is said to use the actor paradigm, which means that we can spawn lightweight processes, each of which will be a function with a closure containing its free variables. The language system will work out which ones actually need their own thread and which can just be inlined and more efficiently handled in other ways. But there is a a mutable aspect to the runtime system model to the virtual machine, which is the concept of a local mail inbox for each process. You might think that we need to change something in order to upgrade the program so that it has new behaviour. But if our functions are first class items then they can be received from a mailbox and run as well. So it's just a matter of an external agent sending new software in. And software that stops executing is just garbage collected, so it can be quite elegant. CLICK Here's a fragment of a programme, Erlang. The producer consumer example. None of the details of the language, of course, are examinable. I'm just giving you a general idea of this sort of thing. We've defined a module called producer Consumer and we're going to export its start method so that others can communicate with this. I think the slash 0 means that it doesn't take any arguments. It's a unit function. The start method, if we want to call it, just executes a spawn statement which takes a function definition. This will run that function while not blocking the caller thread: the caller thread is going to return. So once we've sent the start message to this module, then it will continue computing. Running the code inside it. That's right. Looking now at the definition of loop, it consists of a receive statement. It can pattern match three types of message that it might receive in its mailbox. Two of the possible messages are produced and consume, but it will also receive a stop message. We notice that the body of the stop handler doesn't call loop again, so that will end the infinite tail recursion iteration. So it's not infinite after all. If we receive the PRODUCE message with an argument item, then we will call the subroutine ENTER(ITEM) which isn't defined here, but will be in scope somehow. If we receive the CONSUME message, then we'll also receive as an argument some channel identifier we're going to invoke the pling operator on that channel. As in Occam, pling is the "send a message on a channel" operator and the value that we're going to send on that channel is whatever is returned from the remove item subroutine call. The channel identifier PID is actually a process identifier. Them that we're sending messages to the inbox of a particular instance. If we want to do I/O to control the telephone system in some way, set up a call or whatever, then we'll be able to send messages to processes that are well known to exist, which set up and tear down calls, and so on. Or add something to somebody's bill which is possibly of greater interest. CLICK What have we learned about message passing? It's a way of sidestepping some of the issues associated with shared memory concurrency. If there's no direct access to the data, then there can't be any localised race conditions. Threads receive a message and dispatch based on it and handle it accordingly. We must to choose the size of our process. We need sufficiently many of them to get a good parallel speed up. But we don't want so many that we lose our familiar programming paradigms and get overwhelmed by the quirkiness of the operating environment. If we have a lot of very small processes, it can all get a bit cumbersome and awkward. A bad design will model each mutable variable of a shared memory system as a process and export globally-called read and write methods: we won't have achieved anything useful. Inside the process, we can program in whatever programming language paradigm we like, functional, imperative or even logical prolog-like ways are possible. RMI in Java or similar in other languages is a pretty natural programming paradigm, and when we come to look at distributed computing, we will want an easy to use messaging system. And so there are rich libraries available. eg, programming in the cloud, we can expect complete racks to get turned on by management systems as they notice our load is increasing and then we'll have to interface our programme with some sort of load balancing agent that will distribute work onto those additionally powered up racks, that sort of thing. For those interested, you might like to read up about the kilim system based on a kilim rug which has thousands and thousands of threads they all communicate with each other very efficiently, owing to the design of the system. Overall, we have eliminated some of the issues associated with shared memory, but we will still run into concurrent programming bugs. These systems are just as prone to deadlock and livelock etc.. In Occam or whatever, it's pretty easy to write a program that needs to read a message from channel A before channel B, whereas the sender is going to send it on B before A and hence deadlock. CLICK ... second half of lecture, introduction to transactions, is in another file ...