Locality scheduling redesign


  • A batch consists of a sequence of files F1 ... Fn and a set of jobs J1 ... Jm operating on these files.
  • Each job uses a contiguous set of files.
  • A given file may be used by many jobs.
  • The density of jobs in the file sequence may be variable.
  • Several batches may be in progress concurrently, for the same or different applications.


  • To complete batches quickly.
  • To minimize the amount of data transfer to hosts.

Scheduling policy

The policy in which we dispatch the jobs in order essentially sends every file to every host, so it fails to achieve the 2nd goal.

The ideal policy would start each host at a different point in the job space, separated according to their speeds. This would potentially send each file only to a single host. However, it's impractical for various reasons: replication, unreliability of hosts, and unpredictability of their speed.

Instead, we use a policy in which the set of hosts is divided into teams, and each team works on a different area of the job space. Teams should have these properties:

  • The fastest host in a team should be no faster than the total of the other hosts. Otherwise, it could get unboundedly far ahead of the others.
  • Subject to the above, teams should as small as possible. A good size might be 10 or 20.
  • The hosts in a team should belong to different users (for validation purposes).

Because of host churn, team membership is dynamic; e.g. a team may be empty for a period.


A cursor consists of

  • a team of hosts
  • a range of jobs
  • status information (see below)

Note: we discussed having a separate notion of "job range", allowing cursors to move from one job range to another, and allowing job ranges to be subdivided. I think this is needlessly complex.


New tables:

    // this table already exists; we may need to add fields to it

batch_host              // batch/host association table
    host_id integer
    batch_id integer
    cursor_id integer

    batch_id integer
    first_job_num integer
    last_job_num integer
        // range of jobs to be done
    expavg_credit double
        // sum of expavg_credit of hosts in the team
    remaining_credit double
        // estimated credit of unfinished jobs
        // zero means all jobs finished
    first_unfinished_job_num integer
        // all jobs before this have been completed
    first_ungenerated_job_num integer
        // we've generated workunit records for all jobs before this

workunit (new fields)
    cursor_id integer
    job_num integer

Creating a batch

To create a batch:

  • create batch record
  • based on # of hosts, # of jobs, and job density, create locality_cursor records


Assign host to cursors


est_time_left(cursor) = cursor.remaining_credit/cursor.expavg_credit

This is an estimate of the time needed to complete the cursor's jobs, given its current team.

For each batch:

If this is a new host (i.e. no batch_host record) then

  • assign host to cursor for this batch with least greatest est_time_left().
  • create batch_host record
  • add host's expavg_credit to cursor's expavg_credit

Otherwise, consider moving this host to a different cursor. Let C = host's cursor, and let D = the cursor for which est_time_left() is greatest. If est_time_left(C) < .5*est_time_left(D), then move this host to D. (This policy may need to be refined a bit to reduce moving hosts between cursors).

Assigning jobs

For each batch:

Enumerate unsent results for this host's cursor in order of increasing job num (i.e. finish old jobs before starting new ones). Send as many as the host can handle.

Some type of synchronization is needed; maybe

  • use a DB transaction (how much would this lock?)
  • explicitly lock the locality_cursor record (is this possible?)
  • use a semaphore per cursor

Deleting files

If the host has a sticky file that's not used by an unfinished job in its cursor, tell client to delete that file.

Note: names of sticky files should encode the batch and file number.

Work generator

Loop over batches and cursors. Try to maintain a cushion of N unsent jobs per cursor. Start generating jobs at cursor.first_ungenerated_job_num.


When a workunit is completed (successfully or not):

while cursor.first_unfinished_job_num is finished
    update cursor.remaining_credit


This runs periodically (every hour or day) and recomputes locality_cursor.expavg_credit. It doesn't have to be a separate program; it could be added to an existing daemon, like the feeder

Last modified 5 years ago Last modified on 08/14/12 17:17:04