Pipes, Channels, and Perl-Win32 
by Jean-Louis Leroy

Listing One
# ifndef channel_defined
# define channel_defined

# include <queue>
# include <cassert>
# include <windows.h>

template<class T>
class channel
    {
    public:
        class lock
            {
            public:
                lock(CRITICAL_SECTION& critsec) : critsec(critsec)
                    { EnterCriticalSection(&critsec); }
                ~lock()
                    { LeaveCriticalSection(&critsec); }
            private:
                CRITICAL_SECTION& critsec;
           };
        channel();
        ~channel();

        void open();
        void write(const T& item);
        int read(T& item);
        int blocking_read(T& item);
        BOOL wait(DWORD timeout = INFINITE);
        int pending() const;
        BOOL is_open() const;
        BOOL is_closed() const;
        void close();

        void write(const T* iter, const T* last);
        void write(const T* iter, int n);
        T* read(T* iter, T* last);
        int read(T* iter, int n);
        T* blocking_read(T* iter, T* last);
        int blocking_read(T* iter, int n);
    private:
        mutable CRITICAL_SECTION critsec;
        HANDLE event;
        queue<T, deque<T, allocator<T> >, allocator<T> > buf;
        BOOL opened;
        friend class lock;
    };
template<class T>
inline BOOL channel<T>::is_open() const
    {
    lock access(critsec);
    return opened;
    }
template<class T>
inline BOOL channel<T>::is_closed() const
    {
    lock access(critsec);
    return !opened;
    }
template<class T>
inline int channel<T>::pending() const
    {
    lock access(critsec);
    return buf.size();
    }
template<class T>
channel<T>::channel() : opened(FALSE)
    {
    // do inits in ctor (not in open) because other thread might
    // wait on this channel after it's been closed
    InitializeCriticalSection(&critsec);
    event = CreateEvent(NULL, TRUE, FALSE, NULL);
    }
template<class T>
void channel<T>::open()
    {
    lock access(critsec);
    while (buf.size())
        buf.pop();
    ResetEvent(event);
    opened = TRUE;
    }
template<class T>
channel<T>::~channel()
    {
    DeleteCriticalSection(&critsec);

    if (event)
        CloseHandle(event);
    }   
template<class T>
void channel<T>::write(const T& item)
    {
    lock access(critsec);
    assert(opened);
    int prev = buf.size();
    buf.push(item);
    if (!prev);
        SetEvent(event);
    }
template<class T>
void channel<T>::write(const T* iter, const T* last)
    {
    if (iter == last)
        return;
    lock access(critsec);
    assert(opened);
    int prev = buf.size();
    while (iter != last)
        {
       buf.push(*iter);
        ++iter;
        }
    if (!prev);
        SetEvent(event);
    }
template<class T>
void channel<T>::write(const T* iter, int n)
    {
    if (!n)
        return;
    lock access(critsec);
    assert(opened);
    int prev = buf.size();
    while (n)
        {
        buf.push(*iter);
        ++iter;
        --n;
        }
    if (!prev);
        SetEvent(event);
    }
template<class T>
int channel<T>::read(T& item)
    {
    lock access(critsec);
    if (!buf.size())
        return 0;
    item = buf.front();
    buf.pop();
    if (!buf.size() && opened)
        ResetEvent(event);
    return 1;
    }
template<class T>
T* channel<T>::read(T* iter, T* last)
    {
    lock access(critsec);
    while (buf.size() && iter != last)
        {
       *iter = buf.front();
        ++iter;
        buf.pop();
        }
    if (!buf.size() && opened)
        ResetEvent(event);
    return iter;
    }
template<class T>
int channel<T>::read(T* iter, int n)
    {
    lock access(critsec);
    int i = 0;
    while (buf.size() && i < n)
        {
        *iter = buf.front();
        ++iter;
        ++i;
        buf.pop();
        }
    if (!buf.size() && opened)
        ResetEvent(event);
    return i;
    }
template<class T>
int channel<T>::blocking_read(T& item)
    {
    while (!read(item))
        if (!wait())
            return 0;
    return 1;
    }
template<class T>
T* channel<T>::blocking_read(T* iter, T* last)
    {
    do  {
        iter = read(iter, last);
        } while (iter != last && wait());
    return iter;
    }
template<class T>
int channel<T>::blocking_read(T* iter, int n)
    {
   int res = 0;
    do  {
        int r = read(iter, n);
        iter += r;
        res += r;
        n -= r;
        } while (n && wait());
    return res;
    }
template<class T>
BOOL channel<T>::wait(DWORD timeout)
    {
    // channel can be closed at this point; however it may still contain data
    WaitForSingleObject(event, timeout);
    lock access(critsec);
    return buf.size() || opened;
    }
template<class T>
void channel<T>::close()
    {
    // critical section
        {
        // signal we're closed
        lock access(critsec);
        opened = FALSE;
        }
    // there is 'data' in the channel; i.e. information that there's no more
    SetEvent(event);
    }
# endif


Listing Two
# ifndef channelstream_defined
# define channelstream_defined

# include <iostream>
# include <streambuf>
# include "channel.h"

template<class Char>
class basic_channelbuf : public basic_streambuf<Char, char_traits<Char> >
    {
    public:
        basic_channelbuf();
        ~basic_channelbuf();
        void open(channel<Char>& c, BOOL autoclose);
        void close();
        void close_channel();

        BOOL is_open() const
            { return pc && pc->is_open(); }
    protected:
        virtual int_type underflow();
        virtual int_type overflow(int_type c = EOF);
        virtual int_type sync();
    private:
        channel<Char>* pc;
        enum { ibufsize = 1024, obufsize = 1024 };
        Char ibuf[ibufsize];
        Char obuf[obufsize];
        BOOL autoclose;
    };
typedef basic_channelbuf<char> channelbuf;
typedef basic_channelbuf<wchar_t> wchannelbuf;

template<class Char, class Base, BOOL Autoclose>
class channelstream_ : public Base
    {
    public:
        channelstream_() : Base(&buf)
            { }
        channelstream_(channel<Char>& c,BOOL autoclose=Autoclose):Base(&buf)
            { open(c, autoclose); }
        void open(channel<Char>& c, BOOL autoclose = Autoclose)
            { buf.open(c, autoclose); }
        BOOL is_open() const
            { return buf.is_open(); }
        void close()
            { buf.close(); }
        void close_channel()
            { buf.close_channel(); }
    private:
        basic_channelbuf<Char> buf;
    };
typedef channelstream_<char, ostream, TRUE> ochannelstream;
typedef channelstream_<char, istream, FALSE> ichannelstream;
typedef channelstream_<char, iostream, TRUE> channelstream;

typedef channelstream_<wchar_t, wostream, TRUE> wochannelstream;

typedef channelstream_<wchar_t, wistream, FALSE> wichannelstream;
typedef channelstream_<wchar_t, wiostream, TRUE> wchannelstream;

template<class Char>
basic_channelbuf<Char>::basic_channelbuf() : pc(NULL)
    {
    }
template<class Char>
basic_channelbuf<Char>::~basic_channelbuf()
    {
    if (pc)
        close();
    }
template<class Char>
void basic_channelbuf<Char>::open(channel<Char>& c, BOOL ac)
    {
    pc = &c;
    if (!c.is_open())
        c.open();
    autoclose = ac;
    setp(obuf, obuf + obufsize);
    }
template<class Char>
void basic_channelbuf<Char>::close()
    {
    sync();
    if (autoclose && pc->is_open())
        pc->close();
    pc = NULL;
    }
template<class Char>
void basic_channelbuf<Char>::close_channel()
    {
    sync();
    if (pc->is_open())
        pc->close();
    pc = NULL;
    }
template<class Char>
basic_channelbuf<Char>::int_type
basic_channelbuf<Char>::underflow()
    {
    if (!pc->blocking_read(ibuf, 1))
       return traits_type::eof();
    setg(ibuf, ibuf, pc->read(ibuf + 1, ibuf + ibufsize));
    return traits_type::to_int_type(*gptr());
    }
template<class Char>
basic_channelbuf<Char>::int_type
basic_channelbuf<Char>::sync()
    {
    return overflow();
    }
template<class Char>
basic_channelbuf<Char>::int_type
basic_channelbuf<Char>::overflow(int_type c)
    {
    if (!is_open())
        return traits_type::eof();
    pc->write(pbase(), pptr());
    if (c != traits_type::eof())
        {
        Char tmp = traits_type::to_char_type(c);
        pc->write(tmp);
        }
    setp(obuf, obuf + obufsize);
    return 1;
    }
# endif


7


