PreviousUpNext

15.4.1073  src/lib/std/src/io/threadkit-binary-io-g.pkg

## threadkit-binary-io-g.pkg

# Compiled by:
#     src/lib/std/standard.lib


# This is the threadkit version of the binary_io generic package
#     src/lib/std/src/io/binary-io-g.pkg

# We are compiletime invoked by:
#     src/lib/std/src/posix/posix-threadkit-binary-io.pkg

stipulate
    package iox =  io_exceptions;                                       # io_exceptions                         is from   src/lib/std/src/io/io-exceptions.pkg
herein

    generic package threadkit_binary_io_g (

                                                                        # Threadkit_Winix_Stream_Readers_And_Writers            is from   src/lib/std/src/io/threadkit-winix-stream-readers-and-writers.api
                                                                        # threadkit_posix_binary_stream_readers_and_writers     is from   src/lib/std/src/posix/threadkit-posix-binary-stream-readers-and-writers.pkg
        package threadkit_winix_stream_readers_and_writers
            :
            Threadkit_Winix_Stream_Readers_And_Writers
                where  stream_readers_and_writers::Rw_Vector       == threadkit_binary_stream_readers_and_writers::Rw_Vector
                where  stream_readers_and_writers::Vector          == threadkit_binary_stream_readers_and_writers::Vector
                where  stream_readers_and_writers::Rw_Vector_Slice == threadkit_binary_stream_readers_and_writers::Rw_Vector_Slice
                where  stream_readers_and_writers::Vector_Slice    == threadkit_binary_stream_readers_and_writers::Vector_Slice
                where  stream_readers_and_writers::Element         == threadkit_binary_stream_readers_and_writers::Element
                where  stream_readers_and_writers::File_Position   == threadkit_binary_stream_readers_and_writers::File_Position
                where  stream_readers_and_writers::Stream_Reader   == threadkit_binary_stream_readers_and_writers::Stream_Reader
                where  stream_readers_and_writers::Stream_Writer   == threadkit_binary_stream_readers_and_writers::Stream_Writer;

      )

    : (weak)  Threadkit_Binary_Io                                       # Threadkit_Binary_Io   is from   src/lib/std/src/io/threadkit-binary-io.api

    {
        include threadkit;                                              # threadkit             is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg

        package pio = threadkit_winix_stream_readers_and_writers::stream_readers_and_writers;           # threadkit_winix_stream_readers_and_writers    is our argument.

        package a   = rw_vector_of_one_byte_unts;                               # rw_vector_of_one_byte_unts    is from   src/lib/std/src/rw-vector-of-one-byte-unts.pkg
        package rs  = rw_vector_slice_of_one_byte_unts;                         # rw_vector_slice_of_one_byte_unts      is from   src/lib/std/src/rw-vector-slice-of-one-byte-unts.pkg

        package v   = vector_of_one_byte_unts;                                  # vector_of_one_byte_unts               is from   src/lib/std/src/vector-of-one-byte-unts.pkg
        package vs  = vector_slice_of_one_byte_unts;                            # vector_slice_of_one_byte_unts is from   src/lib/std/src/vector-slice-of-one-byte-unts.pkg
        package pos = file_position;                            # file_position         is from   src/lib/std/file-position.pkg


        # Assign to a maildrop:
        #
        fun update_maildrop (mv, x)
            =
            {   empty mv;
                fill (mv, x);
            };

        # An element for initializing buffers:
        #
        some_element = (0u0:  one_byte_unt::Unt);

        vec_extract =  vs::to_vector o vs::make_slice;
        vec_get     =  v::get;
        rw_vec_set  =  a::set;
        empty_vec   =  v::from_list [];

        fun dummy_cleaner () = ();

        package pure_io {

            Vector   = v::Vector;
            Element  = v::Element;

            Stream_Reader =  pio::Stream_Reader;
            Stream_Writer =  pio::Stream_Writer;
            File_Position =  pio::File_Position;

            # ** Functional input streams **
            #
            Input_Stream
                =
                INPUT_STREAM  (Input_Buffer, Int)

            also
            Input_Buffer
                =
                INPUT_BUFFER
                  {
                    base_position:  Null_Or( File_Position ),
                    more:           Maildrop( More ),           # When this cell is empty, it means that 
                                                                # there is an outstanding request to the 
                                                                # server to extend the stream. 
                    data:  Vector,
                    info:  Info
                  }

            also
            More
              = MORE  Input_Buffer      # Forward link to additional data.
              | NOMORE                  # Placeholder for forward link.
              | TERMINATED              # Termination of the stream.

            also
            Info = INFO   { reader:             Stream_Reader,
                            read_vector:        Int -> Vector,
                            read_vec_mailop:    Int -> threadkit::Mailop( Vector ),

                            is_closed:          Ref( Bool ),
                            get_position:       Void -> Null_Or( File_Position ),
                            tail:               Maildrop(  Maildrop(  More ) ),         # Points to the more cell of the last buffer.

                            clean_tag:          threadkit_io_cleanup_at_shutdown::Tag
                          };

            fun info_of_ibuf (INPUT_BUFFER { info, ... } )
                =
                info;

            fun chunk_size_of_ibuf buf
                =
                {   (info_of_ibuf  buf)
                        ->
                        INFO { reader => pio::STREAM_READER { chunk_size, ... }, ... };

                    chunk_size;
                };

            fun read_vector (INPUT_BUFFER { info=>INFO { read_vector=>f, ... }, ... } )
                =
                f;

            fun input_exn (INFO { reader => pio::STREAM_READER { name, ... }, ... }, ml_op, cause)
                =
                raise exception  iox::IO  { function => ml_op,  name,  cause };

            More_Data = EOF | DATA  Input_Buffer;

            # Extend the stream by a chunk.
            # Invariant: the more m-variable is empty on entry and full on exit.
            #
            fun extend_stream (read_g, ml_op, buf as INPUT_BUFFER { more, info, ... } )
                =
                {   info ->   INFO { get_position, tail, ... };

                    base_position = get_position ();

                    chunk = read_g  (chunk_size_of_ibuf  buf);

                    if (v::length chunk == 0)

                        fill (more, NOMORE);
                        EOF;

                    else 

                        new_more = make_empty_maildrop ();

                        buf' = INPUT_BUFFER {
                                base_position,
                                data => chunk,
                                more => new_more,
                                info
                              };

                        # Note that we do not fill the newMore cell until
                        # after the tail has been updated.  This ensures
                        # that someone attempting to access the tail will
                        # not acquire the lock until after we are done.
                        #
                        update_maildrop (tail, new_more);
                        fill (more,       MORE buf');  #  releases lock!! 
                        fill (new_more, NOMORE);
                        DATA buf';
                    fi;
                }
                except
                    ex =  {   fill (more, NOMORE);
                              input_exn (info, ml_op, ex);
                          };


            # Get the next buffer in the stream,
            # extending it if necessary. 
            # If the stream must be extended,
            # we lock it by taking the value from the
            # more cell; the extend_stream function
            # is responsible for filling in the cell.
            #
            fun get_buffer (read_g, ml_op) (buf as INPUT_BUFFER { more, info, ... } )
                =
                get (peek more)
                where
                    fun get TERMINATED  =>  EOF;
                        get (MORE buf') =>  DATA buf';

                        get NOMORE
                            =>
                            case (empty more)

                                NOMORE => extend_stream (read_g, ml_op, buf);

                                next   => {   fill (more, next);
                                              get next;
                                          };
                            esac;
                    end;
                end;

            # Read a chunk that is at least the specified size 
            #
            fun read_chunk buf
                =
                {   (info_of_ibuf buf)
                        ->
                        INFO { read_vector, reader => pio::STREAM_READER { chunk_size, ... }, ... };

                    case (chunk_size - 1)
                        #
                        0 => (fn n = read_vector n);
                        k => (fn n = read_vector (int::quot (n+k, chunk_size) * chunk_size));
                                    #
                                    # Round up to next multiple of chunk_size 
                    esac;
                };

            fun generalized_input get_buf
                =
                get
                where
                    fun get (INPUT_STREAM (buf as INPUT_BUFFER { data, ... }, pos))
                        =
                        {   len = v::length data;
                            #
                            if (pos < len)
                                #
                                (vec_extract (data, pos, NULL), INPUT_STREAM (buf, len));
                            else
                                case (get_buf buf)
                                    #
                                    EOF       =>  (empty_vec, INPUT_STREAM (buf, len));
                                    DATA rest =>  get (INPUT_STREAM (rest, 0));
                                esac;
                            fi;
                        };
                end;

            # Terminate an input stream 
            #
            fun terminate (info as INFO { tail, clean_tag, ... } )
                =
                {   m = peek tail;
                    #
                    case (empty m)
                        #
                        (m' as MORE _)
                            =>
                            {   fill (m, m');
                                terminate info;
                            };

                        TERMINATED
                            =>
                            fill (m, TERMINATED);

                       _ => {   threadkit_io_cleanup_at_shutdown::remove_cleaner clean_tag;
                                fill (m, TERMINATED);
                            };
                    esac;
                };


            # Find the end of the stream 
            #
            fun find_eos (buf as INPUT_BUFFER { more, data, ... } )
                =
                case (peek more)
                    #
                    MORE buf =>  find_eos buf;
                    _        =>  INPUT_STREAM (buf, v::length data);
                esac;


            fun read (stream as INPUT_STREAM (buf, _))
                =
                generalized_input
                    (get_buffer (read_vector buf, "read"))
                    stream;

            fun read_one (INPUT_STREAM (buf, pos))
                =
                {   buf ->  INPUT_BUFFER { data, more, ... };

                    if (pos < v::length data)
                        THE (vec_get (data, pos), INPUT_STREAM (buf, pos+1));
                    else
                        get (peek more)
                        where
                            fun get (MORE buf) => read_one (INPUT_STREAM (buf, 0));
                                get TERMINATED => NULL;

                                get NOMORE
                                    =>
                                    case (empty more)

                                        NOMORE
                                            =>
                                            case (extend_stream (read_vector buf, "read_one", buf))
                                                DATA rest => read_one (INPUT_STREAM (rest, 0));
                                                EOF       => NULL;
                                            esac;

                                        next =>
                                            {   fill (more, next);
                                                get next;
                                            };
                                    esac;
                            end;
                        end;
                    fi;
                };

            fun read_n (INPUT_STREAM (buf, pos), n)
                =
                {   fun join (item, (list, stream))
                        =
                        (item ! list, stream);

                    fun input_list (buf as INPUT_BUFFER { data, ... }, i, n)
                        =
                        {   len = v::length data;

                            remain = len-i;

                            if (remain >= n)
                                 ([vec_extract (data, i, THE n)], INPUT_STREAM (buf, i+n));
                            else
                                 join ( vec_extract (data, i, NULL),
                                        next_buf (buf, n-remain)
                                      );
                            fi;
                          }

                    also
                    fun next_buf (buf as INPUT_BUFFER { more, data, ... }, n)
                        =
                        get (peek more)
                        where
                            fun get (MORE buf)
                                    =>
                                    input_list (buf, 0, n);

                                get TERMINATED
                                    =>
                                    ([], INPUT_STREAM (buf, v::length data));

                                get NOMORE
                                    =>
                                    case (empty more)

                                        NOMORE => (case (extend_stream (read_vector buf, "read_n", buf))
                                             EOF => ([], INPUT_STREAM (buf, v::length data));
                                            (DATA rest) => input_list (rest, 0, n); esac
                                         );             # end case

                                        next => {   fill (more, next);
                                                    get next;
                                                };
                                    esac;

                            end;
                        end;

                    my (data, stream)
                        =
                        input_list (buf, pos, n);

                    (v::cat data, stream);
                };

            fun read_all (stream as INPUT_STREAM (buf, _))
                =
                {   (info_of_ibuf  buf)
                        ->
                        INFO  { reader => pio::STREAM_READER { avail, ... }, ... };
                        

                    # Read a chunk that is as large as the available input.
                    # Note that for systems that use CR-LF for '\n', the
                    # size will be too large, but this should be okay.
                    #
                    fun big_chunk _
                        =
                        read_chunk  buf  delta
                        where
                            delta = case (avail())
                                        #
                                        NULL  =>  chunk_size_of_ibuf buf;
                                        THE n =>  n;
                                    esac;
                        end;

                    big_input
                        =
                        generalized_input
                            (get_buffer (big_chunk, "read_all"));

                    fun loop (v, stream)
                        =
                        if (v::length v == 0)  [];
                        else                   v ! loop (big_input stream);
                        fi;

                    data = v::cat (loop (big_input stream));

                    (data, find_eos buf);
                };

            fun input1evt       _ =  raise exception FAIL "input1Evt unimplemented";
            fun input_mailop     _ =  raise exception FAIL "inputEvt unimplemented";
            fun input_nevt      _ =  raise exception FAIL "inputNEvt unimplemented";
            fun input_all_mailop _ =  raise exception FAIL "inputAllEvt unimplemented";

            # Return THE k, if k <= amount
            # characters can be read without blocking. 
            #
            fun can_read (stream as INPUT_STREAM (buf, pos), amount)
                =
                if (amount < 0)
                    raise exception SIZE;
                else
                    try_input (buf, pos, amount)
                    where
    /******
                          readVecNB = (case buf
                               of (INPUT_BUFFER { info as INFO { readVecNB=NULL, ... }, ... } ) =>
                                    inputExn (info, "can_read", iox::NONBLOCKING_IO_NOT_SUPPORTED)
                                | (INPUT_BUFFER { info=INFO { readVecNB=THE f, ... }, ... } ) => f
                              )         # end case
    ******/
                        fun try_input (buf as INPUT_BUFFER { data, ... }, i, n)
                            =
                            {   len = v::length data;
                                remain = len - i;

                                remain >= n  ??  THE n
                                             ::  next_buf (buf, n - remain);
                            }

                        also
                        fun next_buf (INPUT_BUFFER { more, ... }, n)
                            =
                            get (peek more)
                            where
                                fun get (MORE buf) => try_input (buf, 0, n);
                                    get TERMINATED => THE (amount - n);
                                    get NOMORE => THE (amount - n);
                                end;
    /******
                                  | get NOMORE = (case md::mTake more
                                       of NOMORE => ((
                                            case extendStream (readVecNB, "can_read", buf)
                                             of EOF => THE (amount - n)
                                              | (DATA b) => tryInput (b, 0, n)
                                            )           # end case
                                              except iox::IO { cause=WOULD_BLOCK, ... } => THE (amount - n))
                                        | next => (md::mPut (more, next); get next)
                                      )         # end case
    ******/

                        end;

                    end;
                fi;

            # Close an input stream given its info package.
            # We need this function for the cleanup hook
            # to avoid a space leak.
            #
            fun close_in_info (INFO { is_closed=>REF TRUE, ... } )
                    =>
                    ();

                close_in_info (info as INFO { is_closed, reader => pio::STREAM_READER { close, ... }, ... } )
                    =>
                    {
    # ** We need some kind of lock on the input stream to do this safely!!! ** XXX BUGGO FIXME
                        terminate info;

                        is_closed := TRUE;

                        close()
                        except
                            ex =  input_exn (info, "close_input", ex);
                    };
            end;


            fun close_input (INPUT_STREAM (buf, _))
                =
                close_in_info (info_of_ibuf buf);


            fun end_of_stream (INPUT_STREAM (buf as INPUT_BUFFER { more, ... }, pos))
                =
                case (empty more)

                    next as MORE _
                        =>
                        {   fill (more, next);
                            FALSE;
                        };

                    next
                        =>
                        {   buf ->  INPUT_BUFFER { data, info=>INFO { is_closed, ... }, ... };

                            if (pos == v::length data)
                                #
                                case (next, *is_closed)
                                    #
                                    (NOMORE, FALSE)
                                        =>
                                        case (extend_stream (read_vector buf, "end_of_stream", buf))
                                            #
                                            EOF =>  TRUE;
                                            _   =>  FALSE;
                                        esac;

                                    _   =>
                                        {   fill (more, next);
                                            TRUE;
                                        };
                                esac;
                            else
                                fill (more, next);
                                FALSE;
                            fi;
                       };
                esac;


            fun make_instream (reader, data)
                =
                {   reader ->  pio::STREAM_READER { read_vector, read_vec_mailop, get_position, set_position, ... };
                    #
                    get_position
                        =
                        case (get_position, set_position)
                            #
                            (THE f, THE _)
                                =>
                                (fn () =  THE (f ()));

                             _   =>
                                 (fn () =  NULL);
                         esac;

                    more = make_full_maildrop NOMORE;
                    tag  = threadkit_io_cleanup_at_shutdown::add_cleaner dummy_cleaner;

                    info = INFO   { reader,
                                    read_vector,
                                    read_vec_mailop,
                                    get_position,
                                    #
                                    is_closed =>  REF FALSE,
                                    tail      =>  make_full_maildrop more,
                                    clean_tag =>  tag
                                  };

                    # * What should we do about the position in this case ?? *

                    # Suggestion: When building a stream with supplied initial data,
                    # nothing can be said about the positions inside that initial
                    # data (who knows where that data even came from!).


                    base_position
                        =
                        if (v::length data == 0)    get_position ();
                        else                        NULL;
                        fi;

                    buf = INPUT_BUFFER {
                            base_position, data,
                            info, more
                          };

                    stream =  INPUT_STREAM (buf, 0);

                    threadkit_io_cleanup_at_shutdown::rebind_cleaner
                      ( tag,
                        fn () =  close_in_info info
                      );

                    stream;
                };

            fun get_reader (INPUT_STREAM (buf, pos))
                =
                {   buf ->  INPUT_BUFFER { data, info as INFO { reader, ... }, more, ... };
                    #
                    fun get_data more
                        =
                        case (peek more)
                            #
                            MORE (INPUT_BUFFER { data, more=>more', ... } )
                                =>
                                data ! get_data more';

                             _ => [];
                        esac;


                    terminate info;

                    if (pos < v::length data)
                        ( reader,
                          v::cat (vec_extract (data, pos, NULL) ! get_data more)
                        );
                    else
                        ( reader,
                          v::cat (get_data more)
                        );
                    fi;
                };

    /*
          # * Position operations on instreams *
            enum in_pos = INP of {
                base:  pos,
                offset:  Int,
                info:  info
              }
    */

    /*
            fun getPosIn (INPUT_STREAM (buf, pos)) = (case buf
                   of INPUT_BUFFER { basePos=NULL, info, ... } =>
                        inputExn (info, "getPosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED)
                    | INPUT_BUFFER { basePos=THE p, info, ... } => INP {
                          base = p, offset = pos, info = info
                        }
                  )             # end case
    */


    /*
            fun filePosIn (INP { base, offset, ... } ) =
                  position.+(base, file_position::from_int offset)
    */

            fun file_position_in (INPUT_STREAM (buf, pos))
                =
                case buf

                    INPUT_BUFFER { base_position=>NULL, info, ... }
                        =>
                        input_exn (info, "filePosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);

                    INPUT_BUFFER { base_position=>THE b, ... }
                        =>
                        file_position::(+) (b, file_position::from_int pos);
                esac;
    /*
            fun setPosIn (pos as INP { info as INFO { reader, ... }, ... } ) = let
                  fpos = filePosIn pos
                  my (PIO::READER rd) = reader
                  in
                    terminate info;
                    the rd.setPos fpos;
                    make_instream (PIO::READER rd, empty_vec)
                  end
    */

            # IO mailop constructors:
            # We exploit the "functional" nature of stream IO
            # to implement the mailop constructors.  These
            # constructors spawn a thread to do the operation
            # and and write the result in an iVariable that
            # serves as the synchronization value.
            #
            # NOTE: This implementation has the weakness that
            # it prevents shutdown when everything else is
            # deadlocked, since the thread that is spawned
            # to actually do the IO could proceed.

            stipulate

                Result X = RES  X | EXCEPTION  Exception;

                fun do_input input_op
                    =
                    {   fun read arg
                            =
                            RES (input_op arg)
                            except
                                ex = EXCEPTION ex;

                        fn arg
                            =
                            guard
                               .{   reply_1shot =  make_oneshot_maildrop ();

                                    make_thread "binary I/O" .{
                                        set (reply_1shot, read arg);
                                    };

                                    get'  reply_1shot
                                        ==>
                                        fn (RES x)        =>  x;
                                           (EXCEPTION ex) =>  raise exception ex;
                                        end;

                                };

                    };
            herein

                input1evt       = do_input read_one;
                input_mailop     = do_input read;

                input_nevt      = do_input read_n;
                input_all_mailop = do_input read_all;
            end;                                                # stipulate


            # ** Output streams **

            # An output stream is implemented
            # as a monitor using an mvar to
            # hold its data.

            Ostrm_Info
                =
                OSTRM_INFO
                  {
                    buffer:                     a::Rw_Vector,
                    first_free_byte_in_buffer:  Ref( Int ),

                    is_closed:                  Ref( Bool ),
                    buffering_mode:             Ref( iox::Buffering_Mode ),

                    writer:                     Stream_Writer,

                    write_rw_vector:            rs::Slice -> Void,
                    write_vector:               vs::Slice -> Void,

                    clean_tag:  threadkit_io_cleanup_at_shutdown::Tag
                  };

            Output_Stream
                =
                Maildrop( Ostrm_Info );

            fun raise_io_exception (OSTRM_INFO { writer => pio::STREAM_WRITER { name, ... }, ... }, ml_op, cause)
                =
                raise exception  iox::IO  { function => ml_op,  name,  cause };


            # Lock access to the stream and
            # make sure that it is not closed. 
            #
            fun lock_and_check_closed_out (strm_mv, ml_op)
                =
                case (empty  strm_mv)
                    #
                    (stream as OSTRM_INFO( { is_closed => REF TRUE, ... } ))
                        =>
                        {   fill (strm_mv, stream);
                            #
                            raise_io_exception (stream, ml_op, iox::CLOSED_IO_STREAM);
                        };

                    stream => stream;
                esac;


            fun flush_buffer (strm_mv, stream as OSTRM_INFO { buffer, first_free_byte_in_buffer, write_rw_vector, ... }, ml_op)
                =
                case *first_free_byte_in_buffer
                    #
                    0 => ();

                    n => {   write_rw_vector (rs::make_slice (buffer, 0, THE n));
                             #
                             first_free_byte_in_buffer := 0;
                         }
                         except
                             ex =  {   fill (strm_mv, stream);
                                       #
                                       raise_io_exception (stream, ml_op, ex);
                                   };
                esac;


            fun write (strm_mv, v)
                =
                {   my (stream as OSTRM_INFO os)
                        =
                        lock_and_check_closed_out (strm_mv, "write");

                    fun release ()
                        =
                        fill (strm_mv, stream);

                    os ->  { buffer, first_free_byte_in_buffer, buffering_mode, ... };


                    fun flush ()
                        =
                        flush_buffer (strm_mv, stream, "write");

                    fun flush_all ()
                        =
                        os.write_rw_vector  (rs::make_full_slice  buffer)
                        except
                            ex =  {   release();
                                      #
                                      raise_io_exception (stream, "write", ex);
                                  };


                    fun write_direct ()
                        =
                        {   case *first_free_byte_in_buffer
                                #
                                0 => ();
                                #
                                n => {   os.write_rw_vector  (rs::make_slice  (buffer, 0, THE n));
                                         #
                                         first_free_byte_in_buffer := 0;
                                     };
                            esac;

                            os.write_vector  (vs::make_full_slice  v);
                        }
                        except
                            ex =  {   release();
                                      #
                                      raise_io_exception  (stream, "write", ex);
                                  };

                    fun insert copy_vec
                        =
                        {   buf_len  =  a::length  buffer;
                            data_len =  v::length  v;

                            if (data_len >= buf_len)
                                #
                                write_direct ();
                            else
                                i = *first_free_byte_in_buffer;

                                avail = buf_len - i;

                                if (avail < data_len)

                                    copy_vec (v, 0, avail, buffer, i);

                                    flush_all ();

                                    copy_vec (v, avail, data_len-avail, buffer, 0);

                                    first_free_byte_in_buffer := data_len-avail;

                                else

                                    copy_vec (v, 0, data_len, buffer, i);

                                    first_free_byte_in_buffer :=  i + data_len;

                                    if (avail == data_len)   flush ();   fi;
                                fi;
                            fi;
                       };

                    case *buffering_mode
                        #
                        iox::NO_BUFFERING
                            =>
                            write_direct ();

                        _   =>
                            insert copy_vec
                            where
                                fun copy_vec (from, from_i, from_len, to, to_i)
                                    =
                                    rs::copy_vec
                                      { from => vs::make_slice (from, from_i, THE from_len),
                                        to,
                                        di => to_i
                                      };

                            end;
                    esac;

                    release ();
                };

            fun write_one (strm_mv, element)
                =
                release ()
                where
                    my (stream as OSTRM_INFO { buffer, first_free_byte_in_buffer, buffering_mode, write_rw_vector, ... } )
                        =
                        lock_and_check_closed_out (strm_mv, "write_one");

                    fun release ()
                        =
                        fill (strm_mv, stream);

                    case *buffering_mode
                        #
                        iox::NO_BUFFERING
                            =>
                            {   rw_vec_set (buffer, 0, element);

                                write_rw_vector (rs::make_slice (buffer, 0, THE 1))
                                except
                                    ex =  {   release();
                                              raise_io_exception (stream, "write_one", ex);
                                          };
                            };

                         _   =>
                            {   i = *first_free_byte_in_buffer;
                                #
                                i' = i+1;

                                rw_vec_set (buffer, i, element);

                                first_free_byte_in_buffer := i';

                                if (i' == a::length  buffer)
                                    #
                                    flush_buffer (strm_mv, stream, "write_one");
                                fi;
                            };
                    esac;
                end;

            fun flush strm_mv
                 =
                 {   stream =  lock_and_check_closed_out (strm_mv, "flush");
                     #  
                     flush_buffer (strm_mv, stream, "flush");
                     #  
                     fill (strm_mv, stream);
                 };

            fun close_output  strm_mv
                =
                {   (empty  strm_mv)
                        ->
                        (stream as OSTRM_INFO  { writer => pio::STREAM_WRITER { close, ... },  is_closed,  clean_tag, ... } );

                    if (not *is_closed)
                        #
                        flush_buffer (strm_mv, stream, "close");
                        is_closed := TRUE;
                        threadkit_io_cleanup_at_shutdown::remove_cleaner clean_tag;
                        close();
                    fi;

                    fill (strm_mv, stream);
                };

            fun make_outstream  (wr as pio::STREAM_WRITER { chunk_size, write_rw_vector, write_vector, ... },  mode)
                =
                stream
                where
                    fun iterate (f, size, subslice)
                        =
                        lp
                        where
                            fun lp sl
                                =
                                if (size sl != 0) 
                                    #
                                    n = f sl;

                                    lp (subslice (sl, n, NULL));
                                fi;
                        end;

                    write_rw_vector' = iterate (write_rw_vector, rs::length, rs::make_subslice);
                    write_vector'    = iterate (write_vector,    vs::length, vs::make_subslice);

                    # Install a dummy cleaner:
                    #
                    tag = threadkit_io_cleanup_at_shutdown::add_cleaner dummy_cleaner;

                    stream
                        =
                        make_full_maildrop (
                            OSTRM_INFO
                              {
                                buffer    =>  a::make_rw_vector (chunk_size, some_element),
                                first_free_byte_in_buffer       =>  REF 0,

                                is_closed    => REF FALSE,
                                buffering_mode => REF mode,

                                writer          => wr,
                                write_rw_vector => write_rw_vector',
                                write_vector    => write_vector',
                                clean_tag       => tag
                              }
                      );

                    threadkit_io_cleanup_at_shutdown::rebind_cleaner
                        (tag, fn () = close_output stream);
                end;


            fun get_writer strm_mv
                =
                {   my (stream as OSTRM_INFO { writer, buffering_mode, ... } )
                        =
                        lock_and_check_closed_out (strm_mv, "getWriter");

                    (writer, *buffering_mode)
                    before
                        fill (strm_mv, stream);
                };

            # Position operations on outstreams:
            #
            Out_Position
                =
                OUT_POSITION
                  {
                    pos:  pio::File_Position,
                    stream:  Output_Stream
                  };

            fun get_output_position strm_mv
                =
                {   my (stream as OSTRM_INFO { writer, ... } )
                        =
                        lock_and_check_closed_out (strm_mv, "getWriter");

                    fun release ()
                        =
                        fill (strm_mv, stream);

                    flush_buffer (strm_mv, stream, "get_output_position");

                    case writer
                        #
                        pio::STREAM_WRITER { get_position => THE f, ... }
                            =>
                            OUT_POSITION { pos => f(), stream => strm_mv }
                            except
                                ex =    {   release();
                                            raise_io_exception (stream, "get_output_position", ex);
                                        };
                        _   =>  {   release();
                                    raise_io_exception (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
                                }
                                before
                                    release ();
                    esac;
                };

            fun file_pos_out (OUT_POSITION { pos, stream=>strm_mv } )
                =
                {   fill (strm_mv, lock_and_check_closed_out (strm_mv, "file_pos_out"));
                    #
                    pos;
                };

            fun set_output_position (OUT_POSITION { pos, stream=>strm_mv } )
                =
                {   my (stream as OSTRM_INFO { writer, ... } )
                        =
                        lock_and_check_closed_out (strm_mv, "set_output_position");

                    fun release ()
                        =
                        fill (strm_mv, stream);

                    case writer
                        #
                        pio::STREAM_WRITER { set_position=>THE f, ... }
                            => 
                            f pos
                            except
                                ex =    {   release ();
                                            raise_io_exception (stream, "set_output_position", ex);
                                        };
                        _   =>
                            {   release ();
                                raise_io_exception (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
                            };
                    esac;

                    release ();
                };

            fun set_buffering_mode (strm_mv, mode)
                =
                {   (lock_and_check_closed_out (strm_mv, "setBufferMode"))
                        ->
                        (stream as OSTRM_INFO { buffering_mode, ... } );

                    if (mode == iox::NO_BUFFERING)
                        #                   
                        flush_buffer (strm_mv, stream, "setBufferMode");
                    fi;

                    buffering_mode := mode;

                    fill (strm_mv, stream);
                };

            fun get_buffering_mode  strm_mv
                =
                {   # Should we be checking for closed streams here???  XXX BUGGO FIXME
                    #
                    (lock_and_check_closed_out (strm_mv, "getBufferMode"))
                        ->
                        (stream as OSTRM_INFO { buffering_mode, ... } );

                    *buffering_mode
                    before
                        fill (strm_mv, stream);
                };

        };              # package pure_io 

        Vector  = v::Vector;
        Element = v::Element;

        Input_Stream  = Maildrop( pure_io::Input_Stream  );
        Output_Stream = Maildrop( pure_io::Output_Stream );



        # Input operations:

        fun read stream
            =
            {   my (v, stream')
                    =
                    pure_io::read (empty stream);

                fill (stream, stream');

                v;
            };

        fun read_one stream
            =
            case (pure_io::read_one (empty stream))

                THE (element, stream')
                    =>
                    {   fill (stream, stream');
                        THE element;
                    };

                NULL => NULL;
            esac;


        fun read_n (stream, n)
            =
            {   my (v, stream')
                    =
                    pure_io::read_n (empty stream, n);

                fill (stream, stream');

                v;
            };

        fun read_all (stream:  Input_Stream)
            =
            {   my (v, stream')
                    =
                    pure_io::read_all (empty stream);

                fill (stream, stream'); v;
            };

        fun input1evt         _ = raise exception FAIL "input1evt unimplemented";
        fun input_mailop     _ = raise exception FAIL "input_mailop unimplemented";
        fun input_nevt        _ = raise exception FAIL "input_nevt unimplemented";
        fun input_all_mailop _ = raise exception FAIL "input_ell_mailop unimplemented";

        fun can_read (stream, n)
            =
            pure_io::can_read (peek stream, n);


        fun lookahead (stream:  Input_Stream)
            =
            case (pure_io::read_one (peek stream))

                THE (element, _) => THE element;

                NULL => NULL;
            esac;

        fun close_input stream
            =
            {   my (s as pure_io::INPUT_STREAM (buf as pure_io::INPUT_BUFFER { data, ... }, _))
                    =
                    empty stream;

                pure_io::close_input s;

                fill (stream, pure_io::find_eos buf);
            };

        fun end_of_stream stream
            =
            pure_io::end_of_stream (peek stream);
    /*
        fun getPosIn stream = pure_io::getPosIn (mGet stream)
        fun setPosIn (stream, p) = mUpdate (stream, pure_io::setPosIn p)
    */


        # Output operations:

        fun write (stream, v)     =  pure_io::write     (peek stream, v);
        fun write_one (stream, c) =  pure_io::write_one (peek stream, c);

        fun flush        stream =  pure_io::flush        (peek stream);
        fun close_output stream =  pure_io::close_output (peek stream);

        fun get_output_position stream = pure_io::get_output_position (peek stream);

        fun set_output_position (stream, p as pure_io::OUT_POSITION { stream=>stream', ... } )
            =
            {   update_maildrop (stream, stream');
                pure_io::set_output_position p;
            };

        fun make_instream (stream:  pure_io::Input_Stream) =  make_full_maildrop stream;
        fun get_instream  (stream:  Input_Stream)          =  peek stream;
        fun set_instream  (stream:  Input_Stream, stream') =  update_maildrop (stream, stream');

        fun make_outstream (stream:  pure_io::Output_Stream) =  make_full_maildrop stream;
        fun get_outstream (stream:  Output_Stream)           =  peek stream;
        fun set_outstream (stream:  Output_Stream, stream')  =  update_maildrop (stream, stream');



        # Open files

        fun open_for_read fname
            =
            make_instream (pure_io::make_instream (threadkit_winix_stream_readers_and_writers::open_for_read fname, empty_vec))
            except
                ex =  raise exception iox::IO { function=>"open_for_read", name=>fname, cause=>ex };

        fun open_for_write  fname
            =
            make_outstream (pure_io::make_outstream (threadkit_winix_stream_readers_and_writers::open_for_write fname, iox::BLOCK_BUFFERING))
            except
                ex =  raise exception iox::IO { function=>"open", name=>fname, cause=>ex };

        fun open_for_append fname
            =
            make_outstream (pure_io::make_outstream (threadkit_winix_stream_readers_and_writers::open_for_append fname, iox::NO_BUFFERING))
            except
                ex =  raise exception iox::IO { function=>"open_for_append", name=>fname, cause=>ex };

    };  #  binary_io_g 
end;

## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2011,
## released under Gnu Public Licence version 3.


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext