Distributing Data Using TLT30G 
by Oleg Kiselyov



Example 1: 

(a)
declare -x LAPFILENAME=test_output.dat
receiver cp @f /tmp/@n.1

(b)
set LAPFILENAME=test_output.dat
receiver copy @f \temp\@n

(c)
~/croot/Frag-Assem> /etc/mknod /tmp/mypipe p
~/croot/Frag-Assem> declare -x LAPFILENAME=/tmp/mypipe
~/croot/Frag-Assem> ( declare -x SESSION_TTL=1;
declare -x LAPFILEERR=1000; declare -x BC_LOG_FILE=/tmp/rec.log;
receiver cp @f /tmp/@n.1 )&
~/croot/Frag-Assem> broadcaster -s20 broadcaster.cc
  ~> diff /tmp/broadcaster.cc.1 croot/Frag-Assem/broadcaster.cc
[no differences found]

(d)
~/croot/Frag-Assem> declare -x LAPNETPORT=4000
~/croot/Frag-Assem> ( declare -x SESSION_TTL=1;
declare -x LAPNETERR=400; declare -x BC_LOG_FILE=/tmp/rec.log;
receiver cp @f /tmp/@n.1 )&
~/croot/Frag-Assem> declare -x LAPNETHOST=localhost
~/croot/Frag-Assem> broadcaster -s60 broadcaster.cc


Example 2:
                                // Send the entire Segment0 packet
void Segment0::send(LAPOut& out_link) const
{
  SegmentOutStream(out_link) << (SegmentHeader)(*this)
        << file_size << MTU << max_seq_no
        << byte_array(file_name,sizeof(file_name));
}


Example 3:
class LAPOut
{
public:
  virtual void write_byte(const unsigned char a_byte) = 0;
  virtual void begin_packet(void) = 0;
  virtual void finish_packet(void) = 0;
};

Example 4:
                        // Handle a new packet just received
void Sessions::digest(SegmentInStream& packet)
{
  SessionLink * sp;
  if( (sp = find(packet.q_header().session_id)) != 0 )
    sp->ingest(packet);
  else
  {
    sp = new SessionLink(sh_command,packet);
    sp->next = sessions;        // that won't be run if an exception was
    sessions = sp;              // thrown from the constructor... good!
  }
}



Listing One
                  // Broadcast the file, return FALSE if the broadcast was 
                  // interrupted (because file was changed in the meantime)
bool FileBeingSent::do_broadcast(LAPOut& out_link)
{
  Segment0::log("Starting");
  Segment0::send(out_link);
  Segment0::send(out_link);  // and do it again, 0th segment is important

  char buffer[256];
  assert( MTU < sizeof(buffer) );
  rewind(fp);
  for(register unsigned long i=1; i<= max_seq_no; i++)
  {
    if( was_modified() )
      return Logger::log("Session aborted: the file was changed"),
      false;
    if( fread(buffer,sizeof(char),MTU,fp) <= 0 )
      perror("reading input file"),
      _error("EOF or error reading an input file");

    SegmentOutStream(out_link) << SegmentHeader(i,session_id,MTU)
                                             << byte_array(buffer,MTU);
  }
  Logger::log("Finished broadcast in session %d",session_id);
  return true;
}

Listing Two
#ifndef __GNUC__
#pragma once
#else
#pragma interface
#endif

#ifndef __Segment_h
#define __Segment_h

#include "LAP.h"
#include "myenv.h"

class SegmentOutStream;
class SegmentInStream;

typedef unsigned short ID;

            // All segments start with this....
struct SegmentHeader
{
  const unsigned long seq_number;   // Sequence number
  const ID session_id;          // ID of the current session
  const unsigned char data_length;  // Length of the data only
                    // Must be within (1,255)
  inline int q_header_size(void) const  // True header size, w/o any padding
    { return sizeof(seq_number) + sizeof(session_id) + sizeof(data_length); }
  SegmentHeader(const unsigned long _seq_number, const short _session_id,
                                                      const int _data_length)
    : seq_number(_seq_number), session_id(_session_id),
      data_length(_data_length)
    { assert( _data_length > 1 && _data_length < 255 ); }
  void write(SegmentOutStream& out_packet) const;
  void read(SegmentInStream& in_packet);
};
class Segment0 : public SegmentHeader
{
protected:
  char file_name[20];           // only the basic portion of name, w/o path
  const unsigned long file_size;
  unsigned long max_seq_no;     // That is, how many data packets
  const unsigned char MTU;      // Max transmission unit: data_length
                                // in all the packets to follow
  inline int q_data_size(void) const
    { return sizeof(file_size) + sizeof(MTU) +
             sizeof(max_seq_no) + sizeof(file_name); }
         // _Partially_ fill out Segment0 based on what found in some_packet
  Segment0(const SegmentHeader& some_packet);
    static unsigned char default_MTU;
  public:
  enum { max_MTU = 255 };  // None of the segments can be bigger than that
  Segment0(const char * full_file_name);
  void send(LAPOut& out_link) const;
  void log(const char * title) const;   // Log the contents of Segment0,
                    // which represents a session complete a partially 
                    // finished Segment0 from a packet
  void complete(SegmentInStream& packet) throw (rc_bad_packet);
  unsigned int q_MTU(void) const        { return MTU; }
  unsigned int q_max_seq_no(void) const     { return max_seq_no; }
  static void set_default_MTU(const int new_MTU);
  static int  get_default_MTU(void);
};
            // A stream to help compose a packet
class SegmentOutStream
{
  LAPOut& out_link;
  CRC16 crc;
  void write_byte(const unsigned char a_byte)
  { out_link.write_byte(a_byte); crc << a_byte; }
public:
  SegmentOutStream(LAPOut& _out_link);
  ~SegmentOutStream(void);      // write the CRC and flush the packet
  SegmentOutStream& operator << (const unsigned char a_byte)
        { write_byte(a_byte); return *this; }
  SegmentOutStream& operator << (const unsigned short a_short);
  SegmentOutStream& operator << (const unsigned long a_long);
  SegmentOutStream& operator << (const SegmentHeader& seg_header);
        // Write an array of bytes
        // Array: a representation for an array. Note, for safety an Array 
        // object cannot be constructed explicitly, either on stack or on 
        // heap. An Array object can only be constructed via a friend 
        // function byte_array, and is always transient.
  class Array
  {
    friend SegmentOutStream;
    friend SegmentInStream;
    const char * const ptr;
    const unsigned long size;
    Array(const char * byte_array, const unsigned long _size) 
                                                 // private constructor
    : ptr(byte_array), size(_size) {}
    Array& operator = (const Array&); // unimplemented and forbidden
    Array(const Array&);              // unimplemented and forbidden
  public:
    friend inline Array byte_array(const char * ptr, const unsigned long size)
        { return Array(ptr,size); }
  };
  SegmentOutStream& operator << (const Array& array);
};
inline SegmentOutStream&
SegmentOutStream::operator << (const SegmentHeader& seg_header)
{
  seg_header.write(*this);
  return *this;
}
      // Stream to help de-compose a packet. When stream is fully constructed,
      //  it contains complete packet, with CRC and other checks performed
class SegmentInStream
{
  unsigned char buffer[Segment0::max_MTU+1];    // The buffer where the packet 
                // is received. Two ptrs in buffer for the curent reading
                // and writing position
  const unsigned char * read_ptr;
  unsigned char * write_ptr;
  CRC16 curr_crc;       // CRC accumulated while the packet is being received
                // The packet header is being read as a part
                // of the receiving process
  SegmentHeader header;
                // Take the current byte from the buffer
  unsigned char take_byte(void)
    { assert( write_ptr > read_ptr ); return *read_ptr++; }
                // Receive a byte from the link and into the buffer
  void receive_byte(LAPIn& in_link);
public:
                // Receive a packet. It throws up if there was an input error
  SegmentInStream(LAPIn& in_link);
  const SegmentHeader& q_header(void) const { return header; }
  SegmentInStream& operator >> (unsigned char& a_byte)
        { a_byte = take_byte(); return *this; }
  SegmentInStream& operator >> (unsigned short& a_short);
  SegmentInStream& operator >> (unsigned long& a_long);
  SegmentInStream& operator >> (const SegmentOutStream::Array& array);
  void write(FILE * fp);    // Dump the rest if the stream into a file
};
#endif



5


