001    package fj.control.parallel;
002    
003    import fj.Effect;
004    import fj.F;
005    import fj.P;
006    import fj.P1;
007    import fj.Unit;
008    import static fj.Function.compose;
009    import static fj.control.parallel.Actor.actor;
010    
011    import java.util.Queue;
012    import java.util.concurrent.ConcurrentLinkedQueue;
013    import java.util.concurrent.atomic.AtomicBoolean;
014    
015    /**
016     * An Actor equipped with a queue. Messages are acted on in some (but any) order. This actor is guaranteed to
017     * act on only one message at a time, but the order in which they are acted upon is undefined.
018     * Author: Runar
019     */
020    public final class QueueActor<A> {
021      private final AtomicBoolean suspended = new AtomicBoolean(true);
022      private final Queue<A> mbox = new ConcurrentLinkedQueue<A>();
023    
024      private final Actor<Unit> act;
025      private final Actor<A> selfish;
026    
027      private QueueActor(final Strategy<Unit> s, final Effect<A> ea) {
028        act = actor(s, new Effect<Unit>() {
029          public void e(final Unit u) {
030            ea.e(mbox.remove());
031            if (mbox.isEmpty()) {
032              suspended.set(true);
033              work();
034            } else
035              act.act(u);
036          }
037        });
038        selfish =
039            actor(s, new Effect<A>() {
040              public void e(final A a) {
041                act(a);
042              }
043            });
044      }
045    
046      private P1<Unit> work() {
047        return suspended.compareAndSet(!mbox.isEmpty(), false) ?
048               act.act(Unit.unit()) : P.p(Unit.unit());
049      }
050    
051      /**
052       * Constructs an actor, equipped with a queue, that uses the given strategy and has the given effect.
053       *
054       * @param s The strategy to use to manage this actor's queue.
055       * @param e What this actor does with its messages.
056       * @return A new actor, equipped with a queue so that it processes one message at a time.
057       */
058      public static <A> QueueActor<A> queueActor(final Strategy<Unit> s, final Effect<A> e) {
059        return new QueueActor<A>(s, e);
060      }
061    
062      /**
063       * Constructs an actor, equipped with a queue, that uses the given strategy and has the given effect.
064       *
065       * @param s The strategy to use to manage this actor's queue.
066       * @param e What this actor does with its messages.
067       * @return A new actor, equipped with a queue so that it processes one message at a time.
068       */
069      public static <A> QueueActor<A> queueActor(final Strategy<Unit> s, final F<A, P1<Unit>> e) {
070        return queueActor(s, Effect.Projection.f(compose(P1.<Unit>__1(), e)));
071      }
072    
073      /**
074       * Provides an Actor representation of this QueueActor
075       *
076       * @return An Actor that represents this QueueActor
077       */
078      public Actor<A> asActor() {
079        return selfish;
080      }
081    
082      /**
083       * Submit a message to this actor's queue.
084       *
085       * @param a A message to submit to this actor's queue.
086       */
087      public void act(final A a) {
088        if (mbox.offer(a))
089          work();
090        else
091          selfish.act(a);
092      }
093    
094    }