Christian Krause
2018-Feb-12 19:08 UTC
[Rd] [parallel] fixes load balancing of parLapplyLB
Dear R-Devel List, **TL;DR:** The function **parLapplyLB** of the parallel package has [reportedly][1] (see also attached RRD output) not been doing its job, i.e. not actually balancing the load. My colleague Dirk Sarpe and I found the cause of the problem and we also have a patch to fix it (attached). A similar fix has also been provided [here][2]. [1]: https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load [2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792 ## The Call Chain First, we traced the relevant R function calls through the code, beginning with `parLapplyLB`: 1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then **clusterApplyLB** 2. **splitList:** clusterApply.R:157 3. **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply** 4. **dynamicClusterApply:** clusterApply.R:39 ## splitList We used both our whiteboard and an R session to manually *run* a few examples. We were using lists of 100 elements and 5 workers. First, lets take a look at **splitList**: ```r> sapply(parallel:::splitList(1:100, 5), length)[1] 20 20 20 20 20> sapply(parallel:::splitList(1:97, 5), length)[1] 20 19 19 19 20> sapply(parallel:::splitList(1:97, 20), length)[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5 ``` As we can see in the examples, the work is distributed as equally as possible. ## dynamicClusterApply **dynamicClusterApply** works this way (simplified): 1. it first gives a chunk to each worker 2. once a worker comes back with the result, it is given the next chunk **This is the important part:** As long as there are **more** chunks than workers, there will be load balancing. If there are fewer chunks than workers, each worker will get **at most one chunk** and there is **no** load balancing. ## parLapplyLB This is how **parLapplyLB** splits the input list (with a bit of refactoring, for readability): ```r parLapplyLB <- function(cl = NULL, X, fun, ...) { cl <- defaultCluster(cl) chunks <- splitList(X, length(cl)) do.call(c, clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), quote = TRUE) } ``` For our examples, the chunks have these sizes: ```r> sapply(parallel:::splitList(1:100, 5), length)[1] 20 20 20 20 20 ``` There we have it: 5 chunks. 5 workers. With this work distribution, there can't possibly be any load balancing, because each worker is given a single chunk and then it stops working because there are no more chunks. Instead, **parLapplyLB** should look like this (patch is attached): ```r parLapplyLB <- function(cl = NULL, X, fun, ...) { cl <- defaultCluster(cl) chunkSize <- max(length(cl), ceiling(length(X) / length(cl))) chunks <- splitList(X, chunkSize) do.call(c, clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), quote = TRUE) } ``` Examples with a cluster of 5 workers: ```r # length(cl) < length(X)> sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)[1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 # length(cl) >= length(X)> sapply(parallel:::splitList(1:4, 4), length)[1] 1 1 1 1 # one worker idles here, but we can't do better than that ``` With this patch, the number of chunks is larger than the number of workers, if possible at all, and then load balancing should work. Best Regards -- Christian Krause Scientific Computing Administration and Support ------------------------------------------------------------------------------------------------------------------------ Phone: +49 341 97 33144 Email: christian.krause at idiv.de ------------------------------------------------------------------------------------------------------------------------ German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig Deutscher Platz 5e 04103 Leipzig Germany ------------------------------------------------------------------------------------------------------------------------ iDiv is a research centre of the DFG ? Deutsche Forschungsgemeinschaft iDiv ist eine zentrale Einrichtung der Universit?t Leipzig im Sinne des ? 92 Abs. 1 S?chsHSFG und wird zusammen mit der Martin-Luther-Universit?t Halle-Wittenberg und der Friedrich-Schiller-Universit?t Jena betrieben sowie in Kooperation mit dem Helmholtz-Zentrum f?r Umweltforschung GmbH ? UFZ. Beteiligte Kooperationspartner sind die folgenden au?eruniversit?ren Forschungseinrichtungen: das Helmholtz-Zentrum f?r Umweltforschung GmbH - UFZ, das Max-Planck-Institut f?r Biogeochemie (MPI BGC), das Max-Planck-Institut f?r chemische ?kologie (MPI CE), das Max-Planck-Institut f?r evolution?re Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen (DSMZ), das Leibniz-Institut f?r Pflanzenbiochemie (IPB), das Leibniz-Institut f?r Pflanzengenetik und Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum f?r Naturkunde G?rlitz (SMNG). USt-IdNr. DE 141510383 -------------- next part -------------- A non-text attachment was scrubbed... Name: fixes-parLapplyLB.patch Type: text/x-patch Size: 676 bytes Desc: not available URL: <https://stat.ethz.ch/pipermail/r-devel/attachments/20180212/927bc7d1/attachment.bin> -------------- next part -------------- A non-text attachment was scrubbed... Name: r-parallel-load-balancing.png Type: image/png Size: 47263 bytes Desc: not available URL: <https://stat.ethz.ch/pipermail/r-devel/attachments/20180212/927bc7d1/attachment.png>
Christian Krause
2018-Feb-19 18:21 UTC
[Rd] [parallel] fixes load balancing of parLapplyLB
Dear R-Devel List, I have installed R 3.4.3 with the patch applied on our cluster and ran a *real-world* job of one of our users to confirm that the patch works to my satisfaction. Here are the results. The original was a series of jobs, all essentially doing the same stuff using bootstrapped data, so for the original there is more data and I show the arithmetic mean with standard deviation. The confirmation with the patched R was only a single instance of that series of jobs. ## Job Efficiency The job efficiency is defined as (this is what the `qacct-efficiency` tool below does): ``` efficiency = cputime / cores / wallclocktime * 100% ``` In simpler words: how well did the job utilize its CPU cores. It shows the percentage of time the job was actually doing stuff, as opposed to the difference: ``` wasted = 100% - efficiency ``` ... which, essentially, tells us how much of the resources were wasted, i.e. CPU cores just idling, without being used by anyone. We care a lot about that because, for our scientific computing cluster, wasted resources is like burning money. ### original This is the entire series from our job accounting database, filteres the successful jobs, calculates efficiency and then shows the average and standard deviation of the efficiency: ``` $ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd n=945 ? 61.7276 ? 7.78719 ``` This is the entire series from our job accounting database, filteres the successful jobs, calculates efficiency and does sort of a histogram-like binning before calculation of mean and standard deviation (to get a more detailed impression of the distribution when standard deviation of the previous command is comparatively high): ``` $ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w 10 | sort -gk1 | column -t 10 - 20 -> n=3 ? 19.21666666666667 ? 0.9112811494447459 20 - 30 -> n=6 ? 26.418333333333333 ? 2.665996374091058 30 - 40 -> n=12 ? 35.11583333333334 ? 2.8575783082671196 40 - 50 -> n=14 ? 45.35285714285715 ? 2.98623361591005 50 - 60 -> n=344 ? 57.114593023255814 ? 2.1922005551774415 60 - 70 -> n=453 ? 64.29536423841049 ? 2.8334788433963856 70 - 80 -> n=108 ? 72.95592592592598 ? 2.5219474143639276 80 - 90 -> n=5 ? 81.526 ? 1.2802265424525452 ``` I have attached an example graph from our monitoring system of a single instance in my previous mail. There you can see that the load balancing does not actually work, i.e. same as `parLapply`. This reflects in the job efficiency. ### patch applied This is the single instance I used to confirm that the patch works: ``` $ qacct -j 4562202 | qacct-efficiency 97.36 ``` The graph from our monitoring system is attached. As you can see, the load balancing works to a satisfying degree and the efficiency is well above 90% which was what I had hoped for :-) ## Additional Notes The list used in this jobs `parLapplyLB` is 5812 elements long. With the `splitList`-chunking from the patch, you'll get 208 lists of about 28 elements (208 chunks of size 28). The job ran on 28 CPU cores and had a wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the function we apply to our list takes about 580 seconds per list element, i.e. about 10 minutes. I suppose, for that runtime, we would get even better load balancing if we would reduce the chunk size even further, maybe even down to 1, thus getting our efficiency even closer to 100%. Of course, for really short-running functions, a higher chunk size may be more efficient because of the overhead. In our case, the overhead is negligible and that is why the low chunk size works really well. In contrast, for smallish lists with short-running functions, you might not even need load balancing and `parLapply` suffices. It only becomes an issue, when the runtime of the function is high and / or varying. In our case, the entire runtime of the entire series of jobs was: ``` $ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print sum, "seconds" }' 4.72439e+09 seconds ``` Thats about 150 years on a single core or 7.5 years on a 20 core server! Our user was constantly using about 500 cores, so this took about 110 days. If you compare this to my 97% efficiency example, the jobs could have been finished in 75 days instead ;-) ## Upcoming Patch If this patch gets applied to the R code base (and I hope it will :-)) my colleague and I will submit another patch that adds the chunk size as an optional parameter to all off the load balancing functions. With that parameter, users of these functions *can* decide for themselves which chunk size they prefer for their code. As mentioned before, the most efficient chunk size depends on the used functions runtime, which is the only thing R does not know and users really should be allowed to specify explicitly. The default of this new optional parameter would be the one we used here and this would make that upcoming patch fully source-compatible. Best Regards On 02/12/2018 08:08 PM, Christian Krause wrote:> Dear R-Devel List, > > **TL;DR:** The function **parLapplyLB** of the parallel package has [reportedly][1] (see also attached RRD output) not > been doing its job, i.e. not actually balancing the load. My colleague Dirk Sarpe and I found the cause of the problem > and we also have a patch to fix it (attached). A similar fix has also been provided [here][2]. > > [1]: https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load > [2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792 > > > ## The Call Chain > > First, we traced the relevant R function calls through the code, beginning with `parLapplyLB`: > > 1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then **clusterApplyLB** > 2. **splitList:** clusterApply.R:157 > 3. **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply** > 4. **dynamicClusterApply:** clusterApply.R:39 > > > ## splitList > > We used both our whiteboard and an R session to manually *run* a few examples. We were using lists of 100 elements and 5 > workers. First, lets take a look at **splitList**: > > ```r >> sapply(parallel:::splitList(1:100, 5), length) > [1] 20 20 20 20 20 > >> sapply(parallel:::splitList(1:97, 5), length) > [1] 20 19 19 19 20 > >> sapply(parallel:::splitList(1:97, 20), length) > [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5 > ``` > > As we can see in the examples, the work is distributed as equally as possible. > > > ## dynamicClusterApply > > **dynamicClusterApply** works this way (simplified): > > 1. it first gives a chunk to each worker > 2. once a worker comes back with the result, it is given the next chunk > > **This is the important part:** As long as there are **more** chunks than workers, there will be load balancing. If > there are fewer chunks than workers, each worker will get **at most one chunk** and there is **no** load balancing. > > > ## parLapplyLB > > This is how **parLapplyLB** splits the input list (with a bit of refactoring, for readability): > > ```r > parLapplyLB <- function(cl = NULL, X, fun, ...) > { > cl <- defaultCluster(cl) > > chunks <- splitList(X, length(cl)) > > do.call(c, > clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), > quote = TRUE) > } > ``` > > For our examples, the chunks have these sizes: > > ```r >> sapply(parallel:::splitList(1:100, 5), length) > [1] 20 20 20 20 20 > ``` > > There we have it: 5 chunks. 5 workers. With this work distribution, there can't possibly be any load balancing, because > each worker is given a single chunk and then it stops working because there are no more chunks. > > Instead, **parLapplyLB** should look like this (patch is attached): > > ```r > parLapplyLB <- function(cl = NULL, X, fun, ...) > { > cl <- defaultCluster(cl) > > chunkSize <- max(length(cl), ceiling(length(X) / length(cl))) > > chunks <- splitList(X, chunkSize) > > do.call(c, > clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), > quote = TRUE) > } > ``` > > Examples with a cluster of 5 workers: > > ```r > # length(cl) < length(X) >> sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length) > [1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 > > # length(cl) >= length(X) >> sapply(parallel:::splitList(1:4, 4), length) > [1] 1 1 1 1 > # one worker idles here, but we can't do better than that > ``` > > With this patch, the number of chunks is larger than the number of workers, if possible at all, and then load balancing > should work. > > Best Regards > > > > ______________________________________________ > R-devel at r-project.org mailing list > https://stat.ethz.ch/mailman/listinfo/r-devel >-- Christian Krause Scientific Computing Administration and Support ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ Email: christian.krause at idiv.de Office: BioCity Leipzig 5e, Room 3.201.3 Phone: +49 341 97 33144 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig Deutscher Platz 5e 04103 Leipzig Germany ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ iDiv is a research centre of the DFG ? Deutsche Forschungsgemeinschaft iDiv ist eine zentrale Einrichtung der Universit?t Leipzig im Sinne des ? 92 Abs. 1 S?chsHSFG und wird zusammen mit der Martin-Luther-Universit?t Halle-Wittenberg und der Friedrich-Schiller-Universit?t Jena betrieben sowie in Kooperation mit dem Helmholtz-Zentrum f?r Umweltforschung GmbH ? UFZ. Beteiligte Kooperationspartner sind die folgenden au?eruniversit?ren Forschungseinrichtungen: das Helmholtz-Zentrum f?r Umweltforschung GmbH - UFZ, das Max-Planck-Institut f?r Biogeochemie (MPI BGC), das Max-Planck-Institut f?r chemische ?kologie (MPI CE), das Max-Planck-Institut f?r evolution?re Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen (DSMZ), das Leibniz-Institut f?r Pflanzenbiochemie (IPB), das Leibniz-Institut f?r Pflanzengenetik und Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum f?r Naturkunde G?rlitz (SMNG). USt-IdNr. DE 141510383 -------------- next part -------------- A non-text attachment was scrubbed... Name: krausec-parLapplyLB-fixed.png Type: image/png Size: 42277 bytes Desc: not available URL: <https://stat.ethz.ch/pipermail/r-devel/attachments/20180219/645ca8fb/attachment.png> -------------- next part -------------- A non-text attachment was scrubbed... Name: signature.asc Type: application/pgp-signature Size: 833 bytes Desc: OpenPGP digital signature URL: <https://stat.ethz.ch/pipermail/r-devel/attachments/20180219/645ca8fb/attachment.sig>
Henrik Bengtsson
2018-Feb-19 21:11 UTC
[Rd] [parallel] fixes load balancing of parLapplyLB
Hi, I'm trying to understand the rationale for your proposed amount of splitting and more precisely why that one is THE one. If I put labels on your example numbers in one of your previous post: nbrOfElements <- 97 nbrOfWorkers <- 5 With these, there are two extremes in how you can split up the processing in chunks such that all workers are utilized: (A) Each worker, called multiple times, processes one element each time:> nbrOfElements <- 97 > nbrOfWorkers <- 5 > nbrOfChunks <- nbrOfElements > sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)[1] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 [30] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 [59] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 [88] 1 1 1 1 1 1 1 1 1 1 (B) Each worker, called once, processes multiple element:> nbrOfElements <- 97 > nbrOfWorkers <- 5 > nbrOfChunks <- nbrOfWorkers > sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)[1] 20 19 19 19 20 I understand that neither of these two extremes may be the best when it comes to orchestration overhead and load balancing. Instead, the best might be somewhere in-between, e.g. (C) Each worker, called multiple times, processing multiple elements:> nbrOfElements <- 97 > nbrOfWorkers <- 5 > nbrOfChunks <- nbrOfElements / nbrOfWorkers > sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5 However, there are multiple alternatives between the two extremes, e.g.> nbrOfChunks <- scale * nbrOfElements / nbrOfWorkersSo, is there a reason why you argue for scale = 1.0 to be the optimal? FYI, In future.apply::future_lapply(X, FUN, ...) there is a 'future.scheduling' scale factor(*) argument where default future.scheduling = 1 corresponds to (B) and future.scheduling = +Inf to (A). Using future.scheduling = 4 achieves the amount of load-balancing you propose in (C). (*) Different definition from the above 'scale'. (Disclaimer: I'm the author) /Henrik On Mon, Feb 19, 2018 at 10:21 AM, Christian Krause <christian.krause at idiv.de> wrote:> Dear R-Devel List, > > I have installed R 3.4.3 with the patch applied on our cluster and ran a *real-world* job of one of our users to confirm that the patch works to my satisfaction. Here are the results. > > The original was a series of jobs, all essentially doing the same stuff using bootstrapped data, so for the original there is more data and I show the arithmetic mean with standard deviation. The confirmation with the patched R was only a single instance of that series of jobs. > > ## Job Efficiency > > The job efficiency is defined as (this is what the `qacct-efficiency` tool below does): > > ``` > efficiency = cputime / cores / wallclocktime * 100% > ``` > > In simpler words: how well did the job utilize its CPU cores. It shows the percentage of time the job was actually doing stuff, as opposed to the difference: > > ``` > wasted = 100% - efficiency > ``` > > ... which, essentially, tells us how much of the resources were wasted, i.e. CPU cores just idling, without being used by anyone. We care a lot about that because, for our scientific computing cluster, wasted resources is like burning money. > > ### original > > This is the entire series from our job accounting database, filteres the successful jobs, calculates efficiency and then shows the average and standard deviation of the efficiency: > > ``` > $ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd > n=945 ? 61.7276 ? 7.78719 > ``` > > This is the entire series from our job accounting database, filteres the successful jobs, calculates efficiency and does sort of a histogram-like binning before calculation of mean and standard deviation (to get a more detailed impression of the distribution when standard deviation of the previous command is comparatively high): > > ``` > $ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w 10 | sort -gk1 | column -t > 10 - 20 -> n=3 ? 19.21666666666667 ? 0.9112811494447459 > 20 - 30 -> n=6 ? 26.418333333333333 ? 2.665996374091058 > 30 - 40 -> n=12 ? 35.11583333333334 ? 2.8575783082671196 > 40 - 50 -> n=14 ? 45.35285714285715 ? 2.98623361591005 > 50 - 60 -> n=344 ? 57.114593023255814 ? 2.1922005551774415 > 60 - 70 -> n=453 ? 64.29536423841049 ? 2.8334788433963856 > 70 - 80 -> n=108 ? 72.95592592592598 ? 2.5219474143639276 > 80 - 90 -> n=5 ? 81.526 ? 1.2802265424525452 > ``` > > I have attached an example graph from our monitoring system of a single instance in my previous mail. There you can see that the load balancing does not actually work, i.e. same as `parLapply`. This reflects in the job efficiency. > > ### patch applied > > This is the single instance I used to confirm that the patch works: > > ``` > $ qacct -j 4562202 | qacct-efficiency > 97.36 > ``` > > The graph from our monitoring system is attached. As you can see, the load balancing works to a satisfying degree and the efficiency is well above 90% which was what I had hoped for :-) > > ## Additional Notes > > The list used in this jobs `parLapplyLB` is 5812 elements long. With the `splitList`-chunking from the patch, you'll get 208 lists of about 28 elements (208 chunks of size 28). The job ran on 28 CPU cores and had a wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the function we apply to our list takes about 580 seconds per list element, i.e. about 10 minutes. I suppose, for that runtime, we would get even better load balancing if we would reduce the chunk size even further, maybe even down to 1, thus getting our efficiency even closer to 100%. > > Of course, for really short-running functions, a higher chunk size may be more efficient because of the overhead. In our case, the overhead is negligible and that is why the low chunk size works really well. In contrast, for smallish lists with short-running functions, you might not even need load balancing and `parLapply` suffices. It only becomes an issue, when the runtime of the function is high and / or varying. > > In our case, the entire runtime of the entire series of jobs was: > > ``` > $ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print sum, "seconds" }' > 4.72439e+09 seconds > ``` > > Thats about 150 years on a single core or 7.5 years on a 20 core server! Our user was constantly using about 500 cores, so this took about 110 days. If you compare this to my 97% efficiency example, the jobs could have been finished in 75 days instead ;-) > > ## Upcoming Patch > > If this patch gets applied to the R code base (and I hope it will :-)) my colleague and I will submit another patch that adds the chunk size as an optional parameter to all off the load balancing functions. With that parameter, users of these functions *can* decide for themselves which chunk size they prefer for their code. As mentioned before, the most efficient chunk size depends on the used functions runtime, which is the only thing R does not know and users really should be allowed to specify explicitly. The default of this new optional parameter would be the one we used here and this would make that upcoming patch fully source-compatible. > > Best Regards > > On 02/12/2018 08:08 PM, Christian Krause wrote: >> Dear R-Devel List, >> >> **TL;DR:** The function **parLapplyLB** of the parallel package has [reportedly][1] (see also attached RRD output) not >> been doing its job, i.e. not actually balancing the load. My colleague Dirk Sarpe and I found the cause of the problem >> and we also have a patch to fix it (attached). A similar fix has also been provided [here][2]. >> >> [1]: https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load >> [2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792 >> >> >> ## The Call Chain >> >> First, we traced the relevant R function calls through the code, beginning with `parLapplyLB`: >> >> 1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then **clusterApplyLB** >> 2. **splitList:** clusterApply.R:157 >> 3. **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply** >> 4. **dynamicClusterApply:** clusterApply.R:39 >> >> >> ## splitList >> >> We used both our whiteboard and an R session to manually *run* a few examples. We were using lists of 100 elements and 5 >> workers. First, lets take a look at **splitList**: >> >> ```r >>> sapply(parallel:::splitList(1:100, 5), length) >> [1] 20 20 20 20 20 >> >>> sapply(parallel:::splitList(1:97, 5), length) >> [1] 20 19 19 19 20 >> >>> sapply(parallel:::splitList(1:97, 20), length) >> [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5 >> ``` >> >> As we can see in the examples, the work is distributed as equally as possible. >> >> >> ## dynamicClusterApply >> >> **dynamicClusterApply** works this way (simplified): >> >> 1. it first gives a chunk to each worker >> 2. once a worker comes back with the result, it is given the next chunk >> >> **This is the important part:** As long as there are **more** chunks than workers, there will be load balancing. If >> there are fewer chunks than workers, each worker will get **at most one chunk** and there is **no** load balancing. >> >> >> ## parLapplyLB >> >> This is how **parLapplyLB** splits the input list (with a bit of refactoring, for readability): >> >> ```r >> parLapplyLB <- function(cl = NULL, X, fun, ...) >> { >> cl <- defaultCluster(cl) >> >> chunks <- splitList(X, length(cl)) >> >> do.call(c, >> clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), >> quote = TRUE) >> } >> ``` >> >> For our examples, the chunks have these sizes: >> >> ```r >>> sapply(parallel:::splitList(1:100, 5), length) >> [1] 20 20 20 20 20 >> ``` >> >> There we have it: 5 chunks. 5 workers. With this work distribution, there can't possibly be any load balancing, because >> each worker is given a single chunk and then it stops working because there are no more chunks. >> >> Instead, **parLapplyLB** should look like this (patch is attached): >> >> ```r >> parLapplyLB <- function(cl = NULL, X, fun, ...) >> { >> cl <- defaultCluster(cl) >> >> chunkSize <- max(length(cl), ceiling(length(X) / length(cl))) >> >> chunks <- splitList(X, chunkSize) >> >> do.call(c, >> clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), >> quote = TRUE) >> } >> ``` >> >> Examples with a cluster of 5 workers: >> >> ```r >> # length(cl) < length(X) >>> sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length) >> [1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 >> >> # length(cl) >= length(X) >>> sapply(parallel:::splitList(1:4, 4), length) >> [1] 1 1 1 1 >> # one worker idles here, but we can't do better than that >> ``` >> >> With this patch, the number of chunks is larger than the number of workers, if possible at all, and then load balancing >> should work. >> >> Best Regards >> >> >> >> ______________________________________________ >> R-devel at r-project.org mailing list >> https://stat.ethz.ch/mailman/listinfo/r-devel >> > > -- > Christian Krause > > Scientific Computing Administration and Support > > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > > Email: christian.krause at idiv.de > > Office: BioCity Leipzig 5e, Room 3.201.3 > > Phone: +49 341 97 33144 > > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > > German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig > > Deutscher Platz 5e > > 04103 Leipzig > > Germany > > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > > iDiv is a research centre of the DFG ? Deutsche Forschungsgemeinschaft > > iDiv ist eine zentrale Einrichtung der Universit?t Leipzig im Sinne des ? 92 Abs. 1 S?chsHSFG und wird zusammen mit der Martin-Luther-Universit?t Halle-Wittenberg und der Friedrich-Schiller-Universit?t Jena betrieben sowie in Kooperation mit dem Helmholtz-Zentrum f?r Umweltforschung GmbH ? UFZ. Beteiligte Kooperationspartner sind die folgenden au?eruniversit?ren Forschungseinrichtungen: das Helmholtz-Zentrum f?r Umweltforschung GmbH - UFZ, das Max-Planck-Institut f?r Biogeochemie (MPI BGC), das Max-Planck-Institut f?r chemische ?kologie (MPI CE), das Max-Planck-Institut f?r evolution?re Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen (DSMZ), das Leibniz-Institut f?r Pflanzenbiochemie (IPB), das Leibniz-Institut f?r Pflanzengenetik und Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum f?r Naturkunde G?rlitz (SMNG). USt-IdNr. DE 141510383 > > > ______________________________________________ > R-devel at r-project.org mailing list > https://stat.ethz.ch/mailman/listinfo/r-devel >