wiki:LocalityNew

Version 3 (modified by davea, 12 years ago) (diff)

--

Locality scheduling redesign

Assumptions

  • A batch consists of a sequence of files F1 ... Fn and a set of jobs J1 ... Jm operating on these files.
  • Each job uses a contiguous set of files.
  • A given file may be used by many jobs.
  • The density of jobs in the file sequence may be variable.
  • Several batches may be in progress concurrently, for the same or different applications.

Goals

  • To complete batches quickly.
  • To minimize the amount of data transfer to hosts.

Scheduling policy

The policy in which we dispatch the jobs in order essentially sends every file to every host, so it fails to achieve the 2nd goal.

The ideal policy would start each host at a different point in the job space, separated according to their speeds. This would potentially send each file only to a single host. However, it's impractical for various reasons: replication, unreliability of hosts, and unpredictability of their speed.

Instead, we use a policy in which the set of hosts is divided into teams, and each team works on a different area of the job space. Teams should have these properties:

  • The fastest host in a team should be no faster than the total of the other hosts. Otherwise, it could get unboundedly far ahead of the others.
  • Subject to the above, teams should as small as possible. A good size might be 10 or 20.
  • The hosts in a team should belong to different users (for validation purposes).

Because of host churn, team membership is dynamic; e.g. a team may be empty for a period.

Design

A cursor consists of

  • a team of hosts
  • a range of jobs
  • status information (see below)

Note: we discussed having a separate notion of "job range", allowing cursors to move from one job range to another, and allowing job ranges to be subdivided. I think this is needlessly complex.

Database

New tables:

batch
	// this table already exists; we may need to add fields to it

batch_host              // batch/host association table
	host_id integer
	batch_id integer
	cursor_id integer

locality_cursor
	batch_id integer
	expavg_credit double
		// sum of expavg_credit of hosts in the team
	first_job_num integer
	last_job_num integer
	first_unfinished_job_num integer
		// all jobs before this have been completed
	first_ungenerated_job_num integer
		// we've generated workunit records for all jobs before this
	index on (batch_id, expavg_credit)

workunit (new fields)
	cursor_id integer
	job_num integer

Initialization

To initialize a batch:

  • create batch record
  • based on # of hosts, # of jobs, and job density, create locality_cursor records

Scheduler

Assign host to cursors

For each batch:

If this is a new host (i.e. no batch_host record) then

  • assign host to cursor for this batch with least expavg_credit
  • create batch_host record
  • add host's expavg_credit to cursor's expavg_credit

Otherwise, consider moving this host to a different cursor. Let C = host's cursor. If C.expavg_credit > 2*lowest expavg among cursors, then move this host to that cursor. (This policy may need to be refined a bit).

Assigning jobs

For each batch:

Enumerate unsent results for this host's cursor in order of increasing job num (i.e. finish old jobs before starting new ones). Send as many as the host can handle.

Some type of synchronization is needed; maybe

  • use a DB transaction (how much would this lock?)
  • explicitly lock the locality_cursor record (is this possible?)
  • use a semaphore per cursor

Deleting files

If the host has a file that's used only by finished jobs, tell client to delete it.

Note: names of sticky files should encode the batch and file number.

Work generator

Loop over batches and cursors. Try to maintain a cushion of N unsent jobs per cursor. Start generating jobs at cursor.first_ungenerated_job_num.

Transitioner

When a workunit is completed (successfully or not):

if workunit.job_num == cursor.first_unfinished_job_num
	cursor.first_unfinished_job_num++

locality_daemon

This runs periodically (every hour or day) and recomputes locality_cursor.expavg_credit. It doesn't have to be a separate program; it could be added to an existing daemon, like the feeder