001 package fj.control.parallel; 002 003 import fj.Effect; 004 import fj.F; 005 import fj.P; 006 import fj.P1; 007 import fj.Unit; 008 import static fj.Function.compose; 009 import static fj.control.parallel.Actor.actor; 010 011 import java.util.Queue; 012 import java.util.concurrent.ConcurrentLinkedQueue; 013 import java.util.concurrent.atomic.AtomicBoolean; 014 015 /** 016 * An Actor equipped with a queue. Messages are acted on in some (but any) order. This actor is guaranteed to 017 * act on only one message at a time, but the order in which they are acted upon is undefined. 018 * Author: Runar 019 */ 020 public final class QueueActor<A> { 021 private final AtomicBoolean suspended = new AtomicBoolean(true); 022 private final Queue<A> mbox = new ConcurrentLinkedQueue<A>(); 023 024 private final Actor<Unit> act; 025 private final Actor<A> selfish; 026 027 private QueueActor(final Strategy<Unit> s, final Effect<A> ea) { 028 act = actor(s, new Effect<Unit>() { 029 public void e(final Unit u) { 030 ea.e(mbox.remove()); 031 if (mbox.isEmpty()) { 032 suspended.set(true); 033 work(); 034 } else 035 act.act(u); 036 } 037 }); 038 selfish = 039 actor(s, new Effect<A>() { 040 public void e(final A a) { 041 act(a); 042 } 043 }); 044 } 045 046 private P1<Unit> work() { 047 return suspended.compareAndSet(!mbox.isEmpty(), false) ? 048 act.act(Unit.unit()) : P.p(Unit.unit()); 049 } 050 051 /** 052 * Constructs an actor, equipped with a queue, that uses the given strategy and has the given effect. 053 * 054 * @param s The strategy to use to manage this actor's queue. 055 * @param e What this actor does with its messages. 056 * @return A new actor, equipped with a queue so that it processes one message at a time. 057 */ 058 public static <A> QueueActor<A> queueActor(final Strategy<Unit> s, final Effect<A> e) { 059 return new QueueActor<A>(s, e); 060 } 061 062 /** 063 * Constructs an actor, equipped with a queue, that uses the given strategy and has the given effect. 064 * 065 * @param s The strategy to use to manage this actor's queue. 066 * @param e What this actor does with its messages. 067 * @return A new actor, equipped with a queue so that it processes one message at a time. 068 */ 069 public static <A> QueueActor<A> queueActor(final Strategy<Unit> s, final F<A, P1<Unit>> e) { 070 return queueActor(s, Effect.Projection.f(compose(P1.<Unit>__1(), e))); 071 } 072 073 /** 074 * Provides an Actor representation of this QueueActor 075 * 076 * @return An Actor that represents this QueueActor 077 */ 078 public Actor<A> asActor() { 079 return selfish; 080 } 081 082 /** 083 * Submit a message to this actor's queue. 084 * 085 * @param a A message to submit to this actor's queue. 086 */ 087 public void act(final A a) { 088 if (mbox.offer(a)) 089 work(); 090 else 091 selfish.act(a); 092 } 093 094 }