public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException { ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); int numTaskTrackers = clusterStatus.getTaskTrackers(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); // // Get map + reduce counts for the current tracker. // int maxCurrentMapTasks = taskTracker.getMaxMapTasks(); int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks(); int numMaps = taskTracker.countMapTasks(); int numReduces = taskTracker.countReduceTasks(); // // Compute average map and reduce task numbers across pool // int remainingReduceLoad = 0; int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { int totalMapTasks = job.desiredMaps(); int totalReduceTasks = job.desiredReduces(); remainingMapLoad += (totalMapTasks - job.finishedMaps()); remainingReduceLoad += (totalReduceTasks - job.finishedReduces()); } } } // find out the maximum number of maps or reduces that we are willing // to run on any node. int maxMapLoad = 0; int maxReduceLoad = 0; if (numTaskTrackers > 0) { maxMapLoad = Math.min(maxCurrentMapTasks, (int) Math.ceil((double) remainingMapLoad / numTaskTrackers)); maxReduceLoad = Math.min(maxCurrentReduceTasks, (int) Math.ceil((double) remainingReduceLoad / numTaskTrackers)); } int totalMaps = clusterStatus.getMapTasks(); int totalMapTaskCapacity = clusterStatus.getMaxMapTasks(); int totalReduces = clusterStatus.getReduceTasks(); int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks(); // // In the below steps, we allocate first a map task (if appropriate), // and then a reduce task if appropriate. We go through all jobs // in order of job arrival; jobs only get serviced if their // predecessors are serviced, too. // // // We hand a task to the current taskTracker if the given machine // has a workload that's less than the maximum load of that kind of // task. // if (numMaps < maxMapLoad) { int totalNeededMaps = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } //这里是取得Task的地方,需要到job中去取 Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { return Collections.singletonList(t); } // // Beyond the highest-priority task, reserve a little // room for failures and speculative executions; don't // schedule tasks to the hilt. // totalNeededMaps += job.desiredMaps(); int padding = 0; if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) { padding = Math.min(maxCurrentMapTasks, (int)(totalNeededMaps * padFraction)); } if (totalMaps + padding >= totalMapTaskCapacity) { break; } } } } // // Same thing, but for reduce tasks // if (numReduces < maxReduceLoad) { int totalNeededReduces = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { return Collections.singletonList(t); } // // Beyond the highest-priority task, reserve a little // room for failures and speculative executions; don't // schedule tasks to the hilt. // totalNeededReduces += job.desiredReduces(); int padding = 0; if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) { padding = Math.min(maxCurrentReduceTasks, (int) (totalNeededReduces * padFraction)); } if (totalReduces + padding >= totalReduceTaskCapacity) { break; } } } } return null; }
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException { ............. //如果是接受新任务的话,让JotTracker去进行调度,这里会调用taskScheduler的assignTasks if (acceptNewTasks) { TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName); if (taskTrackerStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); } else { List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); //这里是准备assignTask的地方,由配置的调度器来决定怎样调度 if (tasks == null ) { tasks = taskScheduler.assignTasks(taskTrackerStatus); } if (tasks != null) { for (Task task : tasks) { expireLaunchingTasks.addNewTask(task.getTaskID()); LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID()); actions.add(new LaunchTaskAction(task)); } } } }
mark一下,供调度器做参考。
转载自:http://jiwenke.javaeye.com/blog/334146 和 http://jiwenke.javaeye.com/blog/335093
花了许多功夫把Hadoop的mapreduce实现过了一遍,基本线索理清楚了:
1. 任务的运行时TaskTracker通过heartbeat取得
2. TaskTracker得到hearbeatresponse之后,会根据封装在response里的action来决定行为
3. 如果是launchaction的话,调用TasklLauncher,在startNewTasks中的localizeJob调用 launchTaskForJob然后再TaskInProgress中launchTask让runner.start(); - 这里面的startNewTasks是在线程中的run方法中,而TaskLauncher的notifyall会把线程唤醒:
public void addToTaskQueue(LaunchTaskAction action) { synchronized (tasksToLaunch) { TaskInProgress tip = registerTask(action, this); tasksToLaunch.add(tip); tasksToLaunch.notifyAll(); } }
而这个addToTaskQueue方法是在offerService中调用的,这样就整过过程就街上了:
if (action instanceof LaunchTaskAction) { addToTaskQueue((LaunchTaskAction)action); } else if (action instanceof CommitTaskAction)
4. 这个时候就回到TaskRunner.run中去launchJVM,当然要把这个JVM的参数构造好,这个JVM就是我们看到map任务运行的JVM
5. 然后JVMRunner会spawn JVM, 这是通过shexe Child这个类来实现的,这个Child的main就是新起的JVM的主函数入口
6.在Child.main中会对任务的类型进行判断,调用相应的MapTask.run和ReduceTask.run
7.这个时候就可以看到mapper.map入口啦,然后就开始执行用户定义的mapper!
所以Child启动以后的log输出和前面TaskTracker的输出不在同一个文件里,因为已经不是一个虚拟机了。
--------------------------------------
上一节看到TaskTracker启动新任务的过程,这里接着看看在JobTracker中是怎样响应和调度的,在hadoop中,我们看到采用的是pull的方式拿到任务。
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, askForNewTask, heartbeatResponseId);
这里是TaskTracker想JobTracker发送heartbeat的地方 - 使用的是RPC,这样我们你就来到JobTracker了:
这个taskScheduler采用的是默认的
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass,conf);
这是在配置文件中指定的,"mapred.jobtracker.taskScheduler",常常是JobQueueTaskScheduler是hadoop的实现,FIFO类型的调度器,让我们看看这个调度器是怎样assignTasks的:
task的取得就要到JobInProgress中去obtainNewReduceTask了,需要对集群的状态进行查询处理了。
关于ssh的多主机key-based登陆,其实你可以用一个for循环,比如你的机器有3台,servername-{1,2,3},那么你可以在3个机器上运行下面的命令(放入脚本),这样你也不需要去修改authorized_keys文件的权限,创建.ssh目录等繁琐操作:#!/usr/bin/env bash# Description: Auto deploy ssh key to pvriode auto-login.echo StrictHostKeyChecking no' >>$HOME/.ssh/configssh-keygen -q -t rsa -N -f $HOME/.ssh/id_rsafor node in servername-{1,2,3} ; do ssh-copy-id -i $HOME/.ssh/id_rsa.pub root@$nodedone#Done脚本中echo的那一句是你作为ssh的client时,第一次ssh到一个server时要建立known_hosts时还要输入yes,比较麻烦。