viirya's blog

[Note] Hadoop

# Submit job

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientProtocolProvider.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java

hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/target/generated-sources/proto/org/apache/hadoop/yarn/proto/ClientRMProtocol.java

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider


CLI
	run
		Cluster cluster = new Cluster(getConf());
		Job job = Job.getInstance(cluster, new JobConf(submitJobFile));
		job.submit();
Cluster


Job
	submit
		final JobSubmitter submitter = new JobSubmitter(cluster.getFileSystem(), cluster.getClient());		
		submitter.submitJobInternal

JobSubmitter
	submitJobInternal
		copyAndConfigureFiles
		writeSplits
		conf.setInt(MRJobConfig.NUM_MAPS, maps);
		status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); // submitClient is a ClientProtocol.

	copyAndConfigureFiles
	copyJar

JobSubmissionFiles


YarnClientProtocolProvider


YARNRunner
	submitJob
		ApplicationSubmissionContext appContext =  // the application's first container, i.e. application master
		  createApplicationSubmissionContext(conf, jobSubmitDir, ts);

		ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
		ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);

	createApplicationSubmissionContext
		Vector vargs = new Vector(8);
		vargs.add(Environment.JAVA_HOME.$() + "/bin/java");

		vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); // the main class of application master that is defined in MRJobConfig

MRJobConfig
	public static final String APPLICATION_MASTER_CLASS =
	  "org.apache.hadoop.mapreduce.v2.app.MRAppMaster"

ResourceMgrDelegate
	ResourceMgrDelegate
		YarnRPC rpc = YarnRPC.create(conf); // default is HadoopYarnProtoRPC
		applicationsManager =
		  (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, appsManagerServerConf);
	
	submitApplication
		applicationsManager.submitApplication(request); // applicationsManager is a ClientRMProtocol.

HadoopYarnProtoRPC
	getProxy
		RpcFactoryProvider.getClientFactory(myConf).getClient(protocol, 1, addr, myConf); // getClient returns ClientRMProtocolPBClientImpl

RpcFactoryProvider
	getClientFactory
		String clientFactoryClassName = conf.get(YarnConfiguration.IPC_CLIENT_FACTORY);
		if (clientFactoryClassName == null)
			return RpcClientFactoryPBImpl.get();		
		else
			return(RpcClientFactory) getFactoryClassInstance(clientFactoryClassName);

RpcClientFactoryPBImpl
	get
		return RpcClientFactoryPBImpl.self;
	getClient


ClientRMService // The client interface to the Resource Manager.

	submitApplication(SubmitApplicationRequest request)
		rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext)); // rmAppManager is a RMAppManager

RMAppManager
	handle
		submitApplication(submissionContext);

	submitApplication(ApplicationSubmissionContext submissionContext)
		application = new RMAppImpl(applicationId, rmContext,
		  this.conf, submissionContext.getApplicationName(), user,
		  submissionContext.getQueue(), submissionContext, clientTokenStr,
		  appStore, rmContext.getAMLivelinessMonitor(), this.scheduler,
		  this.masterService);
		if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null)  // duplicate application
			application.handle(new RMAppRejectedEvent(applicationId,
			  "Application with this id is already present! Cannot add a duplicate!"));
		else
			this.rmContext.getDispatcher().getEventHandler().handle(  // GenericEventHandler
			  new RMAppEvent(applicationId, RMAppEventType.START));

RMAppImpl // has a state machine as JobImpl

	StartAppAttemptTransition // triggered by RMAppEventType.START
		transition
			app.createNewAttempt();

	createNewAttempt
		RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
		  clientTokenStr, rmContext, scheduler, masterService,
		  submissionContext);
		dispatcher.getEventHandler().handle(
		  new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));  // see RMAppAttemptImpl

RMAppAttemptImpl
	AttemptStartedTransition // triggered by RMAppAttemptEventType.START
		appAttempt.eventHandler.handle(
		  new AppAddedSchedulerEvent(appAttempt.applicationAttemptId, // it is a SchedulerEvent handled by ResourceScheduler, see APP_ADDED case
		  appAttempt.submissionContext.getQueue(),
		  appAttempt.submissionContext.getUser()));

	ScheduleTransition // triggered by RMAppAttemptEventType.APP_ACCEPTED
		transition
			appAttempt.eventHandler.handle(new RMAppEvent(event // send the acceptance to the app
			  .getApplicationAttemptId().getApplicationId(),
			  RMAppEventType.APP_ACCEPTED));

			ResourceRequest request = BuilderUtils.newResourceRequest( //  request a container for the AM
			  AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
			  .getMasterCapability(), 1);

			appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
			  Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);

	AMContainerAllocatedTransition // triggered by RMAppAttemptEventType.CONTAINER_ALLOCATED
		transition
			Allocation amContainerAllocation = appAttempt.scheduler.allocate(
          		  appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
			  EMPTY_CONTAINER_RELEASE_LIST);

			appAttempt.eventHandler.handle(new AMLauncherEvent( // sends AMLauncherEventType.LAUNCH event
			  AMLauncherEventType.LAUNCH, appAttempt));

	ContainerAcquiredTransition // triggered by RMAppAttemptEventType.CONTAINER_ACQUIRED
		

	AMLaunchedTransition // triggered by RMAppAttemptEventType.LAUNCHED
		transition
			appAttempt.rmContext.getAMLivelinessMonitor().register(
			  appAttempt.applicationAttemptId);
		
	AMRegisteredTransition // triggered by RMAppAttemptEventType.REGISTERED
			       // sends RMAppEventType.ATTEMPT_REGISTERED


AMLivelinessMonitor

ApplicationMasterLauncher>

	handle
		RMAppAttempt application = appEvent.getAppAttempt();
		case LAUNCH:
			launch(application);
		case CLEANUP:
			cleanup(application);

	createRunnableLauncher
		Runnable launcher = new AMLauncher(context, application, event,
		  applicationTokenSecretManager, clientToAMSecretManager, getConfig());

	launch
		Runnable launcher = createRunnableLauncher(application,
		  AMLauncherEventType.LAUNCH);
		masterEvents.add(launcher);

	LauncherThread
		run
			toLaunch = masterEvents.take();
			launcherPool.execute(toLaunch);

AMLauncher
	run
		case LAUNCH: // handles AMLauncherEventType.LAUNCH
			launch();
			handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),  // sends RMAppAttemptEventType.LAUNCHED handled by RMAppAttemptImpl
			  RMAppAttemptEventType.LAUNCHED));


	connect
		containerMgrProxy = getContainerMgrProxy(masterContainerID.getApplicationAttemptId().getApplicationId());

	getContainerMgrProxy
		return currentUser.doAs(new PrivilegedAction() {
		  @Override
		  public ContainerManager run() {
		    return (ContainerManager) rpc.getProxy(ContainerManager.class,
		      NetUtils.createSocketAddr(containerManagerBindAddress), conf);
		  }
		});


	launch
		connect();
		ContainerLaunchContext launchContext =
		  createAMContainerLaunchContext(applicationContext, masterContainerID);
		StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
		containerMgrProxy.startContainer(request);
		

ContainerManagerImpl // created in NodeManager

	startContainer(StartContainerRequest request) // sends ApplicationInitEvent event handled by ApplicationEventDispatcher
		Container container = new ContainerImpl(this.dispatcher, launchContext, credentials, metrics);

		Application application = new ApplicationImpl(dispatcher, launchContext.getUser(), applicationID, credentials);

		dispatcher.getEventHandler().handle(new ApplicationInitEvent(container));

	ApplicationEventDispatcher
		handle
			Application app = ContainerManagerImpl.this.context.getApplications().get(event.getApplicationID());
			app.handle(event);

	ContainerEventDispatcher
		Map containers = ContainerManagerImpl.this.context.getContainers();
		Container c = containers.get(event.getContainerID());
		c.handle(event);

NodeManager

	init
		ContainerExecutor exec = ReflectionUtils.newInstance(  // DefaultContainerExecutor or LinuxContainerExecutor
		  conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
		  DefaultContainerExecutor.class, ContainerExecutor.class), conf);



		


ApplicationImpl

	AppInitTransition // triggered by ApplicationInitEvent	
		app.containers.put(container.getContainerID(), container);

		app.dispatcher.getEventHandler().handle(
		  new ApplicationLocalizationEvent(LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); // sends LocalizationEventType.INIT_APPLICATION_RESOURCES event handled by ResourceLocalizationService


	AppInitDoneTransition // triggered by ApplicationEventType.APPLICATION_INITED

		for (Container container : app.containers.values()) { // sends ContainerEventType.INIT_CONTAINER event
		  app.dispatcher.getEventHandler().handle(new ContainerInitEvent(container.getContainerID()));
		}
		


ResourceLocalizationService> // created in ContainerManagerImpl

	handle
		case INIT_APPLICATION_RESOURCES:
			dispatcher.getEventHandler().handle(new ApplicationInitedEvent(app.getAppId())); // sends ApplicationEventType.APPLICATION_INITED event

		case INIT_CONTAINER_RESOURCES:
			tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
			  c.getContainerID().getApplicationAttemptId().getApplicationId());
			for (LocalResourceRequest req : e.getValue()) {  // sends ResourceEventType.REQUEST event handled by LocalResourcesTrackerImpl
			  tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
			}


	LocalizerTracker>
		handle
			case REQUEST_RESOURCE_LOCALIZATION:
				case PUBLIC:
					publicLocalizer.addResource(req); break; // see PublicLocalizer
				case PRIVATE:
				case APPLICATION:
					LocalizerRunner localizer = privLocalizers.get(locId);
					localizer.addResource(req);

	PublicLocalizer
		addResource

		run
			assoc.getResource().handle(  // sends ResourceEventType.LOCALIZED event
			  new ResourceLocalizedEvent(key,
			  local, FileUtil.getDU(new File(local.toUri()))));


	LocalizerRunner
		addResource

		update

			assoc.getResource().handle(  // sends ResourceEventType.LOCALIZED event
			  new ResourceLocalizedEvent(req,
			  ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
			  stat.getLocalSize()));

		run
			exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, // exec is a ContainerExecutor
			  context.getUser(),
			  ConverterUtils.toString(
			    context.getContainerId().
			    getApplicationAttemptId().getApplicationId()),
			  localizerId, localDirs);


DefaultContainerExecutor

LinuxContainerExecutor

	startLocalizer
		// builds system command
		File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java")
		command.add(jvm.toString());

	launchContainer
		List command = new ArrayList(
		  Arrays.asList(containerExecutorExe,  // refers to hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c
                  user,
                  Integer.toString(Commands.LAUNCH_CONTAINER.getValue()),
                  appId,
                  containerIdStr,
                  containerWorkDir.toString(),
                  nmPrivateCotainerScriptPath.toUri().getPath().toString(),
                  nmPrivateTokensPath.toUri().getPath().toString()));

		ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
		shExec.execute();



ContainerLaunch // created in ContainersLauncher

	writeLaunchEnv

	call

		writeLaunchEnv(containerScriptOutStream, env, localResources,
		  launchContext.getCommands(), appDirs);

		dispatcher.getEventHandler().handle(new ContainerEvent(
		  container.getContainerID(),
		  ContainerEventType.CONTAINER_LAUNCHED));

		exec.launchContainer(container, nmPrivateContainerScriptPath,
		  nmPrivateTokensPath, user, appIdStr, containerWorkDir);

		


ContainersLauncher
	handle  // handles ContainersLauncherEvent event sent from ContainerImpl
		case LAUNCH_CONTAINER:
			Application app =
			  context.getApplications().get(
			  containerId.getApplicationAttemptId().getApplicationId());
	
			ContainerLaunch launch =
			  new ContainerLaunch(getConfig(), dispatcher, exec, app,
			  event.getContainer());

			running.put(containerId,
			  new RunningContainer(userName,
			  containerLauncher.submit(launch)));


FSDownload // Download a single URL to the local disk

	

LocalResourcesTrackerImpl

	handle
		case REQUEST:
		case LOCALIZED:
			rsrc = new LocalizedResource(req, dispatcher);
			localrsrc.put(req, rsrc);
		rsrc.handle(event);

LocalizedResource>

	FetchResourceTransition // triggered by ResourceEventType.REQUEST
		rsrc.dispatcher.getEventHandler().handle(  // sends LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION event
		  new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt));
	

	FetchSuccessTransition // triggered by ResourceEventType.LOCALIZED (ResourceLocalizedEvent), sends ContainerResourceLocalizedEvent


	LocalizedResourceTransition // triggered by ResourceEventType.REQUEST, sends ContainerResourceLocalizedEvent

ContainerImpl // created in ContainerManagerImpl

	RequestResourcesTransition // triggered by ContainerEventType.INIT_CONTAINER
		Map cntrRsrc = ctxt.getLocalResources();
		if (!cntrRsrc.isEmpty())
			container.dispatcher.getEventHandler().handle( // sends LocalizationEventType.INIT_CONTAINER_RESOURCES event
			  new ContainerLocalizationRequestEvent(container, req)); 
		else
			container.dispatcher.getEventHandler().handle(  // sends ContainersLauncherEventType.LAUNCH_CONTAINER event
			  new ContainersLauncherEvent(container,
			  ContainersLauncherEventType.LAUNCH_CONTAINER));

	LocalizedTransition // triggered by ContainerEventType.RESOURCE_LOCALIZED (ContainerResourceLocalizedEvent)
		container.dispatcher.getEventHandler().handle(  // sends ContainersLauncherEventType.LAUNCH_CONTAINER event handled by ContainersLauncher
		  new ContainersLauncherEvent(container,
		  ContainersLauncherEventType.LAUNCH_CONTAINER));


BuilderUtils
	newResourceRequest


ResourceTrackerService

	nodeHeartbeat
		this.rmContext.getDispatcher().getEventHandler().handle(
		  new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
		  remoteNodeStatus.getContainersStatuses(), latestResponse));

	registerNodeManager
		RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort,
		  httpPort, resolve(host), capability);

NodeStatusUpdaterImpl
	start
		registerWithRM
		startStatusUpdater

	registerWithRM
		this.resourceTracker = getRMClient();
		LOG.info("Connected to ResourceManager at " + this.rmAddress);

	startStatusUpdater
		run()
			HeartbeatResponse response =
			  resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();

RMNodeImpl
	StatusUpdateWhenHealthyTransition // triggered by RMNodeEventType.STATUS_UPDATE (RMNodeStatusEvent), see ResourceTrackerService 
		transition
			rmNode.context.getDispatcher().getEventHandler().handle(  // see scheduler's NODE_UPDATE
			  new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers,
			  completedContainers));

FifoScheduler
	handle
		case NODE_UPDATE:
			nodeUpdate(nodeUpdatedEvent.getRMNode(),
			  nodeUpdatedEvent.getNewlyLaunchedContainers(),
			  nodeUpdatedEvent.getCompletedContainers());

		case APP_ADDED:
			addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
			  .getQueue(), appAddedEvent.getUser());
	addApplication
		rmContext.getDispatcher().getEventHandler().handle(
		  new RMAppAttemptEvent(appAttemptId,
		  RMAppAttemptEventType.APP_ACCEPTED))

	allocate // called in AMContainerAllocatedTransition
		getRMContainer
		application.updateResourceRequests(ask);
		return new Allocation(
		  application.pullNewlyAllocatedContainers(),
		  application.getHeadroom());

	getRMContainer
		application.getRMContainer(containerId);


	nodeUpdate // called from handle's NODE_UPDATE section
		assignContainers

	assignContainers // heart of the scheduler
		assignContainersOnNode

	assignContainersOnNode
		assignNodeLocalContainers // calls assignContainer
		assignRackLocalContainers // calls assignContainer
		assignOffSwitchContainers // calls assignContainer

	assignContainer
		Container container =  // create container
		  BuilderUtils.newContainer(recordFactory,
		  application.getApplicationAttemptId(),
		  application.getNewContainerId(),
		  node.getRMNode().getNodeID(),
		  node.getRMNode().getHttpAddress(), capability);
		RMContainer rmContainer =
		  application.allocate(type, node, priority, request, container); // application is SchedulerApp
		
SchedulerApp
	updateResourceRequests
		this.appSchedulingInfo.updateResourceRequests(requests);

	allocate
		RMContainer rmContainer = new RMContainerImpl(container, this
		  .getApplicationAttemptId(), node.getNodeID(), this.rmContext
		  .getDispatcher().getEventHandler(), this.rmContext
		  .getContainerAllocationExpirer());
		  updateResourceRequests
		  this.appSchedulingInfo.updateResourceRequests(requests);
		rmContainer.handle(new RMContainerEvent(container.getId(), RMContainerEventType.START)); // see RMContainerImpl
		appSchedulingInfo.allocate(type, node, priority, request, container);

	pullNewlyAllocatedContainers
		rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
		  RMContainerEventType.ACQUIRED));  // sends RMContainerEventType.ACQUIRED handled by RMContainerImpl

	getHeadroom

AppSchedulingInfo
	updateResourceRequests

	allocate
		allocateNodeLocal // calls allocate
		allocateRackLocal // calls allocate
		allocateOffSwitch // calls allocate

	allocate(Container container)


RMContainerImpl
	ContainerStartedTransition // triggered by RMContainerEventType.START
		transition  // sends RMAppAttemptEventType.CONTAINER_ALLOCATED, see RMAppAttemptImpl
			container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( 
			  container.appAttemptId, container.container));


	AcquiredTransition
		transition  // sends RMAppAttemptEventType.CONTAINER_ACQUIRED event
			container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
			  container.getApplicationAttemptId(), container.getContainer()));


RMContextImpl

	ConcurrentMap applications = new ConcurrentHashMap();

	getRMApps
		return this.applications;

	
ResourceManager
	init
		this.rmDispatcher = new AsyncDispatcher();
		this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
		  this.containerAllocationExpirer, amLivelinessMonitor);
		this.rmDispatcher.register(RMAppEventType.class,   // Register event handler for RmAppEvents
		  new ApplicationEventDispatcher(this.rmContext));

	createRMAppManager
		return new RMAppManager(this.rmContext, this.clientToAMSecretManager,
		  this.scheduler, this.masterService, this.conf);

	createScheduler
		return
		  ReflectionUtils.newInstance(  // default is FifoScheduler.class
		  conf.getClass(YarnConfiguration.RM_SCHEDULER,
		  FifoScheduler.class, ResourceScheduler.class),
		  this.conf);

ApplicationEventDispatcher
	handle
		RMApp rmApp = this.rmContext.getRMApps().get(appID);
		rmApp.handle(event);


AsyncDispatcher
	dispatch


ApplicationAttemptEventDispatcher
	handle
		RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID);
		rmAppAttempt.handle(event);


ClientRMProtocolPBClientImpl

	private ClientRMProtocolService.BlockingInterface proxy;

	ClientRMProtocolPBClientImpl
		RPC.setProtocolEngine(conf, ClientRMProtocolService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class);
		proxy = (ClientRMProtocolService.BlockingInterface)RPC.getProxy(
                  ClientRMProtocolService.BlockingInterface.class, clientVersion, addr, conf);

	submitApplication(SubmitApplicationRequest request)
		SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto();
		return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto));


ClientRMProtocolPBServiceImpl

	private ClientRMProtocol real;

	submitApplicationn(RpcController arg0, SubmitApplicationRequestProto proto)
		SubmitApplicationRequestPBImpl request = new SubmitApplicationRequestPBImpl(proto);
		SubmitApplicationResponse response = real.submitApplication(request); // request = SubmitApplicationRequestPBImpl
		
RPC
	getProxy
		return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();

	getProtocolProxy
		return getProtocolEngine(protocol,conf).getProxy(protocol,
                  clientVersion, addr, ticket, conf, factory, rpcTimeout);

	getProtocolEngine
		RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
		if (engine == null)
			Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), // ENGINE_PROP = "rpc.engine"
                                    WritableRpcEngine.class);
			engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
			PROTOCOL_ENGINES.put(protocol, engine);
		return engine;

ProtoOverHadoopRpcEngine
	getProxy
		return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance(protocol
	          .getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
	          addr, ticket, conf, factory, rpcTimeout)), false);

ProtocolProxy



---

hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/impl
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientProtocolProvider.java
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobQueueClient.java

---

JobImpl << EventHandler // maintain a state machine (doTransition) of Job.
	InitTransition  // JobState.NEW -> JobState.INITED or JobState.FAILED by JobEventType.JOB_INIT
		transition

		        TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
        		job.numMapTasks = taskSplitMetaInfo.length;			


			createMapTasks
				TaskImpl task = new MapTaskImpl
				addTask
			createReduceTasks
				TaskImpl task = new ReduceTaskImpl
				addTask

		createSplits
			allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(
		            job.oldJobId, job.fs,
		            job.conf,
		            job.remoteJobSubmitDir);
			

	StartTransition // JobState.INITED -> JobState.RUNNING by JobEventType.JOB_START
			// Triggered in MRAppMaster's startJobs() method

		job.scheduleTasks(job.mapTasks);  // schedule (i.e., start) the maps
		job.scheduleTasks(job.reduceTasks);


	scheduleTasks // send TaskEventType.T_SCHEDULE event
		

abstract TaskImpl << EventHandler // maintain a state machine (doTransition) of Task.

	InitialScheduleTransition // TaskState.NEW -> TaskState.SCHEDULED by TaskEventType.T_SCHEDULE
		transition
			task.addAndScheduleAttempt

	addAndScheduleAttempt
		TaskAttempt attempt = createAttempt();
		// send TaskAttemptEventType.TA_RESCHEDULE or TaskAttemptEventType.TA_SCHEDULE event


	LaunchTransition // TaskState.SCHEDULED -> TaskState.RUNNING by TaskEventType.T_ATTEMPT_LAUNCHED

MapTaskImpl
	createAttempt
		return new MapTaskAttemptImpl

ReduceTaskImpl
	createAttempt
		return new ReduceTaskAttemptImpl


abstract TaskAttemptImpl << EventHandler // maintain a state machine (doTransition) of TaskAttempt.


	RequestContainerTransition // TaskAttemptState.NEW -> TaskAttemptState.UNASSIGNED by TaskAttemptEventType.TA_SCHEDULE      
		transition // send ContainerRequestEvent (type: ContainerAllocator.EventType.CONTAINER_REQ) event to job.eventHandler that is handled by RMContainerAllocator or LocalContainerAllocator
			   // dataLocalHosts come from taskSplitMetaInfo of JobImpl

	ContainerAssignedTransition // TaskAttemptState.UNASSIGNED -> TaskAttemptState.ASSIGNED by TaskAttemptEventType.TA_ASSIGNED
		transition
			taskAttempt.remoteTask = taskAttempt.createRemoteTask(); // real Task (classic Hadoop mapred flavor)

			// launch the container
			taskAttempt.eventHandler.handle(
			  new ContainerRemoteLaunchEvent(taskAttempt.attemptId, // handled by ContainerLauncherImpl
			  taskAttempt.containerID,
			  taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
			    @Override
			    public ContainerLaunchContext getContainer() {
			      return taskAttempt.createContainerLaunchContext(); // create ContainerLaunchContext for map/reduce containers
		 	    }
			    @Override
			    public Task getRemoteTask() {  // classic mapred Task, not YARN version
			      return taskAttempt.remoteTask;
			    }
			});
	


	LaunchedContainerTransition // TaskAttemptState.ASSIGNED -> TaskAttemptState.RUNNING by TaskAttemptEventType.TA_CONTAINER_LAUNCHED
		transition //send TaskEventType.T_ATTEMPT_LAUNCHED event

	StatusUpdater // TaskAttemptState.RUNNING -> TaskAttemptState.RUNNING by TaskAttemptEventType.TA_UPDATE

	CLEANUP_CONTAINER_TRANSITION // TaskAttemptState.RUNNING -> TaskAttemptState.SUCCESS_CONTAINER_CLEANUP by TaskAttemptEventType.TA_DONE


	createContainerLaunchContext // create ContainerLaunchContext for map/reduce containers

		List commands = MapReduceChildJVM.getVMCommand(
		  taskAttemptListener.getAddress(), remoteTask,
		  jvmID);
		
		ContainerLaunchContext container =
		  recordFactory.newRecordInstance(ContainerLaunchContext.class);
		container.setCommands(commands);


ContainerLauncherImpl
	EventProcessor
		run
			case CONTAINER_REMOTE_LAUNCH:
				ContainerManager proxy =
				  getCMProxy(containerID, containerManagerBindAddr, containerToken);

				ContainerLaunchContext containerLaunchContext =
				  launchEv.getContainer();

				// Now launch the actual container
				StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
				startRequest.setContainerLaunchContext(containerLaunchContext);
				StartContainerResponse response = proxy.startContainer(startRequest);


MapReduceChildJVM
	getVMCommand
		Vector vargs = new Vector(8);
		vargs.add("exec");
		vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
		// Add main class and its arguments 
		vargs.add(YarnChild.class.getName());  // main of Child


YarnChild // The main() for MapReduce task processes
	main

		JvmTask myTask = null;;
		// poll for new task from TaskAttemptListener's address, TaskAttemptListener created in MRAppMaster
		for (int idle = 0; null == myTask; ++idle) {
		  long sleepTimeMilliSecs = Math.min(idle * 500, 1500);
		  LOG.info("Sleeping for " + sleepTimeMilliSecs
		    + "ms before retrying again. Got null now.");
		  MILLISECONDS.sleep(sleepTimeMilliSecs);
		  myTask = umbilical.getTask(context);  // get task from remote TaskAttemptListener
		}


		final Task taskFinal = task;
		childUGI.doAs(new PrivilegedExceptionAction() {
		  @Override
		  public Object run() throws Exception {
		    // use job-specified working directory
		    FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
		    taskFinal.run(job, umbilical); // run the task
		    return null;
		  }
		});



MapTaskAttemptImpl
	createRemoteTask
		MapTask mapTask =
		  new MapTask("", TypeConverter.fromYarn(getID()), partition,
		  splitInfo.getSplitIndex(), 1); // YARN doesn't have the concept of slots per task, set it as 1.
		mapTask.setUser(conf.get(MRJobConfig.USER_NAME));
		mapTask.setConf(conf);


MapTask
	run
		if (useNewApi) {
			runNewMapper(job, splitMetaInfo, umbilical, reporter);
		} else {
			runOldMapper(job, splitMetaInfo, umbilical, reporter);
		}
		
MapRunnable

JobConf
	getUseNewMapper
	setUseNewMapper


RMContainerAllocator // allocate the container from the ResourceManager scheduler
	handle
		if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ)  // if it is a container request
			if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) // Map
				scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
			else // Reduce
				if (reqEvent.getEarlierAttemptFailed())  // the request has failed before
          				//add to the front of queue for fail fast
					pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));

				else
					pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));//reduces are added to pending and are slowly ramped up

	heartbeat
		List allocatedContainers = getResources();
		scheduledRequests.assign(allocatedContainers);

	getResources
		AMResponse response = makeRemoteRequest(); // in RMContainerRequestor
		


ScheduledRequests
	addMap		
		request = new ContainerRequest(event, PRIORITY_MAP);   
		maps.put(event.getAttemptID(), request);  // put into map that is a LinkedHashMap
		addContainerReq(request);  // in RMContainerRequestor that puts request into ask, a TreeSet of ResourceRequest
	addReduce
	assign  // sends TaskAttemptContainerAssignedEvent event (TaskAttemptEventType.TA_ASSIGNED)
		eventHandler.handle(new TaskAttemptContainerAssignedEvent(  // trigger ContainerAssignedTransition in TaskAttemptImpl
			assigned.attemptID, allocated));

				
abstract RMContainerRequestor
	addContainerReq
		addResourceRequest
	addResourceRequest		
		ask.add(remoteRequest);  // ask = new TreeSet();
	makeRemoteRequest
		allocateRequest.addAllAsks(new ArrayList(ask));
		AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); // 
		AMResponse response = allocateResponse.getAMResponse(); // 
	heartbeat


RMCommunicator	// Registers/unregisters to RM and sends heartbeats to RM	
	start
		scheduler = createSchedulerProxy(); // scheduler is an AMRMProtocolPBClientImpl
		register();
		startAllocatorThread();

	startAllocatorThread
		run
			heartbeat

	heartbeat
		AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
		AMResponse response = allocateResponse.getAMResponse();

	createSchedulerProxy
		final YarnRPC rpc = YarnRPC.create(getConfig());  // default is HadoopYarnProtoRPC
		return currentUser.doAs(new PrivilegedAction() {
			@Override
			public AMRMProtocol run() {
				return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
				NetUtils.createSocketAddr(serviceAddr), conf);
			}
		});

	register
		RegisterApplicationMasterRequest request =
		  recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
		  request.setApplicationAttemptId(applicationAttemptId);

		RegisterApplicationMasterResponse response = scheduler.registerApplicationMaster(request);


AMRMProtocol
	allocate // the main interface between ApplicationMaster and ResourceManager
		 // ResourceManager returns with a list of allocated Container via AllocateResponse

LocalContainerAllocator


ScheduledRequests
	


MRAppMaster // CompositeService can add services (addService) and start() will invoke all start() functions of its services.

	init // send JobEventType.JOB_INIT event
		job = createJob(config, fsTokens, currentUser.getUserName());
		dispatcher.register(JobEventType.class, synchronousJobEventDispatcher);
		dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
		dispatcher.register(TaskAttemptEventType.class,
			new TaskAttemptEventDispatcher());
		dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
		dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
			historyService);
			
	createJob
		// getEventHandler returns GenericEventHandler that only puts event into eventQueue then corresponding eventDispatcher will take the event to process
		Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
			taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
			completedTasksFromPreviousRun, metrics, user);
			((RunningAppContext) context).jobs.put(newJob.getID(), newJob);

	ContainerLauncherRouter // a service added into MRAppMaster
		start

			if (job.isUber()) {
			  this.containerLauncher = new LocalContainerLauncher(context,
			    (TaskUmbilicalProtocol) taskAttemptListener);
			} else {
			  this.containerLauncher = new ContainerLauncherImpl(context);
			}
			((Service)this.containerLauncher).init(getConfig());
			((Service)this.containerLauncher).start();


	ContainerAllocatorRouter // a service added into MRAppMaster 
		start
			if (job.isUber()) {
			  this.containerAllocator = new LocalContainerAllocator(
			  this.clientService, this.context);
			} else {
			  this.containerAllocator = new RMContainerAllocator(  // it extends RMCommunicator that registers to RM
			  this.clientService, this.context);
			}
	
			((Service)this.containerAllocator).init(getConfig());
			((Service)this.containerAllocator).start(); 

	start
		JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
		jobEventDispatcher.handle(initJobEvent);

		super.start(); // invokes all services' start() functions

		startJobs // send JobEventType.JOB_START event
			
	main
	      appMaster.init(conf);
	      appMaster.start();


== hadoop common ==

UserGroupInformation
	HadoopLoginModule // login module for Kerberos, Unix, or Windows principal