博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink KeyedStream的KeySelector
阅读量:6291 次
发布时间:2019-06-22

本文共 11094 字,大约阅读时间需要 36 分钟。

本文主要研究一下flink KeyedStream的KeySelector

KeyedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

@Publicpublic class KeyedStream
extends DataStream
{ /** * The key selector that can get the key by which the stream if partitioned from the elements. */ private final KeySelector
keySelector; /** The type of the key by which the stream is partitioned. */ private final TypeInformation
keyType; /** * Creates a new {@link KeyedStream} using the given {@link KeySelector} * to partition operator state by key. * * @param dataStream * Base stream of data * @param keySelector * Function for determining state partitions */ public KeyedStream(DataStream
dataStream, KeySelector
keySelector) { this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType())); } /** * Creates a new {@link KeyedStream} using the given {@link KeySelector} * to partition operator state by key. * * @param dataStream * Base stream of data * @param keySelector * Function for determining state partitions */ public KeyedStream(DataStream
dataStream, KeySelector
keySelector, TypeInformation
keyType) { this( dataStream, new PartitionTransformation<>( dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)), keySelector, keyType); } /** * Creates a new {@link KeyedStream} using the given {@link KeySelector} and {@link TypeInformation} * to partition operator state by key, where the partitioning is defined by a {@link PartitionTransformation}. * * @param stream * Base stream of data * @param partitionTransformation * Function that determines how the keys are distributed to downstream operator(s) * @param keySelector * Function to extract keys from the base stream * @param keyType * Defines the type of the extracted keys */ @Internal KeyedStream( DataStream
stream, PartitionTransformation
partitionTransformation, KeySelector
keySelector, TypeInformation
keyType) { super(stream.getExecutionEnvironment(), partitionTransformation); this.keySelector = clean(keySelector); this.keyType = validateKeyType(keyType); } //......}复制代码
  • 这里可以看到KeyedStream的不同构造器中都需要一个KeySelector类型的参数

KeySelector

flink-core-1.7.0-sources.jar!/org/apache/flink/api/java/functions/KeySelector.java

@Public@FunctionalInterfacepublic interface KeySelector
extends Function, Serializable { /** * User-defined function that deterministically extracts the key from an object. * *

For example for a class: *

	 * 	public class Word {	 * 		String word;	 * 		int count;	 * 	}	 * 
* The key extractor could return the word as * a key to group all Word objects by the String they contain. * *

The code would look like this *

	 * 	public String getKey(Word w) {	 * 		return w.word;	 * 	}	 * 
* * @param value The object to get the key from. * @return The extracted key. * * @throws Exception Throwing an exception will cause the execution of the respective task to fail, * and trigger recovery or cancellation of the program. */ KEY getKey(IN value) throws Exception;}复制代码
  • KeySelector接口继承了Function接口,定义了getKey方法,用于从IN类型中提取出KEY

DataStream.keyBy

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

/**	 * It creates a new {@link KeyedStream} that uses the provided key for partitioning	 * its operator states.	 *	 * @param key	 *            The KeySelector to be used for extracting the key for partitioning	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)	 */	public 
KeyedStream
keyBy(KeySelector
key) { Preconditions.checkNotNull(key); return new KeyedStream<>(this, clean(key)); } /** * It creates a new {@link KeyedStream} that uses the provided key with explicit type information * for partitioning its operator states. * * @param key The KeySelector to be used for extracting the key for partitioning. * @param keyType The type information describing the key type. * @return The {@link DataStream} with partitioned state (i.e. KeyedStream) */ public
KeyedStream
keyBy(KeySelector
key, TypeInformation
keyType) { Preconditions.checkNotNull(key); Preconditions.checkNotNull(keyType); return new KeyedStream<>(this, clean(key), keyType); } /** * Partitions the operator state of a {@link DataStream} by the given key positions. * * @param fields * The position of the fields on which the {@link DataStream} * will be grouped. * @return The {@link DataStream} with partitioned state (i.e. KeyedStream) */ public KeyedStream
keyBy(int... fields) { if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) { return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType())); } else { return keyBy(new Keys.ExpressionKeys<>(fields, getType())); } } /** * Partitions the operator state of a {@link DataStream} using field expressions. * A field expression is either the name of a public field or a getter method with parentheses * of the {@link DataStream}'s underlying type. A dot can be used to drill * down into objects, as in {@code "field1.getInnerField2()" }. * * @param fields * One or more field expressions on which the state of the {@link DataStream} operators will be * partitioned. * @return The {@link DataStream} with partitioned state (i.e. KeyedStream) **/ public KeyedStream
keyBy(String... fields) { return keyBy(new Keys.ExpressionKeys<>(fields, getType())); } private KeyedStream
keyBy(Keys
keys) { return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))); }复制代码
  • DataStream的keyBy方法用于将DataStream转换为KeyedStream,该方法有不同的重载
  • 一个是支持变长int数组,这个通常用于简单tuple类型,int为tuple的小标,从0开始,如果是多个int,表示是组合key,比如keyBy(0,1)表示要用tuple的第一个和第二个字段作为key;
  • 一个是支持变长String数组,这个通常用于复杂tuple类型及POJO类型,对于POJO,String用于指定字段名,也支持对象/tuple嵌套属性,比如user.zip,对于对象类型的tuple,f0表示该tuple的第一个字段
  • 一个是支持KeySelector,通过Key Selector Function可以自由指定key,比如从对象提取然后做些处理
  • keyBy(int... fields)及keyBy(String... fields)里头均有调用到私有的keyBy(Keys<T> keys)方法,由于KeyedStream的构造器都需要KeySelector参数,所以该方法最后也是通过KeySelectorUtil.getSelectorForKeys将Keys转换为KeySelector对象

Keys.ExpressionKeys

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/operators/Keys.java

/**	 * Represents (nested) field access through string and integer-based keys	 */	public static class ExpressionKeys
extends Keys
{ public static final String SELECT_ALL_CHAR = "*"; public static final String SELECT_ALL_CHAR_SCALA = "_"; private static final Pattern WILD_CARD_REGEX = Pattern.compile("[\\.]?(" + "\\" + SELECT_ALL_CHAR + "|" + "\\" + SELECT_ALL_CHAR_SCALA +")$"); // Flattened fields representing keys fields private List
keyFields; private TypeInformation
[] originalKeyTypes; //...... /** * Create String-based (nested) field expression keys on a composite type. */ public ExpressionKeys(String[] keyExpressions, TypeInformation
type) { checkNotNull(keyExpressions, "Field expression cannot be null."); this.keyFields = new ArrayList<>(keyExpressions.length); if (type instanceof CompositeType){ CompositeType
cType = (CompositeType
) type; this.originalKeyTypes = new TypeInformation
[keyExpressions.length]; // extract the keys on their flat position for (int i = 0; i < keyExpressions.length; i++) { String keyExpr = keyExpressions[i]; if (keyExpr == null) { throw new InvalidProgramException("Expression key may not be null."); } // strip off whitespace keyExpr = keyExpr.trim(); List
flatFields = cType.getFlatFields(keyExpr); if (flatFields.size() == 0) { throw new InvalidProgramException("Unable to extract key from expression '" + keyExpr + "' on key " + cType); } // check if all nested fields can be used as keys for (FlatFieldDescriptor field : flatFields) { if (!field.getType().isKeyType()) { throw new InvalidProgramException("This type (" + field.getType() + ") cannot be used as key."); } } // add flat fields to key fields keyFields.addAll(flatFields); String strippedKeyExpr = WILD_CARD_REGEX.matcher(keyExpr).replaceAll(""); if (strippedKeyExpr.isEmpty()) { this.originalKeyTypes[i] = type; } else { this.originalKeyTypes[i] = cType.getTypeAt(strippedKeyExpr); } } } else { if (!type.isKeyType()) { throw new InvalidProgramException("This type (" + type + ") cannot be used as key."); } // check that all key expressions are valid for (String keyExpr : keyExpressions) { if (keyExpr == null) { throw new InvalidProgramException("Expression key may not be null."); } // strip off whitespace keyExpr = keyExpr.trim(); // check that full type is addressed if (!(SELECT_ALL_CHAR.equals(keyExpr) || SELECT_ALL_CHAR_SCALA.equals(keyExpr))) { throw new InvalidProgramException( "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for non-composite types."); } // add full type as key keyFields.add(new FlatFieldDescriptor(0, type)); } this.originalKeyTypes = new TypeInformation[] { type}; } } //...... }复制代码
  • ExpressionKeys是Keys里头的一个静态类,它继承了Keys对象;keyBy(int... fields)及keyBy(String... fields)里头均有通过new Keys.ExpressionKeys,将fields转换为Keys.ExpressionKeys,最后调用私有的keyBy(Keys<T> keys)方法

KeySelectorUtil.getSelectorForKeys

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/keys/KeySelectorUtil.java

@Internalpublic final class KeySelectorUtil {	public static 
KeySelector
getSelectorForKeys(Keys
keys, TypeInformation
typeInfo, ExecutionConfig executionConfig) { if (!(typeInfo instanceof CompositeType)) { throw new InvalidTypesException( "This key operation requires a composite type such as Tuples, POJOs, or Case Classes."); } CompositeType
compositeType = (CompositeType
) typeInfo; int[] logicalKeyPositions = keys.computeLogicalKeyPositions(); int numKeyFields = logicalKeyPositions.length; TypeInformation
[] typeInfos = keys.getKeyFieldTypes(); // use ascending order here, the code paths for that are usually a slight bit faster boolean[] orders = new boolean[numKeyFields]; for (int i = 0; i < numKeyFields; i++) { orders[i] = true; } TypeComparator
comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig); return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos)); } //......}复制代码
  • KeySelectorUtil.getSelectorForKeys方法用于将Keys转换为KeySelector类型

小结

  • KeyedStream的不同构造器中都需要一个KeySelector参数
  • DataStream的keyBy方法有不同的重载,支持变长int数组,变长String数组以及KeySelector类型
  • keyBy(int... fields)及keyBy(String... fields)里头均有通过new Keys.ExpressionKeys,将fields转换为Keys.ExpressionKeys,最后调用私有的keyBy(Keys<T> keys)方法,该方法通过调用KeySelectorUtil.getSelectorForKeys方法将Keys转换为KeySelector类型

doc

转载地址:http://etcta.baihongyu.com/

你可能感兴趣的文章
Spring Boot Unregistering JMX-exposed beans on shutdown
查看>>
poi 导入导出的api说明(大全)
查看>>
Mono for Android 优势与劣势
查看>>
将图片转成base64字符串并在JSP页面显示的Java代码
查看>>
js 面试题
查看>>
sqoop数据迁移(基于Hadoop和关系数据库服务器之间传送数据)
查看>>
腾讯云下安装 nodejs + 实现 Nginx 反向代理
查看>>
Javascript 中的 Array 操作
查看>>
java中包容易出现的错误及权限问题
查看>>
AngularJS之初级Route【一】(六)
查看>>
服务器硬件问题整理的一点总结
查看>>
SAP S/4HANA Cloud: Revolutionizing the Next Generation of Cloud ERP
查看>>
Mellanox公司计划利用系统芯片提升存储产品速度
查看>>
白帽子守护网络安全,高薪酬成大学生就业首选!
查看>>
ARM想将芯片装进人类大脑 降低能耗是一大挑战
查看>>
Oracle数据库的备份方法
查看>>
Selenium 自动登录考勤系统
查看>>
关于如何以编程的方式执行TestNG
查看>>
智能照明造福千家万户 家居智能不再是梦
查看>>
物联网如何跳出“看起来很美”?
查看>>