001 package fj.control.parallel; 002 003 import fj.Effect; 004 import fj.F; 005 import fj.F2; 006 import fj.Function; 007 import fj.P; 008 import fj.P1; 009 import static fj.Function.compose; 010 import static fj.Function.curry; 011 import static fj.P1.fmap; 012 import static fj.P1.sequence; 013 import fj.data.Java; 014 import fj.data.List; 015 import fj.data.Array; 016 017 import java.util.concurrent.Callable; 018 import java.util.concurrent.CompletionService; 019 import java.util.concurrent.ExecutionException; 020 import java.util.concurrent.ExecutorService; 021 import java.util.concurrent.Future; 022 import java.util.concurrent.FutureTask; 023 024 /** 025 * Functional-style parallel evaluation strategies. 026 * A Strategy is a method of evaluating a product-1, yielding another product-1 from which the result of its evaluation 027 * can be retrieved at a later time. 028 * <p/> 029 * 030 * @version %build.number%<br> 031 * <ul> 032 * <li>$LastChangedRevision: 161 $</li> 033 * <li>$LastChangedDate: 2009-06-01 17:14:38 +1000 (Mon, 01 Jun 2009) $</li> 034 * <li>Author: runar</li> 035 * </ul> 036 */ 037 public final class Strategy<A> { 038 039 private final F<P1<A>, P1<A>> f; 040 041 private Strategy(final F<P1<A>, P1<A>> f) { 042 this.f = f; 043 } 044 045 /** 046 * Returns the functional representation of this Strategy, a function that evaluates a product-1. 047 * 048 * @return The function representing this strategy, which evaluates a product-1. 049 */ 050 public F<P1<A>, P1<A>> f() { 051 return f; 052 } 053 054 /** 055 * Constructs a strategy from the given evaluation function. 056 * 057 * @param f The execution function for the strategy 058 * @return A strategy that uses the given function to evaluate product-1s. 059 */ 060 public static <A> Strategy<A> strategy(final F<P1<A>, P1<A>> f) { 061 return new Strategy<A>(f); 062 } 063 064 /** 065 * Apply the strategy to the given product-1. 066 * 067 * @param a A P1 to evaluate according to this strategy. 068 * @return A P1 that yields the value from calling the given product-1. 069 */ 070 public P1<A> par(final P1<A> a) { 071 return f().f(a); 072 } 073 074 /** 075 * Promotes a function to a concurrent function. 076 * 077 * @param f A function to promote to a concurrent function. 078 * @return A function that executes concurrently when called, yielding a Future value. 079 */ 080 public <B> F<B, P1<A>> concurry(final F<B, A> f) { 081 return compose(f(), P1.<B, A>curry(f)); 082 } 083 084 /** 085 * Promotes a function of arity-2 to a concurrent function. 086 * 087 * @param f The function to promote to a concurrent function. 088 * @return A function that executes concurrently when called, yielding a product-1 that returns the value. 089 */ 090 public <B, C> F<B, F<C, P1<A>>> concurry(final F2<B, C, A> f) { 091 return new F<B, F<C, P1<A>>>() { 092 public F<C, P1<A>> f(final B b) { 093 return concurry(curry(f).f(b)); 094 } 095 }; 096 } 097 098 /** 099 * Waits for every Future in a list to obtain a value, and collects those values in a list. 100 * 101 * @param xs The list of Futures from which to get values. 102 * @return A list of values extracted from the Futures in the argument list. 103 */ 104 public static <A> List<P1<A>> mergeAll(final List<Future<A>> xs) { 105 return xs.map(Strategy.<A>obtain()); 106 } 107 108 /** 109 * Evaluates a list of product-1s in parallel. 110 * 111 * @param ps A list to evaluate in parallel. 112 * @return A list of the values of the product-1s in the argument. 113 */ 114 public P1<List<A>> parList(final List<P1<A>> ps) { 115 return sequence(ps.map(f())); 116 } 117 118 /** 119 * Maps the given function over the given list in parallel using this strategy. 120 * 121 * @param f A function to map over the given list in parallel. 122 * @param bs A list over which to map the given function in parallel. 123 * @return A product-1 that returns the list with all of its elements transformed by the given function. 124 */ 125 public <B> P1<List<A>> parMap(final F<B, A> f, final List<B> bs) { 126 return sequence(bs.map(concurry(f))); 127 } 128 129 /** 130 * Maps the given function over the given array in parallel using this strategy. 131 * 132 * @param f A function to map over the given array in parallel. 133 * @param bs An array over which to map the given function in parallel. 134 * @return A product-1 that returns the array with all of its elements transformed by the given function. 135 */ 136 public <B> P1<Array<A>> parMap(final F<B, A> f, final Array<B> bs) { 137 return sequence(bs.map(concurry(f))); 138 } 139 140 /** 141 * A strict version of parMap over lists. 142 * Maps the given function over the given list in parallel using this strategy, 143 * blocking the current thread until all values have been obtained. 144 * 145 * @param f A function to map over the given list in parallel. 146 * @param bs A list over which to map the given function in parallel. 147 * @return A list with all of its elements transformed by the given function. 148 */ 149 public <B> List<A> parMap1(final F<B, A> f, final List<B> bs) { 150 return compose(P1.<List<A>>__1(), parMapList(f)).f(bs); 151 } 152 153 /** 154 * A strict version of parMap over arrays. 155 * Maps the given function over the given arrays in parallel using this strategy, 156 * blocking the current thread until all values have been obtained. 157 * 158 * @param f A function to map over the given array in parallel. 159 * @param bs An array over which to map the given function in parallel. 160 * @return An array with all of its elements transformed by the given function. 161 */ 162 public <B> Array<A> parMap1(final F<B, A> f, final Array<B> bs) { 163 return compose(P1.<Array<A>>__1(), parMapArray(f)).f(bs); 164 } 165 166 /** 167 * Promotes a function to a parallel function on lists using this strategy. 168 * 169 * @param f A function to transform into a parallel function on lists. 170 * @return The function transformed into a parallel function on lists. 171 */ 172 public <B> F<List<B>, P1<List<A>>> parMapList(final F<B, A> f) { 173 return new F<List<B>, P1<List<A>>>() { 174 public P1<List<A>> f(final List<B> as) { 175 return parMap(f, as); 176 } 177 }; 178 } 179 180 /** 181 * First-class version of parMap on lists. 182 * 183 * @return A function that promotes another function to a parallel function on lists. 184 */ 185 public <B> F<F<B, A>, F<List<B>, P1<List<A>>>> parMapList() { 186 return new F<F<B, A>, F<List<B>, P1<List<A>>>>() { 187 public F<List<B>, P1<List<A>>> f(final F<B, A> f) { 188 return parMapList(f); 189 } 190 }; 191 } 192 193 /** 194 * First-class version of parMap1 on lists (parallel list functor). 195 * 196 * @return A function that promotes another function to a blocking parallel function on lists. 197 */ 198 public <B> F<F<B, A>, F<List<B>, List<A>>> parMapList1() { 199 return new F<F<B, A>, F<List<B>, List<A>>>() { 200 public F<List<B>, List<A>> f(final F<B, A> f) { 201 return new F<List<B>, List<A>>() { 202 public List<A> f(final List<B> bs) { 203 return parMap1(f, bs); 204 } 205 }; 206 } 207 }; 208 } 209 210 /** 211 * Promotes a function to a parallel function on arrays using this strategy. 212 * 213 * @param f A function to transform into a parallel function on arrays. 214 * @return The function transformed into a parallel function on arrays. 215 */ 216 public <B> F<Array<B>, P1<Array<A>>> parMapArray(final F<B, A> f) { 217 return new F<Array<B>, P1<Array<A>>>() { 218 public P1<Array<A>> f(final Array<B> as) { 219 return parMap(f, as); 220 } 221 }; 222 } 223 224 /** 225 * First-class version of parMap on arrays. 226 * 227 * @return A function that promotes another function to a parallel function on arrays. 228 */ 229 public <B> F<F<B, A>, F<Array<B>, P1<Array<A>>>> parMapArray() { 230 return new F<F<B, A>, F<Array<B>, P1<Array<A>>>>() { 231 public F<Array<B>, P1<Array<A>>> f(final F<B, A> f) { 232 return parMapArray(f); 233 } 234 }; 235 } 236 237 /** 238 * First-class version of parMap1 on arrays (parallel array functor). 239 * 240 * @return A function that promotes another function to a blocking parallel function on arrays. 241 */ 242 public <B> F<F<B, A>, F<Array<B>, Array<A>>> parMapArray1() { 243 return new F<F<B, A>, F<Array<B>, Array<A>>>() { 244 public F<Array<B>, Array<A>> f(final F<B, A> f) { 245 return new F<Array<B>, Array<A>>() { 246 public Array<A> f(final Array<B> bs) { 247 return parMap1(f, bs); 248 } 249 }; 250 } 251 }; 252 } 253 254 /** 255 * Binds the given function in parallel across the given list, using the given strategy, with a final join. 256 * 257 * @param s The strategy to use for parallelization. 258 * @param f The function to bind across the given list. 259 * @param as The list across which to bind the given function. 260 * @return A P1 containing the result of the parallel map operation after the final join. 261 */ 262 public static <A, B> P1<List<B>> parFlatMap(final Strategy<List<B>> s, 263 final F<A, List<B>> f, 264 final List<A> as) { 265 return fmap(List.<B>join()).f(s.parMap(f, as)); 266 } 267 268 /** 269 * Binds the given function in parallel across the given array, using the given strategy, with a final join. 270 * 271 * @param s The strategy to use for parallelization. 272 * @param f The function to bind across the given array. 273 * @param as The array across which to bind the given function. 274 * @return A P1 containing the result of the parallel map operation after the final join. 275 */ 276 public static <A, B> P1<Array<B>> parFlatMap(final Strategy<Array<B>> s, 277 final F<A, Array<B>> f, 278 final Array<A> as) { 279 return fmap(Array.<B>join()).f(s.parMap(f, as)); 280 } 281 282 /** 283 * Sequentially evaluates chunks (sub-sequences) of a list in parallel. Splits the list into chunks, 284 * evaluating the chunks simultaneously, but each chunk as a sequence. 285 * 286 * @param s The strategy to use for parallelization. 287 * @param chunkLength The length of each sequence. 288 * @param as The list to evaluate in parallel chunks. 289 * @return A product-1 containing the list of results extracted from the given list of product-1s. 290 */ 291 public static <A> P1<List<A>> parListChunk(final Strategy<List<A>> s, 292 final int chunkLength, 293 final List<P1<A>> as) { 294 return fmap(List.<A>join()).f(s.parList(as.partition(chunkLength).map(P1.<A>sequenceList()))); 295 } 296 297 /** 298 * Zips together two lists in parallel using a given function, with this strategy. 299 * Calls the given function once for each corresponding pair in the lists, position-wise, 300 * passing elements from the first list to the first argument of the function, and elements from the second list 301 * to the second argument of the function, yielding a list of the results. 302 * If the lists are not of the same length, the remaining elements of the longer list are ignored. 303 * 304 * @param f The function of arity-2 with which to zip. 305 * @param bs A list to zip with the given function. 306 * @param cs A list to zip with the given function. 307 * @return The list of the results of calling the given function on corresponding elements of the given lists. 308 */ 309 public <B, C> P1<List<A>> parZipWith(final F2<B, C, A> f, final List<B> bs, final List<C> cs) { 310 return sequence(bs.zipWith(cs, concurry(f))); 311 } 312 313 /** 314 * Zips together two arrays in parallel using a given function, with this strategy. 315 * Calls the given function once for each corresponding pair in the arrays, position-wise, 316 * passing elements from the first array to the first argument of the function, and elements from the second array 317 * to the second argument of the function, yielding a array of the results. 318 * If the arrays are not of the same length, the remaining elements of the longer array are ignored. 319 * 320 * @param f The function of arity-2 with which to zip. 321 * @param bs A array to zip with the given function. 322 * @param cs A array to zip with the given function. 323 * @return The array of the results of calling the given function on corresponding elements of the given arrays. 324 */ 325 public <B, C> P1<Array<A>> parZipWith(final F2<B, C, A> f, final Array<B> bs, final Array<C> cs) { 326 return sequence(bs.zipWith(cs, concurry(f))); 327 } 328 329 /** 330 * Lifts a given function of arity-2 so that it zips together two lists in parallel, 331 * using this strategy, calling the function once for each corresponding pair in the lists, position-wise. 332 * 333 * @param f The function of arity-2 with which to zip. 334 * @return A transformation that zips two lists using the argument function, in parallel. 335 */ 336 public <B, C> F2<List<B>, List<C>, P1<List<A>>> parZipListWith(final F2<B, C, A> f) { 337 return new F2<List<B>, List<C>, P1<List<A>>>() { 338 public P1<List<A>> f(final List<B> bs, final List<C> cs) { 339 return parZipWith(f, bs, cs); 340 } 341 }; 342 } 343 344 /** 345 * Lifts a given function of arity-2 so that it zips together two arrays in parallel, 346 * using this strategy, calling the function once for each corresponding pair in the arrays, position-wise. 347 * 348 * @param f The function of arity-2 with which to zip. 349 * @return A transformation that zips two arrays using the argument function, in parallel. 350 */ 351 public <B, C> F2<Array<B>, Array<C>, P1<Array<A>>> parZipArrayWith(final F2<B, C, A> f) { 352 return new F2<Array<B>, Array<C>, P1<Array<A>>>() { 353 public P1<Array<A>> f(final Array<B> bs, final Array<C> cs) { 354 return parZipWith(f, bs, cs); 355 } 356 }; 357 } 358 359 /** 360 * Returns a function which returns a product-1 which waits for the given Future to obtain a value. 361 * 362 * @return A function which, given a Future, yields a product-1 that waits for it. 363 */ 364 public static <A> F<Future<A>, P1<A>> obtain() { 365 return new F<Future<A>, P1<A>>() { 366 public P1<A> f(final Future<A> t) { 367 return obtain(t); 368 } 369 }; 370 } 371 372 /** 373 * Provides a product-1 that waits for the given future to obtain a value. 374 * 375 * @param t A Future for which to wait. 376 * @return A product-1 that waits for the given future to obtain a value. 377 */ 378 public static <A> P1<A> obtain(final Future<A> t) { 379 return new P1<A>() { 380 public A _1() { 381 try { 382 return t.get(); 383 } catch (InterruptedException e) { 384 Thread.currentThread().interrupt(); 385 throw new Error(e); 386 } catch (ExecutionException e) { 387 throw new Error(e); 388 } 389 } 390 }; 391 } 392 393 /** 394 * Returns an Effect that waits for a given Future to obtain a value, discarding the value. 395 * 396 * @return An effect, which, given a Future, waits for it to obtain a value, discarding the value. 397 */ 398 public static <A> Effect<Future<A>> discard() { 399 return new Effect<Future<A>>() { 400 public void e(final Future<A> a) { 401 Strategy.<A>obtain().f(a)._1(); 402 } 403 }; 404 } 405 406 /** 407 * Provides a simple parallelization strategy that creates, and discards, a new thread for 408 * every evaluation. 409 * 410 * @return a simple parallelization strategy that creates, and discards, a new thread for 411 * every evaluation. 412 */ 413 public static <A> Strategy<A> simpleThreadStrategy() { 414 return strategy(new F<P1<A>, P1<A>>() { 415 public P1<A> f(final P1<A> p) { 416 final FutureTask<A> t = new FutureTask<A>(Java.<A>P1_Callable().f(p)); 417 new Thread(t).start(); 418 return obtain(t); 419 } 420 }); 421 } 422 423 /** 424 * Provides a parallelization strategy that uses an ExecutorService to control the method and 425 * degree of parallelism. 426 * 427 * @param s The ExecutorService to use for scheduling evaluations. 428 * @return A Strategy that uses the provided ExecutorService to control the method and degree 429 * of parallelism. 430 */ 431 public static <A> Strategy<A> executorStrategy(final ExecutorService s) { 432 return strategy(new F<P1<A>, P1<A>>() { 433 public P1<A> f(final P1<A> p) { 434 return obtain(s.submit(Java.<A>P1_Callable().f(p))); 435 } 436 }); 437 } 438 439 /** 440 * Provides a parallelization strategy that uses a CompletionService to control the method and 441 * degree of parallelism, and where each parallel task's completion is registered with the service. 442 * 443 * @param s The CompletionService to use for scheduling evaluations and detect their completion. 444 * @return A Strategy that uses the provided CompletionService to control the method and degree of parallelism, 445 * and notifies the service of task completion. 446 */ 447 public static <A> Strategy<A> completionStrategy(final CompletionService<A> s) { 448 return strategy(new F<P1<A>, P1<A>>() { 449 public P1<A> f(final P1<A> p) { 450 return obtain(s.submit(Java.<A>P1_Callable().f(p))); 451 } 452 }); 453 } 454 455 /** 456 * Provides a strategy that performs sequential (non-concurrent) evaluation of its argument. 457 * 458 * @return A strategy that performs sequential (non-concurrent) evaluation of its argument. 459 */ 460 public static <A> Strategy<A> seqStrategy() { 461 return strategy(new F<P1<A>, P1<A>>() { 462 public P1<A> f(final P1<A> a) { 463 return P.p(a._1()); 464 } 465 }); 466 } 467 468 /** 469 * Provides a strategy that performs no evaluation of its argument. 470 * 471 * @return A strategy that performs no evaluation of its argument. 472 */ 473 public static <A> Strategy<A> idStrategy() { 474 return strategy(Function.<P1<A>>identity()); 475 } 476 477 /** 478 * Maps the given bijective transformation across this strategy (Exponential Functor pattern). 479 * 480 * @param f A transformation from this strategy's codomain to the resulting strategy's codomain. 481 * @param g A transformation from the resulting strategy's domain to this strategy's domain. 482 * @return A new strategy that maps to this strategy and back again. 483 */ 484 public <B> Strategy<B> xmap(final F<P1<A>, P1<B>> f, final F<P1<B>, P1<A>> g) { 485 return strategy(compose(f, compose(f(), g))); 486 } 487 488 /** 489 * Maps the given transformation across this strategy's domain (Invariant Functor pattern). 490 * 491 * @param f A transformation from this strategy's codomain to the resulting strategy's codomain. 492 * @return A new strategy that applies the given transformation after each application of this strategy. 493 */ 494 public Strategy<A> map(final F<P1<A>, P1<A>> f) { 495 return xmap(f, Function.<P1<A>>identity()); 496 } 497 498 /** 499 * Maps the given transformation across this strategy's codomain (Invariant Functor pattern). 500 * 501 * @param f A transformation from the resulting strategy's domain to this strategy's domain. 502 * @return A new strategy that applies the given transformation before each application of this strategy. 503 */ 504 public Strategy<A> comap(final F<P1<A>, P1<A>> f) { 505 return xmap(Function.<P1<A>>identity(), f); 506 } 507 508 /** 509 * Provides an error-handling strategy. Captures any uncaught runtime errors encountered by this strategy and applies 510 * the given side-effect to them. 511 * 512 * @param e The effect that should handle errors. 513 * @return A strategy that captures any runtime errors with a side-effect. 514 */ 515 public Strategy<A> errorStrategy(final Effect<Error> e) { 516 return errorStrategy(this, e); 517 } 518 519 /** 520 * Provides an error-handling strategy. Captures any uncaught runtime errors encountered by the given strategy 521 * and applies the given side-effect to them. 522 * 523 * @param s The strategy to equip with an error-handling effect. 524 * @param e The effect that should handle errors. 525 * @return A strategy that captures any runtime errors with a side-effect. 526 */ 527 public static <A> Strategy<A> errorStrategy(final Strategy<A> s, final Effect<Error> e) { 528 return s.comap(new F<P1<A>, P1<A>>() { 529 public P1<A> f(final P1<A> a) { 530 return new P1<A>() { 531 public A _1() { 532 try { 533 return a._1(); 534 } catch (Throwable t) { 535 final Error error = new Error(t); 536 e.e(error); 537 throw error; 538 } 539 } 540 }; 541 } 542 }); 543 } 544 545 /** 546 * Provides a normalising strategy that fully evaluates its Callable argument. 547 * 548 * @param s A non-normalising strategy to use for the evaluation. 549 * @return A new strategy that fully evaluates Callables, using the given strategy. 550 */ 551 public static <A> Strategy<Callable<A>> callableStrategy(final Strategy<Callable<A>> s) { 552 return s.comap(new F<P1<Callable<A>>, P1<Callable<A>>>() { 553 public P1<Callable<A>> f(final P1<Callable<A>> a) { 554 return P1.curry(Callables.<A>normalise()).f(a._1()); 555 } 556 }); 557 } 558 559 }