In this part we’ll cover the classical problems that occur when dealing with synchronization.
But within this paradigm, we don’t encounter the same problems as when using semaphores. Mutual exclusion is not an issue, since we never share any resource, the big problem today will be synchronization and coordination.
We’ll see that many problems have the solution of a server-client architecture.
Barriers
Let’s quickly recap what our barriers should be able to do:
-module(barrier).
% Initialize barrier for ‘Expected’ processes
init(Expected) ->
% TODO
% Block at ‘Barrier’ until all processes have reached it
wait(Barrier) ->
% TODO
Since we are talking about processes, it’s natural to have a process for the barrier itself. This process keeps track of what other processes that has arrived at the barrier point.
When a new process arrives at the barrier, it sends a arrived
message to the barrier process.
When the list of all arrived processes is complete, the barrier process sends a continue
message to everyone.
After notifying all other processes, the barrier processes itself, goes back to its initial state.
So we need to implement the barrier’s event loop as a server function:
barrier(Arrived, Expected, PidRefs)
Let’s implement this:
% event loop of barrier for ‘Expected’ processes
% Arrived: number of processes arrived so far
% PidRefs: list of {Pid, Ref} of processes arrived so far
% All processes arrived notify all waiting processes:
barrier(Arrived, Expected, PidRefs) when Arrived =:= Expected ->
[To ! {continue, Ref} || {To, Ref} <- PidRefs],
% Reset barrier
barrier(0, Expected, []);
% Still waiting for some processes
barrier(Arrived, Expected, PidRefs) ->
receive
{arrived, From, Ref} ->
% one more arrived: add {From, Ref} to PidRefs list:
barrier(Arrived + 1, Expected, [{From, Ref}|PidRefs])
end.
Now for the wait
function:
% Block at ‘Barrier’ until all processes have reached it
wait(Barrier) ->
% Notify barrier of arrival
Ref = make _ ref(),
% Wait for signal to continue
Barrier ! {arrived, self(), Ref},
receive {continue, Ref} -> through end.
And finally, the init
function, simple:
% Initialize barrier for ‘Expected’ processes
init(Expected) ->
spawn(fun () -> barrier(0, Expected, []) end).
Resource allocator
Let’s recap the problem, an *allocator grants users, exclusive access to a number of resources.
Users asynchronously request and release resources back. The allocator ensures exclusive access to a single user, and keeps tracks of the number of available resources.
So our module would look like:
-module(allocator).
% Register allocator with list of Resources
init(Resources) ->
% TODO
% Get N resources from allocator
request(N) ->
% TODO
% Release Resources to allocator
release(Resources) ->
% TODO
The user would perform something like:
user() ->
% How many resources are needed?
N = howMany(),
% Get resources from allocator
Resources = allocator:request(N),
% Do something with resources
use(Resources),
% Release resources
allocator:release(Resources),
user().
Again, in the message-passing world, using a server-client architecture often solves the problem.
We dedicate a process to the allocator, which keeps track of list of resources.
When a process requests for some resources that are available, the allocator sends a granted
message.
Then accordingly removes those resources from the list.
When a process releases some resources, the allocator sends a released
, and then adds the resources to the list.
If requests exceed the availability, the fall into our built-in mailbox. The allocator process will resolve this as soon as they pattern-match again (resources available again).
allocator(Resources) ->
% Count how many resources are available
Available = length(Resources),
receive
% Serve requests if enough resources are available
{request, From, Ref, N} when N =< Available ->
% Granted ++ Remaining =:= Resources
% Length(Granted) =:= N
{Granted, Remaining} = lists:split(N, Resources),
% Send resources to requesting process
From ! {granted, Ref, Granted},
% Continue with Remaining resources
allocator(Remaining);
% Serve releases
{releases, From, Ref, Released} ->
% Notify releasing process
From ! {released, Ref},
% Continue with previous and released resources
allocator(Resources ++ Released)
The request function:
% Get N resources from allocator, gets blocked if not available
request(N) ->
Ref = make_ref(),
allocator ! {request, self(), Ref, N},
recieve {granted, Ref, Granted} -> Granted end.
% Release Resources to allocator
release(Resources) ->
Ref = make_ref(),
allocator ! {release, self(), Ref, Resources},
recieve {released, Ref} -> released end.
Producer-consumer
Recap; Implement a buffer
such that:
- Producers and consumers access the buffer atomically
- Consumers block when the buffer is empty
- Producers block when the buffer is full (bounded buffer variant)
-module(buffer).
% Initialize buffer with size Bound
init_buffer(Bound) ->
% TODO
% Put Item in Buffer; Block if full
put(Buffer, Item) ->
% TODO
% Get Item from Buffer; Block if empty
get(Buffer) ->
% TODO
The producer and buffer:
producer(Buffer) ->
Item = produce(),
buffer:put(Buffer, Item),
producer(Buffer).
consumer(Buffer) ->
Item = buffer:get(Buffer),
% Do something with Item
consume(Item),
consumer(Buffer).
At this point you pretty much can see the pattern here that arises:
buffer(Content, Count, Bound) ->
receive
% Serve gets when buffer not empty
{get, From, Ref} when Count > 0 ->
% Match first item
[First | Rest] = Content,
% Send it out
From ! {item, Ref, First},
% Remove it from buffer
buffer(Rest, Count-1, Bound);
% Serve puts when buffer not full
{put, From, Ref, Item} when Count < Bound ->
% Send ack
From ! {done, Ref},
% Add item to end
buffer(Content ++ [Item], Count + 1, Bound)
end.
In this solution, both a bounded and unbounded will work - due to Erlang’s order between numbers and atoms!
Now for get and put:
% Get item from ‘Buffer’; block if empty
get(Buffer) ->
Ref = make_ref(),
Buffer ! {get, self(), Ref},
receive {item, Ref, Item} -> Item end.
% Put ‘Item’ in ‘Buffer’; block if full
put(Buffer, Item) ->
Ref = make_ref(),
Buffer ! {put, self(), Ref, Item},
receive {done, Ref} -> done end.
Readers-writers
-module(board).
% Register board with Name
init(Name) ->
% TODO
% Get read access to Board
begin_read(Board) ->
% TODO
% Release read access to Board
end_read(Board) ->
% TODO
% Get write access to Board
begin_write(Board) ->
% TODO
% Release write access to Board
end_write(Board) ->
% TODO
Our first naive server function would be:
% ‘Readers’ active readers and ‘Writers’ active writers
board_row(Readers, Writers) ->
receive
{begin_read, From, Ref} when Writers =:= 0 ->
From ! {ok_ to_ read, Ref},
board_row(Readers+1, Writers);
{begin_write, From, Ref} when (Writers =:= 0) and (Readers =:= 0) ->
From ! {ok_ to_ write, Ref},
board_row(Readers, Writers+1);
{end_read, From, Ref} -> From ! {ok, Ref},
board_row(Readers-1, Writers);
{end_write, From, Ref} -> From ! {ok, Ref},
board_row(Readers, Writers-1)
end.
Just as our naive solution when using semaphores, this doesn’t prevent starvation due to this version prioritizes readers.
The solution based on two monitors is a approach here, but it’s quite cumbersome for a message-passing program.
We instead implement two macro states:
- Empty - no readers or writers
- Readers - Readers but no writers
The initial board is in empty state, then:
- When board is in state
emtpy
:- Read requests - served immediately, then switches to
readers
state. - Write requests - served immediately and synchronously, wait until writing ends, then go into
empty
state.
- Read requests - served immediately, then switches to
- When board is in state
readers
:- Read requests - served immediately and stays in
readers
. - Write requests - served as soon as possible, board waits until all reading ends, then request is served. Back to
empty
state.
- Read requests - served immediately and stays in
For this we’ll need two server functions, empty_board
and readers_board
:
% Board with no readers and no writers
empty_board() ->
receive
% Serve read request
{begin_read, From, Ref} ->
% Notify reader
From ! {ok_to_read, Ref},
% Board has one reader
readers_board(1);
% Serve write request synchronously
{begin_write, From, Ref} ->
% Notify writer
From ! {ok_to_write, Ref},
% Wait for writer to finish
Receive
{end_write, _From, _Ref} ->
% Board is empty again
empty_board()
end
end.
% Board with no readers (and no writers)
readers_ board(0) -> empty_ board();
% Board with ‘Readers’ active readers
% (and no writers)
readers_board(Readers) ->
receive
% Serve write request
{begin_write, From, Ref} ->
% Wait until all ‘Readers’ have finished
[receive {end_read, _From, _Ref} -> end_read end || _ <- lists:seq(1, Readers)],
% Notify writer
From ! {ok_to_write, Ref},
% Wait for writer to finish
receive
{end_write, _From, _Ref} -> empty_board()
end;
% Serve read request
{begin_read, From, Ref} ->
% Notify reader
From ! {ok _ to _ read, Ref},
% Board has one more reader
readers _ board(Readers+1);
% Serve end read
{end_read, From, Ref} ->
% Board has one less reader
readers_board(Readers-1)
end.
Dining Philosophers
-module(philosophers).
% Set up table of N philosophers
init(N) ->
% TODO
% Philosopher picks up Fork
get_fork(Fork) ->
% TODO
% Philosopher releases Fork
put_fork(Fork) ->
% TODO
We could explore the solutions we did based on locking and breaking symmetry - but there is a solution which better fits into the message-passing paradigm
We have a waiter (process) who supervises access to the table. So each philosopher asks for permission to sit at the table before picking up both forks.
So, as long as the waiter allows strictly fewer philosopher than the total number of forks to sit around the table, deadlock and starvation are avoided.
Waiter interface:
% Ask Waiter to be seated; may wait
sit(Waiter) ->
% TODO
% Ssk Waiter to leave
leave(Waiter) ->
% TODO
Our server function:
waiter(Eating, Seats) ->
receive
% Serve as long as seats are available
{sit, From, Ref} when Eating < Seats ->
From ! {ok_to_sit, Ref},
% One more eating
waiter(Eating+1, Seats);
% Can leave at any time
{leave, From, Ref} ->
From ! {ok_to_leave, Ref},
% One less eating
waiter(Eating-1, Seats)
end.
And sit
and leave
:
% ask Waiter to be seated; may wait
sit(Waiter) ->
Ref = make _ ref(),
Waiter ! {sit, self(), Ref},
receive {ok_to_sit, Ref} -> ok end.
% ask Waiter to leave
leave(Waiter) ->
Ref = make _ ref(),
Waiter ! {leave, self(), Ref},
receive {ok_to_leave, Ref} -> ok end.
Now, each fork
is also a process, which keeps track of whether the for is free or not.
Server function:
% Fork not held by anyone
fork() ->
receive
{get, From, Ref} ->
From ! {ack, Ref},
% Fork held
fork(From)
end.
% a fork held by Owner
fork(Owner) ->
receive
{put, Owner, _ Ref} ->
% Fork not held
fork()
end.
and the get
and put
for the forks:
% Pick up Fork; block until available
get_fork(Fork) ->
Ref = make _ ref(),
Fork ! {get, self(), Ref},
receive {ack, Ref} -> ack end.
% Put down Fork
put_fork(Fork) ->
Ref = make _ ref(),
Fork ! {put, self(), Ref}.
And finally, the init
function for the whole problem:
% Set up table of ‘N’ philosophers
init(N) ->
% Spawn waiter process
Waiter = spawn(fun () -> waiter(0, N-1) end),
% [1, 2, ..., N]
Ids = lists:seq(1,N),
% Spawn fork processes
Forks = [spawn(fun fork/0) || _ <- Ids],
% Spawn philosopher processes
[spawn(fun () ->
Left = lists:nth(I, Forks),
% 1-based indexes
Right = lists:nth(1+(I rem N), Forks),
philosopher(#forks{left=Left, right=Right}, Waiter)
end) || I <- Ids].