In this part we’ll cover the classical problems that occur when dealing with synchronization.
But within this paradigm, we don’t encounter the same problems as when using semaphores. Mutual exclusion is not an issue, since we never share any resource, the big problem today will be synchronization and coordination.
We’ll see that many problems have the solution of a server-client architecture.
Barriers
Let’s quickly recap what our barriers should be able to do:
-module(barrier).
% Initialize barrier for ‘Expected’ processesinit(Expected) -> % TODO
% Block at ‘Barrier’ until all processes have reached itwait(Barrier) -> % TODO
Since we are talking about processes, it’s natural to have a process for the barrier itself. This process keeps track of what other processes that has arrived at the barrier point.
When a new process arrives at the barrier, it sends a arrived
message to the barrier process.
When the list of all arrived processes is complete, the barrier process sends a continue
message to everyone.
After notifying all other processes, the barrier processes itself, goes back to its initial state.
So we need to implement the barrier’s event loop as a server function:
barrier(Arrived, Expected, PidRefs)
Let’s implement this:
% event loop of barrier for ‘Expected’ processes% Arrived: number of processes arrived so far% PidRefs: list of {Pid, Ref} of processes arrived so far
% All processes arrived notify all waiting processes:barrier(Arrived, Expected, PidRefs) when Arrived =:= Expected -> [To ! {continue, Ref} || {To, Ref} <- PidRefs],
% Reset barrier barrier(0, Expected, []);
% Still waiting for some processesbarrier(Arrived, Expected, PidRefs) -> receive {arrived, From, Ref} -> % one more arrived: add {From, Ref} to PidRefs list: barrier(Arrived + 1, Expected, [{From, Ref}|PidRefs])end.
Now for the wait
function:
% Block at ‘Barrier’ until all processes have reached itwait(Barrier) -> % Notify barrier of arrival Ref = make _ ref(),
% Wait for signal to continue Barrier ! {arrived, self(), Ref},
receive {continue, Ref} -> through end.
And finally, the init
function, simple:
% Initialize barrier for ‘Expected’ processesinit(Expected) -> spawn(fun () -> barrier(0, Expected, []) end).
Resource allocator
Let’s recap the problem, an *allocator grants users, exclusive access to a number of resources.
Users asynchronously request and release resources back. The allocator ensures exclusive access to a single user, and keeps tracks of the number of available resources.
So our module would look like:
-module(allocator).
% Register allocator with list of Resourcesinit(Resources) -> % TODO
% Get N resources from allocatorrequest(N) -> % TODO
% Release Resources to allocatorrelease(Resources) -> % TODO
The user would perform something like:
user() -> % How many resources are needed? N = howMany(),
% Get resources from allocator Resources = allocator:request(N),
% Do something with resources use(Resources),
% Release resources allocator:release(Resources),
user().
Again, in the message-passing world, using a server-client architecture often solves the problem.
We dedicate a process to the allocator, which keeps track of list of resources.
When a process requests for some resources that are available, the allocator sends a granted
message.
Then accordingly removes those resources from the list.
When a process releases some resources, the allocator sends a released
, and then adds the resources to the list.
If requests exceed the availability, the fall into our built-in mailbox. The allocator process will resolve this as soon as they pattern-match again (resources available again).
allocator(Resources) -> % Count how many resources are available Available = length(Resources), receive
% Serve requests if enough resources are available {request, From, Ref, N} when N =< Available ->
% Granted ++ Remaining =:= Resources % Length(Granted) =:= N {Granted, Remaining} = lists:split(N, Resources),
% Send resources to requesting process From ! {granted, Ref, Granted},
% Continue with Remaining resources allocator(Remaining);
% Serve releases {releases, From, Ref, Released} -> % Notify releasing process From ! {released, Ref},
% Continue with previous and released resources allocator(Resources ++ Released)
The request function:
% Get N resources from allocator, gets blocked if not availablerequest(N) -> Ref = make_ref(), allocator ! {request, self(), Ref, N}, recieve {granted, Ref, Granted} -> Granted end.
% Release Resources to allocatorrelease(Resources) -> Ref = make_ref(), allocator ! {release, self(), Ref, Resources}, recieve {released, Ref} -> released end.
Producer-consumer
Recap; Implement a buffer
such that:
- Producers and consumers access the buffer atomically
- Consumers block when the buffer is empty
- Producers block when the buffer is full (bounded buffer variant)
-module(buffer).
% Initialize buffer with size Boundinit_buffer(Bound) -> % TODO
% Put Item in Buffer; Block if fullput(Buffer, Item) -> % TODO
% Get Item from Buffer; Block if emptyget(Buffer) -> % TODO
The producer and buffer:
producer(Buffer) -> Item = produce(), buffer:put(Buffer, Item), producer(Buffer).
consumer(Buffer) -> Item = buffer:get(Buffer), % Do something with Item
consume(Item), consumer(Buffer).
At this point you pretty much can see the pattern here that arises:
buffer(Content, Count, Bound) -> receive
% Serve gets when buffer not empty {get, From, Ref} when Count > 0 -> % Match first item [First | Rest] = Content,
% Send it out From ! {item, Ref, First},
% Remove it from buffer buffer(Rest, Count-1, Bound);
% Serve puts when buffer not full {put, From, Ref, Item} when Count < Bound ->
% Send ack From ! {done, Ref},
% Add item to end buffer(Content ++ [Item], Count + 1, Bound)end.
In this solution, both a bounded and unbounded will work - due to Erlang’s order between numbers and atoms!
Now for get and put:
% Get item from ‘Buffer’; block if emptyget(Buffer) -> Ref = make_ref(), Buffer ! {get, self(), Ref}, receive {item, Ref, Item} -> Item end.
% Put ‘Item’ in ‘Buffer’; block if fullput(Buffer, Item) -> Ref = make_ref(), Buffer ! {put, self(), Ref, Item}, receive {done, Ref} -> done end.
Readers-writers
-module(board).
% Register board with Nameinit(Name) -> % TODO
% Get read access to Boardbegin_read(Board) -> % TODO
% Release read access to Boardend_read(Board) -> % TODO
% Get write access to Boardbegin_write(Board) -> % TODO
% Release write access to Boardend_write(Board) -> % TODO
Our first naive server function would be:
% ‘Readers’ active readers and ‘Writers’ active writersboard_row(Readers, Writers) ->receive {begin_read, From, Ref} when Writers =:= 0 -> From ! {ok_ to_ read, Ref}, board_row(Readers+1, Writers);
{begin_write, From, Ref} when (Writers =:= 0) and (Readers =:= 0) -> From ! {ok_ to_ write, Ref}, board_row(Readers, Writers+1);
{end_read, From, Ref} -> From ! {ok, Ref}, board_row(Readers-1, Writers);
{end_write, From, Ref} -> From ! {ok, Ref}, board_row(Readers, Writers-1)end.
Just as our naive solution when using semaphores, this doesn’t prevent starvation due to this version prioritizes readers.
The solution based on two monitors is a approach here, but it’s quite cumbersome for a message-passing program.
We instead implement two macro states:
- Empty - no readers or writers
- Readers - Readers but no writers
The initial board is in empty state, then:
- When board is in state
emtpy
:- Read requests - served immediately, then switches to
readers
state. - Write requests - served immediately and synchronously, wait until writing ends, then go into
empty
state.
- Read requests - served immediately, then switches to
- When board is in state
readers
:- Read requests - served immediately and stays in
readers
. - Write requests - served as soon as possible, board waits until all reading ends, then request is served. Back to
empty
state.
- Read requests - served immediately and stays in
For this we’ll need two server functions, empty_board
and readers_board
:
% Board with no readers and no writersempty_board() -> receive
% Serve read request {begin_read, From, Ref} ->
% Notify reader From ! {ok_to_read, Ref},
% Board has one reader readers_board(1);
% Serve write request synchronously {begin_write, From, Ref} -> % Notify writer From ! {ok_to_write, Ref},
% Wait for writer to finish Receive {end_write, _From, _Ref} -> % Board is empty again empty_board() endend.
% Board with no readers (and no writers)readers_ board(0) -> empty_ board();
% Board with ‘Readers’ active readers% (and no writers)readers_board(Readers) -> receive
% Serve write request {begin_write, From, Ref} -> % Wait until all ‘Readers’ have finished [receive {end_read, _From, _Ref} -> end_read end || _ <- lists:seq(1, Readers)],
% Notify writer From ! {ok_to_write, Ref},
% Wait for writer to finish receive {end_write, _From, _Ref} -> empty_board() end;
% Serve read request {begin_read, From, Ref} -> % Notify reader From ! {ok _ to _ read, Ref},
% Board has one more reader readers _ board(Readers+1);
% Serve end read {end_read, From, Ref} ->
% Board has one less reader readers_board(Readers-1)end.
Dining Philosophers
-module(philosophers).
% Set up table of N philosophersinit(N) -> % TODO
% Philosopher picks up Forkget_fork(Fork) -> % TODO
% Philosopher releases Forkput_fork(Fork) -> % TODO
We could explore the solutions we did based on locking and breaking symmetry - but there is a solution which better fits into the message-passing paradigm
We have a waiter (process) who supervises access to the table. So each philosopher asks for permission to sit at the table before picking up both forks.
So, as long as the waiter allows strictly fewer philosopher than the total number of forks to sit around the table, deadlock and starvation are avoided.
Waiter interface:
% Ask Waiter to be seated; may waitsit(Waiter) -> % TODO
% Ssk Waiter to leaveleave(Waiter) -> % TODO
Our server function:
waiter(Eating, Seats) -> receive
% Serve as long as seats are available {sit, From, Ref} when Eating < Seats -> From ! {ok_to_sit, Ref},
% One more eating waiter(Eating+1, Seats);
% Can leave at any time {leave, From, Ref} -> From ! {ok_to_leave, Ref},
% One less eating waiter(Eating-1, Seats)end.
And sit
and leave
:
% ask Waiter to be seated; may waitsit(Waiter) -> Ref = make _ ref(), Waiter ! {sit, self(), Ref}, receive {ok_to_sit, Ref} -> ok end.
% ask Waiter to leaveleave(Waiter) -> Ref = make _ ref(), Waiter ! {leave, self(), Ref}, receive {ok_to_leave, Ref} -> ok end.
Now, each fork
is also a process, which keeps track of whether the for is free or not.
Server function:
% Fork not held by anyonefork() -> receive {get, From, Ref} -> From ! {ack, Ref},
% Fork held fork(From)end.
% a fork held by Ownerfork(Owner) -> receive {put, Owner, _ Ref} -> % Fork not held fork()end.
and the get
and put
for the forks:
% Pick up Fork; block until availableget_fork(Fork) -> Ref = make _ ref(), Fork ! {get, self(), Ref}, receive {ack, Ref} -> ack end.
% Put down Forkput_fork(Fork) -> Ref = make _ ref(), Fork ! {put, self(), Ref}.
And finally, the init
function for the whole problem:
% Set up table of ‘N’ philosophersinit(N) -> % Spawn waiter process Waiter = spawn(fun () -> waiter(0, N-1) end),
% [1, 2, ..., N] Ids = lists:seq(1,N),
% Spawn fork processes Forks = [spawn(fun fork/0) || _ <- Ids],
% Spawn philosopher processes [spawn(fun () -> Left = lists:nth(I, Forks),
% 1-based indexes Right = lists:nth(1+(I rem N), Forks), philosopher(#forks{left=Left, right=Right}, Waiter) end) || I <- Ids].