PreviousUpNext

15.4.1075  src/lib/std/src/io/threadkit-file-g.pkg

## threadkit-file-g.pkg
#
# The threadkit version of file_g.

# Compiled by:
#     src/lib/std/standard.lib





stipulate
    package cv  =  vector_of_chars;                                                     # vector_of_chars               is from   src/lib/std/vector-of-chars.pkg
    package cvs =  vector_slice_of_chars;                                               # vector_slice_of_chars         is from   src/lib/std/src/vector-slice-of-chars.pkg
    package iox =  io_exceptions;                                                       # io_exceptions                 is from   src/lib/std/src/io/io-exceptions.pkg
    package wcs =  rw_vector_slice_of_chars;                                            # rw_vector_slice_of_chars      is from   src/lib/std/src/rw-vector-slice-of-chars.pkg
    package wcv =  rw_vector_of_chars;                                                  # rw_vector_of_chars            is from   src/lib/std/rw-vector-of-chars.pkg
herein

    generic package   threadkit_file_g   (
        #             ================
        #
        package winix_stream_readers_and_writers
            :
            api {
                include Threadkit_Winix_Stream_Readers_And_Writers;                                     # Threadkit_Winix_Stream_Readers_And_Writers    is from   src/lib/std/src/io/threadkit-winix-stream-readers-and-writers.api

                stdin:          Void   -> stream_readers_and_writers::Stream_Reader;
                stdout:         Void   -> stream_readers_and_writers::Stream_Writer;
                stderr:         Void   -> stream_readers_and_writers::Stream_Writer;

                string_reader:  String -> stream_readers_and_writers::Stream_Reader;
            }
                where  stream_readers_and_writers::Rw_Vector       ==  threadkit_text_stream_readers_and_writers::Rw_Vector
                where  stream_readers_and_writers::Vector          ==  threadkit_text_stream_readers_and_writers::Vector
                where  stream_readers_and_writers::Rw_Vector_Slice ==  threadkit_text_stream_readers_and_writers::Rw_Vector_Slice
                where  stream_readers_and_writers::Vector_Slice    ==  threadkit_text_stream_readers_and_writers::Vector_Slice
                where  stream_readers_and_writers::Element         ==  threadkit_text_stream_readers_and_writers::Element
                where  stream_readers_and_writers::File_Position   ==  threadkit_text_stream_readers_and_writers::File_Position
                where  stream_readers_and_writers::Stream_Reader   ==  threadkit_text_stream_readers_and_writers::Stream_Reader
                where  stream_readers_and_writers::Stream_Writer   ==  threadkit_text_stream_readers_and_writers::Stream_Writer;

    )

    : (weak) Threadkit_File                                                             # Threadkit_File                is from   src/lib/std/src/io/threadkit-file.api

    {
        include threadkit;                                                              # threadkit                     is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg

        stipulate
            package bio = winix_stream_readers_and_writers::stream_readers_and_writers;
        herein

            # Assign to an MVar 
            #
            fun m_update (mv, x)
                =
                ignore (swap (mv, x));

            # An element for initializing buffers:
            #
            some_element = '\000';

            vec_extract = cvs::to_vector o cvs::make_slice;
            vec_get = cv::get;
            rw_vec_set = wcv::set;
            substring_base = substring::base;
            empty_string = "";

            fun dummy_cleaner ()
                =
                ();

            package pio {                                                       # Exported to client packages.
                #
                Vector   = cv::Vector;
                Element  = cv::Element;

                Stream_Reader =  bio::Stream_Reader;
                Stream_Writer =  bio::Stream_Writer;
                File_Position =  bio::File_Position;

                # Functional input streams:
                #
                Input_Stream
                    =
                    INPUT_STREAM (Input_Buffer, Int)

                also
                Input_Buffer
                    =
                    INPUT_BUFFER
                      {
                        base_position:  Null_Or( File_Position ),

                        more:  Maildrop( More ),                # When this cell is empty, it means that 
                                                                # there is an outstanding request to the 
                                                                # server to extend the stream. 
                        data:  Vector,
                        info:  Info
                      }

                also
                More
                  = MORE  Input_Buffer  #  forward link to additional data 
                  | NOMORE              #  placeholder for forward link 
                  | TERMINATED          #  termination of the stream 

                also
                Info = INFO
                         { reader:           Stream_Reader,
                           #
                           read_vector:      Int -> Vector,
                           read_vec_mailop:  Int -> threadkit::Mailop( Vector ),
                           #
                           closed:           Ref( Bool ),
                           #
                           get_position:     Void -> Null_Or( File_Position ),
                           #
                           tail:             Maildrop(  Maildrop(  More ) ),    # Points to the more cell of the last buffer. 
                           #
                           clean_tag:        threadkit_io_cleanup_at_shutdown::Tag
                         };


                fun info_of_ibuf (INPUT_BUFFER { info, ... } )
                    =
                    info;


                fun chunk_size_of_ibuf  buf
                    =
                    {   (info_of_ibuf  buf)
                            ->
                            INFO { reader => bio::STREAM_READER { chunk_size, ... }, ... };

                        chunk_size;
                    };


                fun read_vector (INPUT_BUFFER { info=>INFO { read_vector => f, ... }, ... } )           # Should this be renamed get_read_vector? XXX QUERO FIXME
                    =
                    f;


                fun input_exn (INFO { reader => bio::STREAM_READER { name, ... }, ... }, ml_op, exn)
                    =
                    raise exception  iox::IO  { function=>ml_op,  name,  cause=>exn };

                 More_Data = EOF
                           | DATA  Input_Buffer
                           ;


                # Extend the stream by a chunk.
                # Invariant: the more m-variable
                # is empty on entry and full on exit.
                #
                fun extend_stream (read_fn, ml_op, buf as INPUT_BUFFER { more, info, ... } )
                    =
                    {
                        info ->  INFO { get_position, tail, ... };

                        base_position
                            =
                            get_position ();

                        chunk = read_fn (chunk_size_of_ibuf buf);


                        if (cv::length chunk == 0)

                             fill (more, NOMORE);
                             EOF;

                        else

                            new_more = make_empty_maildrop ();

                            buf' = INPUT_BUFFER
                                     {
                                       base_position,
                                       info,
                                       data => chunk,
                                       more => new_more
                                     };

                            # Note that we do not fill the more cell
                            # until after the tail has been updated.
                            #
                            # This ensures that someone attempting to
                            # access the tail will not acquire the lock
                            # until after we are done.
                            #
                            m_update (tail, new_more);

                            fill (more, MORE buf');  #  releases lock!! 
                            fill (new_more, NOMORE);

                            DATA buf';
                       fi;
                   }
                   except ex
                       =
                       {   fill (more, NOMORE);
                           input_exn (info, ml_op, ex);
                       };

                # Get the next buffer in the stream,
                # extending it if necessary.
                #
                # If the stream must be extended,
                # we lock it by taking the value from the
                # more cell; the extend_stream function
                # is responsible for filling in the cell.
                #
                fun get_buffer (read_fn, ml_op) (buf as INPUT_BUFFER { more, info, ... } )
                    =
                    get (peek more)
                    where
                        fun get TERMINATED  =>  EOF;
                            get (MORE buf') =>  DATA buf';

                            get NOMORE      =>  case (empty more)
                                                    #
                                                    NOMORE => extend_stream (read_fn, ml_op, buf);
                                                    next => { fill (more, next); get next;};
                                                esac;
                        end;
                    end;

                # Read a chunk that is at least the specified size:
                # 
                fun read_chunk buf
                    =
                    {   (info_of_ibuf  buf)
                            ->
                            INFO { read_vector, reader => bio::STREAM_READER { chunk_size, ... }, ... };

                        case (chunk_size - 1)
                            #
                            0 =>   fn n = read_vector n;
                            #
                            k =>   # Round up to next multiple of chunkSize:
                                   # 
                                   fn n =  read_vector (int::quot (n+k, chunk_size) * chunk_size);
                        esac;
                    };

                fun generalized_input get_buf
                    =
                    get
                    where 
                        fun get (INPUT_STREAM (buf as INPUT_BUFFER { data, ... }, pos))
                            =
                            {   len = cv::length data;
                                #
                                if (pos < len)
                                    #
                                    (vec_extract (data, pos, NULL), INPUT_STREAM (buf, len));
                                else
                                    case (get_buf buf)
                                        #
                                        DATA rest =>  get (INPUT_STREAM (rest, 0));
                                        EOF       =>  (empty_string, INPUT_STREAM (buf, len));
                                     esac;
                                fi;
                            };
                    end;

                # Terminate an input stream:
                # 
                fun terminate (info as INFO { tail, clean_tag, ... } )
                    =
                    {   m = peek tail;
                        #
                        case (empty m)
                            #
                            (m' as MORE _)
                                =>
                                {   fill (m, m');
                                    #
                                    terminate info;
                                };

                            TERMINATED
                                =>
                                fill (m, TERMINATED);

                            _ => {   threadkit_io_cleanup_at_shutdown::remove_cleaner clean_tag;
                                     fill (m, TERMINATED);
                                 };
                         esac;
                      };

                # Find the end of the stream:
                # 
                fun find_eos (buf as INPUT_BUFFER { more, data, ... } )
                    =
                    case (peek more)
                        #
                        MORE buf =>  find_eos buf;
                        _        =>  INPUT_STREAM (buf, cv::length data);
                    esac;


                fun read (stream as INPUT_STREAM (buf, _))
                    =
                    generalized_input (get_buffer (read_vector buf, "read")) stream;


                fun read_one (INPUT_STREAM (buf, pos))
                    =
                    {   buf ->  INPUT_BUFFER { data, more, ... };

                        if (pos < cv::length data)
                            #
                            THE (vec_get (data, pos), INPUT_STREAM (buf, pos+1));
                        else

                            fun get (MORE buf)
                                    =>
                                    read_one (INPUT_STREAM (buf, 0));

                                get TERMINATED
                                    =>
                                    NULL;

                                get NOMORE
                                    =>
                                    case (empty more)
                                        #
                                        NOMORE =>   case (extend_stream (read_vector buf, "read_one", buf))
                                                        #
                                                        EOF       => NULL;
                                                        DATA rest => read_one (INPUT_STREAM (rest, 0));
                                                    esac;

                                        next => {   fill (more, next);
                                                    get next;
                                                };
                                    esac;
                            end;

                            get (peek more);
                        fi;
                    };

                fun read_n (INPUT_STREAM (buf, pos), n)
                    =
                    {   fun join (item, (list, stream))
                            =
                            (item ! list, stream);


                        fun input_list (buf as INPUT_BUFFER { data, ... }, i, n)
                            =
                            {   len = cv::length data;
                                #
                                remain = len-i;

                                if (remain >= n)
                                    #
                                    ([vec_extract (data, i, THE n)], INPUT_STREAM (buf, i+n));
                                else
                                    join (vec_extract (data, i, NULL), next_buf (buf, n-remain));
                                fi;
                            }

                        also
                        fun next_buf (buf as INPUT_BUFFER { more, data, ... }, n)
                            =
                            get (peek more)
                            where
                                fun get (MORE buf)
                                        =>
                                        input_list (buf, 0, n);

                                    get TERMINATED
                                        =>
                                        ([], INPUT_STREAM (buf, cv::length data));

                                    get NOMORE
                                        =>
                                        case (empty more)

                                            NOMORE
                                                =>
                                                case (extend_stream (read_vector buf, "read_n", buf))

                                                    EOF       =>  ([], INPUT_STREAM (buf, cv::length data));
                                                    DATA rest =>  input_list (rest, 0, n);
                                                esac;

                                            next => {   fill (more, next);
                                                        get next;
                                                    };
                                        esac;
                                end;
                            end;

                        my (data, stream)
                            =
                            input_list (buf, pos, n);

                        (cv::cat data, stream);
                    };

                fun read_all (stream as INPUT_STREAM (buf, _))
                    =
                    {   (info_of_ibuf  buf)
                            ->
                            INFO { reader => bio::STREAM_READER { avail, ... }, ... };

                        # Read a chunk that is as large as the available input.
                        # Note that for systems that use CR-LF for '\n',
                        # the size will be too large, but this should be okay.
                        #
                        fun big_chunk _
                            =
                            {   delta = case (avail ())
                                            #
                                            THE n =>  n;
                                            NULL  =>  chunk_size_of_ibuf buf;
                                        esac;


                                read_chunk buf delta;
                            };

                        big_input
                            =
                            generalized_input (get_buffer (big_chunk, "read_all"));

                        fun loop (v, stream)
                            =
                            if (cv::length v == 0)  [];
                            else                    v ! loop (big_input stream);
                            fi;

                        data = cv::cat (loop (big_input stream));

                        (data, find_eos buf);
                    };

                # Return THE k, if k <= amount characters
                # can be read without blocking.
                # 
                fun can_read (stream as INPUT_STREAM (buf, pos), amount)
                    =
                    {
        /******
                      readVecNB = (case buf
                           of (INPUT_BUFFER { info as INFO { readVecNB=NULL, ... }, ... } ) =>
                                inputExn (info, "can_read", iox::NONBLOCKING_IO_NOT_SUPPORTED)
                            | (INPUT_BUFFER { info=INFO { readVecNB=THE f, ... }, ... } ) => f
                          )             # end case
        ******/
                      fun try_input (buf as INPUT_BUFFER { data, ... }, i, n)
                          =
                          {   len = cv::length data;

                              remain = len - i;

                              remain >= n   ??   THE n
                                            ::   next_buf (buf, n - remain);
                          }
                      also
                      fun next_buf (INPUT_BUFFER { more, ... }, n)
                          =
                          get (peek more)
                          where
                              fun get (MORE buf) => try_input (buf, 0, n);
                                  get TERMINATED => THE (amount - n);
        /******
                              | get NOMORE = (case md::mTake more
                                   of NOMORE => ((
                                        case extendStream (readVecNB, "can_read", buf)
                                         of EOF => THE (amount - n)
                                          | (DATA b) => tryInput (b, 0, n)
                                        )               # end case
                                          except iox::IO { cause=WOULD_BLOCK, ... } => THE (amount - n))
                                    | next => (md::mPut (more, next); get next)
                                  )             # end case
        ******/
                               get NOMORE => THE (amount - n);
                             end;

                          end;

                        if (amount < 0)   raise exception SIZE;
                        else              try_input (buf, pos, amount);
                        fi;
                    };

                # Close an input stream given its info package;
                # we need this function for the cleanup hook to
                # avoid a space leak.
                #
                fun close_in_info (INFO { closed=>REF TRUE, ... } )
                        =>
                        ();

                    close_in_info (info as INFO { closed, reader => bio::STREAM_READER { close, ... }, ... } )
                        =>
                        {
        # ** We need some kind of lock on the input stream to do this safely!!! ** XXX BUGGO FIXME
                            terminate info;

                            closed := TRUE;

                            close()
                            except
                                ex =  input_exn (info, "close_input", ex);
                        };
                end;

                fun close_input (INPUT_STREAM (buf, _))
                    =
                    close_in_info (info_of_ibuf buf);

                fun end_of_stream (INPUT_STREAM (buf as INPUT_BUFFER { more, ... }, pos))
                    =
                    case (empty more)

                        (next as MORE _)
                            =>
                            {   fill (more, next);
                                FALSE;
                            };

                        next
                            =>
                            {   buf ->  INPUT_BUFFER { data, info=>INFO { closed, ... }, ... };

                                if (pos == cv::length data)

                                    case (next, *closed)

                                        (NOMORE, FALSE)
                                            =>
                                            case (extend_stream (read_vector buf, "end_of_stream", buf))
                                                EOF => TRUE;
                                                _   => FALSE;
                                            esac;

                                       _ => {   fill (more, next);
                                                TRUE;
                                            };
                                    esac;

                                else
                                     fill (more, next);
                                     FALSE;
                                fi;
                            };
                    esac;

                fun make_instream' (reader, data)
                    =
                    {   reader ->  bio::STREAM_READER { read_vector, read_vec_mailop, get_position, set_position, ... };
                        #
                        get_position
                            =
                            case (get_position, set_position)
                                #
                                (THE f, THE _) =>   fn () =  THE (f());
                                _              =>   fn () =  NULL;
                            esac;

                        more =  make_full_maildrop  NOMORE;

                        closed_flag = REF FALSE;

                        clean_tag =  threadkit_io_cleanup_at_shutdown::add_cleaner  dummy_cleaner;

                        info = INFO {
                                 reader,
                                 read_vector,
                                 read_vec_mailop,
                                 get_position,
                                 clean_tag,
                                 #
                                 closed     => closed_flag,
                                 tail      => make_full_maildrop more
                               };

                        # What should we do about the position in this case ??
                        # Suggestion: When building a stream with supplied initial data,
                        # nothing can be said about the positions inside that initial
                        # data (who knows where that data even came from!).

                        base_position
                               =
                               if (cv::length data == 0)  get_position ();
                               else                      NULL;
                               fi;

                        buf = INPUT_BUFFER {
                                base_position, data,
                                info, more
                              };

                        stream =  INPUT_STREAM (buf, 0);

                        (clean_tag, stream);
                    };


                fun make_instream arg
                    =
                    {   my (tag, stream as INPUT_STREAM (INPUT_BUFFER { info, ... }, _))
                            =
                            make_instream' arg;

                        threadkit_io_cleanup_at_shutdown::rebind_cleaner (tag, fn () = close_in_info info);

                        stream;
                    };


                fun get_reader (INPUT_STREAM (buf, pos))
                    =
                    {   buf ->  INPUT_BUFFER { data, info as INFO { reader, ... }, more, ... };

                        fun get_data more
                            =
                            case (peek more)

                                (MORE (INPUT_BUFFER { data, more=>more', ... } ))
                                    =>
                                    data ! get_data more';

                                _ => [];
                            esac;


                        terminate info;

                        if (pos < cv::length data)
                             #  
                             (reader, cv::cat (vec_extract (data, pos, NULL) ! get_data more));
                        else (reader, cv::cat (get_data more));
                        fi;
                    };

        /*
              # * Position operations on instreams *
                enum in_pos = INP of {
                    base:  pos,
                    offset:  Int,
                    info:  info
                  }
        */

        /*
                fun getPosIn (INPUT_STREAM (buf, pos)) = (case buf
                       of INPUT_BUFFER { basePos=NULL, info, ... } =>
                            inputExn (info, "getPosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED)
                        | INPUT_BUFFER { basePos=THE p, info, ... } => INP {
                              base = p, offset = pos, info = info
                            }
                      )         # end case
        */
        /*
                fun filePosIn (INP { base, offset, ... } ) =
                      position.+(base, file_position::from_int offset)
        */
                # Get the underlying file position of a stream:
                # 
                fun file_position_in (INPUT_STREAM (buf, pos))
                    =
                    case buf
                        #
                        INPUT_BUFFER { base_position=>NULL, info, ... }
                            =>
                            input_exn (info, "filePosIn", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);

                        INPUT_BUFFER { base_position=>THE base, info, ... }
                            =>
                            {   info ->  INFO { reader => bio::STREAM_READER rd, read_vector, ... };

                                case (rd.get_position, rd.set_position)
                                    #
                                    (THE get_position, THE set_position)
                                        =>
                                        {   tmp_pos = get_position ();
                                            #
                                            fun read_n 0 => ();

                                                read_n n => case (cv::length (read_vector n))
                                                                #
                                                                0 =>  input_exn (info, "filePosIn", FAIL "bogus position");
                                                                k =>  read_n (n-k);
                                                            esac;
                                            end;

                                            set_position base;
                                            read_n pos;

                                            get_position ()
                                            before
                                                set_position tmp_pos;
                                      };

                                   _ => raise exception FAIL "filePosIn: impossible";

                                esac;
                           };
                    esac;

        /*
                fun setPosIn (pos as INP { info as INFO { reader, ... }, ... } ) = let
                      fpos = filePosIn pos
                      my (BIO::READER rd) = reader
                      in
                        terminate info;
                        the rd.setPos fpos;
                        make_instream (bio::READER rd, NULL)
                      end
        */

                # * Text stream specific operations *
                #
                fun read_line (INPUT_STREAM (buf as INPUT_BUFFER { data, ... }, pos))
                    =
                    {   fun join (item, (list, stream))
                            =
                            (item ! list, stream);

                        fun next_buf (is_empty, buf as INPUT_BUFFER { more, data, ... } )
                            =
                            get (peek more)
                            where
                                fun last ()
                                    =
                                    (if is_empty  []; else ["\n"];fi, INPUT_STREAM (buf, cv::length data));

                                fun get (MORE buf)
                                        =>
                                        scan_data (buf, 0);

                                    get NOMORE
                                        =>
                                        case (empty more)

                                            NOMORE
                                                =>
                                                case (extend_stream (read_vector buf, "read_line", buf))

                                                    DATA rest =>  scan_data (rest, 0);
                                                    EOF       =>  last ();
                                                esac;

                                            next
                                                =>
                                                {   fill (more, next);
                                                    get next;
                                                };
                                        esac;

                                    get TERMINATED
                                        =>
                                        last ();
                                end;
                            end

                        also
                        fun scan_data (buf as INPUT_BUFFER { data, ... }, i)
                            =
                            scan i
                            where
                                len = cv::length data;

                                fun scan j
                                    =
                                    if (j == len)

                                        join (vec_extract (data, i, NULL), next_buf (FALSE, buf));

                                    else
                                        if (vec_get (data, j) == '\n')
                                            #
                                            ([vec_extract (data, i, THE (j+1-i))], INPUT_STREAM (buf, j+1));
                                        else
                                            scan (j+1);
                                        fi;
                                    fi;
                            end;

                        my (data, stream)
                            =
                            if (cv::length data == pos)   next_buf (TRUE, buf);
                            else                          scan_data (buf, pos);
                            fi;

                        res_v = cv::cat data;

                        if (cv::length res_v == 0)   NULL;
                        else                         THE (res_v, stream);
                        fi;
                    };

                # IO mailop constructors:
                # We exploit the "functional" nature of stream IO to implement the mailop
                # constructors.  These constructors spawn a thread to do the operation
                # and and write the result in an iVariable that serves as the synchronization
                # value.
                # NOTE: this implementation has the weakness that it prevents shutdown when
                # everything else is deadlocked, since the thread that is spawned to actually
                # do the IO could proceed.            XXX BUGGO FIXME
                #    
                stipulate

                    Result(X) = RESULT(X)
                              | EXCEPTION  Exception
                              ;

                    include threadkit;                                                  # threadkit             is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg

                    fun do_input input_op
                        =
                        {   fun read arg
                                =
                                RESULT (input_op arg)
                                except
                                    ex =  EXCEPTION ex;

                            fn arg
                                =
                                guard
                                   .{   reply_1shot = make_oneshot_maildrop ();

                                        make_thread "text I/O" .{
                                            set (reply_1shot, read arg);
                                        };

                                        get' reply_1shot
                                            ==>
                                            fn (RESULT    x ) =>  x;
                                               (EXCEPTION ex) =>  raise exception ex;
                                            end;
                                    };

                          };
                herein

                    input1evt         = do_input read_one;
                    input_mailop      = do_input read;
                    input_nevt        = do_input read_n;
                    #
                    input_all_mailop  = do_input read_all;
                    input_line_mailop = do_input read_line;

                end;


                # ** Output streams **

                # an output stream is implemented as a monitor using an mvar to
                # hold its data.

                Output_Stream_Info
                    =
                    OUTPUT_STREAM_INFO
                      {
                        buffer:                         wcv::Rw_Vector,
                        first_free_byte_in_buffer:      Ref( Int ),

                        is_closed:                      Ref( Bool ),
                        buffering_mode:                 Ref( iox::Buffering_Mode ),

                        writer:                         Stream_Writer,

                        write_rw_vector:                wcs::Slice -> Void,
                        write_vector:                   cvs::Slice -> Void,

                        clean_tag:                      threadkit_io_cleanup_at_shutdown::Tag
                      };


                Output_Stream
                    =
                    Maildrop( Output_Stream_Info );


                fun is_nl '\n' =>  TRUE;
                    is_nl _    =>  FALSE;
                end;


                fun is_line_break (OUTPUT_STREAM_INFO { buffering_mode, ... } )
                    =
                    *buffering_mode == iox::LINE_BUFFERING
                      ??  is_nl
                      ::  (fn _ = FALSE);


                fun output_exn (OUTPUT_STREAM_INFO { writer => bio::STREAM_WRITER { name, ... }, ... }, ml_op, cause)
                    =
                    raise exception  iox::IO {  function => ml_op,  name,  cause  };

                # Lock access to the stream and make sure that it is not closed. 

                fun lock_and_check_closed_out (strm_mv, ml_op)
                    =
                    case (empty strm_mv)
                        #
                        stream as OUTPUT_STREAM_INFO( { is_closed=>REF TRUE, ... } )
                            =>
                            {   fill (strm_mv, stream);
                                output_exn (stream, ml_op, iox::CLOSED_IO_STREAM);
                            };

                        stream => stream;
                    esac;


                fun flush_buffer (strm_mv, stream as OUTPUT_STREAM_INFO { buffer, first_free_byte_in_buffer, write_rw_vector, ... }, ml_op)
                    =
                    case *first_free_byte_in_buffer
                        #
                        0 => ();

                        n => {   write_rw_vector (wcs::make_slice (buffer, 0, THE n));
                                 first_free_byte_in_buffer := 0;
                             }
                             except ex
                                 =
                                 {   fill (strm_mv, stream);
                                     output_exn (stream, ml_op, ex);
                                 };
                    esac;


                # A version of copyVec that checks for newlines, while it is copying.
                # This is used for LINE_BUFFERING output of strings and substrings.
                #
                fun line_buf_copy_vec (src, src_i, src_len, dst, dst_i)
                    =
                    cpy (src_i, dst_i, FALSE)
                    where

                        stop = src_i+src_len;

                        fun cpy (src_i, dst_i, lb)
                            =
                            if (src_i < stop)

                                 c = vec_get (src, src_i);

                                 rw_vec_set (dst, dst_i, c);

                                 cpy (src_i+1, dst_i+1, lb or is_nl c);
                            else
                                 lb;
                            fi;

                    end;

                # A version of copyVec for BLOCK_BUFFERING
                # output of strings and substrings. 
                #
                fun block_buf_copy_vec (from, from_i, from_len, to, to_i)
                    =
                    {   wcs::copy_vec {
                            from => cvs::make_slice (from, from_i, THE from_len),
                            to,
                            di => to_i
                        };

                        FALSE;
                    };

                fun write (strm_mv, v)
                    =
                    {   my (stream as OUTPUT_STREAM_INFO os)
                            =
                            lock_and_check_closed_out (strm_mv, "write");

                        fun release ()
                            =
                            fill (strm_mv, stream);

                        os -> { buffer, first_free_byte_in_buffer, buffering_mode, ... };

                        fun flush ()
                            =
                            flush_buffer (strm_mv, stream, "write");

                        fun flush_all ()
                            =
                            os.write_rw_vector  (wcs::make_full_slice  buffer)
                            except ex
                                =
                                {   release();
                                    #
                                    output_exn (stream, "write", ex);
                                };

                        fun write_direct ()
                            =
                            {   case *first_free_byte_in_buffer
                                    0 =>  ();
                                    n =>  { os.write_rw_vector (wcs::make_slice (buffer, 0, THE n));
                                            first_free_byte_in_buffer := 0;
                                          };
                                esac;

                                os.write_vector (cvs::make_full_slice v);
                            }
                            except ex
                                =
                                {   release ();
                                    output_exn (stream, "write", ex);
                                };

                        fun insert copy_vec
                            =
                            {
                                buf_len  =  wcv::length buffer;
                                data_len =  cv::length v;

                                if (data_len >= buf_len)

                                     write_direct();

                                else

                                     i = *first_free_byte_in_buffer;

                                     avail = buf_len - i;

                                     if (avail < data_len)

                                          wcs::copy_vec
                                                  { from => cvs::make_slice (v, 0, THE avail),
                                                    to   => buffer,
                                                    di   => i
                                                  };
                                          flush_all();
                                          needs_flush = copy_vec (v, avail, data_len-avail, buffer, 0);

                                          first_free_byte_in_buffer := data_len-avail;

                                          needs_flush  ?:  flush ();
                                     else
                                          needs_flush = copy_vec (v, 0, data_len, buffer, i);

                                          first_free_byte_in_buffer := i + data_len;

                                          if (needs_flush  or  avail == data_len)
                                             flush ();
                                          fi;
                                     fi;
                                fi;
                            };

                          case *buffering_mode
                              #
                              iox::NO_BUFFERING    =>  write_direct ();
                              iox::LINE_BUFFERING  =>  insert line_buf_copy_vec;
                              iox::BLOCK_BUFFERING =>  insert block_buf_copy_vec;
                          esac;

                          release();
                      };

                fun write_one (strm_mv, element)
                    =
                    {   (lock_and_check_closed_out (strm_mv, "write_one"))
                            ->
                            (stream as OUTPUT_STREAM_INFO { buffer, first_free_byte_in_buffer, buffering_mode, write_rw_vector, ... } );

                        fun release ()
                            =
                            fill (strm_mv, stream);

                        case *buffering_mode
                            #
                            iox::NO_BUFFERING
                                =>
                                {   rw_vec_set (buffer, 0, element);
                                    #
                                    write_rw_vector (wcs::make_slice (buffer, 0, THE 1))
                                    except ex
                                        =
                                        {   release();
                                            output_exn (stream, "write_one", ex);
                                        };
                                };

                            iox::LINE_BUFFERING
                                =>
                                {   i = *first_free_byte_in_buffer;

                                    i' = i+1;

                                    rw_vec_set (buffer, i, element);

                                    first_free_byte_in_buffer := i';

                                    if (i' == wcv::length buffer  or  is_nl element)
                                        #
                                        flush_buffer (strm_mv, stream, "write_one");
                                    fi;
                              };

                            iox::BLOCK_BUFFERING
                                =>
                                {   i  = *first_free_byte_in_buffer;
                                    i' = i+1;

                                    rw_vec_set (buffer, i, element);

                                    first_free_byte_in_buffer := i';

                                    if (i' == wcv::length buffer)
                                         flush_buffer (strm_mv, stream, "write_one");
                                    fi;
                              };
                        esac;

                        release();
                    };

                fun flush strm_mv
                    =
                    {   stream =  lock_and_check_closed_out (strm_mv, "flush");

                        flush_buffer (strm_mv, stream, "flush");
                        fill (strm_mv, stream);
                    };

                fun close_output  strm_mv
                    =
                    {   (empty  strm_mv)
                            ->
                            (stream as OUTPUT_STREAM_INFO { writer => bio::STREAM_WRITER { close, ... }, is_closed, clean_tag, ... } );

                        if (not *is_closed)
                            #
                            flush_buffer (strm_mv, stream, "close");
                            is_closed := TRUE;
                            threadkit_io_cleanup_at_shutdown::remove_cleaner clean_tag;
                            close ();
                        fi;

                        fill (strm_mv, stream);
                    };

                fun make_outstream' (wr as bio::STREAM_WRITER { chunk_size, write_rw_vector, write_vector, ... }, mode)
                    =
                    {   fun iterate (f, size, subslice)
                            =
                            lp
                            where
                                fun lp sl
                                    =
                                    if (size sl != 0)
                                        #
                                        n = f sl;
                                        #
                                        lp (subslice (sl, n, NULL));
                                    fi;
                            end;

                        write_rw_vector' =  iterate (write_rw_vector, wcs::length, wcs::make_subslice);
                        write_vector'    =  iterate (write_vector,    cvs::length, cvs::make_subslice);

                        # Install a dummy cleaner:

                        tag =  threadkit_io_cleanup_at_shutdown::add_cleaner dummy_cleaner;

                        stream =    make_full_maildrop
                                        (
                                            OUTPUT_STREAM_INFO
                                              {
                                                buffer => wcv::make_rw_vector (chunk_size, some_element),
                                                first_free_byte_in_buffer => REF 0,
                                                is_closed => REF FALSE,
                                                buffering_mode => REF mode,
                                                writer => wr,
                                                write_rw_vector => write_rw_vector',
                                                write_vector => write_vector',
                                                clean_tag => tag
                                              }
                                        );

                        (tag, stream);
                    };

                fun make_outstream  arg
                    =
                    {   (make_outstream'  arg)
                            ->
                            (tag, stream);

                        threadkit_io_cleanup_at_shutdown::rebind_cleaner (tag, fn () = close_output  stream);

                        stream;
                    };

                fun get_writer  strm_mv
                    =
                    {   (lock_and_check_closed_out (strm_mv, "getWriter"))
                            ->
                            (stream as OUTPUT_STREAM_INFO { writer, buffering_mode, ... } );

                        (writer, *buffering_mode)
                        before
                            fill (strm_mv, stream);
                    };

                # Position operations on outstreams
                #
                Out_Position
                    =
                    OUT_POSITION  {
                      pos:     bio::File_Position,
                      stream:  Output_Stream
                    };

                fun get_output_position strm_mv
                    =
                    {
                        (lock_and_check_closed_out (strm_mv, "getWriter"))
                            ->
                            (stream as OUTPUT_STREAM_INFO { writer, ... } );

                        fun release ()
                            =
                            fill (strm_mv, stream);

                        flush_buffer (strm_mv, stream, "get_output_position");

                        case writer
                            #
                            bio::STREAM_WRITER { get_position=>THE f, ... }
                                =>
                                OUT_POSITION { pos => f(), stream => strm_mv }
                                except ex
                                    =
                                    {   release();
                                        #
                                        output_exn (stream, "get_output_position", ex);
                                    };
                            _   =>
                                {   release();
                                    #
                                    output_exn (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
                                }
                                before release();
                        esac;
                    };

                fun file_pos_out (OUT_POSITION { pos, stream=>strm_mv } )
                    =
                    {   fill (strm_mv, lock_and_check_closed_out (strm_mv, "filePosOut"));
                        #
                        pos;
                    };

                fun set_output_position (OUT_POSITION { pos, stream=>strm_mv } )
                    =
                    {   (lock_and_check_closed_out (strm_mv, "set_output_position"))
                            ->
                            (stream as OUTPUT_STREAM_INFO { writer, ... } );

                        fun release ()
                            =
                            fill (strm_mv, stream);

                        case writer
                            #
                            bio::STREAM_WRITER { set_position => THE f, ... }
                                =>
                                (f pos)
                                except ex
                                    =
                                    {   release ();
                                        #
                                        output_exn (stream, "set_output_position", ex);
                                    };

                           _ => {   release();
                                    #
                                    output_exn (stream, "get_output_position", iox::RANDOM_ACCESS_IO_NOT_SUPPORTED);
                                };
                        esac;

                        release();
                    };

                fun set_buffering_mode  (strm_mv,  mode)
                    =
                    {   (lock_and_check_closed_out (strm_mv, "setBufferMode"))
                            ->
                            (stream as OUTPUT_STREAM_INFO { buffering_mode, ... } );

                        if (mode == iox::NO_BUFFERING)
                            #
                            flush_buffer (strm_mv, stream, "setBufferMode");
                        fi;

                        buffering_mode := mode;

                        fill (strm_mv, stream);
                    };

                fun get_buffering_mode  strm_mv
                    =
                    {
        # * should we be checking for closed streams here??? *

                        (lock_and_check_closed_out (strm_mv, "getBufferMode"))
                            ->
                            (stream as OUTPUT_STREAM_INFO { buffering_mode, ... } );

                        *buffering_mode
                        before
                            fill (strm_mv, stream);
                    };

                # Text stream specific operations
                #
                fun write_substring (strm_mv, ss)
                    =
                    {   my (stream as OUTPUT_STREAM_INFO os)
                            =
                            lock_and_check_closed_out (strm_mv, "write_substring");

                        fun release ()
                            =
                            fill (strm_mv, stream);

                        (substring_base ss) ->  (v, data_start, data_len);
                        os                  ->  { buffer, first_free_byte_in_buffer, buffering_mode, ... };

                        buf_len =  wcv::length  buffer;

                        fun flush ()
                            =
                            flush_buffer (strm_mv, stream, "write_substring");

                        fun flush_all ()
                            =
                            (os.write_rw_vector (wcs::make_full_slice  buffer)
                             except ex
                                =
                                {   release();
                                    output_exn (stream, "write_substring", ex);
                                }
                            );

                        fun write_direct ()
                            =
                            {   case *first_free_byte_in_buffer
                                    #
                                    0 => ();

                                    n => {   os.write_rw_vector (wcs::make_slice (buffer, 0, THE n));
                                             first_free_byte_in_buffer := 0;
                                         };
                                esac;

                                os.write_vector (cvs::make_slice (v, data_start, THE data_len));
                            }
                            except ex
                                =
                                {   release ();
                                    #
                                    output_exn (stream, "write_substring", ex);
                                };

                        fun insert copy_vec
                            =
                            {   buf_len  =  wcv::length buffer;
                                data_len =  cv::length v;

                                if (data_len >= buf_len)
                                    #
                                    write_direct ();

                                else

                                    i = *first_free_byte_in_buffer;

                                    avail = buf_len - i;

                                    if (avail < data_len)
                                        #
                                        wcs::copy_vec
                                            { from => cvs::make_slice (v, data_start, THE avail),
                                              to   => buffer,
                                              di   => i
                                            };

                                        flush_all();

                                        needs_flush = copy_vec (v, avail, data_len-avail, buffer, 0);

                                        first_free_byte_in_buffer := data_len-avail;

                                        needs_flush   ?:  flush ();

                                    else

                                        needs_flush
                                            =
                                            copy_vec (v, data_start, data_len, buffer, i);

                                        first_free_byte_in_buffer := i + data_len;

                                        if (needs_flush or avail == data_len)   flush();   fi;
                                    fi;
                                fi;
                            };

                          case *buffering_mode
                              #
                              iox::NO_BUFFERING => write_direct ();
                              iox::LINE_BUFFERING => insert line_buf_copy_vec;
                              iox::BLOCK_BUFFERING => insert block_buf_copy_vec;
                          esac;

                          release();
                      };

            };          #  pure_io 

            Vector  = cv::Vector;
            Element = cv::Element;

            Input_Stream  = Maildrop( pio::Input_Stream  );
            Output_Stream = Maildrop( pio::Output_Stream );

            # Input operations
            #
            fun read stream
                =
                {   my (v, stream')
                        =
                        pio::read (empty stream);

                    fill (stream, stream'); v;
                };

            fun read_one stream
                =
                case (pio::read_one (empty stream))
                    #
                    THE (element, stream')
                        =>
                        {   fill (stream, stream');
                            THE element;
                        };

                    NULL => NULL;
                esac;

            fun read_n (stream, n)
                =
                {   my (v, stream')
                        =
                        pio::read_n (empty stream, n);

                    fill (stream, stream'); v;
                };

            fun read_all (stream:  Input_Stream)
                =
                {   my (v, stream')
                        =
                        pio::read_all (empty stream);

                    fill (stream, stream'); v;
                };


            # Mailop-value constructors:
            #
            stipulate

                Result(X)
                  = RESULT     X
                  | EXCEPTION  Exception
                  ;

                fun send_mailop (slot, v)
                    =
                    give' (slot, RESULT v);

                fun send_exn_mailop (slot, exn)
                    =
                    give' (slot, EXCEPTION exn);

                fun receive'  slot
                    =
                    take'  slot
                        ==>
                        fn (RESULT    v  ) =>  v;
                           (EXCEPTION exn) =>  raise exception exn;
                        end;

                fun do_input input_mailop (stream:  Input_Stream) nack
                    =
                    {   reply_slot = make_mailslot ();

                        fun input_thread ()
                            =
                            {   stream' = empty stream;

                                nack_mailop
                                    =
                                    nack
                                        ==>
                                        (fn _ = fill (stream, stream'));

                                fun handle_input (result, stream'')
                                    =
                                    select
                                      [
                                        send_mailop (reply_slot, result)
                                            ==>
                                            (fn _ =  fill (stream, stream'')),

                                        nack_mailop
                                      ];

                                (select
                                  [
                                    input_mailop stream'
                                        ==>
                                        handle_input,
                                    nack_mailop
                                  ]
                                )
                                except exn
                                    =
                                    select [

                                        send_exn_mailop (reply_slot, exn)
                                            ==>
                                            (fn _ =  fill (stream, stream')),

                                        nack_mailop
                                    ];
                            };

                        make_thread "text I/O II" input_thread;

                        receive' reply_slot;
                    };
            herein

                fun input1evt (stream:  Input_Stream)
                    =
                    with_nack
                        (do_input  input_mailop  stream)
                    where
                        fun input_mailop (stream:  pio::Input_Stream)
                            =
                            pio::input1evt stream
                                ==>
                                fn THE (s, stream') => (THE s, stream');
                                   NULL             => (NULL, stream);
                                end;
                    end;

                fun input_mailop stream
                    =
                    with_nack
                        (do_input  pio::input_mailop  stream);

                fun input_nevt (stream, n)
                    =
                    with_nack
                        (do_input
                            (fn stream' = pio::input_nevt (stream', n))
                            stream
                        );

                fun input_all_mailop stream
                    =
                    with_nack
                        (do_input  pio::input_all_mailop  stream);

            end;                 # stipulate

            fun can_read (stream, n)
                =
                pio::can_read (peek stream, n);

            fun lookahead (stream:  Input_Stream)
                =
                case (pio::read_one (peek stream))

                     THE (element, _)
                         =>
                         THE element;

                     NULL => NULL;
                esac;

            fun close_input stream
                =
                {   (empty  stream)
                        ->
                        (s as pio::INPUT_STREAM (buffer as pio::INPUT_BUFFER { data, ... }, _));


                    pio::close_input  s;
                    fill (stream, pio::find_eos buffer);
                };

            fun end_of_stream stream
                =
                pio::end_of_stream (peek stream);
        /*
            fun getPosIn stream = pio::getPosIn (md::mGet stream)
            fun setPosIn (stream, p) = mUpdate (stream, pio::setPosIn p)
        */

            # Output operations:
            #
            fun write (stream, v)     =  pio::write (peek stream, v);
            fun write_one (stream, c) =  pio::write_one (peek stream, c);

            fun flush        stream   =  pio::flush        (peek stream);
            fun close_output stream   =  pio::close_output (peek stream);

            fun get_output_position stream
                =
                pio::get_output_position (peek stream);

            fun set_output_position (stream, p as pio::OUT_POSITION { stream=>stream', ... } )
                =
                {   m_update (stream, stream');
                    pio::set_output_position p;
                };

            fun make_instream (stream:  pio::Input_Stream) =  make_full_maildrop stream;
            fun get_instream  (stream:  Input_Stream)          =  peek stream;
            fun set_instream  (stream:  Input_Stream, stream') =  m_update (stream, stream');

            fun make_outstream (stream:  pio::Output_Stream) =  make_full_maildrop stream;
            fun get_outstream  (stream:  Output_Stream)          =  peek stream;
            fun set_outstream  (stream:  Output_Stream, stream') =  m_update (stream, stream');

            # Figure out the proper buffering mode for a given writer 
            #
            fun buffering (bio::STREAM_WRITER { io_descriptor => NULL, ... } )
                    =>
                    iox::BLOCK_BUFFERING;

                buffering (bio::STREAM_WRITER { io_descriptor => THE iod, ... } )
                    =>
                    winix::io::kind iod == winix::io::kind::tty
                    ??     iox::LINE_BUFFERING
                    ::     iox::BLOCK_BUFFERING;
            end;

            # Open files:
            #
            fun open_for_read fname
                =
                make_instream (pio::make_instream (winix_stream_readers_and_writers::open_for_read fname, empty_string))
                except
                    ex =  raise exception iox::IO { function=>"open_for_read", name=>fname, cause=>ex };

            fun open_for_write  fname
                =
                {   wr =   winix_stream_readers_and_writers::open_for_write fname;

                    make_outstream (pio::make_outstream (wr, buffering wr))
                    except
                        ex =  raise exception iox::IO { function=>"open", name=>fname, cause=>ex };
                };

            fun open_for_append fname
                =
                make_outstream (pio::make_outstream (winix_stream_readers_and_writers::open_for_append fname, iox::NO_BUFFERING))
                except ex
                    =
                    raise exception iox::IO { function=>"open_for_append", name=>fname, cause=>ex };

            # Text stream specific operations:
            #
            fun read_line stream
                =
                null_or::map
                    (fn (s, stream') = { fill (stream, stream'); s;})
                    (pio::read_line (empty stream));

            fun write_substring (stream, ss)
                =
                pio::write_substring (peek stream, ss);

            fun open_string src
                =
                make_instream (pio::make_instream (winix_stream_readers_and_writers::string_reader src, empty_string))
                except ex
                    =
                    raise exception iox::IO { function=>"open_for_read", name=>"<string>", cause=>ex };

            package mailslot_io
                =
                mailslot_io_g (
                    package stream_readers_and_writers = bio;
                    package v= vector_of_chars;         # vector_of_chars               is from   src/lib/std/vector-of-chars.pkg
                    package a= rw_vector_of_chars;              # rw_vector_of_chars    is from   src/lib/std/rw-vector-of-chars.pkg
                    package vs= vector_slice_of_chars;  # vector_slice_of_chars is from   src/lib/std/src/vector-slice-of-chars.pkg
                    package rs= rw_vector_slice_of_chars;       # rw_vector_slice_of_chars      is from   src/lib/std/src/rw-vector-slice-of-chars.pkg
                );

            # Open an Input_Stream that is connected
            # to the output port of a channel.
            # 
            fun open_slot_in slot
                =
                make_instream (pio::make_instream (mailslot_io::make_reader slot, empty_string));

            # Open an Output_Stream that is connected
            # to the input port of a slot.
            #
            fun open_slot_out ch
                =
                make_outstream (pio::make_outstream (mailslot_io::make_writer ch, iox::NO_BUFFERING));

            # * Standard streams *
            stipulate

                package sio = pio;

                fun make_std_in rebind
                    =
                    {   my (tag, stream)
                            =
                            sio::make_instream'(winix_stream_readers_and_writers::stdin(), empty_string);

                        if rebind
                           threadkit_io_cleanup_at_shutdown::rebind_cleaner (tag, dummy_cleaner);
                        fi;

                        stream;
                      };

                fun make_std_out rebind
                    =
                    {
                        wr = winix_stream_readers_and_writers::stdout();

                        my (tag, stream)
                            =
                            sio::make_outstream'(wr, buffering wr);

                        if rebind
                            threadkit_io_cleanup_at_shutdown::rebind_cleaner  (tag,  fn () = sio::flush stream);
                        fi;

                        stream;
                    };

                fun make_std_err rebind
                    =
                    {   my (tag, stream)
                            =
                            sio::make_outstream'
                                ( winix_stream_readers_and_writers::stderr (),
                                  iox::NO_BUFFERING
                                );

                        if rebind
                           threadkit_io_cleanup_at_shutdown::rebind_cleaner  (tag,  fn () =  sio::flush stream);
                        fi;

                        stream;
                    };
            herein

                # Build the standard streams.
                # 
                # Since we are not currently running threadkit,
                # we cannot do the cleaner renaming here,
                # but that is okay,
                # since these are just place holders.
                #
                stdin  =  make_instream  (make_std_in  FALSE);
                stdout =  make_outstream (make_std_out FALSE);
                stderr =  make_outstream (make_std_err FALSE);

                fun print s
                    =
                    {   stream' = empty stdout;

                        pio::write (stream', s);
                        pio::flush stream';
                        fill (stdout, stream');
                    };

                fun scan_stream  scan_g
                    =
                    do_it
                    where
                        scan = scan_g  pio::read_one;

                        fun do_it stream
                            =
                            {   instream = get_instream stream;

                                case (scan instream)
                                    #
                                    THE (item, instream')
                                        =>
                                        {   set_instream (stream, instream');
                                            THE item;
                                        };

                                    NULL => NULL;
                                esac;
                            };
                    end;

                # Establish a hook function to rebuild the I/O stack 
                #
                                                        my _ =
                threadkit_io_cleanup_at_shutdown::std_stream_hook
                    :=
                    (fn ()
                        =
                        {   set_instream  (stdin,  make_std_in  TRUE);
                            set_outstream (stdout, make_std_out TRUE);
                            set_outstream (stderr, make_std_err TRUE);

                            runtime_internals::print_hook                       # runtime_internals     is from   src/lib/std/src/nj/runtime-internals.pkg
                                :=
                                print;
                        }
                    );
            end;                                                                # stipulate
        end;                                                                    # stipulate
    };                                                                          # threadkit_file_g 
end;

## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2011,
## released under Gnu Public Licence version 3.


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext