# Submit job
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientProtocolProvider.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/target/generated-sources/proto/org/apache/hadoop/yarn/proto/ClientRMProtocol.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
CLI
run
Cluster cluster = new Cluster(getConf());
Job job = Job.getInstance(cluster, new JobConf(submitJobFile));
job.submit();
Cluster
Job
submit
final JobSubmitter submitter = new JobSubmitter(cluster.getFileSystem(), cluster.getClient());
submitter.submitJobInternal
JobSubmitter
submitJobInternal
copyAndConfigureFiles
writeSplits
conf.setInt(MRJobConfig.NUM_MAPS, maps);
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); // submitClient is a ClientProtocol.
copyAndConfigureFiles
copyJar
JobSubmissionFiles
YarnClientProtocolProvider
YARNRunner
submitJob
ApplicationSubmissionContext appContext = // the application's first container, i.e. application master
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);
createApplicationSubmissionContext
Vector vargs = new Vector(8);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); // the main class of application master that is defined in MRJobConfig
MRJobConfig
public static final String APPLICATION_MASTER_CLASS =
"org.apache.hadoop.mapreduce.v2.app.MRAppMaster"
ResourceMgrDelegate
ResourceMgrDelegate
YarnRPC rpc = YarnRPC.create(conf); // default is HadoopYarnProtoRPC
applicationsManager =
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, appsManagerServerConf);
submitApplication
applicationsManager.submitApplication(request); // applicationsManager is a ClientRMProtocol.
HadoopYarnProtoRPC
getProxy
RpcFactoryProvider.getClientFactory(myConf).getClient(protocol, 1, addr, myConf); // getClient returns ClientRMProtocolPBClientImpl
RpcFactoryProvider
getClientFactory
String clientFactoryClassName = conf.get(YarnConfiguration.IPC_CLIENT_FACTORY);
if (clientFactoryClassName == null)
return RpcClientFactoryPBImpl.get();
else
return(RpcClientFactory) getFactoryClassInstance(clientFactoryClassName);
RpcClientFactoryPBImpl
get
return RpcClientFactoryPBImpl.self;
getClient
ClientRMService // The client interface to the Resource Manager.
submitApplication(SubmitApplicationRequest request)
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext)); // rmAppManager is a RMAppManager
RMAppManager
handle
submitApplication(submissionContext);
submitApplication(ApplicationSubmissionContext submissionContext)
application = new RMAppImpl(applicationId, rmContext,
this.conf, submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext, clientTokenStr,
appStore, rmContext.getAMLivelinessMonitor(), this.scheduler,
this.masterService);
if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) // duplicate application
application.handle(new RMAppRejectedEvent(applicationId,
"Application with this id is already present! Cannot add a duplicate!"));
else
this.rmContext.getDispatcher().getEventHandler().handle( // GenericEventHandler
new RMAppEvent(applicationId, RMAppEventType.START));
RMAppImpl // has a state machine as JobImpl
StartAppAttemptTransition // triggered by RMAppEventType.START
transition
app.createNewAttempt();
createNewAttempt
RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
clientTokenStr, rmContext, scheduler, masterService,
submissionContext);
dispatcher.getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START)); // see RMAppAttemptImpl
RMAppAttemptImpl
AttemptStartedTransition // triggered by RMAppAttemptEventType.START
appAttempt.eventHandler.handle(
new AppAddedSchedulerEvent(appAttempt.applicationAttemptId, // it is a SchedulerEvent handled by ResourceScheduler, see APP_ADDED case
appAttempt.submissionContext.getQueue(),
appAttempt.submissionContext.getUser()));
ScheduleTransition // triggered by RMAppAttemptEventType.APP_ACCEPTED
transition
appAttempt.eventHandler.handle(new RMAppEvent(event // send the acceptance to the app
.getApplicationAttemptId().getApplicationId(),
RMAppEventType.APP_ACCEPTED));
ResourceRequest request = BuilderUtils.newResourceRequest( // request a container for the AM
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
.getMasterCapability(), 1);
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
AMContainerAllocatedTransition // triggered by RMAppAttemptEventType.CONTAINER_ALLOCATED
transition
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
EMPTY_CONTAINER_RELEASE_LIST);
appAttempt.eventHandler.handle(new AMLauncherEvent( // sends AMLauncherEventType.LAUNCH event
AMLauncherEventType.LAUNCH, appAttempt));
ContainerAcquiredTransition // triggered by RMAppAttemptEventType.CONTAINER_ACQUIRED
AMLaunchedTransition // triggered by RMAppAttemptEventType.LAUNCHED
transition
appAttempt.rmContext.getAMLivelinessMonitor().register(
appAttempt.applicationAttemptId);
AMRegisteredTransition // triggered by RMAppAttemptEventType.REGISTERED
// sends RMAppEventType.ATTEMPT_REGISTERED
AMLivelinessMonitor
ApplicationMasterLauncher>
handle
RMAppAttempt application = appEvent.getAppAttempt();
case LAUNCH:
launch(application);
case CLEANUP:
cleanup(application);
createRunnableLauncher
Runnable launcher = new AMLauncher(context, application, event,
applicationTokenSecretManager, clientToAMSecretManager, getConfig());
launch
Runnable launcher = createRunnableLauncher(application,
AMLauncherEventType.LAUNCH);
masterEvents.add(launcher);
LauncherThread
run
toLaunch = masterEvents.take();
launcherPool.execute(toLaunch);
AMLauncher
run
case LAUNCH: // handles AMLauncherEventType.LAUNCH
launch();
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(), // sends RMAppAttemptEventType.LAUNCHED handled by RMAppAttemptImpl
RMAppAttemptEventType.LAUNCHED));
connect
containerMgrProxy = getContainerMgrProxy(masterContainerID.getApplicationAttemptId().getApplicationId());
getContainerMgrProxy
return currentUser.doAs(new PrivilegedAction() {
@Override
public ContainerManager run() {
return (ContainerManager) rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr(containerManagerBindAddress), conf);
}
});
launch
connect();
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
containerMgrProxy.startContainer(request);
ContainerManagerImpl // created in NodeManager
startContainer(StartContainerRequest request) // sends ApplicationInitEvent event handled by ApplicationEventDispatcher
Container container = new ContainerImpl(this.dispatcher, launchContext, credentials, metrics);
Application application = new ApplicationImpl(dispatcher, launchContext.getUser(), applicationID, credentials);
dispatcher.getEventHandler().handle(new ApplicationInitEvent(container));
ApplicationEventDispatcher
handle
Application app = ContainerManagerImpl.this.context.getApplications().get(event.getApplicationID());
app.handle(event);
ContainerEventDispatcher
Map containers = ContainerManagerImpl.this.context.getContainers();
Container c = containers.get(event.getContainerID());
c.handle(event);
NodeManager
init
ContainerExecutor exec = ReflectionUtils.newInstance( // DefaultContainerExecutor or LinuxContainerExecutor
conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
ApplicationImpl
AppInitTransition // triggered by ApplicationInitEvent
app.containers.put(container.getContainerID(), container);
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); // sends LocalizationEventType.INIT_APPLICATION_RESOURCES event handled by ResourceLocalizationService
AppInitDoneTransition // triggered by ApplicationEventType.APPLICATION_INITED
for (Container container : app.containers.values()) { // sends ContainerEventType.INIT_CONTAINER event
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(container.getContainerID()));
}
ResourceLocalizationService> // created in ContainerManagerImpl
handle
case INIT_APPLICATION_RESOURCES:
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(app.getAppId())); // sends ApplicationEventType.APPLICATION_INITED event
case INIT_CONTAINER_RESOURCES:
tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerID().getApplicationAttemptId().getApplicationId());
for (LocalResourceRequest req : e.getValue()) { // sends ResourceEventType.REQUEST event handled by LocalResourcesTrackerImpl
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
}
LocalizerTracker>
handle
case REQUEST_RESOURCE_LOCALIZATION:
case PUBLIC:
publicLocalizer.addResource(req); break; // see PublicLocalizer
case PRIVATE:
case APPLICATION:
LocalizerRunner localizer = privLocalizers.get(locId);
localizer.addResource(req);
PublicLocalizer
addResource
run
assoc.getResource().handle( // sends ResourceEventType.LOCALIZED event
new ResourceLocalizedEvent(key,
local, FileUtil.getDU(new File(local.toUri()))));
LocalizerRunner
addResource
update
assoc.getResource().handle( // sends ResourceEventType.LOCALIZED event
new ResourceLocalizedEvent(req,
ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
stat.getLocalSize()));
run
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, // exec is a ContainerExecutor
context.getUser(),
ConverterUtils.toString(
context.getContainerId().
getApplicationAttemptId().getApplicationId()),
localizerId, localDirs);
DefaultContainerExecutor
LinuxContainerExecutor
startLocalizer
// builds system command
File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java")
command.add(jvm.toString());
launchContainer
List command = new ArrayList(
Arrays.asList(containerExecutorExe, // refers to hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c
user,
Integer.toString(Commands.LAUNCH_CONTAINER.getValue()),
appId,
containerIdStr,
containerWorkDir.toString(),
nmPrivateCotainerScriptPath.toUri().getPath().toString(),
nmPrivateTokensPath.toUri().getPath().toString()));
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
shExec.execute();
ContainerLaunch // created in ContainersLauncher
writeLaunchEnv
call
writeLaunchEnv(containerScriptOutStream, env, localResources,
launchContext.getCommands(), appDirs);
dispatcher.getEventHandler().handle(new ContainerEvent(
container.getContainerID(),
ContainerEventType.CONTAINER_LAUNCHED));
exec.launchContainer(container, nmPrivateContainerScriptPath,
nmPrivateTokensPath, user, appIdStr, containerWorkDir);
ContainersLauncher
handle // handles ContainersLauncherEvent event sent from ContainerImpl
case LAUNCH_CONTAINER:
Application app =
context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId());
ContainerLaunch launch =
new ContainerLaunch(getConfig(), dispatcher, exec, app,
event.getContainer());
running.put(containerId,
new RunningContainer(userName,
containerLauncher.submit(launch)));
FSDownload // Download a single URL to the local disk
LocalResourcesTrackerImpl
handle
case REQUEST:
case LOCALIZED:
rsrc = new LocalizedResource(req, dispatcher);
localrsrc.put(req, rsrc);
rsrc.handle(event);
LocalizedResource>
FetchResourceTransition // triggered by ResourceEventType.REQUEST
rsrc.dispatcher.getEventHandler().handle( // sends LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION event
new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt));
FetchSuccessTransition // triggered by ResourceEventType.LOCALIZED (ResourceLocalizedEvent), sends ContainerResourceLocalizedEvent
LocalizedResourceTransition // triggered by ResourceEventType.REQUEST, sends ContainerResourceLocalizedEvent
ContainerImpl // created in ContainerManagerImpl
RequestResourcesTransition // triggered by ContainerEventType.INIT_CONTAINER
Map cntrRsrc = ctxt.getLocalResources();
if (!cntrRsrc.isEmpty())
container.dispatcher.getEventHandler().handle( // sends LocalizationEventType.INIT_CONTAINER_RESOURCES event
new ContainerLocalizationRequestEvent(container, req));
else
container.dispatcher.getEventHandler().handle( // sends ContainersLauncherEventType.LAUNCH_CONTAINER event
new ContainersLauncherEvent(container,
ContainersLauncherEventType.LAUNCH_CONTAINER));
LocalizedTransition // triggered by ContainerEventType.RESOURCE_LOCALIZED (ContainerResourceLocalizedEvent)
container.dispatcher.getEventHandler().handle( // sends ContainersLauncherEventType.LAUNCH_CONTAINER event handled by ContainersLauncher
new ContainersLauncherEvent(container,
ContainersLauncherEventType.LAUNCH_CONTAINER));
BuilderUtils
newResourceRequest
ResourceTrackerService
nodeHeartbeat
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(), latestResponse));
registerNodeManager
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort,
httpPort, resolve(host), capability);
NodeStatusUpdaterImpl
start
registerWithRM
startStatusUpdater
registerWithRM
this.resourceTracker = getRMClient();
LOG.info("Connected to ResourceManager at " + this.rmAddress);
startStatusUpdater
run()
HeartbeatResponse response =
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
RMNodeImpl
StatusUpdateWhenHealthyTransition // triggered by RMNodeEventType.STATUS_UPDATE (RMNodeStatusEvent), see ResourceTrackerService
transition
rmNode.context.getDispatcher().getEventHandler().handle( // see scheduler's NODE_UPDATE
new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers,
completedContainers));
FifoScheduler
handle
case NODE_UPDATE:
nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
case APP_ADDED:
addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
.getQueue(), appAddedEvent.getUser());
addApplication
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.APP_ACCEPTED))
allocate // called in AMContainerAllocatedTransition
getRMContainer
application.updateResourceRequests(ask);
return new Allocation(
application.pullNewlyAllocatedContainers(),
application.getHeadroom());
getRMContainer
application.getRMContainer(containerId);
nodeUpdate // called from handle's NODE_UPDATE section
assignContainers
assignContainers // heart of the scheduler
assignContainersOnNode
assignContainersOnNode
assignNodeLocalContainers // calls assignContainer
assignRackLocalContainers // calls assignContainer
assignOffSwitchContainers // calls assignContainer
assignContainer
Container container = // create container
BuilderUtils.newContainer(recordFactory,
application.getApplicationAttemptId(),
application.getNewContainerId(),
node.getRMNode().getNodeID(),
node.getRMNode().getHttpAddress(), capability);
RMContainer rmContainer =
application.allocate(type, node, priority, request, container); // application is SchedulerApp
SchedulerApp
updateResourceRequests
this.appSchedulingInfo.updateResourceRequests(requests);
allocate
RMContainer rmContainer = new RMContainerImpl(container, this
.getApplicationAttemptId(), node.getNodeID(), this.rmContext
.getDispatcher().getEventHandler(), this.rmContext
.getContainerAllocationExpirer());
updateResourceRequests
this.appSchedulingInfo.updateResourceRequests(requests);
rmContainer.handle(new RMContainerEvent(container.getId(), RMContainerEventType.START)); // see RMContainerImpl
appSchedulingInfo.allocate(type, node, priority, request, container);
pullNewlyAllocatedContainers
rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
RMContainerEventType.ACQUIRED)); // sends RMContainerEventType.ACQUIRED handled by RMContainerImpl
getHeadroom
AppSchedulingInfo
updateResourceRequests
allocate
allocateNodeLocal // calls allocate
allocateRackLocal // calls allocate
allocateOffSwitch // calls allocate
allocate(Container container)
RMContainerImpl
ContainerStartedTransition // triggered by RMContainerEventType.START
transition // sends RMAppAttemptEventType.CONTAINER_ALLOCATED, see RMAppAttemptImpl
container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
container.appAttemptId, container.container));
AcquiredTransition
transition // sends RMAppAttemptEventType.CONTAINER_ACQUIRED event
container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
container.getApplicationAttemptId(), container.getContainer()));
RMContextImpl
ConcurrentMap applications = new ConcurrentHashMap();
getRMApps
return this.applications;
ResourceManager
init
this.rmDispatcher = new AsyncDispatcher();
this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
this.containerAllocationExpirer, amLivelinessMonitor);
this.rmDispatcher.register(RMAppEventType.class, // Register event handler for RmAppEvents
new ApplicationEventDispatcher(this.rmContext));
createRMAppManager
return new RMAppManager(this.rmContext, this.clientToAMSecretManager,
this.scheduler, this.masterService, this.conf);
createScheduler
return
ReflectionUtils.newInstance( // default is FifoScheduler.class
conf.getClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class),
this.conf);
ApplicationEventDispatcher
handle
RMApp rmApp = this.rmContext.getRMApps().get(appID);
rmApp.handle(event);
AsyncDispatcher
dispatch
ApplicationAttemptEventDispatcher
handle
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID);
rmAppAttempt.handle(event);
ClientRMProtocolPBClientImpl
private ClientRMProtocolService.BlockingInterface proxy;
ClientRMProtocolPBClientImpl
RPC.setProtocolEngine(conf, ClientRMProtocolService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class);
proxy = (ClientRMProtocolService.BlockingInterface)RPC.getProxy(
ClientRMProtocolService.BlockingInterface.class, clientVersion, addr, conf);
submitApplication(SubmitApplicationRequest request)
SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto();
return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto));
ClientRMProtocolPBServiceImpl
private ClientRMProtocol real;
submitApplicationn(RpcController arg0, SubmitApplicationRequestProto proto)
SubmitApplicationRequestPBImpl request = new SubmitApplicationRequestPBImpl(proto);
SubmitApplicationResponse response = real.submitApplication(request); // request = SubmitApplicationRequestPBImpl
RPC
getProxy
return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
getProtocolProxy
return getProtocolEngine(protocol,conf).getProxy(protocol,
clientVersion, addr, ticket, conf, factory, rpcTimeout);
getProtocolEngine
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null)
Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), // ENGINE_PROP = "rpc.engine"
WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
return engine;
ProtoOverHadoopRpcEngine
getProxy
return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance(protocol
.getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
addr, ticket, conf, factory, rpcTimeout)), false);
ProtocolProxy
---
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientProtocolProvider.java
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobQueueClient.java
---
JobImpl << EventHandler // maintain a state machine (doTransition) of Job.
InitTransition // JobState.NEW -> JobState.INITED or JobState.FAILED by JobEventType.JOB_INIT
transition
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
createMapTasks
TaskImpl task = new MapTaskImpl
addTask
createReduceTasks
TaskImpl task = new ReduceTaskImpl
addTask
createSplits
allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(
job.oldJobId, job.fs,
job.conf,
job.remoteJobSubmitDir);
StartTransition // JobState.INITED -> JobState.RUNNING by JobEventType.JOB_START
// Triggered in MRAppMaster's startJobs() method
job.scheduleTasks(job.mapTasks); // schedule (i.e., start) the maps
job.scheduleTasks(job.reduceTasks);
scheduleTasks // send TaskEventType.T_SCHEDULE event
abstract TaskImpl << EventHandler // maintain a state machine (doTransition) of Task.
InitialScheduleTransition // TaskState.NEW -> TaskState.SCHEDULED by TaskEventType.T_SCHEDULE
transition
task.addAndScheduleAttempt
addAndScheduleAttempt
TaskAttempt attempt = createAttempt();
// send TaskAttemptEventType.TA_RESCHEDULE or TaskAttemptEventType.TA_SCHEDULE event
LaunchTransition // TaskState.SCHEDULED -> TaskState.RUNNING by TaskEventType.T_ATTEMPT_LAUNCHED
MapTaskImpl
createAttempt
return new MapTaskAttemptImpl
ReduceTaskImpl
createAttempt
return new ReduceTaskAttemptImpl
abstract TaskAttemptImpl << EventHandler // maintain a state machine (doTransition) of TaskAttempt.
RequestContainerTransition // TaskAttemptState.NEW -> TaskAttemptState.UNASSIGNED by TaskAttemptEventType.TA_SCHEDULE
transition // send ContainerRequestEvent (type: ContainerAllocator.EventType.CONTAINER_REQ) event to job.eventHandler that is handled by RMContainerAllocator or LocalContainerAllocator
// dataLocalHosts come from taskSplitMetaInfo of JobImpl
ContainerAssignedTransition // TaskAttemptState.UNASSIGNED -> TaskAttemptState.ASSIGNED by TaskAttemptEventType.TA_ASSIGNED
transition
taskAttempt.remoteTask = taskAttempt.createRemoteTask(); // real Task (classic Hadoop mapred flavor)
// launch the container
taskAttempt.eventHandler.handle(
new ContainerRemoteLaunchEvent(taskAttempt.attemptId, // handled by ContainerLauncherImpl
taskAttempt.containerID,
taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
@Override
public ContainerLaunchContext getContainer() {
return taskAttempt.createContainerLaunchContext(); // create ContainerLaunchContext for map/reduce containers
}
@Override
public Task getRemoteTask() { // classic mapred Task, not YARN version
return taskAttempt.remoteTask;
}
});
LaunchedContainerTransition // TaskAttemptState.ASSIGNED -> TaskAttemptState.RUNNING by TaskAttemptEventType.TA_CONTAINER_LAUNCHED
transition //send TaskEventType.T_ATTEMPT_LAUNCHED event
StatusUpdater // TaskAttemptState.RUNNING -> TaskAttemptState.RUNNING by TaskAttemptEventType.TA_UPDATE
CLEANUP_CONTAINER_TRANSITION // TaskAttemptState.RUNNING -> TaskAttemptState.SUCCESS_CONTAINER_CLEANUP by TaskAttemptEventType.TA_DONE
createContainerLaunchContext // create ContainerLaunchContext for map/reduce containers
List commands = MapReduceChildJVM.getVMCommand(
taskAttemptListener.getAddress(), remoteTask,
jvmID);
ContainerLaunchContext container =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
container.setCommands(commands);
ContainerLauncherImpl
EventProcessor
run
case CONTAINER_REMOTE_LAUNCH:
ContainerManager proxy =
getCMProxy(containerID, containerManagerBindAddr, containerToken);
ContainerLaunchContext containerLaunchContext =
launchEv.getContainer();
// Now launch the actual container
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
StartContainerResponse response = proxy.startContainer(startRequest);
MapReduceChildJVM
getVMCommand
Vector vargs = new Vector(8);
vargs.add("exec");
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
// Add main class and its arguments
vargs.add(YarnChild.class.getName()); // main of Child
YarnChild // The main() for MapReduce task processes
main
JvmTask myTask = null;;
// poll for new task from TaskAttemptListener's address, TaskAttemptListener created in MRAppMaster
for (int idle = 0; null == myTask; ++idle) {
long sleepTimeMilliSecs = Math.min(idle * 500, 1500);
LOG.info("Sleeping for " + sleepTimeMilliSecs
+ "ms before retrying again. Got null now.");
MILLISECONDS.sleep(sleepTimeMilliSecs);
myTask = umbilical.getTask(context); // get task from remote TaskAttemptListener
}
final Task taskFinal = task;
childUGI.doAs(new PrivilegedExceptionAction