PreviousUpNext

15.4.907  src/lib/src/lib/thread-kit/src/core-thread-kit/oneshot-maildrop.pkg

## oneshot-maildrop.pkg

# Compiled by:
#     src/lib/std/standard.lib



# The implementation of Id-style synchronizing memory cells.



###          "We're fools whether we dance or not,
###           so we might as well dance."
###
###                   -- Japanese proverb



stipulate
    package fat =  fate;                        # fate                          is from   src/lib/std/src/nj/fate.pkg
    package itt =  internal_threadkit_types;    # internal_threadkit_types      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package tkq =  threadkit_queue;             # threadkit_queue               is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-queue.pkg
    package ts  =  thread_scheduler;            # thread_scheduler              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg
herein

    package   oneshot_maildrop
    :         Oneshot_Maildrop                  # Oneshot_Maildrop              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/oneshot-maildrop.api
    {
        Fate(X) =   fat::Fate(X);

        call_with_current_fate =  fat::call_with_current_fate;
        resume_fate            =  fat::resume_fate;

        # We use the same underlying representation
        # for both ivars and mvars:
        #
        Cell(X) =   CELL  { priority:  Ref( Int ),
                            read_q:    tkq::Threadkit_Queue( (Ref( itt::Transaction_Id ), Fate(X)) ),
                            value:     Ref(  Null_Or(X) )
                          };

        Oneshot_Maildrop(X) = Cell(X);

        exception MAY_NOT_FILL_ALREADY_FULL_ONESHOT_MAILDROP;

        fun new_cell ()
            =
            CELL { priority => REF 0,
                   value    => REF NULL,
                   read_q   => tkq::make_threadkit_queue ()
                 };

        fun same_cell (CELL { value=>v1, ... }, CELL { value=>v2, ... } )
            =
            v1 == v2;


        fun make_transaction_id ()
            =
            REF (itt::TRANSACTION_ID (ts::get_current_thread()));


        fun cancel_transaction_and_return_thread_id (transaction_id as REF (itt::TRANSACTION_ID thread_id))
                =>
                {   transaction_id :=   itt::CANCELLED_TRANSACTION_ID;
                    #
                    thread_id;
                };

            cancel_transaction_and_return_thread_id  (REF (itt::CANCELLED_TRANSACTION_ID))
                =>
                raise exception FAIL "Compiler bug:  Attempt to cancel already-cancelled transaction-id";                       # Never happens; here to suppress 'nonexhaustive match' compile warning.
        end;


        # Bump a priority value by one,
        # returning the old value:
        #
        fun bump_priority (p as REF n)
            =
            {   p := n+1;
                n;
            };

        Qy_Item X
          = NO_ITEM
          | ITEM  ((Ref(itt::Transaction_Id), Fate(X)) )
          ;

        # Functions to clean channel input and output queues 
        #
        stipulate

            fun clean []
                    =>
                    [];

                clean ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r)
                    =>
                    clean r;

                clean l
                    =>
                    l;
            end;

            fun clean_rev ([], l)
                    =>
                    l;

                clean_rev ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r, l)
                    =>
                    clean_rev (r, l);

                clean_rev (x ! r, l)
                    =>
                    clean_rev (r, x ! l);
            end;

        herein

            fun clean_and_check (priority, itt::THREADKIT_QUEUE { front, rear } )
                =
                clean_front *front
                where
                    fun clean_front []
                            =>
                            clean_rear *rear;

                        clean_front f
                            =>
                            case (clean f)

                                [] => clean_rear *rear;

                                f' => {   front := f';
                                          bump_priority priority;
                                      };
                            esac;
                    end

                    also
                    fun clean_rear []
                            =>
                            0;

                        clean_rear r
                            =>
                            {   rear := [];

                                case (clean_rev (r, []))

                                    [] => 0;

                                    rr => {   front := rr;
                                              bump_priority priority;
                                          };
                                esac;
                            };
                    end;
                end;

            fun clean_and_remove (itt::THREADKIT_QUEUE { front, rear, ... } )
                =
                clean_front *front
                where
                    fun clean_front []
                            =>
                            clean_rear *rear;

                        clean_front f
                            =>
                            case (clean f)

                                [] => clean_rear *rear;

                                (item ! rest) => {   front := rest;
                                                     ITEM item;
                                                 };
                            esac;

                    end

                    also
                    fun clean_rear []
                            =>
                            NO_ITEM;

                        clean_rear r
                            =>
                            {   rear := [];

                                case (clean_rev (r, []))

                                    [] => NO_ITEM;

                                    item ! rest
                                        =>
                                        {   front := rest;
                                            ITEM item;
                                        };
                                esac;
                            };
                    end;
                end;

            fun clean_and_enqueue (itt::THREADKIT_QUEUE { front, rear, ... }, item)
                =
                clean_front *front
                where
                    fun clean_front []
                            =>
                            clean_rear *rear;

                        clean_front f
                            =>
                            case (clean f)

                                [] =>  clean_rear *rear;

                                f' =>  {   front := f';
                                           rear := item ! *rear;
                                       };
                            esac;
                    end

                    also
                    fun clean_rear []
                            =>
                            front := [item];

                        clean_rear r
                            =>
                            case (clean_rev (r, []))
                                [] => {  front := [item];  rear  := []; };
                                rr => {  rear  := [item];  front := rr; };
                            esac;
                    end;
                end;
        end;                                    # stipulate


        # When a thread is resumed after being blocked
        # on an iGet or mGet operation there may be
        # other threads also blocked on the variable.
        #
        # This function is used to propagate the message
        # to all of the threads that are blocked on the
        # variable (or until one of them takes the value
        # in the mvar case).
        #
        # It must be called from an atomic region.
        # When the readQ is finally empty we leave
        # the atomic region.
        #
        # We must use "clean_and_remove" to get items
        # from the read_q in the unlikely event that
        # a single thread executes a choice of
        # multiple gets on the same variable.
        #
        fun relay_msg (read_q, msg)
            =
            case (clean_and_remove read_q)
                #
                NO_ITEM =>   ts::reenable_thread_switching ();
                #
                ITEM (transaction_id, fate)
                    =>
                    call_with_current_fate
                        (fn my_fate
                            =
                            {   ts::enqueue_and_switch_current_thread
                                  (
                                    my_fate,
                                    cancel_transaction_and_return_thread_id  transaction_id
                                  );

                                resume_fate  fate  msg;
                            }
                        );
            esac;


        # I-variables
        #
        make_oneshot_maildrop =  new_cell;
        same_oneshot_maildrop =  same_cell;

        fun set (CELL { priority, read_q, value }, x)
            =
            {   ts::disable_thread_switching ();
                #
                case *value
                    #
                    NULL => {   value := THE x;
                                #
                                case (clean_and_remove  read_q)
                                    #
                                    NO_ITEM =>   ts::reenable_thread_switching ();
                                    #
                                    ITEM (transaction_id, fate)
                                        =>
                                        call_with_current_fate
                                            (
                                             fn my_fate
                                                =
                                                {   ts::enqueue_and_switch_current_thread
                                                      (
                                                        my_fate,
                                                        cancel_transaction_and_return_thread_id   transaction_id
                                                      );

                                                    priority := 1;

                                                    resume_fate  fate  x;
                                                }
                                            );
                                esac;
                            };

                    THE _ =>
                        {   ts::reenable_thread_switching ();
                            #
                            raise exception  MAY_NOT_FILL_ALREADY_FULL_ONESHOT_MAILDROP;
                        };
                esac;
            };

        fun get (CELL { priority, read_q, value } )
            =
            {   ts::disable_thread_switching ();
                #
                case *value
                    #         
                    NULL =>     {   msg =   call_with_current_fate
                                                (fn fate
                                                    =
                                                    {   tkq::enqueue (read_q, (make_transaction_id(), fate));
                                                        #
                                                        ts::reenable_thread_switching_and_dispatch_next_thread ();
                                                    }
                                                );

                                    relay_msg (read_q, msg);

                                    msg;
                                };

                    THE v =>    {   ts::reenable_thread_switching ();
                                    v;
                                };
               esac;
            };

        fun get' (CELL { priority, read_q, value } )
            =
            itt::BASE_MAILOPS [is_ready]
            where
                fun wait_for { transaction_id, clean_up, next }                                         # Reppy calls this fn blockFn
                    =
                    {   msg =   call_with_current_fate
                                    (fn fate
                                        =
                                        {   tkq::enqueue
                                              ( read_q,
                                                (transaction_id,  fate)
                                              );

                                            next ();

                                            raise exception FAIL "maildrop: impossible";                # Execution should never get to here.
                                        }
                                    );

                        clean_up ();
                        relay_msg (read_q, msg);
                        msg;
                    };

                fun is_ready ()                                                                         # Reppy calls this fn pollFn
                    =
                    case *value
                        #
                        NULL  =>    itt::MAILOP_UNREADY  wait_for;
                        #
                        THE v =>    itt::MAILOP_READY
                                      {
                                        priority =>  bump_priority  priority,
                                        #
                                        do_it    =>  .{   priority := 1;                                # Reppy calls this field doFn
                                                          ts::reenable_thread_switching ();
                                                          v;
                                                      }

                                      };
                   esac;
            end;

        fun nonblocking_get (CELL { priority, read_q, value } )
            =
            {   ts::disable_thread_switching ();
                #
                case *value
                    #
                    THE v =>    {   ts::reenable_thread_switching ();
                                    #
                                    THE v;
                                };

                    NULL  =>  NULL;
                esac;
            };

    };                                          # package maildrop1 
end;

## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.


Comments and suggestions to: bugs@mythryl.org

PreviousUpNext