I have found a solution that appears to work. Instead of calling
mpi.close.Rslaves() to shut down the slaves, I use
mpi.bcast.cmd(q("no")) and rely on the scheduler/mpirun to shut down
anything else. However, because I really don't know what I am doing, I
would appreciate it if anyone who sees something wrong with this would
let me know.
Mark Lyman, Statistician
ATK Launch Systems
mark.lyman@atk.com
(435) 863-2863
From: Lyman, Mark
Sent: Wednesday, July 16, 2008 10:06 AM
To: 'hyu@stats.uwo.ca'
Cc: 'r-help@r-project.org'; Palmer, Michael
Subject: Problem with mpi.close.Rslaves()
I am running R 2.7.0 on a Suse 9.1 linux cluster with a job scheduler
dispatching jobs and openmpi-1.0.1. I have tried running one of the
examples at http://ace.acadiau.ca/math/ACMMaC/Rmpi/examples.html in Rmpi
and they seem to be working, except mpi.close.Rslaves() hangs. The
slaves are closed, but the master doesn't finish its script. Below is
the example script and the call to R. The job is being run on a single 4
processor machine. Any suggestions?
Also is Rmpi using rexec to communicate? Can it use ssh if it doesn't
already?
mpirun -np 4 -machinefile /var/spool/PBS/aux/90361.head
/apps/R/R270/bin/R CMD BATCH --save Rmpi_test4.R
# Initialize MPI
library("Rmpi")
# Function the slaves will call to perform a validation on the
# fold equal to their slave number.
# Assumes: thedata,fold,foldNumber,p
foldslave <- function() {
# Note the use of the tag for sent messages:
# 1=ready_for_task, 2=done_task, 3=exiting
# Note the use of the tag for received messages:
# 1=task, 2=done_tasks
junk <- 0
done <- 0
while (done != 1) {
# Signal being ready to receive a new task
mpi.send.Robj(junk,0,1)
# Receive a task
task <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
task_info <- mpi.get.sourcetag()
tag <- task_info[2]
if (tag == 1) {
foldNumber <- task$foldNumber
rss <- double(p)
for (i in 1:p) {
# produce a linear model on the first i variables on
# training data
templm <- lm(y~.,data=thedata[fold!=foldNumber,1:(i+1)])
# produce predicted data from test data
yhat <-
predict(templm,newdata=thedata[fold==foldNumber,1:(i+1)])
# get rss of yhat-y
localrssresult <-
sum((yhat-thedata[fold==foldNumber,1])^2)
rss[i] <- localrssresult
}
# Send a results message back to the master
results <- list(result=rss,foldNumber=foldNumber)
mpi.send.Robj(results,0,2)
}
else if (tag == 2) {
done <- 1
}
# We'll just ignore any unknown messages
}
mpi.send.Robj(junk,0,3)
}
# We're in the parent.
# first make some data
n <- 1000 # number of obs
p <- 30 # number of variables
# Create data as a set of n samples of p independent variables,
# make a "random" beta with higher weights in the front.
# Generate y's as y = beta*x + random
x <- matrix(rnorm(n*p),n,p)
beta <- c(rnorm(p/2,0,5),rnorm(p/2,0,.25))
y <- x %*% beta + rnorm(n,0,20)
thedata <- data.frame(y=y,x=x)
fold <- rep(1:10,length=n)
fold <- sample(fold)
summary(lm(y~x))
# Now, send the data to the slaves
mpi.bcast.Robj2slave(thedata)
mpi.bcast.Robj2slave(fold)
mpi.bcast.Robj2slave(p)
# Send the function to the slaves
mpi.bcast.Robj2slave(foldslave)
# Call the function in all the slaves to get them ready to
# undertake tasks
mpi.bcast.cmd(foldslave())
# Create task list
tasks <- vector('list')
for (i in 1:10) {
tasks[[i]] <- list(foldNumber=i)
}
# Create data structure to store the results
rssresult = matrix(0,p,10)
junk <- 0
closed_slaves <- 0
n_slaves <- mpi.comm.size()-1
while (closed_slaves < n_slaves) {
# Receive a message from a slave
message <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
message_info <- mpi.get.sourcetag()
slave_id <- message_info[1]
tag <- message_info[2]
if (tag == 1) {
# slave is ready for a task. Give it the next task, or tell it
tasks
# are done if there are none.
if (length(tasks) > 0) {
# Send a task, and then remove it from the task list
mpi.send.Robj(tasks[[1]], slave_id, 1);
tasks[[1]] <- NULL
}
else {
mpi.send.Robj(junk, slave_id, 2)
}
}
else if (tag == 2) {
# The message contains results. Do something with the results.
# Store them in the data structure
foldNumber <- message$foldNumber
rssresult[,foldNumber] <- message$result
}
else if (tag == 3) {
# A slave has closed down.
closed_slaves <- closed_slaves + 1
}
}
# plot the results
plot(apply(rssresult,1,mean))
mpi.close.Rslaves()
mpi.exit()
Mark Lyman, Statistician
ATK Launch Systems
mark.lyman@atk.com
(435) 863-2863
[[alternative HTML version deleted]]