PreviousUpNext

15.4.905  src/lib/src/lib/thread-kit/src/core-thread-kit/mailqueue.pkg

## mailqueue.pkg

# Compiled by:
#     src/lib/std/standard.lib


# Unbounded queues of thread-to-thread mail messages.


stipulate
    package itt =  internal_threadkit_types;    # internal_threadkit_types      is from   src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg
    package ts  =  thread_scheduler;            # thread_scheduler              is from   src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg
herein

    package mailqueue: (weak)
    api {

        include Mailqueue;                              # Mailqueue                     is from   src/lib/src/lib/thread-kit/src/core-thread-kit/mailqueue.api

        reset_mailqueue:  Mailqueue(X) -> Void;

    }
    {
        Fate(X) =   fate::Fate(X);

        call_with_current_fate =  fate::call_with_current_fate;
        resume_fate            =  fate::resume_fate;

        Queue(X)
            =
            { front:  List(X),
              rear:   List(X)
            };

        fun enqueue ( { front, rear }, x)
            =
            { front,
              rear => x ! rear
            };

        fun dequeue ( { front => x ! r, rear } ) =>  ( { front=>r, rear }, x);
            dequeue ( { front => [],    rear } ) =>  dequeue { front=>list::reverse rear, rear=> [] };
        end;

        # The state of a mailqueue.
        # The queue of the NONEMPTY constructor should
        # never be empty (use EMPTY instead).
        #
        State(X)
          = EMPTY      Queue( (Ref( itt::Transaction_Id ), Fate(X)))
          | NONEMPTY  (Int, Queue(X))
          ;

        Mailqueue(X) = MAILQUEUE  Ref( State(X) );

        fun reset_mailqueue (MAILQUEUE state)
            =
            state :=  EMPTY { front => [], rear => [] };

        fun make_mailqueue ()
            =
            MAILQUEUE (REF (EMPTY { front => [], rear => [] } ));

        fun same_mailqueue
            ( MAILQUEUE s1,
              MAILQUEUE s2
            )
            =
            s1 == s2;


        fun make_transaction_id ()
            =
            REF (itt::TRANSACTION_ID (ts::get_current_thread()));


        fun cancel_transaction_and_return_thread_id (transaction_id as REF (itt::TRANSACTION_ID thread_id))
                =>
                {   transaction_id :=   itt::CANCELLED_TRANSACTION_ID;
                    #
                    thread_id;
                };

            cancel_transaction_and_return_thread_id  (REF (itt::CANCELLED_TRANSACTION_ID))
                =>
                raise exception FAIL "Compiler bug:  Attempt to cancel already-cancelled transaction-id";                       # Never happens; here to suppress 'nonexhaustive match' compile warning.
        end;
            

        Qy_Item X
          = NO_ITEM
          |    ITEM  (Ref(itt::Transaction_Id), Fate(X), State(X))
          ;

        stipulate

            fun clean [] => [];
                clean ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r) => clean r;
                clean l => l;
            end;

            fun clean_rev ([], l)
                    =>
                    l;

                clean_rev ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r, l)
                    =>
                    clean_rev (r, l);

                clean_rev (x ! r, l)
                    =>
                    clean_rev (r, x ! l);
            end;

        herein

            fun clean_and_remove (q as { front, rear } )
                =
                clean_front front
                where

                    fun clean_front []
                            =>
                            clean_rear rear;

                        clean_front f
                            =>
                            case (clean f)
                                #
                                [] => clean_rear rear;

                                ((id, k) ! rest)
                                    =>
                                    ITEM (id, k, EMPTY { front=>rest, rear } );
                            esac;
                    end

                    also
                    fun clean_rear []
                            =>
                            NO_ITEM;

                        clean_rear r
                            =>
                            case (clean_rev (r, []))
                                #
                                []             =>  NO_ITEM;
                                (id, k) ! rest =>  ITEM (id, k, EMPTY { front=>rest, rear => [] } );
                            esac;
                    end;
                end;
        end;

        fun push (MAILQUEUE state, x)
            =
            {   ts::disable_thread_switching ();

                case *state
                    #
                    EMPTY q
                        =>
                        case (clean_and_remove q)
                            #
                            NO_ITEM =>
                                {
                                    state := NONEMPTY (1, { front => [x], rear => [] } );
                                    ts::reenable_thread_switching();
                                };

                            ITEM (transaction_id, get_fate, state')
                                =>
                                call_with_current_fate
                                    (fn fate
                                        =
                                        {   state := state';

                                            ts::enqueue_and_switch_current_thread
                                                ( fate,
                                                  cancel_transaction_and_return_thread_id  transaction_id
                                                );

                                            resume_fate  get_fate  x;
                                        }
                                    );
                        esac;

                    NONEMPTY (p, q)
                        => 
                        # We force a context switch here
                        # to prevent a producer from
                        # outrunning a consumer.
                        #
                        call_with_current_fate
                            (fn fate
                                =
                                {   state := NONEMPTY (p, enqueue (q, x));

                                    ts::reenable_thread_switching_and_yield_to_next_thread  fate;
                                }
                            );
                 esac;
            };

        fun get_msg (state, q)
            =
            {   my (q', msg)
                    =
                    dequeue q;

                case q'
                    #
                    { front => [],
                      rear  => []
                    }
                        =>
                        state := EMPTY { front => [], rear => [] };

                    _   =>
                        state := NONEMPTY (1, q');

                esac;

                ts::reenable_thread_switching ();

                msg;
            };

        fun pull (MAILQUEUE state)
            =
            {
                ts::disable_thread_switching ();

                case *state
                    #
                    EMPTY q
                        =>
                        {   msg =   call_with_current_fate
                                        (
                                         fn get_fate
                                            =
                                            {   state := EMPTY (enqueue (q, (make_transaction_id(), get_fate)));

                                                ts::reenable_thread_switching_and_dispatch_next_thread ();
                                            }
                                        );

                            ts::reenable_thread_switching ();

                            msg;
                         };

                  NONEMPTY (priority, q)
                      =>
                      get_msg (state, q);

                esac;
            };

        fun pull' (MAILQUEUE state)
            =
            {
                fun wait_for { transaction_id, clean_up, next }
                    =
                    {   q = case *state     EMPTY    q =>  q;
                            /* */           NONEMPTY _ =>  raise exception FAIL "Compiler bug: Unsupported NONEMPTY case in poll'/wait_for";    # Should be impossible, since is_ready() (below) only queues us up if *state is EMPTY.
                            esac;

                        msg = call_with_current_fate
                                  (fn get_fate
                                      =
                                      {   state := EMPTY (enqueue (q, (transaction_id, get_fate)));

                                          next ();

                                          raise exception FAIL "Mailqueue: impossible";
                                      }
                                  );

                        clean_up ();

                        ts::reenable_thread_switching ();

                        msg;
                    };

                fun is_ready ()
                    =
                    case *state
                        #
                        EMPTY _ =>   itt::MAILOP_UNREADY wait_for;
                        #
                        NONEMPTY (priority, q)
                            =>
                            {   state := NONEMPTY (priority+1, q);

                                itt::MAILOP_READY
                                  {
                                    priority,
                                    do_it =>   .{  get_msg (state, q);  }
                                  };
                            };
                    esac;


                itt::BASE_MAILOPS  [ is_ready ];
            };

        fun nonblocking_pull (MAILQUEUE state)
            =
            {   ts::disable_thread_switching();

                case *state
                    #
                    EMPTY q
                        =>
                        {   ts::reenable_thread_switching ();
                            NULL;
                        };

                    NONEMPTY (priority, q)
                        =>
                        THE (get_msg (state, q));
                esac;
            };
    };                          # package mailqueue
end;



## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.



Comments and suggestions to: bugs@mythryl.org

PreviousUpNext