Saturday, April 3, 2010

Multithreading - Task Synchronization - Barrier Class - C# 4.0

4.0 release has lots of stuff for multithreading. One of them is the Barrier class focused towards the synchronization of the tasks. This class is placed in the namespace System.Threading (System.dll).
Formally, this class enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases. A Barrier enforces the stopping of execution between a number of threads preventing further execution until all threads have reached the given point. On other way, all threads may start their work independently but can not proceed beyond a certain point of time until other threads have not reached to that point.
This class reminds me the typical college trips (i.e. Trip will start from point X and everyone should meet at point X). Let’s see how this class achieve the stated functionality.
Assume there are 3 friends Tom, Tomm and Tommm (feel free to change ). On Friday night, they decided to go for a trip. So, they decided that we will meet at the Trip centre to enter the new city. To simplify the scenario, let’s assume either all 3 will go for the Trip or they will cancel the Trip. So, they registered their trip with the centre. Trip canter is responsible to tell the exact position of the trip members and give a signal to start or cancel the Trip.
So, here is your Trip centre and its token (say registration token):
static Barrier tripCenter;
static CancellationToken token;

Now, here is how registration will happen:
// Register 3 members to the Barrier
tripCenter = new Barrier(3);

Before we proceed further on Trip, let’s see what else this Barrier class has to offer during the registration:
1. Class offers the flexibility to increase or decrease the number of participants at any point of time after the registration process.
// Changed the plan. Let's make it five participants.

// 5th Person is in trouble and he is not joining us. Settle on 4.

2. You can ask for the notification once everyone has reached at the barrier. You can provide the Action at the time of registration. Here is what we will define for this trip
tripCenter = new Barrier(3, (barrier) =>
Console.WriteLine("Every One has Arrived. All of you [Total - {0} ] can proceed for your Trip now. My duty for phase {1} is over. Cheers.", barrier.ParticipantCount, barrier.CurrentPhaseNumber);

Next step is to provide the task to be executed by the participants before reaching at the barrier. This task is the set of actions performed by your workers (or threads). Here we go for the task:
static void MakeMyTrip(string name, TimeSpan timeToReachPetrolPump)
Console.WriteLine("[{0}] Leaving House", name);

// Perform some work
Console.WriteLine("[{0}] Arrived at Petrol Pump", name);

// Need to sync here

// Perform some more work
Console.WriteLine("[{0}] Starting the Trip Now", name);
catch (OperationCanceledException)

Console.WriteLine("[{0}] Problem in Bike... Trip Cancelled!! Going
home!", name);

Let’s assume that things will go fine for this trip and ignore the Catch block for a while. We will discuss it shortly. As per the task:
1. Participant will leave the house and will reach at petrol pump.
2. Once they have reached at the Trip centre, they need to signal this event to barrier. (Inform Trip Centre that you have arrived).
3. No one can proceed further without the signal from the barrier.
4. How Barrier maintains this data is with every call to SignalAndWait(), the number of signals received by the barrier is incremented. Once the number of signals received reaches the number of participants the Barrier was constructed with, all threads are then allowed to continue execution.
And, the last step is to Activate your participant for the Trip.

var tom = new Thread(() => MakeMyTrip("tom", TimeSpan.FromSeconds(0)));

var tomm = new Thread(() => MakeMyTrip("tomm", TimeSpan.FromSeconds(4)));

var tommm = new Thread(() => MakeMyTrip("tommm", TimeSpan.FromSeconds(6))); tommm.Start();

// Allow them to complete their trip

If you will execute this program, you will observe that participants are leaving the home and reaching at the petrol pump in random order. But no one is allowed to proceed beyond the barrier until all participants are not ready. This class provides a god help otherwise we would have written code with Events (which involves manual intervention for the synchronization).
So far things are good as we assumed that there won’t be any problem to anyone and everyone will reach at the barrier. This is something not practical. There may be several scenarios when thread or worker may stop it’s execution (say data fault). In this case, worker may inform the other worker threads (participants in our case) about the cancellation of the Trip. And therefore, above untouched Catch block comes into the picture.
In the beginning, we have declared Barrier class reference along with reference of the class CancellationToken. This class provides the cancellation token for the given Barrier.

var source = new CancellationTokenSource();

// Get the Token
token = source.Token;

// Activate the Worker Threads

// Not Happy with the Idea, Cancel the Trip

Once the source has raised the alarm for cancel, OperationCanceledException exception will be raised to all waiting worker threads.

Finally, the action defined for the barrier. Once all participants have reached to the barrier, barrier will execute the action defined at the time of registration. In our, there is an announcement by the Trip Centre. This action will not be executed by the barrier if the Source has aborted the task (say when Trip is cancelled, there will be no announcement).

You saw that how we can use barrier for the synchronization of the tasks. But there are several other aspects which can be discussed here:
1. What will happen if any worker of the barrier has been aborted without any notice?
2. Will other threads throw time out exception or will wait indefinitely.
3. Can we define any time limit on each worker to reach at barrier?
I will keep these questions open for the discussion.

1 comment:

  1. code for avoid busy waiting() in threads on pintos oparating system