Hi R users. Apologies for the lack of concrete examples because the dataset is large, and it being so I believe is the issue. I multiple, very large datasets for which I need to generate 0/1 absence/presence columns Some include over 200M rows, with two columns that need presence/absence columns based on the strings contained within them, as an example, one set has ~29k unique values and the other with ~15k unique values (no overlap across the two). Using a combination of custom functions: crewjanitormakeclean <- function(df,columns) { df <- df |> mutate(across(columns, ~make_clean_names(., allow_dupes = TRUE))) return(df) } mass_pivot_wider <- function(df,column,prefix) { df <- df |> distinct() |> mutate(n = 1) |> pivot_wider(names_from = glue("{column}"), values_from = n, names_prefix = prefix, values_fill = list(n = 0)) return(df) } sum_group_function <- function(df) { df <- df |> group_by(ID_Key) |> summarise(across(c(starts_with("column1_name_"),starts_with("column2_name_"),), ~ sum(.x, na.rm = TRUE))) |> ungroup() return(df) } and splitting up the data into a list of 110k individual dataframes based on Key_ID temp <- open_dataset( sources = input_files, format = 'csv', unify_schema = TRUE, col_types = schema( "ID_Key" = string(), "column1" = string(), "column1" = string() ) ) |> as_tibble() keeptabs <- split(temp, temp$ID_Key) I used a multicore framework to distribute the `sum` functions across each Key_ID when a multicore argument is enabled. ?????? if(isTRUE(multicore)){ output <- mclapply(1:length(modtabs), function(i) crewjanitormakeclean(modtabs[[i]],c("string_columns_2","string_columns_1")), mc.cores = numcores) output <- mclapply(1:length(modtabs), function(i) mass_pivot_wider(modtabs[[i]],"string_columns_1","col_1_name_"), mc.cores = numcores) output <- mclapply(1:length(modtabs), function(i) mass_pivot_wider(modtabs[[i]],"string_columns_2","col_2_name_"), mc.cores = numcores) }else{ output <- lapply(1:length(modtabs), function(i) crewjanitormakeclean(modtabs[[i]],c("string_columns_2","string_columns_1"))) output <- lapply(1:length(modtabs), function(i) mass_pivot_wider(modtabs[[i]],"string_columns_1","col_1_name_")) output <- mclapply(1:length(modtabs), function(i) mass_pivot_wider(modtabs[[i]],"string_columns_2","col_2_name_")) } Moving every Key_ID to a single row and then row-binding the data while creating new columns for the differences across `Key_ID`s from the pivot using the following solution (78 upvotes at time of this email): https://stackoverflow.com/questions/3402371/combine-two-data-frames-by-rows-rbind-when-they-have-different-sets-of-columns allNms <- unique(unlist(lapply(keeptabs, names))) output <- do.call(rbind, c(lapply(keeptabs, function(x) data.frame(c((x), sapply(setdiff(allNms, names(x)), function(y) NA))) |> as_tibble()), make.row.names=FALSE)) |> mutate(across(c(starts_with("column1_name_"), starts_with("column2_name_")), coalesce, 0)) However, I have noticed that the jobs seem to "hang" after a while, with the initial 30 or so (numcores == 30 in the workflow, equal to the number of cores I reserved) at 100% of the requested CPU, and then several "zombie" processes occur and the cores just stop at 0% and never proceed, usually dying with a timeout or not all jobs running to completion failure to join of some kind. This happens in both base R and RStudio, and I haven't been able to figure out if it's something wrong with the code, the size of the data, or our architecture, but I would appreciate any suggestions as to what I might be able to do about this. Before they are suggested, I have also tried this same approach with foreach, snow, future, and furr packages, and base parallel with mc.apply seems to be the only thing that works for at least one dataset. In the event it has something to do with our architecture, here is what we are running on and our loaded packages: OS Information: NAME="Red Hat Enterprise Linux" VERSION="9.3 (Plow)" ID="rhel" ID_LIKE="fedora" VERSION_ID="9.3" PLATFORM_ID="platform:el9" PRETTY_NAME="Red Hat Enterprise Linux 9.3 (Plow)" ANSI_COLOR="0;31" LOGO="fedora-logo-icon" CPE_NAME="cpe:/o:redhat:enterprise_linux:9::baseos" HOME_URL="https://www.redhat.com/" DOCUMENTATION_URL="https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/9" BUG_REPORT_URL="https://bugzilla.redhat.com/" REDHAT_BUGZILLA_PRODUCT="Red Hat Enterprise Linux 9" REDHAT_BUGZILLA_PRODUCT_VERSION=9.3 REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux" REDHAT_SUPPORT_PRODUCT_VERSION="9.3" Operating System: Red Hat Enterprise Linux 9.3 (Plow) CPE OS Name: cpe:/o:redhat:enterprise_linux:9::baseos Kernel: Linux 5.14.0-362.13.1.el9_3.x86_64 Architecture: x86-64 Hardware Vendor: Dell Inc. Hardware Model: PowerEdge R840 Firmware Version: 2.15.1 R Version: R.Version() $platform [1] "x86_64-pc-linux-gnu" $arch [1] "x86_64" $os [1] "linux-gnu" $system [1] "x86_64, linux-gnu" $status [1] "" $major [1] "4" $minor [1] "3.2" $year [1] "2023" $month [1] "10" $day [1] "31" $`svn rev` [1] "85441" $language [1] "R" $version.string [1] "R version 4.3.2 (2023-10-31)" $nickname [1] "Eye Holes" RStudio Server Version: RStudio 2023.09.1+494 "Desert Sunflower" Release (cd7011dce393115d3a7c3db799dda4b1c7e88711, 2023-10-16) for RHEL 9 Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 PPM Repo: https://packagemanager.posit.co/cran/__linux__/rhel9/latest attached base packages: [1] parallel stats graphics grDevices datasets utils methods base other attached packages: [1] listenv_0.9.1 microbenchmark_1.5.0 dbplyr_2.4.0 duckplyr_0.4.1 readxl_1.4.3 fastDummies_1.7.3 [7] glue_1.8.0 arrow_14.0.2.1 data.table_1.15.2 toolbox_0.1.1 janitor_2.2.0 lubridate_1.9.3 [13] forcats_1.0.0 stringr_1.5.1 dplyr_1.1.4 purrr_1.0.2 readr_2.1.5 tidyr_1.3.1 [19] tibble_3.2.1 ggplot2_3.5.0 tidyverse_2.0.0 duckdb_1.1.2 DBI_1.2.3 fs_1.6.3 Happy to provide additional information if it would be helpful. Thank you in advance! The information in this e-mail is intended only for the person to whom it is addressed. If you believe this e-mail was sent to you in error and the e-mail contains patient information, please contact the Mass General Brigham Compliance HelpLine at https://www.massgeneralbrigham.org/complianceline <https://www.massgeneralbrigham.org/complianceline> . Please note that this e-mail is not secure (encrypted). If you do not wish to continue communication over unencrypted e-mail, please notify the sender of this message immediately. Continuing to send or respond to e-mail after receiving this message means you understand and accept this risk and wish to continue to communicate over unencrypted e-mail. [[alternative HTML version deleted]]
Maybe ask on the HPC list? [1] A general tip... you may be running out of memory. If at all possible you need to make sure you extract the data subsets in the parent process, and limit the amount of environment data passed into the child processes. That is, instead of using an integer counter to index into the modtabs list, iterate over the list itself so that each process doesnt need a copy of the whole data set. I know that fork lets you share memory blocks temporarily between the parent and child processes, but it is fragile... if you modify the data at all then the sharing stops for that memory block and you get an explosion of memory use. [1] <https://stat.ethz.ch/mailman/listinfo/r-sig-hpc> On December 10, 2024 4:20:28 PM PST, "Deramus, Thomas Patrick" <tderamus at mgb.org> wrote:>Hi R users. > >Apologies for the lack of concrete examples because the dataset is large, and it being so I believe is the issue. > >I multiple, very large datasets for which I need to generate 0/1 absence/presence columns >Some include over 200M rows, with two columns that need presence/absence columns based on the strings contained within them, as an example, one set has ~29k unique values and the other with ~15k unique values (no overlap across the two). > >Using a combination of custom functions: > >crewjanitormakeclean <- function(df,columns) { > df <- df |> mutate(across(columns, ~make_clean_names(., allow_dupes = TRUE))) > return(df) >} > >mass_pivot_wider <- function(df,column,prefix) { > df <- df |> distinct() |> mutate(n = 1) |> pivot_wider(names_from = glue("{column}"), values_from = n, names_prefix = prefix, values_fill = list(n = 0)) > return(df) >} > >sum_group_function <- function(df) { > df <- df |> group_by(ID_Key) |> summarise(across(c(starts_with("column1_name_"),starts_with("column2_name_"),), ~ sum(.x, na.rm = TRUE))) |> ungroup() > return(df) >} > >and splitting up the data into a list of 110k individual dataframes based on Key_ID > >temp <- > open_dataset( > sources = input_files, > format = 'csv', > unify_schema = TRUE, > col_types = schema( > "ID_Key" = string(), > "column1" = string(), > "column1" = string() > ) > ) |> as_tibble() > > > keeptabs <- split(temp, temp$ID_Key) > > >I used a multicore framework to distribute the `sum` functions across each Key_ID when a multicore argument is enabled. > >?????? > if(isTRUE(multicore)){ > output <- mclapply(1:length(modtabs), function(i) crewjanitormakeclean(modtabs[[i]],c("string_columns_2","string_columns_1")), mc.cores = numcores) > output <- mclapply(1:length(modtabs), function(i) mass_pivot_wider(modtabs[[i]],"string_columns_1","col_1_name_"), mc.cores = numcores) > output <- mclapply(1:length(modtabs), function(i) mass_pivot_wider(modtabs[[i]],"string_columns_2","col_2_name_"), mc.cores = numcores) > }else{ > output <- lapply(1:length(modtabs), function(i) crewjanitormakeclean(modtabs[[i]],c("string_columns_2","string_columns_1"))) > output <- lapply(1:length(modtabs), function(i) mass_pivot_wider(modtabs[[i]],"string_columns_1","col_1_name_")) > output <- mclapply(1:length(modtabs), function(i) mass_pivot_wider(modtabs[[i]],"string_columns_2","col_2_name_")) > } > > >Moving every Key_ID to a single row and then row-binding the data while creating new columns for the differences across `Key_ID`s from the pivot using the following solution (78 upvotes at time of this email): >https://stackoverflow.com/questions/3402371/combine-two-data-frames-by-rows-rbind-when-they-have-different-sets-of-columns > > allNms <- unique(unlist(lapply(keeptabs, names))) > > output <- do.call(rbind, > c(lapply(keeptabs, > function(x) data.frame(c((x), sapply(setdiff(allNms, names(x)), > function(y) NA))) |> as_tibble()), > make.row.names=FALSE)) |> mutate(across(c(starts_with("column1_name_"), starts_with("column2_name_")), coalesce, 0)) > >However, I have noticed that the jobs seem to "hang" after a while, with the initial 30 or so (numcores == 30 in the workflow, equal to the number of cores I reserved) at 100% of the requested CPU, and then several "zombie" processes occur and the cores just stop at 0% and never proceed, usually dying with a timeout or not all jobs running to completion failure to join of some kind. > >This happens in both base R and RStudio, and I haven't been able to figure out if it's something wrong with the code, the size of the data, or our architecture, but I would appreciate any suggestions as to what I might be able to do about this. > >Before they are suggested, I have also tried this same approach with foreach, snow, future, and furr packages, and base parallel with mc.apply seems to be the only thing that works for at least one dataset. > >In the event it has something to do with our architecture, here is what we are running on and our loaded packages: > >OS Information: >NAME="Red Hat Enterprise Linux" >VERSION="9.3 (Plow)" >ID="rhel" >ID_LIKE="fedora" >VERSION_ID="9.3" >PLATFORM_ID="platform:el9" >PRETTY_NAME="Red Hat Enterprise Linux 9.3 (Plow)" >ANSI_COLOR="0;31" >LOGO="fedora-logo-icon" >CPE_NAME="cpe:/o:redhat:enterprise_linux:9::baseos" >HOME_URL="https://www.redhat.com/" >DOCUMENTATION_URL="https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/9" >BUG_REPORT_URL="https://bugzilla.redhat.com/" >REDHAT_BUGZILLA_PRODUCT="Red Hat Enterprise Linux 9" >REDHAT_BUGZILLA_PRODUCT_VERSION=9.3 >REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux" >REDHAT_SUPPORT_PRODUCT_VERSION="9.3" >Operating System: Red Hat Enterprise Linux 9.3 (Plow) > CPE OS Name: cpe:/o:redhat:enterprise_linux:9::baseos > Kernel: Linux 5.14.0-362.13.1.el9_3.x86_64 > Architecture: x86-64 > Hardware Vendor: Dell Inc. > Hardware Model: PowerEdge R840 >Firmware Version: 2.15.1 > >R Version: >R.Version() >$platform >[1] "x86_64-pc-linux-gnu" >$arch >[1] "x86_64" >$os >[1] "linux-gnu" >$system >[1] "x86_64, linux-gnu" >$status >[1] "" >$major >[1] "4" >$minor >[1] "3.2" >$year >[1] "2023" >$month >[1] "10" >$day >[1] "31" >$`svn rev` >[1] "85441" >$language >[1] "R" >$version.string >[1] "R version 4.3.2 (2023-10-31)" >$nickname >[1] "Eye Holes" > >RStudio Server Version: >RStudio 2023.09.1+494 "Desert Sunflower" Release (cd7011dce393115d3a7c3db799dda4b1c7e88711, 2023-10-16) for RHEL 9 >Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 > >PPM Repo: >https://packagemanager.posit.co/cran/__linux__/rhel9/latest > >attached base packages: >[1] parallel stats graphics grDevices datasets utils methods base > >other attached packages: > [1] listenv_0.9.1 microbenchmark_1.5.0 dbplyr_2.4.0 duckplyr_0.4.1 readxl_1.4.3 fastDummies_1.7.3 > [7] glue_1.8.0 arrow_14.0.2.1 data.table_1.15.2 toolbox_0.1.1 janitor_2.2.0 lubridate_1.9.3 >[13] forcats_1.0.0 stringr_1.5.1 dplyr_1.1.4 purrr_1.0.2 readr_2.1.5 tidyr_1.3.1 >[19] tibble_3.2.1 ggplot2_3.5.0 tidyverse_2.0.0 duckdb_1.1.2 DBI_1.2.3 fs_1.6.3 > > >Happy to provide additional information if it would be helpful. > >Thank you in advance! >The information in this e-mail is intended only for the...{{dropped:23}}
Hello Thomas, Consider that the primary bottleneck may be tied to memory usage and the complexity of pivoting extremely large datasets into wide formats with tens of thousands of unique values per column. Extremely large expansions of columns inherently stress both memory and CPU, and splitting into 110k separate data frames before pivoting and combining them again is likely causing resource overhead and system instability. Perhaps, evaluate if the presence/absence transformation can be done in a more memory-efficient manner without pivoting all at once. Since you are dealing with extremely large data, a more incremental or streaming approach may be necessary. Instead of splitting into thousands of individual data frames and trying to pivot each in parallel, consider instead a method that processes segments of data to incrementally build a large sparse matrix or a compressed representation, then combine results at the end. It's probbaly better to move away from `pivot_wider()` on a massive scale and attempt a data.table-based approach, which is often more memory-efficient and faster for large-scale operations in R. An alternate way would be data.table?s `dcast()` can handle large data more efficiently, and data.table?s in-memory operations often reduce overhead compared to tidyverse pivoting functions. Also - consider using data.table?s `fread()` or `arrow::open_dataset()` directly with `as.data.table()` to keep everything in a data.table format. For example, you can do a large `dcast()` operation to create presence/absence columns by group. If your categories are extremely large, consider an approach that processes categories in segments as I mentioned earlier - and writes intermediate results to disk, then combines/mergesresults at the end. Limit parallelization when dealing with massive reshapes. Instead of trying to parallelize the entire pivot across thousands of subsets, run a single parallelized chunking approach that processes manageable subsets and writes out intermediate results (for example... using `fwrite()` for each subset). After processing, load and combine these intermediate results. This manual segmenting approach can circumvent the "zombie" processes you mentioned - that I think arise from overly complex parallel nesting and excessivememory utilization. If the presence/absence indicators are ultimately sparse (many zeros and few ones), consider storing the result in a sparse matrix format (for exapmple- `Matrix` package in R). Instead of creating thousands of columns as dense integers, using a sparse matrix representation should dramatically reduce memory. After processing the data into a sparse format, you can then save it in a suitable file format and only convert to a dense format if absolutely necessary. Below is a reworked code segment using data.table for a more scalable approach. Note that this is a conceptual template. In practice, adapt the chunk sizes and filtering operations to your workflow. The idea is to avoid creating 110k separate data frames and to handle the pivot in a data.table manner that?s more robust and less memory intensve. Here, presence/absence encoding is done by grouping and casting directly rather than repeatedly splitting and row-binding.> library(data.table) > library(arrow) > > # Step A: Load data efficiently as data.table > dt <- as.data.table( > open_dataset( > sources = input_files, > format = 'csv', > unify_schema = TRUE, > col_types = schema( > "ID_Key" = string(), > "column1" = string(), > "column2" = string() > ) > ) |>> collect() > ) > > # Step B: Clean names once > # Assume `crewjanitormakeclean` essentially standardizes column names > dt[, column1 := janitor::make_clean_names(column1, allow_dupes => TRUE)] > dt[, column2 := janitor::make_clean_names(column2, allow_dupes => TRUE)] > > # Step C: Create presence/absence indicators using data.table > # Use dcast to pivot wide. Set n=1 for presence, 0 for absence. > # For large unique values, consider chunking if needed. > out1 <- dcast(dt[!is.na(column1)], ID_Key ~ column1, fun.aggregate => length, value.var = "column1") > out2 <- dcast(dt[!is.na(column2)], ID_Key ~ column2, fun.aggregate => length, value.var = "column2") > > # Step D: Merge the two wide tables by ID_Key > # Fill missing columns with 0 using data.table on-the-fly operations > all_cols <- unique(c(names(out1), names(out2))) > out1_missing <- setdiff(all_cols, names(out1)) > out2_missing <- setdiff(all_cols, names(out2)) > > # Add missing columns with 0 > for (col in out1_missing) out1[, (col) := 0] > for (col in out2_missing) out2[, (col) := 0] > > # Ensure column order alignment if needed > setcolorder(out1, all_cols) > setcolorder(out2, all_cols) > > # Combine by ID_Key (since they share same columns now) > final_dt <- rbindlist(list(out1, out2), use.names = TRUE, fill = TRUE) > > # Step E: If needed, summarize across ID_Key to sum presence> indicators > final_result <- final_dt[, lapply(.SD, sum, na.rm = TRUE), by => ID_Key, .SDcols = setdiff(names(final_dt), "ID_Key")] > > # note that final_result should now contain summed presence/absence> (0/1) indicators.Hope this helps! gregg somewhereinArizona -------------- next part -------------- A non-text attachment was scrubbed... Name: signature.asc Type: application/pgp-signature Size: 509 bytes Desc: OpenPGP digital signature URL: <https://stat.ethz.ch/pipermail/r-help/attachments/20241211/2385b049/attachment.sig>