Rebecca Payne
2016-Aug-19 18:22 UTC
[R] Efficiently parallelize across columns of a data.table
I am trying to parallelize a task across columns of a data.table using foreach and doParallel. My data is large relative to my system memory (about 40%) so I'm avoiding making any copies of the full table. The function I am parallelizing is pretty simple, taking as input a fixed set of columns and a single unique column per task. I'd like to send only the small subset of columns actually needed to the worker nodes. I'd also like the option to only send a subset of rows to the worker nodes. My initial attempts to parallelize did not work as expected, and seemed to copy the entire data.table to every worker node. ### start code ### library(data.table) library(foreach) library(doParallel) registerDoParallel() anotherVar = "Y" someVars = paste0("X", seq(1:20)) N = 100000000 # I've chosen N such that my Rsession consumes ~15GB of memory according to top right after DT is created DT = as.data.table(matrix(rnorm(21*N), ncol=21)) setnames(DT, c(anotherVar, someVars)) MyFun = function(inDT, inX, inY){ cor(inDT[[inX]], inDT[[inY]]) } #Warning: Will throw an error on the mac GUI corrWithY_1 = foreach(i = 1:length(someVars), .combine = c) %dopar% MyFun(DT[,c(anotherVar, someVars[i]), with=FALSE], someVars[i], anotherVar) # Watching top, all of the slave nodes also appear to consume the full ~15Gb of system memory gc() # So I tried creating an entirely separate subset of DT to send to the slave nodes, and then removing it by hand. # This task, too, appears to take ~15GB of memory per slave node according to top. MyFun2 = function(DT, anotherVar, uniqueVar){ tmpData = DT[, c(anotherVar, uniqueVar), with=FALSE] out = MyFun(tmpData, anotherVar, uniqueVar) rm(tmpData) return(out) } corrWithY_2 = foreach(i = 1:length(someVars), .combine = c) %dopar% MyFun2(DT, anotherVar, someVars[i]) ### end code ### Another thing I've tried is to send only the name of DT and it's environment to the slave nodes, but `get`doesn't seem to be able to only get a subset of rows from DT, as I would need to do frequently Questions: 1. Is top accurately reflecting my R session's memory usage? 2. If so, is there a way to parallelize over the columns of a data.table without copying the entire table to every slave node? [[alternative HTML version deleted]]
Peter Langfelder
2016-Aug-20 04:01 UTC
[R] Efficiently parallelize across columns of a data.table
Last time I looked (admittedly a few years back), on unix-alikes (which you seem to be using, based on your use of top), foreach/doParallel used forking. This means each worker gets a copy of the entire R session, __but__ modern operating systems do not actually copy on spawn, they only copy on write (i.e., when the worker process starts modifying the existing variables). I believe top shows memory use as if the copy actually occurred (what the operating system promises to each worker). I would run the code and monitor usage of swap space - as long as the system isn't swapping to disk, I would not worry about copying the table to every slave node, since the copy doesn't really happen unless the worker processes modify the table. HTH, Peter On Fri, Aug 19, 2016 at 11:22 AM, Rebecca Payne <rebeccapayne at gmail.com> wrote:> I am trying to parallelize a task across columns of a data.table using > foreach and doParallel. My data is large relative to my system memory > (about 40%) so I'm avoiding making any copies of the full table. The > function I am parallelizing is pretty simple, taking as input a fixed set > of columns and a single unique column per task. I'd like to send only the > small subset of columns actually needed to the worker nodes. I'd also like > the option to only send a subset of rows to the worker nodes. My initial > attempts to parallelize did not work as expected, and seemed to copy the > entire data.table to every worker node. > > > > > > ### start code ### > > library(data.table) > > library(foreach) > > library(doParallel) > > registerDoParallel() > > > > anotherVar = "Y" > > someVars = paste0("X", seq(1:20)) > > N = 100000000 > > # I've chosen N such that my Rsession consumes ~15GB of memory according to > top right after DT is created > > DT = as.data.table(matrix(rnorm(21*N), ncol=21)) > > setnames(DT, c(anotherVar, someVars)) > > > > MyFun = function(inDT, inX, inY){ > > cor(inDT[[inX]], inDT[[inY]]) > > } > > > > #Warning: Will throw an error on the mac GUI > > corrWithY_1 = foreach(i = 1:length(someVars), .combine = c) %dopar% > > MyFun(DT[,c(anotherVar, someVars[i]), with=FALSE], someVars[i], > anotherVar) > > # Watching top, all of the slave nodes also appear to consume the full > ~15Gb of system memory > > > > gc() > > > > # So I tried creating an entirely separate subset of DT to send to the > slave nodes, and then removing it by hand. > > # This task, too, appears to take ~15GB of memory per slave node according > to top. > > > > MyFun2 = function(DT, anotherVar, uniqueVar){ > > tmpData = DT[, c(anotherVar, uniqueVar), with=FALSE] > > out = MyFun(tmpData, anotherVar, uniqueVar) > > rm(tmpData) > > return(out) > > } > > > > corrWithY_2 = foreach(i = 1:length(someVars), .combine = c) %dopar% > > MyFun2(DT, anotherVar, someVars[i]) > > > > ### end code ### > > > > Another thing I've tried is to send only the name of DT and it's > environment to the slave nodes, but `get`doesn't seem to be able to only > get a subset of rows from DT, as I would need to do frequently > > > > Questions: > > 1. Is top accurately reflecting my R session's memory usage? > > 2. If so, is there a way to parallelize over the columns of a data.table > without copying the entire table to every slave node? > > [[alternative HTML version deleted]] > > ______________________________________________ > R-help at r-project.org mailing list -- To UNSUBSCRIBE and more, see > https://stat.ethz.ch/mailman/listinfo/r-help > PLEASE do read the posting guide http://www.R-project.org/posting-guide.html > and provide commented, minimal, self-contained, reproducible code.
Rebecca Payne
2016-Aug-20 19:41 UTC
[R] Efficiently parallelize across columns of a data.table
Makes sense. Thanks for the clear explanation. Rebecca On Friday, August 19, 2016, Peter Langfelder <peter.langfelder at gmail.com <javascript:_e(%7B%7D,'cvml','peter.langfelder at gmail.com');>> wrote:> Last time I looked (admittedly a few years back), on unix-alikes > (which you seem to be using, based on your use of top), > foreach/doParallel used forking. This means each worker gets a copy of > the entire R session, __but__ modern operating systems do not actually > copy on spawn, they only copy on write (i.e., when the worker process > starts modifying the existing variables). I believe top shows memory > use as if the copy actually occurred (what the operating system > promises to each worker). > > I would run the code and monitor usage of swap space - as long as the > system isn't swapping to disk, I would not worry about copying the > table to every slave node, since the copy doesn't really happen unless > the worker processes modify the table. > > HTH, > > Peter > > On Fri, Aug 19, 2016 at 11:22 AM, Rebecca Payne <rebeccapayne at gmail.com> > wrote: > > I am trying to parallelize a task across columns of a data.table using > > foreach and doParallel. My data is large relative to my system memory > > (about 40%) so I'm avoiding making any copies of the full table. The > > function I am parallelizing is pretty simple, taking as input a fixed set > > of columns and a single unique column per task. I'd like to send only the > > small subset of columns actually needed to the worker nodes. I'd also > like > > the option to only send a subset of rows to the worker nodes. My initial > > attempts to parallelize did not work as expected, and seemed to copy the > > entire data.table to every worker node. > > > > > > > > > > > > ### start code ### > > > > library(data.table) > > > > library(foreach) > > > > library(doParallel) > > > > registerDoParallel() > > > > > > > > anotherVar = "Y" > > > > someVars = paste0("X", seq(1:20)) > > > > N = 100000000 > > > > # I've chosen N such that my Rsession consumes ~15GB of memory according > to > > top right after DT is created > > > > DT = as.data.table(matrix(rnorm(21*N), ncol=21)) > > > > setnames(DT, c(anotherVar, someVars)) > > > > > > > > MyFun = function(inDT, inX, inY){ > > > > cor(inDT[[inX]], inDT[[inY]]) > > > > } > > > > > > > > #Warning: Will throw an error on the mac GUI > > > > corrWithY_1 = foreach(i = 1:length(someVars), .combine = c) %dopar% > > > > MyFun(DT[,c(anotherVar, someVars[i]), with=FALSE], someVars[i], > > anotherVar) > > > > # Watching top, all of the slave nodes also appear to consume the full > > ~15Gb of system memory > > > > > > > > gc() > > > > > > > > # So I tried creating an entirely separate subset of DT to send to the > > slave nodes, and then removing it by hand. > > > > # This task, too, appears to take ~15GB of memory per slave node > according > > to top. > > > > > > > > MyFun2 = function(DT, anotherVar, uniqueVar){ > > > > tmpData = DT[, c(anotherVar, uniqueVar), with=FALSE] > > > > out = MyFun(tmpData, anotherVar, uniqueVar) > > > > rm(tmpData) > > > > return(out) > > > > } > > > > > > > > corrWithY_2 = foreach(i = 1:length(someVars), .combine = c) %dopar% > > > > MyFun2(DT, anotherVar, someVars[i]) > > > > > > > > ### end code ### > > > > > > > > Another thing I've tried is to send only the name of DT and it's > > environment to the slave nodes, but `get`doesn't seem to be able to only > > get a subset of rows from DT, as I would need to do frequently > > > > > > > > Questions: > > > > 1. Is top accurately reflecting my R session's memory usage? > > > > 2. If so, is there a way to parallelize over the columns of a data.table > > without copying the entire table to every slave node? > > > > [[alternative HTML version deleted]] > > > > ______________________________________________ > > R-help at r-project.org mailing list -- To UNSUBSCRIBE and more, see > > https://stat.ethz.ch/mailman/listinfo/r-help > > PLEASE do read the posting guide http://www.R-project.org/posti > ng-guide.html > > and provide commented, minimal, self-contained, reproducible code. >[[alternative HTML version deleted]]