


## mailqueue.pkg
# Compiled by:
# src/lib/std/standard.lib# Unbounded queues of thread-to-thread mail messages.
stipulate
package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package ts = thread_scheduler; # thread_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkgherein
package mailqueue: (weak)
api {
include Mailqueue; # Mailqueue is from src/lib/src/lib/thread-kit/src/core-thread-kit/mailqueue.api reset_mailqueue: Mailqueue(X) -> Void;
}
{
Fate(X) = fate::Fate(X);
call_with_current_fate = fate::call_with_current_fate;
resume_fate = fate::resume_fate;
Queue(X)
=
{ front: List(X),
rear: List(X)
};
fun enqueue ( { front, rear }, x)
=
{ front,
rear => x ! rear
};
fun dequeue ( { front => x ! r, rear } ) => ( { front=>r, rear }, x);
dequeue ( { front => [], rear } ) => dequeue { front=>list::reverse rear, rear=> [] };
end;
# The state of a mailqueue.
# The queue of the NONEMPTY constructor should
# never be empty (use EMPTY instead).
#
State(X)
= EMPTY Queue( (Ref( itt::Transaction_Id ), Fate(X)))
| NONEMPTY (Int, Queue(X))
;
Mailqueue(X) = MAILQUEUE Ref( State(X) );
fun reset_mailqueue (MAILQUEUE state)
=
state := EMPTY { front => [], rear => [] };
fun make_mailqueue ()
=
MAILQUEUE (REF (EMPTY { front => [], rear => [] } ));
fun same_mailqueue
( MAILQUEUE s1,
MAILQUEUE s2
)
=
s1 == s2;
fun make_transaction_id ()
=
REF (itt::TRANSACTION_ID (ts::get_current_thread()));
fun cancel_transaction_and_return_thread_id (transaction_id as REF (itt::TRANSACTION_ID thread_id))
=>
{ transaction_id := itt::CANCELLED_TRANSACTION_ID;
#
thread_id;
};
cancel_transaction_and_return_thread_id (REF (itt::CANCELLED_TRANSACTION_ID))
=>
raise exception FAIL "Compiler bug: Attempt to cancel already-cancelled transaction-id"; # Never happens; here to suppress 'nonexhaustive match' compile warning.
end;
Qy_Item X
= NO_ITEM
| ITEM (Ref(itt::Transaction_Id), Fate(X), State(X))
;
stipulate
fun clean [] => [];
clean ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r) => clean r;
clean l => l;
end;
fun clean_rev ([], l)
=>
l;
clean_rev ((REF itt::CANCELLED_TRANSACTION_ID, _) ! r, l)
=>
clean_rev (r, l);
clean_rev (x ! r, l)
=>
clean_rev (r, x ! l);
end;
herein
fun clean_and_remove (q as { front, rear } )
=
clean_front front
where
fun clean_front []
=>
clean_rear rear;
clean_front f
=>
case (clean f)
#
[] => clean_rear rear;
((id, k) ! rest)
=>
ITEM (id, k, EMPTY { front=>rest, rear } );
esac;
end
also
fun clean_rear []
=>
NO_ITEM;
clean_rear r
=>
case (clean_rev (r, []))
#
[] => NO_ITEM;
(id, k) ! rest => ITEM (id, k, EMPTY { front=>rest, rear => [] } );
esac;
end;
end;
end;
fun push (MAILQUEUE state, x)
=
{ ts::disable_thread_switching ();
case *state
#
EMPTY q
=>
case (clean_and_remove q)
#
NO_ITEM =>
{
state := NONEMPTY (1, { front => [x], rear => [] } );
ts::reenable_thread_switching();
};
ITEM (transaction_id, get_fate, state')
=>
call_with_current_fate
(fn fate
=
{ state := state';
ts::enqueue_and_switch_current_thread
( fate,
cancel_transaction_and_return_thread_id transaction_id
);
resume_fate get_fate x;
}
);
esac;
NONEMPTY (p, q)
=>
# We force a context switch here
# to prevent a producer from
# outrunning a consumer.
#
call_with_current_fate
(fn fate
=
{ state := NONEMPTY (p, enqueue (q, x));
ts::reenable_thread_switching_and_yield_to_next_thread fate;
}
);
esac;
};
fun get_msg (state, q)
=
{ my (q', msg)
=
dequeue q;
case q'
#
{ front => [],
rear => []
}
=>
state := EMPTY { front => [], rear => [] };
_ =>
state := NONEMPTY (1, q');
esac;
ts::reenable_thread_switching ();
msg;
};
fun pull (MAILQUEUE state)
=
{
ts::disable_thread_switching ();
case *state
#
EMPTY q
=>
{ msg = call_with_current_fate
(
fn get_fate
=
{ state := EMPTY (enqueue (q, (make_transaction_id(), get_fate)));
ts::reenable_thread_switching_and_dispatch_next_thread ();
}
);
ts::reenable_thread_switching ();
msg;
};
NONEMPTY (priority, q)
=>
get_msg (state, q);
esac;
};
fun pull' (MAILQUEUE state)
=
{
fun wait_for { transaction_id, clean_up, next }
=
{ q = case *state EMPTY q => q;
/* */ NONEMPTY _ => raise exception FAIL "Compiler bug: Unsupported NONEMPTY case in poll'/wait_for"; # Should be impossible, since is_ready() (below) only queues us up if *state is EMPTY.
esac;
msg = call_with_current_fate
(fn get_fate
=
{ state := EMPTY (enqueue (q, (transaction_id, get_fate)));
next ();
raise exception FAIL "Mailqueue: impossible";
}
);
clean_up ();
ts::reenable_thread_switching ();
msg;
};
fun is_ready ()
=
case *state
#
EMPTY _ => itt::MAILOP_UNREADY wait_for;
#
NONEMPTY (priority, q)
=>
{ state := NONEMPTY (priority+1, q);
itt::MAILOP_READY
{
priority,
do_it => .{ get_msg (state, q); }
};
};
esac;
itt::BASE_MAILOPS [ is_ready ];
};
fun nonblocking_pull (MAILQUEUE state)
=
{ ts::disable_thread_switching();
case *state
#
EMPTY q
=>
{ ts::reenable_thread_switching ();
NULL;
};
NONEMPTY (priority, q)
=>
THE (get_msg (state, q));
esac;
};
}; # package mailqueue
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.


