Worker Nodes

These are the nodes that will run map or reduce jobs. They are not expected to stay around forever and in order to join the system, the user visiting the 3rd party website must opt-in.

 

UI Elements - /src/views/worker.jade, /src/public/javascripts/worker.coffee

For the demo we talked about having a basic website that explains Countdera with the actual client on the side. This will allow audience members to join the network and read about it at the same time.

 

Opt-in view

Something that says “Click to help Science” or something to that effect.

 

Participant view

This could simply be a white square with lines of text being appended to a scrollable element. The lines of text would be status messages that are set at various parts of the mapping or reducing jobs.

 

Functions - /src/public/javascripts/client.coffee, /src/public/javascripts/lib/models/client.coffee

These are actions that will need to be performed in javascript for the worker nodes.

 

opt_in()

//not actually there, run is just called

This gets called when the user clicks on the opt-in button. When this happens, initialize a client object to get an Id, start heartbeating, and mark state as IDLE.

 

finish_job()

When a node gets a JOB_DONE message from the server, it should clean up its state and become IDLE again. This includes clearing out local map_output data and all reduce output lines.

 

Mapper Functions - same files as functions

start_map(map_start_message)

This function gets called when the client object receives a MAP_START type message from the server. This message needs to contain the job_id and the url of the data that this mapper should retrieve. The first thing the client should do is update it’s state from IDLE to MAPPER. The client can then call get_data(url) on the url and retrieve the mapping code from firebase with the job_id.

 

Once get_data(url) completes, the mapper should run the mapping code on the data to generate the map output data with run_map_job(data, map_code) where data is an array of lines of the input file and map_code is the string representation of the mapping code retrieved from Firebase.

 

Once run_map_job completes, store the resulting data on the client object and call map_done(job_id).

 

get_data(url)

This function needs to request the page specified by the url and parse the data into an array of lines of strings. If we want to be fancy with this function, we could try to find a library that strips away html so we can parse arbitrary web pages (maybe something like readability?). This function should return a list of lines of the input file and will likely need to use a deferred so the start_map function can wait on the data.

 

run_map_job(data, map_code)

This function actually performs the mapping job. One way to do this is to first make a nested function called emit(key, object) that when called adds the key, object pair to an array that we will output later. Now that the emit function is in scope, we can define another function called map(data) that runs exec on the string representation of the mapping code. In order for this to work we need to make sure that the mapping code has been stripped of the first and last line (which would explain the map function interface they are working with).

 

Once map(data) has been defined, it can be called on the actual data, and then we output the array that is added to by our emit(key, object) function. Note that since we’re sending this, the object that they emit for each key can’t have functions or anything that can’t be serialized through JSON.

 

map_done(job_id)

This function updates the client’s state to MAPPER_DONE. It then sends a message to the server MAP_DONE that includes the job_id it was working on. The server will then send back a REDUCE_NODES message that will need to be processed.

 

send_map_data(reduce_node_list)

This function is sent by the server to inform the mapper of a list of reducers to send the map output data to. Whenever the mapper gets one of these messages, he should begin sending all of the map output data to the proper reducers.

 

The sending could work as follows: Given the number of reducers and the ordered list of them, iterate through the list of all (key, object) pairs in the map_output and hash the key, modulo the result with the number of reducers and send the pair to the reducer at that index. This message should be a MAP_OUTPUT message that contains the index of the mapper and the pair (the index being the index of the URL that was requested in the list of URLs for the job). Firebase guarantees FIFO ordering and delivery but we still need to make sure the reducer knows when it has all of the results from the given mapper. Since we have FIFO, an easy way to do this would be to send a END_OF_MAP_OUTPUT message to the reducer that also includes the index so it knows when it has all of the results from a mapper. In addition, we will need to send a START_OF_MAP_OUTPUT message to the reducer to inform it to invalidate what it has already received from a mapper with our index in the event that a previous mapper died.

 

After sending all of the map data, the mapper needs to remain in the MAPPER_DONE state until the entire job is done so that if a reducer dies, the server can send another REDUCE_NODES message and recover.

 

Reducer Functions - same files as Functions

 

start_reduce(start_reduce_message)

When a worker node receives a START_REDUCE message, the message should contain the job_id and the number of mappers involved in the job. It’s important that the reducers only treat the mappers by the index of them (instead of id) because the actual mappers could change / die / be restarted. At this point the worker node should change its state to REDUCER and it should start listening for MAP_OUTPUT messages.

 

add_map_output(map_output_message)

The reducer should build up a map on the client object of the map_output. The map should go from mapper_index to the map output pairs from each mapper. Once a END_OF_MAP_OUTPUT message is received from the mapper index, a boolean array should be updated and once the last one is received, we can call the start_reduce() method for the reducer.

 

There’s an issue here with determining if the mapper dies while sending the map output… the reducer could wait here forever. We might need a system so that if it’s been more than 15 seconds or so since the last map_output came from a mapper that isn’t done, we tell the server to restart the mapper.

 

do_reduce()

This should only be called after the reducer has put all of the map output in the index to map_output pairs map. Once this is called, this method should iterate through it and generate a new map from key to lists of objects for each key. Once this new map has been generated, we need to build up a local emit(output_line) and reduce(key, list_of_objects) method to call the reduce code with. Once again the reduce code is retrieved from the job id. These functions are generated in a similar manner to the mappers functions. After we make the functions, we iterate through the new map and pass in each key, list_of_object pair to the reduce function.

 

The emit(line) function should output each line of the output and each line should be associated with the key that the reduce function called so we should effectively build up a new map of key -> list of output lines. This map should then be sent to the IOServer once it is completely built. This will be done in the finish_reduce() method. //which I call immediately

 

finish_reduce()

This function should be called when we have a full map of key -> output line. This function should add to an append only queue on the OUTPUT node that the IOServer is watching. The output objects should be added to a node named the job_id under the output node. Each output object should include both the key and the list of lines. Once all objects have been sent to Firebase, the reducer should send a REDUCE_DONE message to the server. This should also change the node’s state to REDUCER_DONE.

 

Client Nodes

These are the nodes that want to run a job. The client side javascript here will handle submitting the job, monitoring its progress, and displaying a link to the output file once it has been flushed to the disk.

 

UI Elements - /src/views/master.jade, /src/public/javascripts/master.coffee

This will be the page that people see when they want to run their jobs. It should first show a textarea where they can paste their map code along with the function header at the top and a close brace at the bottom with an indication that they shouldn’t touch those lines in a comment. When someone hits Save after entering the map code and tags, we call save_map_code(). Then we do the same thing for reduce, then we show a textarea for the url input and when they click run, we fire off start_job().

Functions - /src/public/javascripts/master.coffee, /src/public/javascripts/lib/models/master.coffee


init()

This function makes a firebase call to generate an Id for the job that will be run. This will allow the mapper and reducer code ids to be stored and will include all current statuses of the job.

 

save_map_code(map_code, tag_list)

This function takes in the string representation of the code and pushes it on MAP_CODE reference which will generate a unique id for it. This id is then pushed onto references under MAP_CODE_TAGS (one for each tag in tag_list) for eventual easy lookup. In addition, the map_id is saved in the job state under the JOB_STATE reference’s entry for the job_id generated in init().

 

save_reduce_code(reduce_code, tag_list)

This function does the same thing as save_map_code but uses the REDUCE_CODE and REDUCE_CODE_TAGS references instead.

 

save_urls(url_list)

This function adds the list of urls to the job state reference to the job and calls start_job(job_id).

 

start_job(job_id)

This function adds a watcher on a status tag of the job reference so that we can display neat things to the person running the job and know when it has finished. At this point, we also send a START_JOB message to the server with the job_id included.

 

finish_job(job_id)

When the client sees that the link to the output of the job is in the job reference on Firebase, he sends a FINISH_JOB message to the server.

 

Server

At this point, the server is in charge of knowing what nodes are alive in the system and making sure jobs run to completion. This will be accomplished by the workers sending messages to the server and the server sending messages to the workers and updating the job state that the client is watching.

 

watch_job(job_id)

This is called when the server receives a START_JOB message. At this point, the server should create a new JobWatcher object with the job_id, add it to a list of them, and call the JobWatcher’s run() method. Messages that are processed from the server will need to be dispatched to the proper JobWatcher to handle. They should just be pushed onto an array of messages that the JobWatcher has.

 

finish_job(job_id)

When the server receives a FINISH_JOB message, it calls this method. This makes sure that the JobWatcher is killed.

JobWatcher.run()

This method will start a setInterval loop to check on the job and make sure that progress is being made. This method will contain all of the logic for updating the state value of the job that the client is watching and should essentially operate as a state machine. When this is called for the first time, it should update the state field on the right Firebase job reference to STARTING before starting the loop.

 

Inside the loop, we should read the state value of the job. Below I’ve listed out the possible states and when to move from one to the other.

 

 

IOServer

This server watches the OUTPUT Firebase reference for child additions. When a node is added, the result of a job is being appended to that node. We start creating a file with the name of the node that was added and start watching that node for line additions. For each addition, we write out the key and each line to disk and remove the entry from the Firebase node to save space.

 

Under the IO_SERVER_MESSAGE_REF there will be three types of messages for reduces to write out their output. The types are START_REDUCER_OUTPUT, REDUCER_OUTPUT, STOP_REDUCER_OUTPUT. START_REDUCER_OUTPUT and STOP_REDUCER_OUTPUT will look like the following:

 

{ name: START_REDUCER_OUTPUT, reducer: id, job: jobid }

{ name: STOP_REDUCER_OUTPUT, reducer: id, job: jobid }

 

The REDUCER_OUTPUT will look like

 

{ name: REDUCER_OUTPUT, reducer: id, key: key, lines: lines }

 

{ name: START_JOB, numReducers: number, job: jobid }