


## timeout-mailop.pkg
# Compiled by:
# src/lib/std/standard.lib# Mail_Ops for synchronizing on timeouts.
stipulate
package fat = fate; # fate is from src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package mop = mailop; # mailop is from src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.pkg package sch = thread_scheduler; # thread_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkg package tim = time; # time is from src/lib/std/time.pkg #
Mailop(X) = mop::Mailop(X);
herein
package timeout_mailop: (weak) api {
#
include Timeout_Mailop; # Timeout_Mailop is from src/lib/src/lib/thread-kit/src/core-thread-kit/timeout-mailop.api reset_sleep_queue_to_empty
:
Void -> Void;
wake_sleeping_threads_whose_time_has_come
:
Void -> Void;
time_until_next_sleeping_thread_wakes
:
Void -> Null_Or( tim::Time );
}
{
# The list of threads waiting for timeouts.
# It is sorted in increasing order
# of time value.
#
# NOTE: We may want to use some sort of
# balanced search package in the future. XXX BUGGO FIXME
#
Item = ( tim::Time,
Void -> Void,
Ref (itt::Transaction_Id),
fat::Fate( Void )
);
#
sleep_queue
=
REF ([]: List( Item ));
fun time_wait (t, f, id, k)
=
sleep_queue := insert *sleep_queue
where
fun insert []
=>
[ (t, f, id, k) ];
insert ((_, _, REF itt::CANCELLED_TRANSACTION_ID, _) ! rest)
=>
# Drop cancelled transaction in passing:
#
insert rest;
insert (list as ((item as (t', _, _, _)) ! rest))
=>
tim::(<) (t', t) ?? item ! insert rest
:: (t, f, id, k) ! list;
end;
end;
# Drop all cancelled entries from itemlist.
# Return cleaned list:
#
fun drop_cancelled_queue_entries itemlist
=
drop_them itemlist
where
fun drop_them ((_, _, REF itt::CANCELLED_TRANSACTION_ID, _) ! rest)
=>
drop_them rest;
drop_them (item ! rest)
=>
item ! drop_them rest;
drop_them [] => [];
end;
end;
# Find all sleeping threads whose
# time has come and move them to
# run queue.
#
# Return list of still-sleeping threads.
#
fun wake_and_remove_sleeping_threads_whose_time_has_come q
=
wake_them q
where
now = sch::get_approximate_time ();
fun wake_them ((_, _, REF itt::CANCELLED_TRANSACTION_ID, _) ! rest)
=>
wake_them rest;
wake_them (list as ((item as (t', f, transaction_id as REF (itt::TRANSACTION_ID tid), k)) ! rest))
=>
if (tim::(<=) (t', now))
#
sch::enqueue_thread (tid, k);
f (); # Cleanup function.
wake_them rest;
else
drop_cancelled_queue_entries list;
fi;
wake_them [] => [];
end;
end;
fun time_until_next_sleeping_thread_wakes ()
=
case (drop_cancelled_queue_entries *sleep_queue)
#
[] => NULL;
(q as ((t, _, _, _) ! _))
=>
{ now = sch::get_approximate_time ();
tim::(<=) (t, now)
##
?? THE (tim::zero_time)
:: THE (tim::(-) (t, now));
};
esac;
fun wake_sleeping_threads_whose_time_has_come ()
=
case *sleep_queue
#
[] => ();
queue => sleep_queue
:=
wake_and_remove_sleeping_threads_whose_time_has_come
queue;
esac;
fun reset_sleep_queue_to_empty ()
=
sleep_queue := [];
# NOTE: Unlike for most base mail_ops, the
# block functions of time-out mail_ops do not
# have to exit the critical section or execute
# the clean-up operation. This is done when
# they are removed from the waiting queue.
#
fun timeout_in' sleep_duration
=
itt::BASE_MAILOPS [ is_ready ]
where
fun wait_for { transaction_id, clean_up, next } # Reppy calls this fn blockFn
=
{ # now = sch::get_approximate_time (); # Replaced by below 2012-02-01 CrT because 100ms wait was coming back after 99ms, triggering 'make check' alarm.
now = tim::get_current_time_utc ();
#
fate::call_with_current_fate
(fn fate
=
{ time_wait
( tim::(+) (sleep_duration, now),
clean_up,
transaction_id,
fate
);
next ();
}
);
sch::reenable_thread_switching ();
};
fun is_ready () # Reppy calls this fn pollFn
=
if (sleep_duration == tim::zero_time)
#
itt::MAILOP_READY
{ priority => -1,
do_it => sch::reenable_thread_switching # Reppyc alls this field doFn
};
else
itt::MAILOP_UNREADY wait_for;
fi;
end;
fun sleep_for sleep_duration
=
mop::do_mailop (timeout_in' sleep_duration);
fun timeout_at' wakeup_time
=
itt::BASE_MAILOPS [ is_ready ]
where
fun wait_for { transaction_id, clean_up, next } # Reppy calls this fn blockFn
=
{ fate::call_with_current_fate
(
fn fate
=
{ time_wait (wakeup_time, clean_up, transaction_id, fate);
#
next ();
}
);
sch::reenable_thread_switching ();
};
fun is_ready () # Reppy calls this fn pollFn
=
if (tim::(<=) (wakeup_time, sch::get_approximate_time ()))
#
itt::MAILOP_READY
{ priority => -1,
do_it => sch::reenable_thread_switching # Reppy calls this field doFn
};
else
itt::MAILOP_UNREADY wait_for;
fi;
end;
fun sleep_until wakeup_time
=
mop::do_mailop (timeout_at' wakeup_time);
};
end;


