码迷,mamicode.com
首页 > 其他好文 > 详细

【Flink源码】一、客户端任务提交源码

时间:2021-06-18 19:53:27      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:rect   name   eric   内核   内存模型   win   amp   tco   parameter   

一、Flink内核解析,针对版本1.12.0,四大块:任务的提交流程,组件通信,任务的调度,内存管理。

任务的提交流程:以命令行的提交命令开始追踪类,20多个步骤,几十个类,数千行代码量,最后画一个PPT动图

组件通信:actor的模型,akka基本原理和实现,5大关键角色:代理转发,处理细节,PPT动图

任务调度:streamGraph-jobGraph-ext,四个图的讲解,在什么步骤还是生成图,调用位置,如何转换,

     task任务调度:调度器,调度模型,调度策略

     task执行:以map算子为例,详细展示

内存管理:1.10之后的内存模型做了改进,jobmanage,taskmanage,

     如何jvm内存不足的处理,内存的分配过程,数据结构,特有的组件,网络传输的内存管理如何实现

     反压过程的讲解

 

二、yarn-per-job的提交流程讲解,企业常用的模式

前提启动hadoop,安装了flink,不需要启动flink集群

nc -lk 9999启动端口

flink run -t yarn-per-job /opt/module/flink-1.11.3/examples/streaming/SocketWindowWordCount.jar   --port 9999

  

启动完成之后会多出这几个类

2466 NodeManager
4596 YarnTaskExecutorRunner
4426 YarnJobClusterEntrypoint
2124 DataNode
4749 Jps
1983 NameNode
3679 CliFrontend

  执行客户端的入口类

org.apache.flink.client.cli.CliFrontend

  进入main方法看主要逻辑,123,细枝末节太多,看主要逻辑,不然容易混乱

                // 1. find the configuration directory
          // 拿路径 final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. load the global configuration
          // 拿配置 final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. load the custom command lines
// 加载自定义命令行 三种顺序:1.generic,2.yarn,3.default final List<CustomCommandLine> customCommandLines = loadCustomCommandLines( configuration, configurationDirectory); try { final CliFrontend cli = new CliFrontend( configuration, customCommandLines); SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// 核心逻辑run方法 int retCode = SecurityUtils.getInstalledContext() .runSecured(() -> cli.parseAndRun(args)); System.exit(retCode); }

  

进入cli.parseAndRun(args));

run -t yarn-per-job /opt/module/flink-1.11.3/examples/streaming/SocketWindowWordCount.jar   --port 9999

  

               // get action   这个拿到命令行参数的第一个,run
		String action = args[0];

		// remove action from parameters  除开run 后面的参数
		final String[] params = Arrays.copyOfRange(args, 1, args.length); 

                switch (action) {
				case ACTION_RUN:
                                        //进入					
run(params); return 0; case ACTION_RUN_APPLICATION: runApplication(params); return 0; case ACTION_LIST: list(params); return 0; case ACTION_INFO: info(params); return 0; case ACTION_CANCEL: cancel(params); return 0; case ACTION_STOP: stop(params); return 0; case ACTION_SAVEPOINT: savepoint(params); return 0; case "-h": case "--help": CliFrontendParser.printHelp(customCommandLines); return 0; case "-v": case "--version": String version = EnvironmentInformation.getVersion(); String commitID = EnvironmentInformation.getRevisionInformation().commitId; System.out.print("Version: " + version); System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID); return 0; default: System.out.printf("\"%s\" is not a valid action.\n", action); System.out.println(); System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\"."); System.out.println(); System.out.println("Specify the version option (-v or --version) to print Flink version."); System.out.println(); System.out.println("Specify the help option (-h or --help) to get help on the command."); return 1; }

 进入 run(params);

                //默认配置
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
//获取命令行 final CommandLine commandLine = getCommandLine(commandOptions, args, true); // evaluate help flag if (commandLine.hasOption(HELP_OPTION.getOpt())) { CliFrontendParser.printHelpForRun(customCommandLines); return; } final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine)); final ProgramOptions programOptions = ProgramOptions.create(commandLine); final List<URL> jobJars = getJobJarAndDependencies(programOptions); final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine, commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration); try {
//执行 executeProgram(effectiveConfiguration, program); }

  这个就是最终执行:

executeProgram(effectiveConfiguration, program);

 



【Flink源码】一、客户端任务提交源码

标签:rect   name   eric   内核   内存模型   win   amp   tco   parameter   

原文地址:https://www.cnblogs.com/fi0108/p/14899055.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!