PreviousUpNext

15.4.1069  src/lib/std/src/io/mailslot-io-g.pkg

## mailslot-io-g.pkg

# Compiled by:
#     src/lib/std/standard.lib



generic package mailslot_io_g (
    #
    package stream_readers_and_writers:  Threadkit_Stream_Readers_And_Writers;          # Threadkit_Stream_Readers_And_Writers  is from   src/lib/std/src/io/threadkit-stream-readers-and-writers.api

    package v:   Typelocked_Vector;                             # Typelocked_Vector             is from   src/lib/std/src/typelocked-vector.api
    package vs:  Typelocked_Vector_Slice;                       # Typelocked_Vector_Slice       is from   src/lib/std/src/typelocked-vector-slice.api
    package a:   Typelocked_Rw_Vector;                          # Typelocked_Rw_Vector          is from   src/lib/std/src/typelocked-rw-vector.api
    package rs:  Typelocked_Rw_Vector_Slice;                    # Typelocked_Rw_Vector_Slice    is from   src/lib/std/src/typelocked-rw-vector-slice.api

    sharing a::Rw_Vector ==           rs::Rw_Vector
                         == stream_readers_and_writers::Rw_Vector;

    sharing a::Vector    ==            v::Vector
                         ==           rs::Vector
                         ==           vs::Vector
                         == stream_readers_and_writers::Vector;

    sharing vs::Slice    ==           rs::Vector_Slice
                         == stream_readers_and_writers::Vector_Slice;

    sharing rs::Slice    == stream_readers_and_writers::Rw_Vector_Slice;
  )

: (weak)

api {

    package stream_readers_and_writers:  Threadkit_Stream_Readers_And_Writers;          # Threadkit_Stream_Readers_And_Writers  is from   src/lib/std/src/io/threadkit-stream-readers-and-writers.api

    make_reader:   threadkit::Mailslot( stream_readers_and_writers::Vector ) -> stream_readers_and_writers::Stream_Reader;
    make_writer:   threadkit::Mailslot( stream_readers_and_writers::Vector ) -> stream_readers_and_writers::Stream_Writer;

}
{
    package stream_readers_and_writers =   stream_readers_and_writers;


    include threadkit;                                          # threadkit                     is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit.pkg


    vextract = vs::to_vector o vs::make_slice;

    # Create a reader that is connected
    # to the output port of a slot. 
    #
    fun make_reader slot
        =
        {
            closed_1shot = make_oneshot_maildrop ();

            is_closed_mailop
                =
                get' closed_1shot
                    ==>
                   .{  raise exception io_exceptions::CLOSED_IO_STREAM;  };

            Request
              = READ  (Int, Mailop(Void), Mailslot(v::Vector))
              | CLOSE
              ;

            request_queue
                =
                make_mailqueue ();

            fun read_vec_mailop 0
                    =>
                    always_mailop (v::from_list []);

                read_vec_mailop n
                    =>
                    if (n < 0)
                        raise exception exceptions::SUBSCRIPT;
                    else
                        with_nack
                            (fn nack
                                =
                                {   reply_slot = make_mailslot ();

                                    push (request_queue, READ (n, nack, reply_slot));

                                    choose [
                                        take'  reply_slot,
                                        is_closed_mailop
                                    ];
                                }
                            );
                    fi;
            end;

            fun read_arr_mailop asl
                =
                {   my (buf, i, n)
                        =
                        rs::base asl;

                    read_vec_mailop n
                        ==>
                        (fn v =  {   a::copy_vec { to=>buf, di=>i, from=>v };
                                     v::length v;
                                 }
                        );
                };

            fun close ()
                =
                push (request_queue, CLOSE);

            fun get_data (THE v)
                   =>
                   v;

                get_data NULL
                    =>
                    {   v = take slot;

                        v::length v > 0  ??  v
                                         ::  get_data NULL;
                    };
            end;

            fun server buf
                =
                case (pull  request_queue)
                  
                     READ (n, nack, reply_slot)
                         =>
                         {   v = get_data buf;

                             if (v::length v > n)
                                 
                                  v' = vextract (v, 0, THE n);

                                  select [

                                     nack
                                         ==>
                                         .{ server (THE v); },

                                     give' (reply_slot, v)
                                         ==>
                                         .{  server (THE (vextract (v, n, NULL))); }
                                   ];

                             else
                                  select [

                                      nack
                                          ==>
                                          .{  server (THE v); },

                                      give' (reply_slot, v)
                                          ==>
                                          .{ server NULL; }
                                  ];
                             fi;
                        };

                     CLOSE
                         =>
                         {   set (closed_1shot, ());
                             closed_server ();
                         };
                esac

            also
            fun closed_server ()
                =
                {   pull  request_queue;
                    closed_server ();
                };

            make_thread' "mailslot_io" server NULL;

            stream_readers_and_writers::STREAM_READER
              {
                name           => "<channel>", 
                chunk_size     => 1024,                 #  ?? 
                read_vector    => do_mailop o read_vec_mailop,
                read_rw_vector => do_mailop o read_arr_mailop,
                read_vec_mailop,
                read_arr_mailop,
                avail      => fn () = NULL,             #  ?? 
                get_position     => NULL,
                set_position     => NULL,
                end_position     => NULL,
                verify_position  => NULL,
                close,
                io_descriptor     => NULL
              };
          };

    # Create a writer that is connected to the input port of a slot. 
    #
    fun make_writer ch
        =
        {
            closed_1shot = make_oneshot_maildrop ();

            closed_mailop
                =
                get' closed_1shot
                    ==>
                    .{  raise exception io_exceptions::CLOSED_IO_STREAM;  };

            slot' = make_mailslot ();

            fun buffer ()
                =
                select [

                    take' slot'
                        ==> 
                        (fn v = {   if (v::length v > 0)
                                       #        
                                       give (ch, v);
                                   fi;

                                   buffer ();
                               }
                        ),

                    closed_mailop
                ];

            fun write_vec_mailop arg
                =
                {   v = vs::to_vector arg;

                    choose [

                        closed_mailop,

                        give' (slot', v)
                            ==>
                            .{ v::length v; }
                      ];
                  };

            fun write_arr_mailop arg
                =
                {   v = rs::to_vector arg;

                    choose [

                        closed_mailop,

                        give' (slot', v)
                            ==>
                            .{ v::length v; }
                      ];
                  };

            fun close ()
                =
                set (closed_1shot, ());
          
            make_thread "mailslot io II" .{
                #
                buffer ();
                #
                ();
            };

            stream_readers_and_writers::STREAM_WRITER
              {
                name              =>  "<channel>",
                chunk_size        =>  1024,
                write_vector      =>  do_mailop o write_vec_mailop,
                write_rw_vector   =>  do_mailop o write_arr_mailop,
                write_vec_mailop,
                write_arr_mailop,
                get_position      =>  NULL,
                set_position      =>  NULL,
                end_position      =>  NULL,
                verify_position   =>  NULL,
                close,
                io_descriptor     =>  NULL
              };
        };
};


## COPYRIGHT (c) 1996 AT&T Research.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2011,
## released under Gnu Public Licence version 3.


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext