Sedikit tentang komputasi paralel di R

Publikasi sangat singkat. Banyak orang berpikir bahwa komputasi paralel di R sangat sulit dan tidak berlaku untuk tugas mereka saat ini.





Iya dan tidak. Jika Anda tidak dengan sengaja mempelajari teori, perangkat keras, dan segala macam detail, maka Anda dapat menggambar "3 dan 1/2" dari resep yang hampir universal. Contoh-contoh ini sengaja dibuat mirip dengan tugas-tugas produktif, dan bukan beberapa baris sintetis yang dikebiri.





Ini adalah kelanjutan dari serangkaian publikasi sebelumnya .





Paket bekas

Memuat paket
library(tidyverse)
library(magrittr)
library(stringi)
library(glue)

library(dqrng)

library(iterators)
library(future)
library(foreach)
library(doFuture)

library(tictoc)
library(futile.logger)
library(lgr) #      `lgr`

library(hrbrthemes)
      
      



Pola paralelisasi

Pola 1. Paralelisasi perhitungan tidyverse

Situasi. Ada skrip yang berisi banyak pipeline untuk tidyverse



.





Contoh tugas. Mari kita hitung rata-rata jumlah kuadrat dari angka-angka tersebut. Untuk meningkatkan efisiensi komputasi paralel, penting untuk mengurangi jumlah transfer data antar utas. Kami menggunakan paket furrr .





pipeline `tidyverse`
registerDoFuture()
# future::plan(multiprocess)
workers <- parallel::detectCores() - 1
future::plan(multisession, workers = workers)

num_row <- 1:10^6

ff_seq <- function(x) x^2

ff_par <- function(x) mean(x^2)

tic(" ")
lst1 <- num_row %>%
  purrr::map_dbl(ff_seq) %>%
  mean()
toc()

tic(" ,  1")
lst2 <- num_row %>%
  furrr::future_map_dbl(ff_seq) %>%
  mean()
toc() 

tic(" ,  2")
lst2 <- num_row %>%
  split(cut(seq_along(.), workers, labels = FALSE)) %>%
  furrr::future_map_dbl(ff_par) %>%
  mean()
toc()
      
      



Secara alami, hasilnya tergantung pada platform perangkat keras dan OS tempat semuanya berjalan. Saat uji coba, saya memiliki tata letak ini:





 : 7.23 sec elapsed
 ,  1: 3.43 sec elapsed
 ,  2: 0.64 sec elapsed
      
      



Windows dan Linux sangat berbeda dalam cara mereka memparalelkan. Linux dalam produksi lebih disukai daripada Windows.





Pola 2. Paralelisasi manual lokal

. . , . , %<-%



.





#  ,  20   10^5 
nn <- 10^5
tic("Generating sample data.frame")
df <- 100 %>%
  # stri_rand_strings(length = 10, pattern = "[a-z]") %>%
  sample(10^4:10^5, .) %>%
  sample(20 * nn, replace = TRUE) %>%
  matrix(byrow = TRUE, ncol = 20) %>%
  as_tibble(.name_repair = "universal") %>%
  mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>%
  #   
  mutate(ver = sample(1:20, n(), replace = TRUE)) %>%
  select(user_id, ver, everything())
toc()

#       
demo_fpath <- here::here("temp", "demo_data.xlsx")
openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
      
      



plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential)
# https://github.com/HenrikBengtsson/future

# ,     2
tic("   ")
tic("  ")
res_lst <- list()
for (j in 1:6) {
  res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)}
}
toc()
seq_df <- bind_rows(res_lst)
toc()


tic("   ")
tic("  ")
df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
toc()
par_df <- bind_rows(df1, df2, df3, df4, df5, df6)
toc()

all_equal(seq_df, par_df)
      
      



. . :





   : 46.23 sec elapsed
   : 37.82 sec elapsed
      
      



3.

. , , .





.

. $C_n^k$



. .





.




#  
flog_logname <- here::here("log", "job_futile.log")
lgr_logname <- here::here("log", "job_lgr.log")

initLogging <- function(log_file){

  lgr <- get_logger_glue("logger")

  lgr$set_propagate(FALSE)
  lgr$set_threshold("all")
  lgr$set_appenders(list(
    console = AppenderConsole$new(
      threshold = "info"
    ),
    file = AppenderFile$new(
      file = log_file,
      threshold = "all"
    )
  ))

  lgr  
}

invisible(flog.appender(appender.tee(flog_logname)))
invisible(flog.threshold(INFO))
lgr <- initLogging(lgr_logname)
      
      



"Start batch processing" %T>%
  flog.info() %T>%
  lgr$info()

#   
# https://github.com/HenrikBengtsson/doFuture
# https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html
registerDoFuture()
# future::plan(multiprocess)
future::plan(multisession, workers = parallel::detectCores())
# future::plan(sequential)
# plan(future.callr::callr)

tic("Batch processing")
start_time <- Sys.time()

foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"), 
        # .packages = 'futile.logger',
        .verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% {

          start <- Sys.time() - start_time

          #    
          flog.appender(appender.tee(flog_logname))
          lgr <- initLogging(lgr_logname)

          res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE)

          # https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/
          message("     message from thread")

          glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.",
               "PID: {Sys.getpid()}",
               "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->",
               .sep = " ") %T>%
            flog.info() %T>%
            lgr$info()

          #   
          return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time))
        } -> output_lst
flog.info("Foreach finished")

checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1)
output_tbl <- dplyr::bind_rows(output_lst)
# rm(output_lst)

#    --------------
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))
      
      



() () . windows.





.
#      
output_tbl %>%
  mutate_at("pid", as.factor) %>%
  mutate_at(vars(start, finish), as.numeric) %>%
  ggplot(aes(start, pid, colour = pid)) +
  geom_point(size = 3, alpha = .7) +
  geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") +
  geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) +
  ggthemes::scale_fill_tableau("Tableau 10") +
  theme_ipsum_rc() +
  xlim(c(0, 5))
      
      



, , , . , « »:





  1. (worker) . (, , …), . () .





  2. , core - 1, . , reduce , . .





  3. .





  4. , . , , ( , , API ..). .





  5. , . .





  6. Untuk sejumlah tugas yang terkait dengan permintaan sinkron panjang dari sistem eksternal (perwakilan tipikal adalah REST API / Web scrapping), Anda dapat membuat lebih banyak kalkulator daripada inti yang tersedia. Mereka masih menggantung sebagian besar waktu dalam mode siaga. Dapat dijalankan sebagai proses OS terpisah dengan mengkonfigurasi backend yang sesuai. registerDoFuture();



    plan(future.callr::callr).



    Ini adalah sisa 1/2 resep.





Publikasi sebelumnya - "Nuansa pengoperasian R solusi di lingkungan perusahaan?" ...








All Articles