Thread Communication Mechanisms in Parallel Algorithms
by Lalit Pant


Listing One
// Multiway tree definition
template <class T> class MTree
{
public :
   // contains tree elements
   class Node
   {
 public:
      T& operator *();
      T* operator ->();
      ...
 private:
      T m_val;
      vector<Node*> m_children;
      ...
   }
 // enumerates child nodes of a node
   class Enumeration
   {
   public:
      Node& nextElement();
      bool hasMoreElements()
   }
   MTree();
   ~MTree();
   // returns tree root
   Node& root();
   // returns parent of given node
   Node& parent(Node& child);
   // returns child nodes of given node
   Enumeration& children(Node& parent);
private:
   Node *m_pRoot;
   ...
};


Listing Two
// Sequential level order traversal
// Function arguments :
// t - tree being traversed
// oper - the operation to be performed on each node
template<class T, class NodeOp>
void levelOrderTraversal(Tree<T>& t, const NodeOp& oper)
{
   queue<Tree<T>::Node *> q;
   q.push(&t.root());
   while(!q.empty())
   {
      Tree<T>::Node* node =3D q.front();
      q.pop();
      oper(*node);
      Tree<T>::Enumeration& e =3D t.children(*node);
      for(; e.hasMoreElements(); )
      {
         q.push(&e.nextElement());
      }
      e.free();
   }
}
// Operation to be performed on each node
// Passed as second template parameter to the function template above
template <class T> struct NodeOper
{
   void operator() (Tree<T>::Node& node) const
   {
    cout << *node << endl;
   }
};


Listing Three
// Concurrency engine
template <class Request, class Work> class ConcurrencyEngine
{
public :
   ConcurrencyEngine(Work& w);
   int init();
   // put request into engine
   void enque(Request* r);
private :
   Request* deque(); // Called by internal threads
   ...
};

// While instantiating an engine, a functor needs to be supplied as the 2nd
// template parameter. Its function call operator is called in the background
// by the concurrency engine with a pointer to an instance of the first
// template parameter
template <class Request> struct SomeFunctor
{
   void operator() (Request *r);
   ...
};


Listing Four
// Parallel level order traversal
// Function arguments :
// t - tree being traversed
// d - the operation to be performed on each node
template<class T, class Do>
void levelOrderTraversalMt(Tree<T>& t, const Do& d)
{
   LotAlgo<T, Do> algo(t, d); // LotAlgo defined in listing five
   ConcurrencyEngine<Tree<T>::Node, LotAlgo<T, Do> >  q(algo);
   algo.setEngine(&q);
   q.init();
   q.enque(&t.root());
}


Listing Five
// Level order traversal functor
template <class T, class Do>
struct LotAlgo
{
   Tree<T>& m_t;
   const Do& m_work;
   ConcurrencyEngine<Tree<T>::Node, LotAlgo<T, Do> >*  m_q;
   ...
   void operator() (Tree<T>::Node* r)
   {
      Tree<T>::Enumeration& e =3D m_t.children(*r);
      // enque child nodes back into the engine
      for(; e.hasMoreElements(); )
      {
         m_q->enque(&e.nextElement());
      }
      // work on the current node
      m_work(r);
      e.free();
   }
};


Listing Six
// condition cariable interface
class Cv
{
public :
    Cv(HANDLE lock);
    ~Cv();
    void wait();
    void signal();
};


Listing Seven
// Producer Consumer queue
template <class Message> class PCQueue
{
private:
   HANDLE m_lock;
   Cv      notEmpty;                         
   Cv      notFull;                          
   queue<Message>   m_queue;                          
   size_t         maxSize;
public:
   PCQueue(size_t size=3D1500)                    
      : maxSize(size), m_lock(CreateMutex(0, 0, 0)), notEmpty(m_lock),
        notFull(m_lock)
   {
   }
   ~PCQueue()                    
   {
      CloseHandle(m_lock);
   }
   Message read(void)
   {
      WaitForSingleObject(m_lock, INFINITE);
      while(m_queue.empty())
      {                         
         notEmpty.wait();                              
      }
      Message result =3D m_queue.front();
      m_queue.pop_front();

      notFull.signal();
      ReleaseMutex(m_lock);
      return result;
   }
   void write(Message m)
   {
      WaitForSingleObject(m_lock, INFINITE);
      while(m_queue.size() =3D=3D maxSize)
      {
         notFull.wait();                               
      }
      queue.push_back(m);
      notEmpty.signal();                               
      ReleaseMutex(m_lock);
   }
};

Listing Eight
// Engine dispatch mechanism using producer consumer queue
template <class Request, class Work> class ConcurrencyEngine
{
 ...
private :
 ...
 PCQueue<Request *> m_pcQueue;
};
template <class Request, class Work>
void ConcurrencyEngine<Request, Work>::enque(Request* r)
{
   m_pcQueue.write(r);
}
template <class Request, class Work>
Request* ConcurrencyEngine<Request, Work>::deque()
{
   return m_pcQueue.read ();
}

Listing Nine
// Engine dispatch mechanism using completion port
template <class Request, class Work> class ConcurrencyEngine
{
   ...
private :
   ...
   HANDLE m_ioPort; // completion port for enqued requests
};
template <class Request, class Work>
void ConcurrencyEngine<Request, Work>::enque(Request* r)
{
   PostQueuedCompletionStatus(m_ioPort, sizeof(Request *), (unsigned
 long)r, 0);
}
template <class Request, class Work>
Request* ConcurrencyEngine<Request, Work>::deque()
{
   DWORD key, read;
   OVERLAPPED *pOv;
   ...
   GetQueuedCompletionStatus(m_ioPort, &read, &key, &pOv, -1);
   return (Request *)key;
}




