001 package fj.control.parallel; 002 003 import fj.Effect; 004 import fj.F; 005 import fj.F2; 006 import fj.P; 007 import fj.P1; 008 import fj.P2; 009 import fj.Unit; 010 import static fj.P.p; 011 import static fj.Function.curry; 012 import static fj.Function.identity; 013 import static fj.control.parallel.Actor.actor; 014 import static fj.control.parallel.Callables.normalise; 015 import static fj.control.parallel.QueueActor.queueActor; 016 import fj.data.Either; 017 import fj.data.List; 018 import fj.data.Option; 019 import static fj.data.Option.none; 020 import static fj.data.Option.some; 021 import fj.data.Stream; 022 023 import java.util.LinkedList; 024 import java.util.Queue; 025 import java.util.concurrent.Callable; 026 import java.util.concurrent.CountDownLatch; 027 import java.util.concurrent.TimeUnit; 028 029 /** 030 * Represents a non-blocking future value. Products, functions, and actors, given to the methods on this class, 031 * are executed concurrently, and the Promise serves as a handle on the result of the computation. Provides monadic 032 * operations so that future computations can be combined 033 * <p/> 034 * Author: Runar 035 */ 036 public final class Promise<A> { 037 038 private final Actor<P2<Either<P1<A>, Actor<A>>, Promise<A>>> actor; 039 040 private final Strategy<Unit> s; 041 042 private final CountDownLatch l = new CountDownLatch(1); 043 private volatile Option<A> v = none(); 044 private final Queue<Actor<A>> waiting = new LinkedList<Actor<A>>(); 045 046 private Promise(final Strategy<Unit> s, final Actor<P2<Either<P1<A>, Actor<A>>, Promise<A>>> qa) { 047 this.s = s; 048 actor = qa; 049 } 050 051 private static <A> Promise<A> mkPromise(final Strategy<Unit> s) { 052 final Actor<P2<Either<P1<A>, Actor<A>>, Promise<A>>> q = 053 queueActor(s, new Effect<P2<Either<P1<A>, Actor<A>>, Promise<A>>>() { 054 public void e(final P2<Either<P1<A>, Actor<A>>, Promise<A>> p) { 055 final Promise<A> snd = p._2(); 056 final Queue<Actor<A>> as = snd.waiting; 057 if (p._1().isLeft()) { 058 final A a = p._1().left().value()._1(); 059 snd.v = some(a); 060 snd.l.countDown(); 061 while (!as.isEmpty()) 062 as.remove().act(a); 063 } else if (snd.v.isNone()) 064 as.add(p._1().right().value()); 065 else 066 p._1().right().value().act(snd.v.some()); 067 } 068 }).asActor(); 069 return new Promise<A>(s, q); 070 } 071 072 /** 073 * Promises to provide the value of the given 1-product, in the future. 074 * Represents the unit function for promises. 075 * 076 * @param s The strategy with which to fulfil the promise. 077 * @param a The 1-product to evaluate concurrently. 078 * @return A promise representing the future result of evaluating the given 1-product. 079 */ 080 public static <A> Promise<A> promise(final Strategy<Unit> s, final P1<A> a) { 081 final Promise<A> p = mkPromise(s); 082 p.actor.act(P.p(Either.<P1<A>, Actor<A>>left(a), p)); 083 return p; 084 } 085 086 /** 087 * Provides a first-class unit function for promises. 088 * 089 * @param s The strategy with which to fulfil promises. 090 * @return A function that, given a 1-product, yields a promise of that product's value. 091 */ 092 public static <A> F<P1<A>, Promise<A>> promise(final Strategy<Unit> s) { 093 return new F<P1<A>, Promise<A>>() { 094 public Promise<A> f(final P1<A> a) { 095 return promise(s, a); 096 } 097 }; 098 } 099 100 /** 101 * Provides a promise to call the given Callable in the future. 102 * 103 * @param s The strategy with which to fulfil the promise. 104 * @param a The Callable to evaluate concurrently. 105 * @return A promise of a new Callable that will return the result of calling the given Callable. 106 */ 107 public static <A> Promise<Callable<A>> promise(final Strategy<Unit> s, final Callable<A> a) { 108 return promise(s, new P1<Callable<A>>() { 109 public Callable<A> _1() { 110 return normalise(a); 111 } 112 }); 113 } 114 115 /** 116 * Transforms any function so that it returns a promise of a value instead of an actual value. 117 * Represents the Kleisli arrow for the Promise monad. 118 * 119 * @param s The strategy with which to fulfil the promise. 120 * @param f The function to turn into a promise-valued function. 121 * @return The given function transformed into a function that returns a promise. 122 */ 123 public static <A, B> F<A, Promise<B>> promise(final Strategy<Unit> s, final F<A, B> f) { 124 return new F<A, Promise<B>>() { 125 public Promise<B> f(final A a) { 126 return promise(s, P1.curry(f).f(a)); 127 } 128 }; 129 } 130 131 /** 132 * Promises to send a value to the given actor in the future. 133 * 134 * @param a An actor that will receive this Promise's value in the future. 135 */ 136 public void to(final Actor<A> a) { 137 actor.act(P.p(Either.<P1<A>, Actor<A>>right(a), this)); 138 } 139 140 /** 141 * Provides a promise to apply the given function to this promise's future value (covariant functor pattern). 142 * 143 * @param f The function to apply to this promise's future value. 144 * @return A promise representing the future result of applying the given function to this promised value. 145 */ 146 public <B> Promise<B> fmap(final F<A, B> f) { 147 return bind(promise(s, f)); 148 } 149 150 /** 151 * Promotes any function to a transformation between promises (covariant functor pattern). 152 * 153 * @param f The function to promote to a transformation between promises. 154 * @return That function lifted to a function on Promises. 155 */ 156 public static <A, B> F<Promise<A>, Promise<B>> fmap_(final F<A, B> f) { 157 return new F<Promise<A>, Promise<B>>() { 158 public Promise<B> f(final Promise<A> a) { 159 return a.fmap(f); 160 } 161 }; 162 } 163 164 /** 165 * Turns a promise of a promise into just a promise. The join function for the Promise monad. 166 * Promise to give it a Promise of an A, and it will promise you an A in return. 167 * 168 * @param p A promise of a promise. 169 * @return The promised promise. 170 */ 171 public static <A> Promise<A> join(final Promise<Promise<A>> p) { 172 final F<Promise<A>, Promise<A>> id = identity(); 173 return p.bind(id); 174 } 175 176 /** 177 * Turns a product of a promise into just a promise. Does not block on the product by calling it, 178 * but creates a new promise with a final join. 179 * 180 * @param s The strategy with which to fulfil the promise. 181 * @param p A product-1 of a promise to turn into just a promise. 182 * @return The joined promise. 183 */ 184 public static <A> Promise<A> join(final Strategy<Unit> s, final P1<Promise<A>> p) { 185 return join(promise(s, p)); 186 } 187 188 /** 189 * Binds the given function over this promise, with a final join. 190 * The bind function for the Promise monad. 191 * 192 * @param f The function to bind over this promise. 193 * @return The result of applying the given function to this promised value. 194 */ 195 public <B> Promise<B> bind(final F<A, Promise<B>> f) { 196 final Promise<B> r = mkPromise(s); 197 final Actor<B> ab = actor(s, new Effect<B>() { 198 public void e(final B b) { 199 r.actor.act(P.p(Either.<P1<B>, Actor<B>>left(P.p(b)), r)); 200 } 201 }); 202 to(ab.promise().comap(f)); 203 return r; 204 } 205 206 /** 207 * Performs function application within a promise (applicative functor pattern). 208 * 209 * @param pf The promised function to apply. 210 * @return A new promise after applying the given promised function to this promise. 211 */ 212 public <B> Promise<B> apply(final Promise<F<A, B>> pf) { 213 return pf.bind(new F<F<A, B>, Promise<B>>() { 214 public Promise<B> f(final F<A, B> f) { 215 return fmap(f); 216 } 217 }); 218 } 219 220 /** 221 * Binds the given function to this promise and the given promise, with a final join. 222 * 223 * @param pb A promise with which to bind the given function. 224 * @param f The function to apply to the given promised values. 225 * @return A new promise after performing the map, then final join. 226 */ 227 public <B, C> Promise<C> bind(final Promise<B> pb, final F<A, F<B, C>> f) { 228 return pb.apply(fmap(f)); 229 } 230 231 /** 232 * Binds the given function to this promise and the given promise, with a final join. 233 * 234 * @param p A promise with which to bind the given function. 235 * @param f The function to apply to the given promised values. 236 * @return A new promise after performing the map, then final join. 237 */ 238 public <B, C> Promise<C> bind(final P1<Promise<B>> p, final F<A, F<B, C>> f) { 239 return Promise.join(s, p).apply(fmap(f)); 240 } 241 242 /** 243 * Promotes a function of arity-2 to a function on promises. 244 * 245 * @param f The function to promote. 246 * @return A function of arity-2 promoted to map over promises. 247 */ 248 public static <A, B, C> F<Promise<A>, F<Promise<B>, Promise<C>>> liftM2(final F<A, F<B, C>> f) { 249 return curry(new F2<Promise<A>, Promise<B>, Promise<C>>() { 250 public Promise<C> f(final Promise<A> ca, final Promise<B> cb) { 251 return ca.bind(cb, f); 252 } 253 }); 254 } 255 256 /** 257 * Turns a List of promises into a single promise of a List. 258 * 259 * @param s The strategy with which to sequence the promises. 260 * @param as The list of promises to transform. 261 * @return A single promise for the given List. 262 */ 263 public static <A> Promise<List<A>> sequence(final Strategy<Unit> s, final List<Promise<A>> as) { 264 return join(foldRight(s, liftM2(List.<A>cons()), promise(s, P.p(List.<A>nil()))).f(as)); 265 } 266 267 /** 268 * First-class version of the sequence function through a List. 269 * 270 * @param s The strategy with which to sequence a given list of promises. 271 * @return A function that turns a list of promises into a single promise of a list. 272 */ 273 public static <A> F<List<Promise<A>>, Promise<List<A>>> sequence(final Strategy<Unit> s) { 274 return new F<List<Promise<A>>, Promise<List<A>>>() { 275 public Promise<List<A>> f(final List<Promise<A>> as) { 276 return sequence(s, as); 277 } 278 }; 279 } 280 281 /** 282 * Turns a Stream of promises into a single promise of a Stream. 283 * 284 * @param s The strategy with which to sequence the promises. 285 * @param as The Stream of promises to transform. 286 * @return A single promise for the given Stream. 287 */ 288 public static <A> Promise<Stream<A>> sequence(final Strategy<Unit> s, final Stream<Promise<A>> as) { 289 return join(foldRightS(s, curry(new F2<Promise<A>, P1<Promise<Stream<A>>>, Promise<Stream<A>>>() { 290 public Promise<Stream<A>> f(final Promise<A> o, final P1<Promise<Stream<A>>> p) { 291 return o.bind(new F<A, Promise<Stream<A>>>() { 292 public Promise<Stream<A>> f(final A a) { 293 return p._1().fmap(Stream.<A>cons_().f(a)); 294 } 295 }); 296 } 297 }), promise(s, P.p(Stream.<A>nil()))).f(as)); 298 } 299 300 /** 301 * First-class version of the sequence function through a Stream. 302 * 303 * @param s The strategy with which to sequence a given Stream of promises. 304 * @return A function that turns a list of promises into a single promise of a Stream.. 305 */ 306 public static <A> F<List<Promise<A>>, Promise<List<A>>> sequenceS(final Strategy<Unit> s) { 307 return new F<List<Promise<A>>, Promise<List<A>>>() { 308 public Promise<List<A>> f(final List<Promise<A>> as) { 309 return sequence(s, as); 310 } 311 }; 312 } 313 314 /** 315 * Transforms a product of a promise to a promise of a product. 316 * 317 * @param s The strategy with which to traverse the promise. 318 * @param p A product of a promise to traverse. 319 * @return A promised product. 320 */ 321 public static <A> Promise<P1<A>> sequence(final Strategy<Unit> s, final P1<Promise<A>> p) { 322 return join(promise(s, p)).fmap(P.<A>p1()); 323 } 324 325 /** 326 * Performs a right-fold reduction across a list in constant stack space. 327 * 328 * @param s The strategy with which to fold the list. 329 * @param f The function to apply on each element of the list. 330 * @param b The beginning value to start the application from. 331 * @return The final result after the right-fold reduction. 332 */ 333 public static <A, B> F<List<A>, Promise<B>> foldRight(final Strategy<Unit> s, final F<A, F<B, B>> f, final B b) { 334 return new F<List<A>, Promise<B>>() { 335 public Promise<B> f(final List<A> as) { 336 return as.isEmpty() ? promise(s, p(b)) : liftM2(f).f(promise(s, P.p(as.head()))).f( 337 join(s, P1.curry(this).f(as.tail()))); 338 } 339 }; 340 } 341 342 /** 343 * Performs a right-fold reduction across a Stream in constant stack space. 344 * 345 * @param s The strategy with which to fold the Stream. 346 * @param f The function to apply on each element of the Stream. 347 * @param b The beginning value to start the application from. 348 * @return The final result after the right-fold reduction. 349 */ 350 public static <A, B> F<Stream<A>, Promise<B>> foldRightS(final Strategy<Unit> s, final F<A, F<P1<B>, B>> f, 351 final B b) { 352 return new F<Stream<A>, Promise<B>>() { 353 public Promise<B> f(final Stream<A> as) { 354 return as.isEmpty() ? promise(s, P.p(b)) : liftM2(f).f(promise(s, P.p(as.head()))).f( 355 Promise.<P1<B>>join(s, new P1<Promise<P1<B>>>() { 356 public Promise<P1<B>> _1() { 357 return f(as.tail()._1()).fmap(P.<B>p1()); 358 } 359 })); 360 } 361 }; 362 } 363 364 /** 365 * Waits if necessary for the computation to complete, and then retrieves its result. 366 * 367 * @return The promised value. 368 */ 369 public A claim() { 370 try { 371 l.await(); 372 } catch (InterruptedException e) { 373 throw new Error(e); 374 } 375 return v.some(); 376 } 377 378 /** 379 * Waits if necessary for the computation to complete, and then retrieves its result. 380 * 381 * @param timeout the maximum time to wait 382 * @param unit the time unit of the timeout argument 383 * @return The promised value, or none if the timeout was reached. 384 */ 385 public Option<A> claim(final long timeout, final TimeUnit unit) { 386 try { 387 if (l.await(timeout, unit)) 388 return v; 389 } catch (InterruptedException e) { 390 throw new Error(e); 391 } 392 return none(); 393 } 394 395 /** 396 * Returns true if this promise has been fulfilled. 397 * 398 * @return true if this promise has been fulfilled. 399 */ 400 public boolean isFulfilled() { 401 return v.isSome(); 402 } 403 404 /** 405 * Binds the given function across a promise of this promise (Comonad pattern). 406 * 407 * @param f A function to apply within a new promise of this promise. 408 * @return A new promise of the result of applying the given function to this promise. 409 */ 410 public <B> Promise<B> cobind(final F<Promise<A>, B> f) { 411 return promise(s, new P1<B>() { 412 public B _1() { 413 return f.f(Promise.this); 414 } 415 }); 416 } 417 418 /** 419 * Duplicates this promise to a promise of itself (Comonad pattern). 420 * 421 * @return a promise of this promise. 422 */ 423 public Promise<Promise<A>> cojoin() { 424 final F<Promise<A>, Promise<A>> id = identity(); 425 return cobind(id); 426 } 427 428 /** 429 * Applies a stream of comonadic functions to this promise, returning a stream of values. 430 * 431 * @param fs A stream of functions to apply to this promise. 432 * @return A stream of the results of applying the given stream of functions to this promise. 433 */ 434 public <B> Stream<B> sequenceW(final Stream<F<Promise<A>, B>> fs) { 435 return fs.isEmpty() 436 ? Stream.<B>nil() 437 : Stream.cons(fs.head().f(this), new P1<Stream<B>>() { 438 public Stream<B> _1() { 439 return sequenceW(fs.tail()._1()); 440 } 441 }); 442 } 443 444 }