001 package fj.control.parallel; 002 003 import fj.Effect; 004 import fj.F; 005 import fj.Unit; 006 import fj.P1; 007 008 /** 009 * Light weight actors for Java. Concurrency is controlled by a parallel Strategy. 010 * The Strategy serves as the Actor's execution engine, and as its mailbox. 011 * <p/> 012 * Given some effect, the Actor performs the effect on its messages using its Strategy, transforming them 013 * into unit-products. The unit-product represents a possibly running computation which is executing the effect. 014 * <p/> 015 * <b>NOTE:</b> An value of this type may process more than one message at a time, depending on its Strategy, 016 * and so is generally not thread safe unless its Effect is. For an actor that processes only one message at a time, 017 * see {@link QueueActor}. 018 * Author: Runar 019 */ 020 public final class Actor<A> { 021 022 private final Strategy<Unit> s; 023 private final F<A, P1<Unit>> f; 024 025 private Actor(final Strategy<Unit> s, final F<A, P1<Unit>> e) { 026 this.s = s; 027 f = new F<A, P1<Unit>>() { 028 public P1<Unit> f(final A a) { 029 return s.par(e.f(a)); 030 } 031 }; 032 } 033 034 035 /** 036 * Creates a new Actor that uses the given parallelization strategy and has the given side-effect. 037 * 038 * @param s The parallelization strategy to use for the new Actor. 039 * @param e The side-effect to apply to messages passed to the Actor. 040 * @return A new actor that uses the given parallelization strategy and has the given side-effect. 041 */ 042 public static <A> Actor<A> actor(final Strategy<Unit> s, final Effect<A> e) { 043 return new Actor<A>(s, P1.curry(Effect.Projection.e(e))); 044 } 045 046 /** 047 * Creates a new Actor that uses the given parallelization strategy and has the given side-effect. 048 * 049 * @param s The parallelization strategy to use for the new Actor. 050 * @param e The function projection of a side-effect to apply to messages passed to the Actor. 051 * @return A new actor that uses the given parallelization strategy and has the given side-effect. 052 */ 053 public static <A> Actor<A> actor(final Strategy<Unit> s, final F<A, P1<Unit>> e) { 054 return new Actor<A>(s, e); 055 } 056 057 /** 058 * Pass a message to this actor, applying its side-effect to the message. The side-effect is applied in a concurrent 059 * computation, resulting in a product referencing that computation. 060 * 061 * @param a The message to send to this actor. 062 * @return A unit-product that represents the action running concurrently. 063 */ 064 public P1<Unit> act(final A a) { 065 return f.f(a); 066 } 067 068 /** 069 * Contravariant functor pattern. Creates a new actor whose message is transformed by the given function 070 * before being passed to this actor. 071 * 072 * @param f The function to use for the transformation 073 * @return A new actor which passes its messages through the given function, to this actor. 074 */ 075 public <B> Actor<B> comap(final F<B, A> f) { 076 return actor(s, new F<B, P1<Unit>>() { 077 public P1<Unit> f(final B b) { 078 return act(f.f(b)); 079 } 080 }); 081 } 082 083 /** 084 * Transforms this actor to an actor on promises. 085 * 086 * @return A new actor, equivalent to this actor, that acts on promises. 087 */ 088 public Actor<Promise<A>> promise() { 089 return actor(s, new Effect<Promise<A>>() { 090 public void e(final Promise<A> b) { 091 b.to(Actor.this); 092 } 093 }); 094 } 095 096 }