This tutorial will show you how to perform parallel computation on a Hadoop cluster in R using Rhipe, with CSV files as input and output.

The EVE Online computer game universe consists of 5201 solar systems for players to explore and conquer in virtual spaceships. Each solar system is connected to an average 2.6 other systems by jump gates which allow instantaneous travel between systems.

A virtual spaceship is only truly safe when it is docked inside an indestructible space station. Inside a station players can repair damaged armor, reconfigure their weapons and shields, safely store precious cargo, and wait for reinforcements to arrive. There are 6101 space stations in the universe, although they are not evenly distributed – 3495 systems contain no station at all.

1132 of those space stations are ‘conquerable’, meaning the group of players who have exercised their military strength to claim that solar system as their own can decide who is allowed to dock their spaceship in the station. Conquered stations are partially named by players, for example: “L-C3O7 VII – Moon 5 – RIP Vile Rat”, a station orbiting the 5th moon of the 7th planet in solar system L-C307, named in memorial to Sean Smith.

The remaining stations are always open to the public, controlled by Non-Player Characters (NPC). 1638 systems contain at least one of these safe havens. Knowing where to find the nearest NPC station can be very useful when you have been ambushed by space pirates in an unfriendly region.

The following R scripts perform a parallel recursive search from the 3564 solar systems in EVE Online that don’t contain a NPC space station – through the network of jump gates – to the nearest solar system with an NPC space station.

This is my software stack, based on instructions provided by Tessera.io:

  • Rhipe v0.75.1.4
  • R version 3.2.0
  • Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
  • Hadoop 2.6.0-cdh5.4.2
  • CentOS release 6.6
  • Linux 2.6.32-504.23.4.el6.x86_64

You will need two input files: eve_systems.csv and eve_jumpgates.csv. These were extracted from Steve Ronuken’s MySQL conversion of the official EVE Static Data Export.

Create a home folder in the Hadoop filesystem (HDFS) for your user. Use sudo to impersonate the hdfs user for these commands.

sudo -u hdfs hadoop fs -mkdir /user/clint
sudo -u hdfs hadoop fs -chown clint /user/clint
sudo -u hdfs hadoop fs -chmod  700 /user/clint

If you stop a running R script that has started a Hadoop job, you have to kill the Hadoop job manually by finding its job ID, otherwise it will keep running in the background.

mapred job -list
mapred job -kill job_1435560979137_0045

This R script creates an archive file containing all of your R packages, then copies it into HDFS. You only need to run this once before you start (and again every time you update R or its packages).

#! /usr/bin/Rscript

library(Rhipe)
rhinit()

rhmkdir("/user/clint/bin")
hdfs.setwd("/user/clint/bin")
bashRhipeArchive("R.Pkg")

Now the main R script begins. In Hadoop the workers are called mappers and the worker function is called a map. Any objects called from the mapper are automatically copied to HDFS by Rhipe.

#! /usr/bin/Rscript

library(Rhipe)
rhinit()
rhoptions(zips = '/user/clint/bin/R.Pkg.tar.gz')
rhoptions(runner = 'sh ./R.Pkg/library/Rhipe/bin/RhipeMapReduce.sh')
hdfs.setwd("/user/clint")

eve_systems <- read.csv("eve_systems.csv")
eve_jumpgates <- read.csv("eve_jumpgates.csv")
solar_system_list <- eve_systems[1][[1]]

Randomise the input to evenly distribute the load. Without doing this, some mappers will have mostly short routes while others have mostly long routes.

solar_system_list <- sample(solar_system_list)

The numbers from 1 to N will become input for the mappers to be used as indexes for reading solar system IDs from solar_system_list.

N <- length(solar_system_list)

Set this to the number of available CPUs in your cluster.

mapper_count <- 128

Set the recursion depth limit to a low number for faster results, excluding longer routes. You will find 1009 results at recursion depth 5, 3179 at depth 25, and 3556 at 35.

recursion_limit <- 5

Define the recursive search function:

BestRoute <- NULL
NearestStationRecursion <- function(solarSystemID, visitedSystems) {
    # Keep a list of solar systems already visited on this route
    visitedSystems <- c(visitedSystems,solarSystemID)
    
    # Check neighbouring systems to see if they have NPC stations.
    for (i in which(eve_jumpgates$fromSolarSystemID == solarSystemID)) {
        toSolarSystemID <- eve_jumpgates[i,]["toSolarSystemID"][[1]]
        toSolarSystem <- eve_systems[which(eve_systems$solarSystemID == toSolarSystemID),]
        if (toSolarSystem["stations"] != 0) {
            NewRoute <- c(visitedSystems,toSolarSystemID)
            if (is.null(BestRoute) || length(NewRoute) < length(BestRoute)) {
                # This is the best route discovered so far.
                BestRoute <<- NewRoute
                return(NULL)
            }
        }
    }
    
    # If no matches were found, continue recursion through all neighbours.
    for (i in which(eve_jumpgates$fromSolarSystemID == solarSystemID)) {
        toSolarSystemID <- eve_jumpgates[i,]["toSolarSystemID"][[1]]
        
        # But only continue if we haven't visited this system already.
        if (!is.element(toSolarSystemID,visitedSystems)) {
            if (length(visitedSystems)+1 >= recursion_limit) {
                # Stop if the new route would exceed depth limit.
                return(NULL)
            }
            else if (!is.null(BestRoute) && length(visitedSystems)+1 >= length(BestRoute)) {
                # Stop if the new route would be longer than a route already found.
                return(NULL)
            }
            else {
                # This route is good, go deeper!
                NearestStationRecursion(toSolarSystemID,visitedSystems)
            }
        }
    }
}

Define the map function. map.values is a list of inputs for this mapper. map.keys is a list of indexes for the input list. rhcollect returns a key,value pair from the mapper – the key and value can be any R object.

map1 <- expression({        
    sapply(seq_along(map.keys), function(key) {

        thisSolarSystemID <- solar_system_list[map.values[[key]]]
        thisSolarSystem <- eve_systems[which(eve_systems$solarSystemID == thisSolarSystemID),]
        if (thisSolarSystem["stations"] != 0) {
            # This system already has a NPC station. Skip it.
        }
        else {
            BestRoute <<- NULL

            # Start the recursion.
            NearestStationRecursion(thisSolarSystemID,NULL)

            if (!is.null(BestRoute)) {
                rhcollect(thisSolarSystemID, tail(BestRoute,1))
            }
        }
    })
})

Finally, rhwatch runs the job and saves results to HDFS.

rhwatch_output <- rhwatch(
    map = map1,
    input = N,
    output = rhfmt("eve_output", type = "sequence"),
    mapred = list(mapreduce.job.maps = mapper_count,
        mapreduce.task.timeout = 0),
    readback = FALSE
)

Now the results are stored in HDFS. Read it out and save it to a CSV file with another R script:

#! /usr/bin/Rscript

library(Rhipe)
rhinit()
hdfs.setwd("/user/clint")

# Read the output back from HDFS.
messy_output <- rhread("eve_output")

# Reshape the output data.
flat_list <- unlist(messy_output)
n <- length(flat_list)
startsystem <- flat_list[seq(1,n,2)]
endsystem <- flat_list[seq(2,n,2)]
tidy_output <- data.frame(cbind(startsystem,endsystem))
write.csv(tidy_output, file = "eve_output.csv", row.names = FALSE)

If everything worked, the contents of eve_output.csv should look like this:

“startsystem”,”endsystem”
30000064,30000069
30000128,30000126
30000192,30000190
30000256,30000205
30001024,30001026
30001280,30001269