Published: 2019-10-20 21:10:00
By ytwan
In Big Data .
tags: Flink
ParameterTool
Flink--1 .7 以及以后
org .apache.flink.api.java.utils.ParameterTool
1 . 使用 ParameterTool 规避 static 变量初始化
ParameterTool 提供了一系列预定义的静态方法来读取配置信息,
ParameterTool 内部是一个 Map < String , String >, 所以很容易与你自己的配置形式相集成
ParameterTool 是可序列化
使用外部参数
final ParameterTool parameterTool = ParameterTool .fromArgs ( args );
if ( parameterTool .getNumberOfParameters () < 3 ) {
System . out . println ( "Missing parameters!\n" +
"Usage: --numRecords <numRecords> --index <index> --type <type>" );
return ;
}
parameterTool .getInt ( "numRecords" )
使用 parameterTool 来承接 main 函数的参数,通过 env 来设置全局变量来进行分发,
那么在继承了 rich 函数的逻辑中就可以使用这个全局参数
方法:
public static ParameterTool
fromPropertiesFile
fromMap
fromSystemProperties
getNumberOfParameters
getRequired getConfiguration getProperties
get getByte getInt getShort getLong getFloat getDouble getBoolean
has
addToDefaults createPropertiesFile
eg :
// 从 .properties 文件中获取
String propertiesFile = "/home/flink/myJob.properties" ;
ParameterTool parameter = ParameterTool .fromPropertiesFile ( propertiesFile );
// 从命令行参数中获取 像 --outputPath hdfs :/// mydata --elements 42 这种形式的参数
// -- Keys have to start with '-' or '--'
final ParameterTool params = ParameterTool .fromArgs ( args );
final String outputPath = params .getRequired ( "outputPath" );
// 使用的情况 -- 设置环境 -- 使用 env .getConfig () .setGlobalJobParameters 将 ParameterTool 的访问范围设置为 global
ParameterTool parameters = ParameterTool .fromArgs ( args );
final ExecutionEnvironment env = ExecutionEnvironment .getExecutionEnvironment ();
env .getConfig () .setGlobalJobParameters
ParameterTool parameters = ( ParameterTool ) getRuntimeContext () .getExecutionConfig () .getGlobalJobParameters ();
parameters .getRequired ( "input" );
Configuration
Flink 1.3 org.apache.flink.configuration.Configuration
步骤: 在main函数中定义变量
步骤 继承自一个rich的function,这样才可以在open方法中获取相应的参数
// Class in Flink to store parameters
Configuration configuration = new Configuration();
使用 broadcast 变量
其他情况
public static void main(String[] args){
if(args.length != 3) {
logger.error("");
System.exit(1);
}
String stuName = args[0];
String statisDate = args[1];
String gradeName = args[2];
}
参考:
如何在flink中传递参数 https://www.cnblogs.com/029zz010buct/p/10362451.html
他山之玉