PreviousUpNext

15.4.903  src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.pkg

## maildrop.pkg
#
# The implementation of Id-style synchronizing memory cells.
# These are essentially concurrency-safe replacements for REF cells.

# Compiled by:
#     src/lib/std/standard.lib






###          "We're fools whether we dance or not,
###           so we might as well dance."
###
###                   -- Japanese proverb



stipulate
    package tkq =  threadkit_queue;             # threadkit_queue               is from   src/lib/src/lib/thread-kit/src/core-thread-kit/threadkit-queue.pkg
    package itt =  internal_threadkit_types;    # internal_threadkit_types      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package ts  =  thread_scheduler;            # thread_scheduler              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg
herein

    package   maildrop
    :         Maildrop                          # Maildrop                      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/maildrop.api
    {
        Fate(X)
            =
            fate::Fate(X);

        call_with_current_fate = fate::call_with_current_fate;
        resume_fate            = fate::resume_fate;

        Cell(X)
            =
            CELL {
                priority:  Ref( Int ),
                read_q:    tkq::Threadkit_Queue( (Ref( itt::Transaction_Id ), Fate(X)) ),
                value:     Ref(  Null_Or(X) )
            };

        Maildrop(X) = Cell(X);

        exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;

        fun new_cell ()
            =
            CELL { priority => REF 0,
                   value    => REF NULL,
                   read_q   => tkq::make_threadkit_queue ()
                 };

        fun same_cell (CELL { value=>v1, ... }, CELL { value=>v2, ... } )
            =
            v1 == v2;

        fun make_transaction_id ()
            =
            REF (itt::TRANSACTION_ID (ts::get_current_thread()));


        fun cancel_transaction_and_return_thread_id (transaction_id as REF (itt::TRANSACTION_ID thread_id))
                =>
                {   transaction_id :=   itt::CANCELLED_TRANSACTION_ID;
                    #
                    thread_id;
                };

            cancel_transaction_and_return_thread_id  (REF (itt::CANCELLED_TRANSACTION_ID))
                =>
                raise exception FAIL "Compiler bug:  Attempt to cancel already-cancelled transaction-id";                       # Never happens; here to suppress 'nonexhaustive match' compile warning.
        end;


        fun bump_priority (p as REF n)                                                                          # Bump a priority value by one, returning the old value.
            =
            {   p := n+1;
                n;
            };


        Qy_Item(X)
          #
          = NO_ITEM
          | ITEM  ((Ref itt::Transaction_Id,  Fate(X)))
          ;

        # Functions to clean channel input and output queues 
        #
        stipulate

            fun clean []
                    =>
                    [];

                clean ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r)
                    =>
                    clean r;

                clean l
                    =>
                    l;
            end;

            fun clean_rev ([], l)
                    =>
                    l;

                clean_rev ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r, l)
                    =>
                    clean_rev (r, l);

                clean_rev (x ! r, l)
                    =>
                    clean_rev (r, x ! l);
            end;

        herein

            fun clean_and_check (priority, itt::THREADKIT_QUEUE { front, rear } )
                =
                clean_front *front
                where
                    fun clean_front []
                            =>
                            clean_rear *rear;

                        clean_front f
                            =>
                            case (clean f)

                                [] => clean_rear *rear;

                                f' => {   front := f';
                                          bump_priority priority;
                                      };
                            esac;
                    end

                    also
                    fun clean_rear []
                            =>
                            0;

                        clean_rear r
                            =>
                            {   rear := [];

                                case (clean_rev (r, []))

                                    [] => 0;

                                    rr => {   front := rr;
                                              bump_priority priority;
                                          };
                                esac;
                            };
                    end;
                end;

            fun clean_and_remove (itt::THREADKIT_QUEUE { front, rear, ... } )
                =
                clean_front *front
                where
                    fun clean_front []
                            =>
                            clean_rear *rear;

                        clean_front f
                            =>
                            case (clean f)

                                [] => clean_rear *rear;

                                (item ! rest) => {   front := rest;
                                                     ITEM item;
                                                 };
                            esac;

                    end

                    also
                    fun clean_rear []
                            =>
                            NO_ITEM;

                        clean_rear r
                            =>
                            {   rear := [];

                                case (clean_rev (r, []))

                                    [] => NO_ITEM;

                                    item ! rest
                                        =>
                                        {   front := rest;
                                            ITEM item;
                                        };
                                esac;
                            };
                    end;
                end;

            fun clean_and_enqueue (itt::THREADKIT_QUEUE { front, rear, ... }, item)
                =
                clean_front *front
                where
                    fun clean_front []
                            =>
                            clean_rear *rear;

                        clean_front f
                            =>
                            case (clean f)

                                [] =>  clean_rear *rear;

                                f' =>  {   front := f';
                                           rear := item ! *rear;
                                       };
                            esac;
                    end

                    also
                    fun clean_rear []
                            =>
                            front := [item];

                        clean_rear r
                            =>
                            case (clean_rev (r, []))
                                [] => {  front := [item];  rear  := []; };
                                rr => {  rear  := [item];  front := rr; };
                            esac;
                    end;
                end;
        end;                                    # stipulate


        # When a thread is resumed after being blocked
        # on an iGet or mGet operation there may be
        # other threads also blocked on the variable.
        #
        # This function is used to propagate the message
        # to all of the threads that are blocked on the
        # variable (or until one of them takes the value
        # in the mvar case).
        #
        # It must be called from an atomic region.
        # When the readQ is finally empty we leave
        # the atomic region.
        #
        # We must use "cleanAndRemove" to get items
        # from the readQ in the unlikely event that
        # a single thread executes a choice of
        # multiple gets on the same variable.
        #
        fun relay_msg (read_q, msg)
            =
            case (clean_and_remove read_q)

                NO_ITEM
                    =>
                    ts::reenable_thread_switching ();

                ITEM (transaction_id, fate)
                    =>
                    call_with_current_fate
                        (fn my_fate
                            =
                            {   ts::enqueue_and_switch_current_thread
                                    (my_fate, cancel_transaction_and_return_thread_id transaction_id);

                                resume_fate  fate  msg;
                            }
                        );
            esac;

        fun impossible ()
            =
            raise exception FAIL "maildrop: impossible";


        # M-variables:
        #
        make_empty_maildrop
            =
            new_cell;


        fun make_full_maildrop x
            =
            CELL { priority => REF 0,
                   read_q   => tkq::make_threadkit_queue (),
                   value    => REF (THE x)
                 };

        same_maildrop = same_cell;


        fun fill (CELL { priority, read_q, value }, x)
            =
            {   ts::disable_thread_switching ();

                case *value

                    NULL =>
                        {   value := THE x;

                            case (clean_and_remove read_q)

                                NO_ITEM
                                    =>
                                    ts::reenable_thread_switching ();

                                ITEM (transaction_id, fate)
                                    =>
                                    call_with_current_fate
                                        (fn my_fate
                                            =
                                            {   ts::enqueue_and_switch_current_thread
                                                    (my_fate, cancel_transaction_and_return_thread_id transaction_id);

                                                priority := 1;

                                                resume_fate  fate  x;
                                            }
                                        );
                            esac;
                        };

                    THE _ =>
                        {   ts::reenable_thread_switching ();
                            #
                            raise exception MAY_NOT_FILL_ALREADY_FULL_MAILDROP;
                        };
                esac;
            };


        fun empty' (CELL { priority, read_q, value } )
            =
            {   fun wait_for { transaction_id, clean_up, next }                         # Reppy calls this fn blockFn
                    =
                    {   v = call_with_current_fate
                                (fn fate
                                    =
                                    {   tkq::enqueue (read_q, (transaction_id, fate));
                                        next ();
                                        impossible ();
                                    }
                                );

                        clean_up ();
                        value := NULL;
                        ts::reenable_thread_switching ();
                        v;
                    };

                fun is_ready ()                                                         # Reppy calls this fn pollFn
                    =
                    case *value
                        #
                        NULL  =>    itt::MAILOP_UNREADY wait_for;
                        #
                        THE v =>    itt::MAILOP_READY
                                      {
                                        priority  =>   bump_priority priority,

                                        do_it     =>  .{   value := NULL;                               # Reppy calls this field doFn
                                                           ts::reenable_thread_switching ();
                                                           v;
                                                       }
                                      };
                    esac;


                itt::BASE_MAILOPS [is_ready];
            };


        fun nonblocking_empty (CELL { priority, read_q, value } )
            =
            {   ts::disable_thread_switching();

                case *value

                    THE v
                        =>
                        {   value := NULL;
                            ts::reenable_thread_switching ();
                            THE v;
                        };

                    NULL => NULL;
                esac;
            };


        fun empty (CELL { priority, read_q, value } )
            =
            {   ts::disable_thread_switching();

                case *value

                    NULL =>
                        {   v = call_with_current_fate
                                    (fn fate
                                        =
                                        {   tkq::enqueue (read_q, (make_transaction_id(), fate));
                                            #
                                            ts::reenable_thread_switching_and_dispatch_next_thread ();
                                        }
                                    );

                            value := NULL;
                            ts::reenable_thread_switching ();
                            v;
                        };

                    THE v =>
                        {   value := NULL;
                            ts::reenable_thread_switching ();
                            v;
                        };
                esac;
            };


        fun peek (CELL { priority, read_q, value } )
            =
            {   ts::disable_thread_switching ();

                case *value

                    NULL
                        =>
                        {   v = call_with_current_fate
                                    (fn fate
                                        =
                                        {   tkq::enqueue (read_q, (make_transaction_id(), fate));
                                            #
                                            ts::reenable_thread_switching_and_dispatch_next_thread ();
                                        }
                                    );

                            relay_msg (read_q, v);

                            v;
                        };

                    THE v
                        =>
                        {   ts::reenable_thread_switching ();
                            v;
                        };
                esac;
            };


        fun peek' (CELL { priority, read_q, value } )
            =
            {
                fun wait_for { transaction_id, clean_up, next }                         # Reppy calls this fn blockFn
                    =
                    {   v = call_with_current_fate
                                (fn fate
                                    =
                                    {   tkq::enqueue (read_q, (transaction_id, fate));
                                        next ();
                                        impossible ();
                                    }
                                );

                        clean_up();
                        relay_msg (read_q, v);
                        v;
                    };

                fun is_ready ()                                                         # Reppy calls this fn pollFn
                    =
                    case *value
                        #               
                        NULL  =>    itt::MAILOP_UNREADY  wait_for;
                        #
                        THE v =>    itt::MAILOP_READY
                                      {
                                        priority =>   bump_priority priority,
                                        do_it    =>  .{   ts::reenable_thread_switching ();             # Reppy calls this field doFn
                                                          v;
                                                      }
                                      };
                    esac;

                itt::BASE_MAILOPS [is_ready];
            };


        fun nonblocking_peek (CELL { priority, read_q, value } )
            =
            {   ts::disable_thread_switching ();

                case *value

                    THE v
                        =>
                        {   ts::reenable_thread_switching ();
                            THE v;
                        };

                    NULL => NULL;
                esac;
            };


        # Swap the current contents of the cell with a new value.
        #
        # This function has the effect of an
        # get_mail followed by a put_mail,
        # except that it is guaranteed to be atomic.
        #
        # It is also somewhat more efficient.
        #
        fun swap (CELL { priority, read_q, value }, new_v)
            =
            {   ts::disable_thread_switching ();

                case *value

                    NULL =>
                        {   v = call_with_current_fate
                                    (fn fate
                                        =
                                        {   tkq::enqueue (read_q, (make_transaction_id(), fate));
                                            #
                                            ts::reenable_thread_switching_and_dispatch_next_thread ();
                                        }
                                    );

                            value := THE new_v;

                            # Relay the new value to
                            # any other blocked threads:
                            #
                            relay_msg (read_q, new_v);
                            v;
                        };

                    THE v =>
                        {   value := THE new_v;
                            ts::reenable_thread_switching ();
                            v;
                        };
                esac;
            };


        fun swap' (CELL { priority, read_q, value }, new_v)
            =
            {
                fun wait_for { transaction_id, clean_up, next }                                 # Reppy calls this fn blockFn
                    =
                    {   v = call_with_current_fate
                                (fn fate
                                    =
                                    {   tkq::enqueue (read_q, (transaction_id, fate));
                                        next ();
                                        impossible();
                                    }
                                );

                        clean_up ();
                        value := THE new_v;
                        relay_msg (read_q, new_v);
                        v;
                    };

                fun is_ready ()
                    =
                    case *value
                        #
                        NULL  =>  itt::MAILOP_UNREADY wait_for;

                        THE v =>  itt::MAILOP_READY
                                    {
                                      priority =>   bump_priority priority,

                                      do_it    =>  .{   value := THE new_v;                             # Reppy calls this field doFn
                                                        ts::reenable_thread_switching ();
                                                        v;
                                                    }
                                    };
                    esac;

                itt::BASE_MAILOPS [is_ready];
            };

    };                                          # package maildrop 
end;

## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.




Comments and suggestions to: bugs@mythryl.org

PreviousUpNext