Andreas Kersting
2019-May-19 09:31 UTC
[Rd] most robust way to call R API functions from a secondary thread
Hi, As the subject suggests, I am looking for the most robust way to call an (arbitrary) function from the R API from another but the main POSIX thread in a package's code. I know that, "[c]alling any of the R API from threaded code is ?for experts only? and strongly discouraged. Many functions in the R API modify internal R data structures and might corrupt these data structures if called simultaneously from multiple threads. Most R API functions can signal errors, which must only happen on the R main thread." (https://cran.r-project.org/doc/manuals/r-release/R-exts.html#OpenMP-support) Let me start with my understanding of the related issues and possible solutions: 1) R API functions are generally not thread-safe and hence one must ensure, e.g. by using mutexes, that no two threads use the R API simultaneously 2) R uses longjmps on error and interrupts as well as for condition handling and it is undefined behaviour to do a longjmp from one thread to another; interrupts can be suspended before creating the threads by setting R_interrupts_suspended = TRUE; by wrapping the calls to functions from the R API with R_ToplevelExec(), longjmps across thread boundaries can be avoided; the only reason for R_ToplevelExec() itself to fail with an R-style error (longjmp) is a pointer protection stack overflow 3) R_CheckStack() might be executed (indirectly), which will (probably) signal a stack overflow because it only works correctly when called form the main thread (see https://cran.r-project.org/doc/manuals/r-release/R-exts.html#Threading-issues); in particular, any function that does allocations, e.g. via allocVector3() might end up calling it via GC -> finalizer -> ... -> eval; the only way around this problem which I could find is to adjust R_CStackLimit, which is outside of the official API; it can be set to -1 to disable the check or be changed to a value appropriate for the current thread 4) R sets signal handlers for several signals and some of them make use of the R API; hence, issues 1) - 3) apply; signal masks can be used to block delivery of signals to secondary threads in general and to the main thread while other threads are using the R API I basically have the following questions: a) Is my understanding of the issues accurate? b) Are there more things to consider when calling the R API from secondary threads? c) Are the solutions proposed appropriate? Are there scenarios in which they will fail to solve the issue? Or might they even cause new problems? d) Are there alternative/better solutions? Any feedback on this is highly appreciated. Below you can find a template which, combines the proposed solutions (and skips all non-illustrative checks of return values). Additionally, R_CheckUserInterrupt() is used in combination with R_UnwindProtect() to regularly check for interrupts from the main thread, while still being able to cleanly cancel the threads before fun_running_in_main_thread() is left via a longjmp. This is e.g. required if the secondary threads use memory which was allocated in fun_running_in_main_thread() using e.g. R_alloc(). Best regards, Andreas Kersting #include <Rinternals.h> #include <pthread.h> #include <signal.h> #include <stdint.h> extern uintptr_t R_CStackLimit; extern int R_PPStackTop; extern int R_PPStackSize; #include <R_ext/libextern.h> LibExtern Rboolean R_interrupts_suspended; LibExtern int R_interrupts_pending; extern void Rf_onintr(void); // mutex for exclusive access to the R API: static pthread_mutex_t r_api_mutex = PTHREAD_MUTEX_INITIALIZER; // a wrapper arround R_CheckUserInterrupt() which can be passed to R_UnwindProtect(): SEXP check_interrupt(void *data) { R_CheckUserInterrupt(); return R_NilValue; } // a wrapper arround Rf_onintr() which can be passed to R_UnwindProtect(): SEXP my_onintr(void *data) { Rf_onintr(); return R_NilValue; } // function called by R_UnwindProtect() to cleanup on interrupt void cleanfun(void *data, Rboolean jump) { if (jump) { // terminate threads cleanly ... } } void fun_calling_R_API(void *data) { // call some R API function, e.g. mkCharCE() ... } void *threaded_fun(void *td) { // ... pthread_mutex_lock(&r_api_mutex); // avoid false stack overflow error: intptr_t R_CStackLimit_old = R_CStackLimit; R_CStackLimit = -1; // R_ToplevelExec() below will call PROTECT 4x: if (R_PPStackTop > R_PPStackSize - 4) { // ppstack would overflow in R_ToplevelExec() -> handle this ... } // avoid longjmp to different thread: Rboolean ok = R_ToplevelExec(fun_calling_R_API, (void *) &some_data); // re-enable stack size checking: R_CStackLimit = R_CStackLimit_old; pthread_mutex_unlock(&r_api_mutex); if (!ok) { // handle error ... } // ... } SEXP fun_running_in_main_thread() { // ... /* create continuation token for R_UnwindProtect(): * * do this explicitly here before the threads are created because this might * fail in allocation or with pointer protection stack overflow */ SEXP cont = PROTECT(R_MakeUnwindCont()); /* block all signals: * * do this before the threads are created such that they inherit the mask */ sigset_t block_set, prev_mask; sigfillset(&block_set); pthread_sigmask(SIG_SETMASK, &block_set, &prev_mask); // suspend interrupts: Rboolean __oldsusp__ = R_interrupts_suspended; R_interrupts_suspended = TRUE; // create threads running threaded_fun() ... for(;;) { // timed blocking check if threads are done ... // unblock signals, check for interrupts and run cleanfun if there is one: pthread_mutex_lock(&r_api_mutex); pthread_sigmask(SIG_SETMASK, &prev_mask, NULL); R_interrupts_suspended = __oldsusp__; if (R_interrupts_pending && ! R_interrupts_suspended) { R_UnwindProtect(my_onintr, NULL, cleanfun, (void *) clean_data, cont); } R_UnwindProtect(check_interrupt, NULL, cleanfun, (void *) clean_data, cont); R_interrupts_suspended = TRUE; pthread_sigmask(SIG_SETMASK, &block_set, NULL); pthread_mutex_unlock(&r_api_mutex); } // now all threads are dead UNPROTECT(1); // continuation token // unblock signals: pthread_sigmask(SIG_SETMASK, &prev_mask, NULL); // reset interrupt-suspension: R_interrupts_suspended = __oldsusp__; if (R_interrupts_pending && ! R_interrupts_suspended) { Rf_onintr(); } // ... }
Stepan
2019-May-20 09:45 UTC
[Rd] most robust way to call R API functions from a secondary thread
Hi Andreas, note that with the introduction of ALTREP, as far as I understand, calls as "simple" as DATAPTR can execute arbitrary code (R or native). Even without ALTREP, if you execute user-provided R code via Rf_eval and such on some custom thread, you may end up executing native code of some package, which may assume it is executed only from the R main thread. Could you (1) decompose your problem in a way that in some initial phase you pull all the necessary data from R, then start the parallel computation, and then again in the R main thread "submit" the results back to the R world? If you wanted something really robust, you can (2) "send" the requests for R API usage to the R main thread and pause the worker thread until it receives the results back. This looks similar to what the "later" package does. Maybe you can even use that package for your purposes? Do you want to parallelize your code to achieve better performance? Even with your proposed solution, you need synchronization and chances are that excessive synchronization will severely affect the expected performance benefits of parallelization. If you do not need to synchronize that much, then the question is if you can do with (1) or (2). Best regards, Stepan On 19/05/2019 11:31, Andreas Kersting wrote:> Hi, > > As the subject suggests, I am looking for the most robust way to call an (arbitrary) function from the R API from another but the main POSIX thread in a package's code. > > I know that, "[c]alling any of the R API from threaded code is ?for experts only? and strongly discouraged. Many functions in the R API modify internal R data structures and might corrupt these data structures if called simultaneously from multiple threads. Most R API functions can signal errors, which must only happen on the R main thread." (https://urldefense.proofpoint.com/v2/url?u=https-3A__cran.r-2Dproject.org_doc_manuals_r-2Drelease_R-2Dexts.html-23OpenMP-2Dsupport&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=neKFCw86thQe2E2-61NAgpDMw4cC7oD_tUTTzraOkQM&m=d1r2raD4w0FF7spOVuz2IVEo0P_II3ZtSbw0TU2NmaE&s=JaadZR_m-QiJ3BQzzQ_fJPYt034tM5Ts6vKhdi6f__A&e=) > > Let me start with my understanding of the related issues and possible solutions: > > 1) R API functions are generally not thread-safe and hence one must ensure, e.g. by using mutexes, that no two threads use the R API simultaneously > > 2) R uses longjmps on error and interrupts as well as for condition handling and it is undefined behaviour to do a longjmp from one thread to another; interrupts can be suspended before creating the threads by setting R_interrupts_suspended = TRUE; by wrapping the calls to functions from the R API with R_ToplevelExec(), longjmps across thread boundaries can be avoided; the only reason for R_ToplevelExec() itself to fail with an R-style error (longjmp) is a pointer protection stack overflow > > 3) R_CheckStack() might be executed (indirectly), which will (probably) signal a stack overflow because it only works correctly when called form the main thread (see https://urldefense.proofpoint.com/v2/url?u=https-3A__cran.r-2Dproject.org_doc_manuals_r-2Drelease_R-2Dexts.html-23Threading-2Dissues&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=neKFCw86thQe2E2-61NAgpDMw4cC7oD_tUTTzraOkQM&m=d1r2raD4w0FF7spOVuz2IVEo0P_II3ZtSbw0TU2NmaE&s=J_TMw2gu43dxB_EX2vHbtF4Zr4bIAFR8RSFzvbRV6jE&e=); in particular, any function that does allocations, e.g. via allocVector3() might end up calling it via GC -> finalizer -> ... -> eval; the only way around this problem which I could find is to adjust R_CStackLimit, which is outside of the official API; it can be set to -1 to disable the check or be changed to a value appropriate for the current thread > > 4) R sets signal handlers for several signals and some of them make use of the R API; hence, issues 1) - 3) apply; signal masks can be used to block delivery of signals to secondary threads in general and to the main thread while other threads are using the R API > > > I basically have the following questions: > > a) Is my understanding of the issues accurate? > b) Are there more things to consider when calling the R API from secondary threads? > c) Are the solutions proposed appropriate? Are there scenarios in which they will fail to solve the issue? Or might they even cause new problems? > d) Are there alternative/better solutions? > > Any feedback on this is highly appreciated. > > Below you can find a template which, combines the proposed solutions (and skips all non-illustrative checks of return values). Additionally, R_CheckUserInterrupt() is used in combination with R_UnwindProtect() to regularly check for interrupts from the main thread, while still being able to cleanly cancel the threads before fun_running_in_main_thread() is left via a longjmp. This is e.g. required if the secondary threads use memory which was allocated in fun_running_in_main_thread() using e.g. R_alloc(). > > Best regards, > Andreas Kersting > > > > #include <Rinternals.h> > #include <pthread.h> > #include <signal.h> > #include <stdint.h> > > extern uintptr_t R_CStackLimit; > extern int R_PPStackTop; > extern int R_PPStackSize; > > #include <R_ext/libextern.h> > LibExtern Rboolean R_interrupts_suspended; > LibExtern int R_interrupts_pending; > extern void Rf_onintr(void); > > // mutex for exclusive access to the R API: > static pthread_mutex_t r_api_mutex = PTHREAD_MUTEX_INITIALIZER; > > // a wrapper arround R_CheckUserInterrupt() which can be passed to R_UnwindProtect(): > SEXP check_interrupt(void *data) { > R_CheckUserInterrupt(); > return R_NilValue; > } > > // a wrapper arround Rf_onintr() which can be passed to R_UnwindProtect(): > SEXP my_onintr(void *data) { > Rf_onintr(); > return R_NilValue; > } > > // function called by R_UnwindProtect() to cleanup on interrupt > void cleanfun(void *data, Rboolean jump) { > if (jump) { > // terminate threads cleanly ... > } > } > > void fun_calling_R_API(void *data) { > // call some R API function, e.g. mkCharCE() ... > } > > void *threaded_fun(void *td) { > > // ... > > pthread_mutex_lock(&r_api_mutex); > > // avoid false stack overflow error: > intptr_t R_CStackLimit_old = R_CStackLimit; > R_CStackLimit = -1; > > > // R_ToplevelExec() below will call PROTECT 4x: > if (R_PPStackTop > R_PPStackSize - 4) { > // ppstack would overflow in R_ToplevelExec() -> handle this ... > } > > // avoid longjmp to different thread: > Rboolean ok = R_ToplevelExec(fun_calling_R_API, (void *) &some_data); > > // re-enable stack size checking: > R_CStackLimit = R_CStackLimit_old; > pthread_mutex_unlock(&r_api_mutex); > > if (!ok) { > // handle error ... > } > > // ... > } > > SEXP fun_running_in_main_thread() { > > // ... > > /* create continuation token for R_UnwindProtect(): > * > * do this explicitly here before the threads are created because this might > * fail in allocation or with pointer protection stack overflow > */ > SEXP cont = PROTECT(R_MakeUnwindCont()); > > /* block all signals: > * > * do this before the threads are created such that they inherit the mask > */ > sigset_t block_set, prev_mask; > sigfillset(&block_set); > pthread_sigmask(SIG_SETMASK, &block_set, &prev_mask); > > // suspend interrupts: > Rboolean __oldsusp__ = R_interrupts_suspended; > R_interrupts_suspended = TRUE; > > // create threads running threaded_fun() ... > > for(;;) { > // timed blocking check if threads are done ... > > // unblock signals, check for interrupts and run cleanfun if there is one: > pthread_mutex_lock(&r_api_mutex); > pthread_sigmask(SIG_SETMASK, &prev_mask, NULL); > > R_interrupts_suspended = __oldsusp__; > if (R_interrupts_pending && ! R_interrupts_suspended) { > R_UnwindProtect(my_onintr, NULL, cleanfun, (void *) clean_data, cont); > } > > R_UnwindProtect(check_interrupt, NULL, cleanfun, (void *) clean_data, cont); > > R_interrupts_suspended = TRUE; > > pthread_sigmask(SIG_SETMASK, &block_set, NULL); > pthread_mutex_unlock(&r_api_mutex); > } > > // now all threads are dead > > UNPROTECT(1); // continuation token > > // unblock signals: > pthread_sigmask(SIG_SETMASK, &prev_mask, NULL); > > // reset interrupt-suspension: > R_interrupts_suspended = __oldsusp__; > if (R_interrupts_pending && ! R_interrupts_suspended) { > Rf_onintr(); > } > > // ... > } > ______________________________________________ > R-devel at r-project.org mailing list > https://urldefense.proofpoint.com/v2/url?u=https-3A__stat.ethz.ch_mailman_listinfo_r-2Ddevel&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=neKFCw86thQe2E2-61NAgpDMw4cC7oD_tUTTzraOkQM&m=d1r2raD4w0FF7spOVuz2IVEo0P_II3ZtSbw0TU2NmaE&s=Cv18jzHzhslwwzRcd4ztMRE_4xqphJz7bQ8lprgol6I&e>
Simon Urbanek
2019-May-20 13:54 UTC
[Rd] most robust way to call R API functions from a secondary thread
Stepan, Andreas gave a lot more thought into what you question in your reply. His question was how you can avoid what you where proposing and have proper threading under safe conditions. Having dealt with this before, I think Andreas' write up is pretty much the most complete analysis I have seen. I'd wait for Luke to chime in as the ultimate authority if he gets to it. The "classic" approach which you mention is to collect and allocate everything, then execute parallel code and then return. What Andres is proposing is obviously much more efficient: you only synchronize on R API calls which are likely a small fraction on the entire time while you keep all threads alive. His question was how to do that safely. (BTW: I really like the touch of counting frames that toplevel exec can use ;) - it may make sense to deal with that edge-case in R if we can ...). Cheers, Simon> On May 20, 2019, at 5:45 AM, Stepan <stepan.sindelar at oracle.com> wrote: > > Hi Andreas, > > note that with the introduction of ALTREP, as far as I understand, calls as "simple" as DATAPTR can execute arbitrary code (R or native). Even without ALTREP, if you execute user-provided R code via Rf_eval and such on some custom thread, you may end up executing native code of some package, which may assume it is executed only from the R main thread. > > Could you (1) decompose your problem in a way that in some initial phase you pull all the necessary data from R, then start the parallel computation, and then again in the R main thread "submit" the results back to the R world? > > If you wanted something really robust, you can (2) "send" the requests for R API usage to the R main thread and pause the worker thread until it receives the results back. This looks similar to what the "later" package does. Maybe you can even use that package for your purposes? > > Do you want to parallelize your code to achieve better performance? Even with your proposed solution, you need synchronization and chances are that excessive synchronization will severely affect the expected performance benefits of parallelization. If you do not need to synchronize that much, then the question is if you can do with (1) or (2). > > Best regards, > Stepan > > On 19/05/2019 11:31, Andreas Kersting wrote: >> Hi, >> As the subject suggests, I am looking for the most robust way to call an (arbitrary) function from the R API from another but the main POSIX thread in a package's code. >> I know that, "[c]alling any of the R API from threaded code is ?for experts only? and strongly discouraged. Many functions in the R API modify internal R data structures and might corrupt these data structures if called simultaneously from multiple threads. Most R API functions can signal errors, which must only happen on the R main thread." (https://urldefense.proofpoint.com/v2/url?u=https-3A__cran.r-2Dproject.org_doc_manuals_r-2Drelease_R-2Dexts.html-23OpenMP-2Dsupport&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=neKFCw86thQe2E2-61NAgpDMw4cC7oD_tUTTzraOkQM&m=d1r2raD4w0FF7spOVuz2IVEo0P_II3ZtSbw0TU2NmaE&s=JaadZR_m-QiJ3BQzzQ_fJPYt034tM5Ts6vKhdi6f__A&e=) >> Let me start with my understanding of the related issues and possible solutions: >> 1) R API functions are generally not thread-safe and hence one must ensure, e.g. by using mutexes, that no two threads use the R API simultaneously >> 2) R uses longjmps on error and interrupts as well as for condition handling and it is undefined behaviour to do a longjmp from one thread to another; interrupts can be suspended before creating the threads by setting R_interrupts_suspended = TRUE; by wrapping the calls to functions from the R API with R_ToplevelExec(), longjmps across thread boundaries can be avoided; the only reason for R_ToplevelExec() itself to fail with an R-style error (longjmp) is a pointer protection stack overflow >> 3) R_CheckStack() might be executed (indirectly), which will (probably) signal a stack overflow because it only works correctly when called form the main thread (see https://urldefense.proofpoint.com/v2/url?u=https-3A__cran.r-2Dproject.org_doc_manuals_r-2Drelease_R-2Dexts.html-23Threading-2Dissues&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=neKFCw86thQe2E2-61NAgpDMw4cC7oD_tUTTzraOkQM&m=d1r2raD4w0FF7spOVuz2IVEo0P_II3ZtSbw0TU2NmaE&s=J_TMw2gu43dxB_EX2vHbtF4Zr4bIAFR8RSFzvbRV6jE&e=); in particular, any function that does allocations, e.g. via allocVector3() might end up calling it via GC -> finalizer -> ... -> eval; the only way around this problem which I could find is to adjust R_CStackLimit, which is outside of the official API; it can be set to -1 to disable the check or be changed to a value appropriate for the current thread >> 4) R sets signal handlers for several signals and some of them make use of the R API; hence, issues 1) - 3) apply; signal masks can be used to block delivery of signals to secondary threads in general and to the main thread while other threads are using the R API >> I basically have the following questions: >> a) Is my understanding of the issues accurate? >> b) Are there more things to consider when calling the R API from secondary threads? >> c) Are the solutions proposed appropriate? Are there scenarios in which they will fail to solve the issue? Or might they even cause new problems? >> d) Are there alternative/better solutions? >> Any feedback on this is highly appreciated. >> Below you can find a template which, combines the proposed solutions (and skips all non-illustrative checks of return values). Additionally, R_CheckUserInterrupt() is used in combination with R_UnwindProtect() to regularly check for interrupts from the main thread, while still being able to cleanly cancel the threads before fun_running_in_main_thread() is left via a longjmp. This is e.g. required if the secondary threads use memory which was allocated in fun_running_in_main_thread() using e.g. R_alloc(). >> Best regards, >> Andreas Kersting >> #include <Rinternals.h> >> #include <pthread.h> >> #include <signal.h> >> #include <stdint.h> >> extern uintptr_t R_CStackLimit; >> extern int R_PPStackTop; >> extern int R_PPStackSize; >> #include <R_ext/libextern.h> >> LibExtern Rboolean R_interrupts_suspended; >> LibExtern int R_interrupts_pending; >> extern void Rf_onintr(void); >> // mutex for exclusive access to the R API: >> static pthread_mutex_t r_api_mutex = PTHREAD_MUTEX_INITIALIZER; >> // a wrapper arround R_CheckUserInterrupt() which can be passed to R_UnwindProtect(): >> SEXP check_interrupt(void *data) { >> R_CheckUserInterrupt(); >> return R_NilValue; >> } >> // a wrapper arround Rf_onintr() which can be passed to R_UnwindProtect(): >> SEXP my_onintr(void *data) { >> Rf_onintr(); >> return R_NilValue; >> } >> // function called by R_UnwindProtect() to cleanup on interrupt >> void cleanfun(void *data, Rboolean jump) { >> if (jump) { >> // terminate threads cleanly ... >> } >> } >> void fun_calling_R_API(void *data) { >> // call some R API function, e.g. mkCharCE() ... >> } >> void *threaded_fun(void *td) { >> // ... >> pthread_mutex_lock(&r_api_mutex); >> // avoid false stack overflow error: >> intptr_t R_CStackLimit_old = R_CStackLimit; >> R_CStackLimit = -1; >> // R_ToplevelExec() below will call PROTECT 4x: >> if (R_PPStackTop > R_PPStackSize - 4) { >> // ppstack would overflow in R_ToplevelExec() -> handle this ... >> } >> // avoid longjmp to different thread: >> Rboolean ok = R_ToplevelExec(fun_calling_R_API, (void *) &some_data); >> // re-enable stack size checking: >> R_CStackLimit = R_CStackLimit_old; >> pthread_mutex_unlock(&r_api_mutex); >> if (!ok) { >> // handle error ... >> } >> // ... >> } >> SEXP fun_running_in_main_thread() { >> // ... >> /* create continuation token for R_UnwindProtect(): >> * >> * do this explicitly here before the threads are created because this might >> * fail in allocation or with pointer protection stack overflow >> */ >> SEXP cont = PROTECT(R_MakeUnwindCont()); >> /* block all signals: >> * >> * do this before the threads are created such that they inherit the mask >> */ >> sigset_t block_set, prev_mask; >> sigfillset(&block_set); >> pthread_sigmask(SIG_SETMASK, &block_set, &prev_mask); >> // suspend interrupts: >> Rboolean __oldsusp__ = R_interrupts_suspended; >> R_interrupts_suspended = TRUE; >> // create threads running threaded_fun() ... >> for(;;) { >> // timed blocking check if threads are done ... >> // unblock signals, check for interrupts and run cleanfun if there is one: >> pthread_mutex_lock(&r_api_mutex); >> pthread_sigmask(SIG_SETMASK, &prev_mask, NULL); >> R_interrupts_suspended = __oldsusp__; >> if (R_interrupts_pending && ! R_interrupts_suspended) { >> R_UnwindProtect(my_onintr, NULL, cleanfun, (void *) clean_data, cont); >> } >> R_UnwindProtect(check_interrupt, NULL, cleanfun, (void *) clean_data, cont); >> R_interrupts_suspended = TRUE; >> pthread_sigmask(SIG_SETMASK, &block_set, NULL); >> pthread_mutex_unlock(&r_api_mutex); >> } >> // now all threads are dead >> UNPROTECT(1); // continuation token >> // unblock signals: >> pthread_sigmask(SIG_SETMASK, &prev_mask, NULL); >> // reset interrupt-suspension: >> R_interrupts_suspended = __oldsusp__; >> if (R_interrupts_pending && ! R_interrupts_suspended) { >> Rf_onintr(); >> } >> // ... >> } >> ______________________________________________ >> R-devel at r-project.org mailing list >> https://urldefense.proofpoint.com/v2/url?u=https-3A__stat.ethz.ch_mailman_listinfo_r-2Ddevel&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=neKFCw86thQe2E2-61NAgpDMw4cC7oD_tUTTzraOkQM&m=d1r2raD4w0FF7spOVuz2IVEo0P_II3ZtSbw0TU2NmaE&s=Cv18jzHzhslwwzRcd4ztMRE_4xqphJz7bQ8lprgol6I&e>> > > ______________________________________________ > R-devel at r-project.org mailing list > https://stat.ethz.ch/mailman/listinfo/r-devel >
Tierney, Luke
2019-May-20 17:29 UTC
[Rd] [External] most robust way to call R API functions from a secondary thread
Your analysis looks pretty complete to me and your solutions seems plausible. That said, I don't know that I would have the level of confidence yet that we haven't missed an important point that I would want before going down this route. Losing stack checking is risky; it might be eventually possible to provide some support for this to be handled via a thread-local variable. Ensuring that R_ToplevelExec can't jump before entering the body function would be a good idea; if you want to propose a patch we can have a look. Best, luke On Sun, 19 May 2019, Andreas Kersting wrote:> Hi, > > As the subject suggests, I am looking for the most robust way to call an (arbitrary) function from the R API from another but the main POSIX thread in a package's code. > > I know that, "[c]alling any of the R API from threaded code is ?for experts only? and strongly discouraged. Many functions in the R API modify internal R data structures and might corrupt these data structures if called simultaneously from multiple threads. Most R API functions can signal errors, which must only happen on the R main thread." (https://cran.r-project.org/doc/manuals/r-release/R-exts.html#OpenMP-support) > > Let me start with my understanding of the related issues and possible solutions: > > 1) R API functions are generally not thread-safe and hence one must ensure, e.g. by using mutexes, that no two threads use the R API simultaneously > > 2) R uses longjmps on error and interrupts as well as for condition handling and it is undefined behaviour to do a longjmp from one thread to another; interrupts can be suspended before creating the threads by setting R_interrupts_suspended = TRUE; by wrapping the calls to functions from the R API with R_ToplevelExec(), longjmps across thread boundaries can be avoided; the only reason for R_ToplevelExec() itself to fail with an R-style error (longjmp) is a pointer protection stack overflow > > 3) R_CheckStack() might be executed (indirectly), which will (probably) signal a stack overflow because it only works correctly when called form the main thread (see https://cran.r-project.org/doc/manuals/r-release/R-exts.html#Threading-issues); in particular, any function that does allocations, e.g. via allocVector3() might end up calling it via GC -> finalizer -> ... -> eval; the only way around this problem which I could find is to adjust R_CStackLimit, which is outside of the official API; it can be set to -1 to disable the check or be changed to a value appropriate for the current thread > > 4) R sets signal handlers for several signals and some of them make use of the R API; hence, issues 1) - 3) apply; signal masks can be used to block delivery of signals to secondary threads in general and to the main thread while other threads are using the R API > > > I basically have the following questions: > > a) Is my understanding of the issues accurate? > b) Are there more things to consider when calling the R API from secondary threads? > c) Are the solutions proposed appropriate? Are there scenarios in which they will fail to solve the issue? Or might they even cause new problems? > d) Are there alternative/better solutions? > > Any feedback on this is highly appreciated. > > Below you can find a template which, combines the proposed solutions (and skips all non-illustrative checks of return values). Additionally, R_CheckUserInterrupt() is used in combination with R_UnwindProtect() to regularly check for interrupts from the main thread, while still being able to cleanly cancel the threads before fun_running_in_main_thread() is left via a longjmp. This is e.g. required if the secondary threads use memory which was allocated in fun_running_in_main_thread() using e.g. R_alloc(). > > Best regards, > Andreas Kersting > > > > #include <Rinternals.h> > #include <pthread.h> > #include <signal.h> > #include <stdint.h> > > extern uintptr_t R_CStackLimit; > extern int R_PPStackTop; > extern int R_PPStackSize; > > #include <R_ext/libextern.h> > LibExtern Rboolean R_interrupts_suspended; > LibExtern int R_interrupts_pending; > extern void Rf_onintr(void); > > // mutex for exclusive access to the R API: > static pthread_mutex_t r_api_mutex = PTHREAD_MUTEX_INITIALIZER; > > // a wrapper arround R_CheckUserInterrupt() which can be passed to R_UnwindProtect(): > SEXP check_interrupt(void *data) { > R_CheckUserInterrupt(); > return R_NilValue; > } > > // a wrapper arround Rf_onintr() which can be passed to R_UnwindProtect(): > SEXP my_onintr(void *data) { > Rf_onintr(); > return R_NilValue; > } > > // function called by R_UnwindProtect() to cleanup on interrupt > void cleanfun(void *data, Rboolean jump) { > if (jump) { > // terminate threads cleanly ... > } > } > > void fun_calling_R_API(void *data) { > // call some R API function, e.g. mkCharCE() ... > } > > void *threaded_fun(void *td) { > > // ... > > pthread_mutex_lock(&r_api_mutex); > > // avoid false stack overflow error: > intptr_t R_CStackLimit_old = R_CStackLimit; > R_CStackLimit = -1; > > > // R_ToplevelExec() below will call PROTECT 4x: > if (R_PPStackTop > R_PPStackSize - 4) { > // ppstack would overflow in R_ToplevelExec() -> handle this ... > } > > // avoid longjmp to different thread: > Rboolean ok = R_ToplevelExec(fun_calling_R_API, (void *) &some_data); > > // re-enable stack size checking: > R_CStackLimit = R_CStackLimit_old; > pthread_mutex_unlock(&r_api_mutex); > > if (!ok) { > // handle error ... > } > > // ... > } > > SEXP fun_running_in_main_thread() { > > // ... > > /* create continuation token for R_UnwindProtect(): > * > * do this explicitly here before the threads are created because this might > * fail in allocation or with pointer protection stack overflow > */ > SEXP cont = PROTECT(R_MakeUnwindCont()); > > /* block all signals: > * > * do this before the threads are created such that they inherit the mask > */ > sigset_t block_set, prev_mask; > sigfillset(&block_set); > pthread_sigmask(SIG_SETMASK, &block_set, &prev_mask); > > // suspend interrupts: > Rboolean __oldsusp__ = R_interrupts_suspended; > R_interrupts_suspended = TRUE; > > // create threads running threaded_fun() ... > > for(;;) { > // timed blocking check if threads are done ... > > // unblock signals, check for interrupts and run cleanfun if there is one: > pthread_mutex_lock(&r_api_mutex); > pthread_sigmask(SIG_SETMASK, &prev_mask, NULL); > > R_interrupts_suspended = __oldsusp__; > if (R_interrupts_pending && ! R_interrupts_suspended) { > R_UnwindProtect(my_onintr, NULL, cleanfun, (void *) clean_data, cont); > } > > R_UnwindProtect(check_interrupt, NULL, cleanfun, (void *) clean_data, cont); > > R_interrupts_suspended = TRUE; > > pthread_sigmask(SIG_SETMASK, &block_set, NULL); > pthread_mutex_unlock(&r_api_mutex); > } > > // now all threads are dead > > UNPROTECT(1); // continuation token > > // unblock signals: > pthread_sigmask(SIG_SETMASK, &prev_mask, NULL); > > // reset interrupt-suspension: > R_interrupts_suspended = __oldsusp__; > if (R_interrupts_pending && ! R_interrupts_suspended) { > Rf_onintr(); > } > > // ... > } > ______________________________________________ > R-devel at r-project.org mailing list > https://stat.ethz.ch/mailman/listinfo/r-devel >-- Luke Tierney Ralph E. Wareham Professor of Mathematical Sciences University of Iowa Phone: 319-335-3386 Department of Statistics and Fax: 319-335-3017 Actuarial Science 241 Schaeffer Hall email: luke-tierney at uiowa.edu Iowa City, IA 52242 WWW: http://www.stat.uiowa.edu
Andreas Kersting
2019-May-21 19:54 UTC
[Rd] [External] most robust way to call R API functions from a secondary thread
Hi Luke, Thanks also for your feedback! I will then follow the proposed route for the problem at hand and I will report back if I encounter any issues. I am also going look into the issues of stack checking and R_ToplevelExec. Regards, Andreas 2019-05-20 19:29 GMT+02:00 Tierney, Luke<luke-tierney at uiowa.edu>:> Your analysis looks pretty complete to me and your solutions seemsplausible. That said, I don't know that I would have the level of > confidence yet that we haven't missed an important point that I would > want before going down this route. > > Losing stack checking is risky; it might be eventually possible to > provide some support for this to be handled via a thread-local > variable. Ensuring that R_ToplevelExec can't jump before entering the > body function would be a good idea; if you want to propose a patch we > can have a look. > > Best, > > luke > > On Sun, 19 May 2019, Andreas Kersting wrote: > >> Hi, >> >> As the subject suggests, I am looking for the most robust way to call an (arbitrary) function from the R API from another but the main POSIX thread in a package's code. >> >> I know that, "[c]alling any of the R API from threaded code is ?for experts only? and strongly discouraged. Many functions in the R API modify internal R data structures and might corrupt these data structures if called simultaneously from multiple threads. Most R API functions can signal errors, which must only happen on the R main thread." (https://cran.r-project.org/doc/manuals/r-release/R-exts.html#OpenMP-support) >> >> Let me start with my understanding of the related issues and possible solutions: >> >> 1) R API functions are generally not thread-safe and hence one must ensure, e.g. by using mutexes, that no two threads use the R API simultaneously >> >> 2) R uses longjmps on error and interrupts as well as for condition handling and it is undefined behaviour to do a longjmp from one thread to another; interrupts can be suspended before creating the threads by setting R_interrupts_suspended = TRUE; by wrapping the calls to functions from the R API with R_ToplevelExec(), longjmps across thread boundaries can be avoided; the only reason for R_ToplevelExec() itself to fail with an R-style error (longjmp) is a pointer protection stack overflow >> >> 3) R_CheckStack() might be executed (indirectly), which will (probably) signal a stack overflow because it only works correctly when called form the main thread (see https://cran.r-project.org/doc/manuals/r-release/R-exts.html#Threading-issues); in particular, any function that does allocations, e.g. via allocVector3() might end up calling it via GC -> finalizer -> ... -> eval; the only way around this problem which I could find is to adjust R_CStackLimit, which is outside of the official API; it can be set to -1 to disable the check or be changed to a value appropriate for the current thread >> >> 4) R sets signal handlers for several signals and some of them make use of the R API; hence, issues 1) - 3) apply; signal masks can be used to block delivery of signals to secondary threads in general and to the main thread while other threads are using the R API >> >> >> I basically have the following questions: >> >> a) Is my understanding of the issues accurate? >> b) Are there more things to consider when calling the R API from secondary threads? >> c) Are the solutions proposed appropriate? Are there scenarios in which they will fail to solve the issue? Or might they even cause new problems? >> d) Are there alternative/better solutions? >> >> Any feedback on this is highly appreciated. >> >> Below you can find a template which, combines the proposed solutions (and skips all non-illustrative checks of return values). Additionally, R_CheckUserInterrupt() is used in combination with R_UnwindProtect() to regularly check for interrupts from the main thread, while still being able to cleanly cancel the threads before fun_running_in_main_thread() is left via a longjmp. This is e.g. required if the secondary threads use memory which was allocated in fun_running_in_main_thread() using e.g. R_alloc(). >> >> Best regards, >> Andreas Kersting >> >> >> >> #include <Rinternals.h> >> #include <pthread.h> >> #include <signal.h> >> #include <stdint.h> >> >> extern uintptr_t R_CStackLimit; >> extern int R_PPStackTop; >> extern int R_PPStackSize; >> >> #include <R_ext/libextern.h> >> LibExtern Rboolean R_interrupts_suspended; >> LibExtern int R_interrupts_pending; >> extern void Rf_onintr(void); >> >> // mutex for exclusive access to the R API: >> static pthread_mutex_t r_api_mutex = PTHREAD_MUTEX_INITIALIZER; >> >> // a wrapper arround R_CheckUserInterrupt() which can be passed to R_UnwindProtect(): >> SEXP check_interrupt(void *data) { >> R_CheckUserInterrupt(); >> return R_NilValue; >> } >> >> // a wrapper arround Rf_onintr() which can be passed to R_UnwindProtect(): >> SEXP my_onintr(void *data) { >> Rf_onintr(); >> return R_NilValue; >> } >> >> // function called by R_UnwindProtect() to cleanup on interrupt >> void cleanfun(void *data, Rboolean jump) { >> if (jump) { >> // terminate threads cleanly ... >> } >> } >> >> void fun_calling_R_API(void *data) { >> // call some R API function, e.g. mkCharCE() ... >> } >> >> void *threaded_fun(void *td) { >> >> // ... >> >> pthread_mutex_lock(&r_api_mutex); >> >> // avoid false stack overflow error: >> intptr_t R_CStackLimit_old = R_CStackLimit; >> R_CStackLimit = -1; >> >> >> // R_ToplevelExec() below will call PROTECT 4x: >> if (R_PPStackTop > R_PPStackSize - 4) { >> // ppstack would overflow in R_ToplevelExec() -> handle this ... >> } >> >> // avoid longjmp to different thread: >> Rboolean ok = R_ToplevelExec(fun_calling_R_API, (void *) &some_data); >> >> // re-enable stack size checking: >> R_CStackLimit = R_CStackLimit_old; >> pthread_mutex_unlock(&r_api_mutex); >> >> if (!ok) { >> // handle error ... >> } >> >> // ... >> } >> >> SEXP fun_running_in_main_thread() { >> >> // ... >> >> /* create continuation token for R_UnwindProtect(): >> * >> * do this explicitly here before the threads are created because this might >> * fail in allocation or with pointer protection stack overflow >> */ >> SEXP cont = PROTECT(R_MakeUnwindCont()); >> >> /* block all signals: >> * >> * do this before the threads are created such that they inherit the mask >> */ >> sigset_t block_set, prev_mask; >> sigfillset(&block_set); >> pthread_sigmask(SIG_SETMASK, &block_set, &prev_mask); >> >> // suspend interrupts: >> Rboolean __oldsusp__ = R_interrupts_suspended; >> R_interrupts_suspended = TRUE; >> >> // create threads running threaded_fun() ... >> >> for(;;) { >> // timed blocking check if threads are done ... >> >> // unblock signals, check for interrupts and run cleanfun if there is one: >> pthread_mutex_lock(&r_api_mutex); >> pthread_sigmask(SIG_SETMASK, &prev_mask, NULL); >> >> R_interrupts_suspended = __oldsusp__; >> if (R_interrupts_pending && ! R_interrupts_suspended) { >> R_UnwindProtect(my_onintr, NULL, cleanfun, (void *) clean_data, cont); >> } >> >> R_UnwindProtect(check_interrupt, NULL, cleanfun, (void *) clean_data, cont); >> >> R_interrupts_suspended = TRUE; >> >> pthread_sigmask(SIG_SETMASK, &block_set, NULL); >> pthread_mutex_unlock(&r_api_mutex); >> } >> >> // now all threads are dead >> >> UNPROTECT(1); // continuation token >> >> // unblock signals: >> pthread_sigmask(SIG_SETMASK, &prev_mask, NULL); >> >> // reset interrupt-suspension: >> R_interrupts_suspended = __oldsusp__; >> if (R_interrupts_pending && ! R_interrupts_suspended) { >> Rf_onintr(); >> } >> >> // ... >> } >> ______________________________________________ >> R-devel at r-project.org mailing list >> https://stat.ethz.ch/mailman/listinfo/r-devel >> > > -- > Luke Tierney > Ralph E. Wareham Professor of Mathematical Sciences > University of Iowa Phone: 319-335-3386 > Department of Statistics and Fax: 319-335-3017 > Actuarial Science > 241 Schaeffer Hall email: luke-tierney at uiowa.edu > Iowa City, IA 52242 WWW: http://www.stat.uiowa.edu