001    package fj.control.parallel;
002    
003    import fj.Effect;
004    import fj.F;
005    import fj.Unit;
006    import fj.P1;
007    
008    /**
009     * Light weight actors for Java. Concurrency is controlled by a parallel Strategy.
010     * The Strategy serves as the Actor's execution engine, and as its mailbox.
011     * <p/>
012     * Given some effect, the Actor performs the effect on its messages using its Strategy, transforming them
013     * into unit-products. The unit-product represents a possibly running computation which is executing the effect.
014     * <p/>
015     * <b>NOTE:</b> An value of this type may process more than one message at a time, depending on its Strategy,
016     * and so is generally not thread safe unless its Effect is. For an actor that processes only one message at a time,
017     * see {@link QueueActor}.
018     * Author: Runar
019     */
020    public final class Actor<A> {
021    
022      private final Strategy<Unit> s;
023      private final F<A, P1<Unit>> f;
024    
025      private Actor(final Strategy<Unit> s, final F<A, P1<Unit>> e) {
026        this.s = s;
027        f = new F<A, P1<Unit>>() {
028          public P1<Unit> f(final A a) {
029            return s.par(e.f(a));
030          }
031        };
032      }
033    
034    
035      /**
036       * Creates a new Actor that uses the given parallelization strategy and has the given side-effect.
037       *
038       * @param s The parallelization strategy to use for the new Actor.
039       * @param e The side-effect to apply to messages passed to the Actor.
040       * @return A new actor that uses the given parallelization strategy and has the given side-effect.
041       */
042      public static <A> Actor<A> actor(final Strategy<Unit> s, final Effect<A> e) {
043        return new Actor<A>(s, P1.curry(Effect.Projection.e(e)));
044      }
045    
046      /**
047       * Creates a new Actor that uses the given parallelization strategy and has the given side-effect.
048       *
049       * @param s The parallelization strategy to use for the new Actor.
050       * @param e The function projection of a side-effect to apply to messages passed to the Actor.
051       * @return A new actor that uses the given parallelization strategy and has the given side-effect.
052       */
053      public static <A> Actor<A> actor(final Strategy<Unit> s, final F<A, P1<Unit>> e) {
054        return new Actor<A>(s, e);
055      }
056    
057      /**
058       * Pass a message to this actor, applying its side-effect to the message. The side-effect is applied in a concurrent
059       * computation, resulting in a product referencing that computation.
060       *
061       * @param a The message to send to this actor.
062       * @return A unit-product that represents the action running concurrently.
063       */
064      public P1<Unit> act(final A a) {
065        return f.f(a);
066      }
067    
068      /**
069       * Contravariant functor pattern. Creates a new actor whose message is transformed by the given function
070       * before being passed to this actor.
071       *
072       * @param f The function to use for the transformation
073       * @return A new actor which passes its messages through the given function, to this actor.
074       */
075      public <B> Actor<B> comap(final F<B, A> f) {
076        return actor(s, new F<B, P1<Unit>>() {
077          public P1<Unit> f(final B b) {
078            return act(f.f(b));
079          }
080        });
081      }
082    
083      /**
084       * Transforms this actor to an actor on promises.
085       *
086       * @return A new actor, equivalent to this actor, that acts on promises.
087       */
088      public Actor<Promise<A>> promise() {
089        return actor(s, new Effect<Promise<A>>() {
090          public void e(final Promise<A> b) {
091            b.to(Actor.this);
092          }
093        });
094      }
095    
096    }