001    package fj.control.parallel;
002    
003    import fj.Effect;
004    import fj.F;
005    import fj.F2;
006    import fj.P;
007    import fj.P1;
008    import fj.P2;
009    import fj.Unit;
010    import static fj.P.p;
011    import static fj.Function.curry;
012    import static fj.Function.identity;
013    import static fj.control.parallel.Actor.actor;
014    import static fj.control.parallel.Callables.normalise;
015    import static fj.control.parallel.QueueActor.queueActor;
016    import fj.data.Either;
017    import fj.data.List;
018    import fj.data.Option;
019    import static fj.data.Option.none;
020    import static fj.data.Option.some;
021    import fj.data.Stream;
022    
023    import java.util.LinkedList;
024    import java.util.Queue;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.CountDownLatch;
027    import java.util.concurrent.TimeUnit;
028    
029    /**
030     * Represents a non-blocking future value. Products, functions, and actors, given to the methods on this class,
031     * are executed concurrently, and the Promise serves as a handle on the result of the computation. Provides monadic
032     * operations so that future computations can be combined
033     * <p/>
034     * Author: Runar
035     */
036    public final class Promise<A> {
037    
038      private final Actor<P2<Either<P1<A>, Actor<A>>, Promise<A>>> actor;
039    
040      private final Strategy<Unit> s;
041    
042      private final CountDownLatch l = new CountDownLatch(1);
043      private volatile Option<A> v = none();
044      private final Queue<Actor<A>> waiting = new LinkedList<Actor<A>>();
045    
046      private Promise(final Strategy<Unit> s, final Actor<P2<Either<P1<A>, Actor<A>>, Promise<A>>> qa) {
047        this.s = s;
048        actor = qa;
049      }
050    
051      private static <A> Promise<A> mkPromise(final Strategy<Unit> s) {
052        final Actor<P2<Either<P1<A>, Actor<A>>, Promise<A>>> q =
053            queueActor(s, new Effect<P2<Either<P1<A>, Actor<A>>, Promise<A>>>() {
054              public void e(final P2<Either<P1<A>, Actor<A>>, Promise<A>> p) {
055                final Promise<A> snd = p._2();
056                final Queue<Actor<A>> as = snd.waiting;
057                if (p._1().isLeft()) {
058                  final A a = p._1().left().value()._1();
059                  snd.v = some(a);
060                  snd.l.countDown();
061                  while (!as.isEmpty())
062                    as.remove().act(a);
063                } else if (snd.v.isNone())
064                  as.add(p._1().right().value());
065                else
066                  p._1().right().value().act(snd.v.some());
067              }
068            }).asActor();
069        return new Promise<A>(s, q);
070      }
071    
072      /**
073       * Promises to provide the value of the given 1-product, in the future.
074       * Represents the unit function for promises.
075       *
076       * @param s The strategy with which to fulfil the promise.
077       * @param a The 1-product to evaluate concurrently.
078       * @return A promise representing the future result of evaluating the given 1-product.
079       */
080      public static <A> Promise<A> promise(final Strategy<Unit> s, final P1<A> a) {
081        final Promise<A> p = mkPromise(s);
082        p.actor.act(P.p(Either.<P1<A>, Actor<A>>left(a), p));
083        return p;
084      }
085    
086      /**
087       * Provides a first-class unit function for promises.
088       *
089       * @param s The strategy with which to fulfil promises.
090       * @return A function that, given a 1-product, yields a promise of that product's value.
091       */
092      public static <A> F<P1<A>, Promise<A>> promise(final Strategy<Unit> s) {
093        return new F<P1<A>, Promise<A>>() {
094          public Promise<A> f(final P1<A> a) {
095            return promise(s, a);
096          }
097        };
098      }
099    
100      /**
101       * Provides a promise to call the given Callable in the future.
102       *
103       * @param s The strategy with which to fulfil the promise.
104       * @param a The Callable to evaluate concurrently.
105       * @return A promise of a new Callable that will return the result of calling the given Callable.
106       */
107      public static <A> Promise<Callable<A>> promise(final Strategy<Unit> s, final Callable<A> a) {
108        return promise(s, new P1<Callable<A>>() {
109          public Callable<A> _1() {
110            return normalise(a);
111          }
112        });
113      }
114    
115      /**
116       * Transforms any function so that it returns a promise of a value instead of an actual value.
117       * Represents the Kleisli arrow for the Promise monad.
118       *
119       * @param s The strategy with which to fulfil the promise.
120       * @param f The function to turn into a promise-valued function.
121       * @return The given function transformed into a function that returns a promise.
122       */
123      public static <A, B> F<A, Promise<B>> promise(final Strategy<Unit> s, final F<A, B> f) {
124        return new F<A, Promise<B>>() {
125          public Promise<B> f(final A a) {
126            return promise(s, P1.curry(f).f(a));
127          }
128        };
129      }
130    
131      /**
132       * Promises to send a value to the given actor in the future.
133       *
134       * @param a An actor that will receive this Promise's value in the future.
135       */
136      public void to(final Actor<A> a) {
137        actor.act(P.p(Either.<P1<A>, Actor<A>>right(a), this));
138      }
139    
140      /**
141       * Provides a promise to apply the given function to this promise's future value (covariant functor pattern).
142       *
143       * @param f The function to apply to this promise's future value.
144       * @return A promise representing the future result of applying the given function to this promised value.
145       */
146      public <B> Promise<B> fmap(final F<A, B> f) {
147        return bind(promise(s, f));
148      }
149    
150      /**
151       * Promotes any function to a transformation between promises (covariant functor pattern).
152       *
153       * @param f The function to promote to a transformation between promises.
154       * @return That function lifted to a function on Promises.
155       */
156      public static <A, B> F<Promise<A>, Promise<B>> fmap_(final F<A, B> f) {
157        return new F<Promise<A>, Promise<B>>() {
158          public Promise<B> f(final Promise<A> a) {
159            return a.fmap(f);
160          }
161        };
162      }
163    
164      /**
165       * Turns a promise of a promise into just a promise. The join function for the Promise monad.
166       * Promise to give it a Promise of an A, and it will promise you an A in return.
167       *
168       * @param p A promise of a promise.
169       * @return The promised promise.
170       */
171      public static <A> Promise<A> join(final Promise<Promise<A>> p) {
172        final F<Promise<A>, Promise<A>> id = identity();
173        return p.bind(id);
174      }
175    
176      /**
177       * Turns a product of a promise into just a promise. Does not block on the product by calling it,
178       * but creates a new promise with a final join.
179       *
180       * @param s The strategy with which to fulfil the promise.
181       * @param p A product-1 of a promise to turn into just a promise.
182       * @return The joined promise.
183       */
184      public static <A> Promise<A> join(final Strategy<Unit> s, final P1<Promise<A>> p) {
185        return join(promise(s, p));
186      }
187    
188      /**
189       * Binds the given function over this promise, with a final join.
190       * The bind function for the Promise monad.
191       *
192       * @param f The function to bind over this promise.
193       * @return The result of applying the given function to this promised value.
194       */
195      public <B> Promise<B> bind(final F<A, Promise<B>> f) {
196        final Promise<B> r = mkPromise(s);
197        final Actor<B> ab = actor(s, new Effect<B>() {
198          public void e(final B b) {
199            r.actor.act(P.p(Either.<P1<B>, Actor<B>>left(P.p(b)), r));
200          }
201        });
202        to(ab.promise().comap(f));
203        return r;
204      }
205    
206      /**
207       * Performs function application within a promise (applicative functor pattern).
208       *
209       * @param pf The promised function to apply.
210       * @return A new promise after applying the given promised function to this promise.
211       */
212      public <B> Promise<B> apply(final Promise<F<A, B>> pf) {
213        return pf.bind(new F<F<A, B>, Promise<B>>() {
214          public Promise<B> f(final F<A, B> f) {
215            return fmap(f);
216          }
217        });
218      }
219    
220      /**
221       * Binds the given function to this promise and the given promise, with a final join.
222       *
223       * @param pb A promise with which to bind the given function.
224       * @param f  The function to apply to the given promised values.
225       * @return A new promise after performing the map, then final join.
226       */
227      public <B, C> Promise<C> bind(final Promise<B> pb, final F<A, F<B, C>> f) {
228        return pb.apply(fmap(f));
229      }
230    
231      /**
232       * Binds the given function to this promise and the given promise, with a final join.
233       *
234       * @param p A promise with which to bind the given function.
235       * @param f The function to apply to the given promised values.
236       * @return A new promise after performing the map, then final join.
237       */
238      public <B, C> Promise<C> bind(final P1<Promise<B>> p, final F<A, F<B, C>> f) {
239        return Promise.join(s, p).apply(fmap(f));
240      }
241    
242      /**
243       * Promotes a function of arity-2 to a function on promises.
244       *
245       * @param f The function to promote.
246       * @return A function of arity-2 promoted to map over promises.
247       */
248      public static <A, B, C> F<Promise<A>, F<Promise<B>, Promise<C>>> liftM2(final F<A, F<B, C>> f) {
249        return curry(new F2<Promise<A>, Promise<B>, Promise<C>>() {
250          public Promise<C> f(final Promise<A> ca, final Promise<B> cb) {
251            return ca.bind(cb, f);
252          }
253        });
254      }
255    
256      /**
257       * Turns a List of promises into a single promise of a List.
258       *
259       * @param s  The strategy with which to sequence the promises.
260       * @param as The list of promises to transform.
261       * @return A single promise for the given List.
262       */
263      public static <A> Promise<List<A>> sequence(final Strategy<Unit> s, final List<Promise<A>> as) {
264        return join(foldRight(s, liftM2(List.<A>cons()), promise(s, P.p(List.<A>nil()))).f(as));
265      }
266    
267      /**
268       * First-class version of the sequence function through a List.
269       *
270       * @param s The strategy with which to sequence a given list of promises.
271       * @return A function that turns a list of promises into a single promise of a list.
272       */
273      public static <A> F<List<Promise<A>>, Promise<List<A>>> sequence(final Strategy<Unit> s) {
274        return new F<List<Promise<A>>, Promise<List<A>>>() {
275          public Promise<List<A>> f(final List<Promise<A>> as) {
276            return sequence(s, as);
277          }
278        };
279      }
280    
281      /**
282       * Turns a Stream of promises into a single promise of a Stream.
283       *
284       * @param s  The strategy with which to sequence the promises.
285       * @param as The Stream of promises to transform.
286       * @return A single promise for the given Stream.
287       */
288      public static <A> Promise<Stream<A>> sequence(final Strategy<Unit> s, final Stream<Promise<A>> as) {
289        return join(foldRightS(s, curry(new F2<Promise<A>, P1<Promise<Stream<A>>>, Promise<Stream<A>>>() {
290          public Promise<Stream<A>> f(final Promise<A> o, final P1<Promise<Stream<A>>> p) {
291            return o.bind(new F<A, Promise<Stream<A>>>() {
292              public Promise<Stream<A>> f(final A a) {
293                return p._1().fmap(Stream.<A>cons_().f(a));
294              }
295            });
296          }
297        }), promise(s, P.p(Stream.<A>nil()))).f(as));
298      }
299    
300      /**
301       * First-class version of the sequence function through a Stream.
302       *
303       * @param s The strategy with which to sequence a given Stream of promises.
304       * @return A function that turns a list of promises into a single promise of a Stream..
305       */
306      public static <A> F<List<Promise<A>>, Promise<List<A>>> sequenceS(final Strategy<Unit> s) {
307        return new F<List<Promise<A>>, Promise<List<A>>>() {
308          public Promise<List<A>> f(final List<Promise<A>> as) {
309            return sequence(s, as);
310          }
311        };
312      }
313    
314      /**
315       * Transforms a product of a promise to a promise of a product.
316       *
317       * @param s The strategy with which to traverse the promise.
318       * @param p A product of a promise to traverse.
319       * @return A promised product.
320       */
321      public static <A> Promise<P1<A>> sequence(final Strategy<Unit> s, final P1<Promise<A>> p) {
322        return join(promise(s, p)).fmap(P.<A>p1());
323      }
324    
325      /**
326       * Performs a right-fold reduction across a list in constant stack space.
327       *
328       * @param s The strategy with which to fold the list.
329       * @param f The function to apply on each element of the list.
330       * @param b The beginning value to start the application from.
331       * @return The final result after the right-fold reduction.
332       */
333      public static <A, B> F<List<A>, Promise<B>> foldRight(final Strategy<Unit> s, final F<A, F<B, B>> f, final B b) {
334        return new F<List<A>, Promise<B>>() {
335          public Promise<B> f(final List<A> as) {
336            return as.isEmpty() ? promise(s, p(b)) : liftM2(f).f(promise(s, P.p(as.head()))).f(
337                join(s, P1.curry(this).f(as.tail())));
338          }
339        };
340      }
341    
342      /**
343       * Performs a right-fold reduction across a Stream in constant stack space.
344       *
345       * @param s The strategy with which to fold the Stream.
346       * @param f The function to apply on each element of the Stream.
347       * @param b The beginning value to start the application from.
348       * @return The final result after the right-fold reduction.
349       */
350      public static <A, B> F<Stream<A>, Promise<B>> foldRightS(final Strategy<Unit> s, final F<A, F<P1<B>, B>> f,
351                                                               final B b) {
352        return new F<Stream<A>, Promise<B>>() {
353          public Promise<B> f(final Stream<A> as) {
354            return as.isEmpty() ? promise(s, P.p(b)) : liftM2(f).f(promise(s, P.p(as.head()))).f(
355                Promise.<P1<B>>join(s, new P1<Promise<P1<B>>>() {
356                  public Promise<P1<B>> _1() {
357                    return f(as.tail()._1()).fmap(P.<B>p1());
358                  }
359                }));
360          }
361        };
362      }
363    
364      /**
365       * Waits if necessary for the computation to complete, and then retrieves its result.
366       *
367       * @return The promised value.
368       */
369      public A claim() {
370        try {
371          l.await();
372        } catch (InterruptedException e) {
373          throw new Error(e);
374        }
375        return v.some();
376      }
377    
378      /**
379       * Waits if necessary for the computation to complete, and then retrieves its result.
380       *
381       * @param timeout the maximum time to wait
382       * @param unit    the time unit of the timeout argument
383       * @return The promised value, or none if the timeout was reached.
384       */
385      public Option<A> claim(final long timeout, final TimeUnit unit) {
386        try {
387          if (l.await(timeout, unit))
388            return v;
389        } catch (InterruptedException e) {
390          throw new Error(e);
391        }
392        return none();
393      }
394    
395      /**
396       * Returns true if this promise has been fulfilled.
397       *
398       * @return true if this promise has been fulfilled.
399       */
400      public boolean isFulfilled() {
401        return v.isSome();
402      }
403    
404      /**
405       * Binds the given function across a promise of this promise (Comonad pattern).
406       *
407       * @param f A function to apply within a new promise of this promise.
408       * @return A new promise of the result of applying the given function to this promise.
409       */
410      public <B> Promise<B> cobind(final F<Promise<A>, B> f) {
411        return promise(s, new P1<B>() {
412          public B _1() {
413            return f.f(Promise.this);
414          }
415        });
416      }
417    
418      /**
419       * Duplicates this promise to a promise of itself (Comonad pattern).
420       *
421       * @return a promise of this promise.
422       */
423      public Promise<Promise<A>> cojoin() {
424        final F<Promise<A>, Promise<A>> id = identity();
425        return cobind(id);
426      }
427    
428      /**
429       * Applies a stream of comonadic functions to this promise, returning a stream of values.
430       *
431       * @param fs A stream of functions to apply to this promise.
432       * @return A stream of the results of applying the given stream of functions to this promise.
433       */
434      public <B> Stream<B> sequenceW(final Stream<F<Promise<A>, B>> fs) {
435        return fs.isEmpty()
436               ? Stream.<B>nil()
437               : Stream.cons(fs.head().f(this), new P1<Stream<B>>() {
438                 public Stream<B> _1() {
439                   return sequenceW(fs.tail()._1());
440                 }
441               });
442      }
443    
444    }