Worker Nodes
These are the nodes that
will run map or reduce jobs. They are not expected to stay around forever and
in order to join the system, the user visiting the 3rd
party website must opt-in.
UI Elements - /src/views/worker.jade, /src/public/javascripts/worker.coffee
For the demo we talked
about having a basic website that explains Countdera
with the actual client on the side. This will allow audience members to join
the network and read about it at the same time.
Opt-in view
Something that says
“Click to help Science” or something to that effect.
Participant view
This could simply be a
white square with lines of text being appended to a scrollable element. The
lines of text would be status messages that are set at various parts of the
mapping or reducing jobs.
Functions - /src/public/javascripts/client.coffee, /src/public/javascripts/lib/models/client.coffee
These are actions that
will need to be performed in javascript for the
worker nodes.
opt_in()
//not actually there,
run is just called
This gets called when
the user clicks on the opt-in button. When this happens, initialize a client
object to get an Id, start heartbeating, and mark
state as IDLE.
finish_job()
When a node gets a JOB_DONE message from the server, it should
clean up its state and become IDLE
again. This includes clearing out local map_output
data and all reduce output lines.
Mapper Functions - same files as functions
start_map(map_start_message)
This function gets called
when the client object receives a MAP_START
type message from the server. This message needs to contain the job_id and the url
of the data that this mapper should retrieve. The first thing the client should
do is update it’s state from IDLE to MAPPER. The
client can then call get_data(url) on the url
and retrieve the mapping code from firebase with the job_id.
Once get_data(url)
completes, the mapper should run the mapping code on the data to generate the
map output data with run_map_job(data, map_code) where data is an array of lines of the input file
and map_code is the string representation of the
mapping code retrieved from Firebase.
Once run_map_job
completes, store the resulting data on the client object and call map_done(job_id).
get_data(url)
This function needs to
request the page specified by the url
and parse the data into an array of lines of strings. If we want to be fancy
with this function, we could try to find a library that strips away html so we
can parse arbitrary web pages (maybe something like readability?). This
function should return a list of lines of the input file and will likely need
to use a deferred so the start_map function can wait
on the data.
run_map_job(data, map_code)
This function actually
performs the mapping job. One way to do this is to first make a nested function
called emit(key, object) that when called adds the
key, object pair to an array that we will output later. Now that the emit
function is in scope, we can define another function called map(data)
that runs exec on the string representation of the mapping code. In order for
this to work we need to make sure that the mapping code has been stripped of
the first and last line (which would explain the map function interface they
are working with).
Once map(data)
has been defined, it can be called on the actual data, and then we output the
array that is added to by our emit(key, object) function. Note that since we’re
sending this, the object that they emit for each key can’t have functions or
anything that can’t be serialized through JSON.
map_done(job_id)
This function updates
the client’s state to MAPPER_DONE. It
then sends a message to the server MAP_DONE
that includes the job_id it was working on. The
server will then send back a REDUCE_NODES
message that will need to be processed.
send_map_data(reduce_node_list)
This function is sent by
the server to inform the mapper of a list of reducers to send the map output
data to. Whenever the mapper gets one of these messages, he should begin
sending all of the map output data to the proper reducers.
The sending could work
as follows: Given the number of reducers and the ordered list of them, iterate
through the list of all (key, object) pairs in the map_output
and hash the key, modulo the result with the number of reducers and send the
pair to the reducer at that index. This message should be a MAP_OUTPUT message that contains the index of
the mapper and the pair (the index being the index of the URL that was
requested in the list of URLs for the job). Firebase guarantees FIFO ordering
and delivery but we still need to make sure the reducer knows when it has all
of the results from the given mapper. Since we have FIFO, an easy way to do
this would be to send a END_OF_MAP_OUTPUT
message to the reducer that also includes the index so it knows when it has all
of the results from a mapper. In addition, we will need to send a START_OF_MAP_OUTPUT message to the reducer to
inform it to invalidate what it has already received from a mapper with our
index in the event that a previous mapper died.
After sending all of the
map data, the mapper needs to remain in the MAPPER_DONE
state until the entire job is done so that if a reducer dies, the server can
send another REDUCE_NODES message and
recover.
Reducer Functions - same files as Functions
start_reduce(start_reduce_message)
When a worker node
receives a START_REDUCE message, the
message should contain the job_id and the number of
mappers involved in the job. It’s important that the reducers only treat the
mappers by the index of them (instead of id) because the actual mappers could
change / die / be restarted. At this point the worker node should change its
state to REDUCER and it should start
listening for MAP_OUTPUT messages.
add_map_output(map_output_message)
The reducer should build
up a map on the client object of the map_output. The
map should go from mapper_index to the map output
pairs from each mapper. Once a END_OF_MAP_OUTPUT
message is received from the mapper index, a boolean
array should be updated and once the last one is received, we can call the start_reduce()
method for the reducer.
There’s an issue here
with determining if the mapper dies while sending the map output… the reducer
could wait here forever. We might need a system so that if it’s been more than
15 seconds or so since the last map_output came from
a mapper that isn’t done, we tell the server to restart the mapper.
do_reduce()
This should only be
called after the reducer has put all of the map output in the index to map_output pairs map. Once this is
called, this method should iterate through it and generate a new map from key
to lists of objects for each key. Once this new map has been generated, we need
to build up a local emit(output_line)
and reduce(key, list_of_objects) method to call the
reduce code with. Once again the reduce code is retrieved from the job id.
These functions are generated in a similar manner to the mappers
functions. After we make the functions, we iterate through the new map and pass
in each key, list_of_object pair to the reduce
function.
The emit(line)
function should output each line of the output and each line should be
associated with the key that the reduce function called so we should
effectively build up a new map of key -> list of output lines. This map
should then be sent to the IOServer once it is
completely built. This will be done in the finish_reduce() method. //which I
call immediately
finish_reduce()
This function should be
called when we have a full map of key -> output line. This function should
add to an append only queue on the OUTPUT node that the IOServer
is watching. The output objects should be added to a node named the job_id under the output node. Each output object should
include both the key and the list of lines. Once all objects have been sent to
Firebase, the reducer should send a REDUCE_DONE
message to the server. This should also change the node’s state to REDUCER_DONE.
Client Nodes
These are the nodes that
want to run a job. The client side javascript here
will handle submitting the job, monitoring its progress, and displaying a link
to the output file once it has been flushed to the disk.
UI Elements - /src/views/master.jade, /src/public/javascripts/master.coffee
This will be the page
that people see when they want to run their jobs. It should first show a textarea where they can paste their map code along with the
function header at the top and a close brace at the bottom with an indication
that they shouldn’t touch those lines in a comment. When someone hits Save
after entering the map code and tags, we call save_map_code(). Then we do the same
thing for reduce, then we show a textarea for the url input and when they click run, we fire off start_job().
Functions - /src/public/javascripts/master.coffee, /src/public/javascripts/lib/models/master.coffee
init()
This function makes a
firebase call to generate an Id for the job that will be run. This will allow
the mapper and reducer code ids to be stored and will include all current
statuses of the job.
save_map_code(map_code, tag_list)
This function takes in
the string representation of the code and pushes it on MAP_CODE reference which will generate a unique id for it. This id
is then pushed onto references under MAP_CODE_TAGS
(one for each tag in tag_list) for eventual easy
lookup. In addition, the map_id is saved in the job
state under the JOB_STATE reference’s
entry for the job_id generated in init().
save_reduce_code(reduce_code, tag_list)
This function does the
same thing as save_map_code but uses the REDUCE_CODE and REDUCE_CODE_TAGS
references instead.
save_urls(url_list)
This function adds the
list of urls to the job state reference to the job
and calls start_job(job_id).
start_job(job_id)
This function adds a
watcher on a status tag of the job reference so that we can display neat things
to the person running the job and know when it has finished. At this point, we
also send a START_JOB message to the
server with the job_id included.
finish_job(job_id)
When the client sees
that the link to the output of the job is in the job reference on Firebase, he
sends a FINISH_JOB message to the
server.
Server
At this point, the
server is in charge of knowing what nodes are alive in the system and making
sure jobs run to completion. This will be accomplished by the workers sending
messages to the server and the server sending messages to the workers and
updating the job state that the client is watching.
watch_job(job_id)
This is called when the
server receives a START_JOB message. At
this point, the server should create a new JobWatcher
object with the job_id, add it to a list of them, and
call the JobWatcher’s run()
method. Messages that are processed from the server will need to be dispatched
to the proper JobWatcher to handle. They should just
be pushed onto an array of messages that the JobWatcher
has.
finish_job(job_id)
When the server receives
a FINISH_JOB message, it calls this
method. This makes sure that the JobWatcher is
killed.
JobWatcher.run()
This method will start a
setInterval loop to check on the job and make sure
that progress is being made. This method will contain all of the logic for
updating the state value of the job that the client is watching and should
essentially operate as a state machine. When this is called for the first time,
it should update the state field on the right Firebase job reference to
STARTING before starting the loop.
Inside the loop, we
should read the state value of the job. Below I’ve listed out the possible
states and when to move from one to the other.
IOServer
This server watches the OUTPUT Firebase reference for child additions.
When a node is added, the result of a job is being appended to that node. We
start creating a file with the name of the node that was added and start
watching that node for line additions. For each addition, we write out the key
and each line to disk and remove the entry from the Firebase node to save
space.
Under the
IO_SERVER_MESSAGE_REF there will be three types of messages for reduces to
write out their output. The types are START_REDUCER_OUTPUT, REDUCER_OUTPUT, STOP_REDUCER_OUTPUT. START_REDUCER_OUTPUT and
STOP_REDUCER_OUTPUT will look like the following:
{ name: START_REDUCER_OUTPUT, reducer: id, job: jobid }
{ name: STOP_REDUCER_OUTPUT, reducer: id, job: jobid }
The REDUCER_OUTPUT will
look like
{ name: REDUCER_OUTPUT, reducer: id, key: key, lines:
lines }
{ name: START_JOB, numReducers:
number, job: jobid }