


## mailop.pkg
# Compiled by:
# src/lib/std/standard.lib# Implementation of mailop values and the mailop combinators.
#
# Some important requirements on the implementation
# of base mailop values:
#
# 1) An is_ready, do_it or wait_for function
# is always called from inside an atomic region.
#
# 2) An is_ready returns an integer priority.
# This is 0 when not enabled,
# -1 for fixed priority and
# >0 for dynamic priority.
# The standard scheme is to associate a counter
# with the underlying synchronization value and
# to increase it by one for each synchronization attempt.
#
# 3) A wait_for is responsible for exiting the atomic region.
# A do_it should NOT leave the atomic region.
#
# 4) Each wait_for is responsible for executing the
# "clean_up" action prior to leaving the atomic region.
stipulate
package fat = fate; # fate is from src/lib/std/src/nj/fate.pkg package itt = internal_threadkit_types; # internal_threadkit_types is from src/lib/src/lib/thread-kit/src/core-thread-kit/internal-threadkit-types.pkg package ts = thread_scheduler; # thread_scheduler is from src/lib/src/lib/thread-kit/src/core-thread-kit/thread-scheduler.pkgherein
package mailop: (weak)
api {
include Mailop; # Mailop is from src/lib/src/lib/thread-kit/src/core-thread-kit/mailop.api atomic_cvar_set: internal_threadkit_types::Condition_Variable -> Void;
cvar_get_mailop: internal_threadkit_types::Condition_Variable -> Mailop( Void );
}{
capture_fate = fat::capture_fate;
throw_fate = fat::throw_fate;
call_with_current_fate = fat::call_with_current_fate;
resume_fate = fat::resume_fate;
# Some inline functions
# to improve performance:
fun map f
=
mapf
where
fun mapf [] => [];
mapf [a] => [f a];
mapf [a, b] => [f a, f b];
mapf [a, b, c] => [f a, f b, f c];
mapf (a ! b ! c ! d ! r) => (f a) ! (f b) ! (f c) ! (f d) ! (mapf r);
end;
end;
fun apply f
=
appf
where
fun appf [] => ();
appf (x ! r) => { f x; appf r;};
end;
end;
fun fold_left f init l
=
foldf (l, init)
where
fun foldf ([], accum) => accum;
foldf (x ! r, accum) => foldf (r, f (x, accum));
end;
end;
fun error msg
=
raise exception FAIL msg;
Mailop_Status == itt::Mailop_Status;
Mailop == itt::Mailop;
Base_Mailop(X) = itt::Base_Mailop(X);
# Condition variables.
#
# Because these variables are set inside
# atomic regions we have to use different
# conventions for clean-up, etc. Instead
# of requiring the wait_for fate
# to call the clean_up action and to leave
# the atomic region, we call the clean_up
# function when setting the condition variable
# (in atomic_cvar_set), and have the invariant
# that the wait_for fate is dispatched
# outside the atomic region.
# Set a condition variable.
# We assume that this function is always
# executed in an atomic region.
#
fun atomic_cvar_set (itt::CONDITION_VARIABLE state)
=
case *state
#
itt::CVAR_UNSET waiting
=>
{ ts::foreground_run_queue -> itt::THREADKIT_QUEUE { rear, ... };
#
fun add []
=>
*rear;
add ( { transaction_id=>REF itt::CANCELLED_TRANSACTION_ID, ... } ! r)
=>
add r;
add ( { transaction_id as REF (itt::TRANSACTION_ID tid), clean_up, fate } ! r)
=>
{ transaction_id
:=
itt::CANCELLED_TRANSACTION_ID;
clean_up ();
(tid, fate) ! (add r);
};
end;
state := itt::CVAR_SET 1;
rear := add waiting;
};
_ => error "cvar already set";
esac;
# The mailop constructor for
# waiting on a condition variable:
#
fun cvar_get_mailop (itt::CONDITION_VARIABLE state)
=
BASE_MAILOPS [is_ready]
where
fun wait_for { transaction_id, clean_up, next } # Reppy calls this fn blockFn
=
call_with_current_fate
(fn fate
=
{ waiting = case *state itt::CVAR_UNSET waiting => waiting;
/* */ _ => raise exception FAIL "Library bug: Unsupported case in cvar_get_mailop"; # Impossible because is_ready only queues us up if *state is not CVAR_SET.
esac;
#
item = { transaction_id, clean_up, fate };
state := itt::CVAR_UNSET (item ! waiting);
next ();
}
);
fun is_ready () # Reppy calls this fn pollFn
=
case *state
#
itt::CVAR_SET n
=>
{ state := itt::CVAR_SET (n+1);
#
MAILOP_READY { priority => n, do_it }
where
fun do_it () # Reppy calls this fn doFn
=
{ state := itt::CVAR_SET 1;
#
ts::reenable_thread_switching ();
};
end;
};
_ => MAILOP_UNREADY wait_for;
esac;
end;
# A mailop which is always ready
# and produces given result:
#
fun always_mailop result
=
BASE_MAILOPS [
fn () = itt::MAILOP_READY
{ priority => -1,
do_it => fn () = { ts::reenable_thread_switching (); # Reppy calls this field doFn
result;
}
}
];
# A mailop which is never ready:
#
never = BASE_MAILOPS [];
guard = GUARD;
with_nack = WITH_NACK;
# "bevt" == "base event"
fun choose (el: List( Mailop(X) ))
=
gather_bevts (reverse el, [])
where
fun gather_bevts ([], l) => BASE_MAILOPS l;
gather_bevts (BASE_MAILOPS [] ! r, l) => gather_bevts (r, l);
gather_bevts (BASE_MAILOPS [bev] ! r, base_mailops') => gather_bevts (r, bev ! base_mailops');
gather_bevts (BASE_MAILOPS base_mailops ! r, base_mailops') => gather_bevts (r, base_mailops @ base_mailops');
gather_bevts (mailops, []) => gather (mailops, []);
gather_bevts (mailops, l) => gather (mailops, [BASE_MAILOPS l]);
end
also
fun gather ([], [mailop]) => mailop;
gather ([], mailops) => CHOOSE mailops;
gather (CHOOSE mailops ! r, mailops') => gather (r, mailops @ mailops');
gather (BASE_MAILOPS base_mailops ! r, BASE_MAILOPS base_mailops' ! r') => gather (r, BASE_MAILOPS (base_mailops @ base_mailops') ! r');
gather (mailop ! r, mailops') => gather (r, mailop ! mailops');
end;
end;
fun wrap (mailop, wfn)
=
wrap' mailop
where
fun wrap_base_mailop is_ready ()
=
case (is_ready ())
#
MAILOP_READY { priority, do_it } => MAILOP_READY { priority, do_it => wfn o do_it };
MAILOP_UNREADY wait_for => MAILOP_UNREADY (wfn o wait_for);
esac;
fun wrap' (BASE_MAILOPS base_mailops) => BASE_MAILOPS (map wrap_base_mailop base_mailops);
#
wrap' (CHOOSE mailops) => CHOOSE (map wrap' mailops);
wrap' (GUARD g) => GUARD (fn () = wrap (g(), wfn));
wrap' (WITH_NACK f) => WITH_NACK (fn mailop = wrap (f mailop, wfn));
end;
end;
(==>) = wrap; # Infix synonym for readability.
fun wrap_handler (mailop, hfn)
=
wrap' mailop
where
fun wrap f x
=
f x
except
exn = hfn exn;
fun wrap_base_mailop is_ready ()
=
case (is_ready ())
#
MAILOP_READY { priority, do_it } => MAILOP_READY { priority, do_it => wrap do_it };
MAILOP_UNREADY wait_for => MAILOP_UNREADY (wrap wait_for);
esac;
fun wrap' (BASE_MAILOPS base_mailops) => BASE_MAILOPS (map wrap_base_mailop base_mailops);
wrap' (CHOOSE mailops) => CHOOSE (map wrap' mailops);
wrap' (GUARD g) => GUARD (fn () = wrap_handler (g(), hfn));
wrap' (WITH_NACK f) => WITH_NACK (fn mailop = wrap_handler (f mailop, hfn));
end;
end;
Mailop_Group X
= BASE_GROUP List( Base_Mailop(X) )
| GROUP List( Mailop_Group(X) )
| NACK_GROUP (itt::Condition_Variable, Mailop_Group(X))
;
/* +DEBUG
fun sayGroup (msg, eg) = let
fun f (BASE_GROUP l, sl) = "BASE_GROUP(" ! int::to_string (list::length l) ::()) ! sl
| f (GROUP l, sl) = "GROUP(" ! g (l, ")" ! sl)
| f (NACK_GROUP l, sl) = "NACK_GROUP(" ! f(#2 l, ")" ! sl)
also g ([], sl) = sl
| g ([x], sl) = f (x, sl)
| g (x ! r, sl) = f (x, ", " ! g (r, sl))
in
Debug::sayDebugId (string::cat (msg ! ": " ! f (eg, ["\n"])))
end
-DEBUG*/
# Force the evaluation of
# any guards in a mailop group:
#
fun force (BASE_MAILOPS l)
=>
BASE_GROUP l;
force mailop
=>
force' mailop
where
fun force' (GUARD g)
=>
force' (g ());
force' (WITH_NACK f)
=>
{ cvar = itt::CONDITION_VARIABLE (REF (itt::CVAR_UNSET []));
NACK_GROUP (cvar, force' (f (cvar_get_mailop cvar)));
};
force' (BASE_MAILOPS group)
=>
BASE_GROUP group;
force' (CHOOSE mailops)
=>
force_bl (mailops, [])
where
fun force_bl ([], base_mailops)
=>
BASE_GROUP base_mailops;
force_bl (mailop ! r, base_mailops')
=>
case (force' mailop)
BASE_GROUP base_mailops => force_bl (r, base_mailops @ base_mailops');
GROUP group => force_l (r, group @ [BASE_GROUP base_mailops']);
group => force_l (r, [group, BASE_GROUP base_mailops']);
esac;
end
also
fun force_l ([], [group]) => group;
force_l ([], l) => GROUP l;
force_l (mailop ! r, l)
=>
case (force' mailop, l)
#
(BASE_GROUP base_mailops, BASE_GROUP base_mailops' ! r')
=>
force_l (r, BASE_GROUP (base_mailops @ base_mailops') ! r');
(GROUP group, l)
=>
force_l (r, group @ l);
(group, l)
=>
force_l (r, group ! l);
esac;
end;
end;
end;
end;
end;
stipulate
count = REF 0;
fun random i
=
{ j = *count;
if (j == 1000000) count := 0;
else count := j+1;
fi;
int::rem (j, i);
};
herein
fun select_do_fn ([(_, do_it)], _)
=>
do_it;
select_do_fn (l, n)
=>
max (l, 0, 0, [])
where
fun priority -1 => n;
priority p => p;
end;
fun max ((p, do_it) ! r, max_p, k, do_its)
=>
{ p = priority p;
if (p > max_p) max (r, p, 1, [do_it]);
elif (p == max_p) max (r, max_p, k+1, do_it ! do_its);
else max (r, max_p, k, do_its);
fi;
};
max ([], _, k, [do_it])
=>
do_it;
max ([], _, k, do_its)
=>
list::nth (do_its, random k);
end;
end;
end;
end;
fun make_flag ()
=
{ flag = REF (itt::TRANSACTION_ID (ts::get_current_thread()));
(flag, fn () = flag := itt::CANCELLED_TRANSACTION_ID);
};
fun sync_on_one_mailop (is_ready: Base_Mailop(X))
=
{ ts::disable_thread_switching ();
#
case (is_ready ())
#
MAILOP_READY { do_it, ... } # Reppy calls this field doFn
=>
do_it ();
MAILOP_UNREADY wait_for
=>
{ my (flag, set_flag)
=
make_flag ();
wait_for { transaction_id=>flag, clean_up=>set_flag, next=>ts::reenable_thread_switching_and_dispatch_next_thread };
};
esac;
};
# This function handles the case of
# synchronizing on a list of base mailops
# (w/o any negative acknowledgements).
#
# It also handles the case of synchronizing
# on NEVER.
#
fun sync_on_base_mailops [] => ts::dispatch_next_thread ();
sync_on_base_mailops [bev] => sync_on_one_mailop bev;
sync_on_base_mailops base_mailops
=>
{ fun ext ([], wait_fors)
=>
capture_fate
(fn fate
=
{ throw_fate = throw_fate fate;
my (transaction_id, set_flag)
=
make_flag ();
fun log []
=>
ts::reenable_thread_switching_and_dispatch_next_thread ();
log (wait_for ! r)
=>
throw_fate
(wait_for
{ transaction_id,
clean_up => set_flag,
next => fn () = log r
}
);
end;
log wait_fors;
error "[log]";
}
);
ext (is_ready ! r, wait_fors)
=>
case (is_ready ())
#
MAILOP_READY { priority, do_it } => ext_rdy (r, [(priority, do_it)], 1);
MAILOP_UNREADY wait_for => ext (r, wait_for ! wait_fors);
esac;
end
# NOTE: Maybe we should just keep
# track of the max priority?
# What about fairness to fixed
# priority mailops (e::g., always, timeout?)
also
fun ext_rdy ([], do_its, n)
=>
select_do_fn (do_its, n) ();
ext_rdy (is_ready ! r, do_its, n)
=>
case (is_ready ())
#
MAILOP_READY { priority, do_it } => ext_rdy (r, (priority, do_it) ! do_its, n+1);
_ => ext_rdy (r, do_its, n);
esac;
end;
ts::disable_thread_switching ();
ext (base_mailops, []);
};
end;
# Walk the mailop group tree,
# collecting the base mailops
# (with associated ack flags),
# also a list of flag sets.
#
# A flag set is a
# (cCar, List(Flag(Ack)))
# pair, where the flags are
# those associated with the
# mailops covered by the nack
# cvar.
#
fun collect group
=
{ un_wrapped_flag
=
REF FALSE;
fun gather_wrapped (group, bl, flg_sets)
=
{ my (bl, _, flg_sets)
=
gather (group, bl, [], flg_sets);
(bl, flg_sets);
}
where
fun gather (BASE_GROUP base_mailops, bl, all_flgs, flg_sets)
=>
{ fun append ([], bl, all_flgs)
=>
(bl, all_flgs);
append (bev ! r, bl, all_flgs)
=>
{ flag = REF FALSE;
append (r, (bev, flag) ! bl, flag ! all_flgs);
};
end;
my (bl', all_flgs')
=
append (base_mailops, bl, all_flgs);
(bl', all_flgs', flg_sets);
};
gather (GROUP group, bl, all_flgs, flg_sets)
=>
{ fun f (group', (bl', all_flgs', flg_sets'))
=
gather (group', bl', all_flgs', flg_sets');
fold_left f (bl, all_flgs, flg_sets) group;
};
gather (NACK_GROUP (cvar, group), bl, all_flgs, flg_sets)
=>
{ my (bl', all_flgs', flg_sets')
=
gather (group, bl, [], flg_sets);
(bl', all_flgs' @ all_flgs, (cvar, all_flgs') ! flg_sets');
};
end;
end;
case group
#
GROUP _ =>
gather (group, [], [])
where
un_wrapped_flag = REF FALSE;
fun append ([], bl) => bl;
append (bev ! r, bl) => append (r, (bev, un_wrapped_flag) ! bl);
end;
fun gather (BASE_GROUP base_mailops, bl, flg_sets)
=>
(append (base_mailops, bl), flg_sets);
gather (GROUP group, bl, flg_sets)
=>
fold_left f (bl, flg_sets) group
where
fun f (group', (bl', flg_sets'))
=
gather (group', bl', flg_sets');
end;
gather (group as NACK_GROUP _, bl, flg_sets)
=>
gather_wrapped (group, bl, flg_sets);
end;
end;
group =>
gather_wrapped (group, [], []);
esac;
};
# This function handles the more
# complicated case of synchronization
# on groups of mailops where negative
# acknowledgements are involved.
#
fun sync_on_group group
=
{ my (bl, flg_sets)
=
collect group;
fun check_cvars ()
=
apply check_cvar flg_sets
where
# check_cvar checks the flags of a flag set.
# If they are all FALSE then the
# corresponding cvar is set to signal
# the negative ack.
#
fun check_cvar (cvar, flgs)
=
check_flgs flgs
where
fun check_flgs []
=>
atomic_cvar_set cvar;
check_flgs ((REF TRUE) ! _)
=>
();
check_flgs (_ ! r)
=>
check_flgs r;
end;
end;
end;
fun ext ([], wait_fors)
=>
capture_fate
(fn fate
=
{ throw_fate = throw_fate fate;
transaction_id
=
REF (itt::TRANSACTION_ID (ts::get_current_thread ()));
fun set_flag flag ()
=
{ transaction_id := itt::CANCELLED_TRANSACTION_ID;
flag := TRUE;
check_cvars ();
};
fun log []
=>
ts::reenable_thread_switching_and_dispatch_next_thread ();
log ((wait_for, flag) ! r)
=>
throw_fate
(wait_for { transaction_id,
clean_up => set_flag flag,
next => fn () = log r
}
);
end;
log wait_fors;
error "[log]";
}
);
ext ((is_ready, flag) ! r, wait_fors)
=>
case (is_ready ())
#
MAILOP_READY { priority, do_it } => ext_rdy (r, [(priority, (do_it, flag))], 1);
MAILOP_UNREADY wait_for => ext (r, (wait_for, flag) ! wait_fors);
esac;
end
# NOTE: maybe we should just
# keep track of the max priority?
# What about fairness to fixed
# priority mailops (e::g., always, timeout?)
#
also
fun ext_rdy ([], do_its, n)
=>
{ my (do_it, flag)
=
select_do_fn (do_its, n);
flag := TRUE;
check_cvars ();
do_it();
};
ext_rdy ((is_ready, flag) ! r, do_its, n)
=>
case (is_ready ())
#
MAILOP_READY { priority, do_it }
=>
ext_rdy (r, (priority, (do_it, flag)) ! do_its, n+1);
_ =>
ext_rdy (r, do_its, n);
esac;
end;
ts::disable_thread_switching ();
ext (bl, []);
}; # fun sync_on_group
fun do_mailop mailop
=
case (force mailop)
#
BASE_GROUP base_mailops => sync_on_base_mailops base_mailops;
group => sync_on_group group;
esac;
fun select mailops
=
{ fun force_bl ([], base_mailops)
=>
BASE_GROUP base_mailops;
force_bl (mailop ! r, base_mailops')
=>
case (force' mailop)
#
BASE_GROUP base_mailops => force_bl (r, base_mailops @ base_mailops');
#
GROUP group => force_l (r, group @ [BASE_GROUP base_mailops']);
group => force_l (r, [group, BASE_GROUP base_mailops']);
esac;
end
also
fun force_l ([], [group]) => group;
force_l ([], l) => GROUP l;
force_l (mailop ! r, l)
=>
case (force' mailop, l)
#
(BASE_GROUP base_mailops, BASE_GROUP base_mailops' ! r')
=>
force_l (r, BASE_GROUP (base_mailops @ base_mailops') ! r');
(GROUP group, l)
=>
force_l (r, group @ l);
(group, l)
=>
force_l (r, group ! l);
esac;
end
also
fun force' (GUARD g)
=>
force' (g ());
force' (WITH_NACK f)
=>
{ cvar = itt::CONDITION_VARIABLE (REF (itt::CVAR_UNSET []));
NACK_GROUP (cvar, force' (f (cvar_get_mailop cvar)));
};
force' (BASE_MAILOPS group)
=>
BASE_GROUP group;
force' (CHOOSE mailops)
=>
force_bl (mailops, []);
end;
case (force_bl (mailops, []))
#
BASE_GROUP base_mailops
=>
sync_on_base_mailops base_mailops;
group =>
sync_on_group group;
esac;
}; # fun select
}; # package mailop
end;
## COPYRIGHT (c) 1989-1991 John H. Reppy
## COPYRIGHT (c) 1995 AT&T Bell Laboratories.
## Subsequent changes by Jeff Prothero Copyright (c) 2010-2012,
## released under Gnu Public Licence version 3.


