Flink commonly used operator transformation

hivefans
4 min readDec 4, 2020

In Flink, the Transformation operator is to convert one or more DataStreams into a new DataStream, which can combine multiple transformations into a complex data stream topology. As shown in the figure below, DataStream will be transformed, filtered, and aggregated into other different streams by different Transformation operations, thereby fulfilling our business requirements.

1、map

Map: Input one element, output one element, can be used to do some cleaning.

public class FlinkSubmitter {
public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


Properties props = new Properties();
props.put("bootstrap.servers", "192.168.83.129:9092");
props.setProperty("group.id","con1");
props.put("zookeeper.connect","192.168.83.129:2181");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

System.out.println("ready to print");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"kafka_flink_mysql",
new SimpleStringSchema(),
props);
consumer.setStartFromGroupOffsets();


SingleOutputStreamOperator<Entity> StreamRecord = env.addSource(consumer)
.map(string -> JSON.parseObject(string, Entity.class))
.setParallelism(1);

SingleOutputStreamOperator<Entity> result = StreamRecord.map(new MapFunction<Entity, Entity>() {
@Override
public Entity map(Entity value) throws Exception {
Entity entity1 = new Entity();
entity1.city = value.city+".XPU.Xiax";
entity1.phoneName = value.phoneName.toUpperCase();
entity1.loginTime = value.loginTime;
entity1.os = value.os;
return entity1;
}
});
result.print().setParallelism(1);
env.execute("new one");

}
}

In this example, after converting the obtained JSON string to an Entity object, we use the map operator to make all phoneNames uppercase and add the XPU.Xiax suffix after city.

2、flatMap

flatMap: Flattening operation, we can understand it as flattening the input elements, so there is no requirement for the number of output results, which can be 0, 1, or more. It is similar to Map, but the reason for introducing flatMap is because the return value result of general java method is one, so flatMap is introduced to distinguish this.

SingleOutputStreamOperator<Entity> result = StreamRecord
.flatMap(new FlatMapFunction<Entity, Entity>() {
@Override
public void flatMap(Entity entity, Collector<Entity> out) throws Exception {
if (entity.city.equals("NewYork")) {
out.collect(entity);
}
}
});

3、filter

Filter: Filter to output all result sets that meet the criteria


SingleOutputStreamOperator<Entity> result = StreamRecord
.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity entity) throws Exception {
if (entity.phoneName.equals("HUAWEI")) {
return true;
}
return false;
}
});

4、keyBy

keyBy: logically partition the Stream according to the specified key, and partition according to the hash value of the key.

KeyedStream<Entity, String> result = StreamRecord
.keyBy(new KeySelector<Entity, String>() {
@Override
public String getKey(Entity entity) throws Exception {
return entity.os;
}
});

5、reduce

Reduce: belongs to a merge operation, it can convert the keyedStream of 3 into a DataStream, Reduce returns a single result value, and the reduce operation always creates a new value every time new data is processed every day. Common aggregation operations such as min, max, etc. can be implemented using the reduce method. Here is a simple example of implementing a Socket wordCount to help understand the process of flatMap/keyBy/reduce/window and other operations

package com.bigdata.flink.Stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {
public static void main(String[] args) throws Exception{

int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch(Exception e){
System.err.println("No port Set, use default port---java");
port = 9000;
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String hostname = "192.168.83.129";
String delimiter = "\n";

DataStreamSource<String> socketWord = env.socketTextStream(hostname, port, delimiter);

DataStream<WordWithCount> windowcounts = socketWord.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] splits = value.split("\\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1));
}
}
}).keyBy("word")
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word, a.count + b.count);
}
});
windowcounts.print().setParallelism(1);
env.execute("socketWindow");
}

public static class WordWithCount{
public String word;
public int count;

public WordWithCount(){

}

public WordWithCount(String word, int count){
this.count = count;
this.word = word;
}

@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}

6、aggregations

aggregations: Perform some aggregation operations, such as sum(), min(), max(), etc., which can be used in keyedStream to obtain aggregation. Usage is as follows

KeyedStream.sum(0) or KeyedStream.sum(“Key”)

7、unoin

Union: Multiple streams can be merged into one stream in order to perform unified processing on the merged stream, which is a bit similar to the two Bolt data of the previous level in Storm. Note that the merged stream types need to be consistent

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
DataStream<String> textStream9002 = env.socketTextStream("localhost", 9002, "\n");

DataStream<String> mapStream9000=textStream9000.map(s->"come from 9000 port:"+s);
DataStream<String> mapStream9001=textStream9001.map(s->"come from 9001 port:"+s);
DataStream<String> mapStream9002=textStream9002.map(s->"come from 9002 port:"+s);


DataStream<String> result = mapStream9000.union(mapStream9001,mapStream9002);

result.print();
env.execute();

8、connect

connect: Similar to union, but can only connect two streams. The data types of the two streams can be different, and different processing methods are applied to the data in the two streams.

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Long> text1 = env.addSource(new MyParalleSource()).setParallelism(1);
DataStreamSource<Long> text2 = env.addSource(new MyParalleSource()).setParallelism(1);

SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "str" + value;
}
});

ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);

SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
@Override
public Object map1(Long value) throws Exception {
return value;
}

@Override
public Object map2(String value) throws Exception {
return value;
}
});
result.print().setParallelism(1);
env.execute( "StreamingDemoWithMyNoParalleSource");

9、split

Split: Split a data stream into multiple streams according to the rules. In a real scenario, the source data stream may be mixed with a variety of similar data. Different types of data processing rules are different, so it is possible to split a data stream into multiple streams according to certain rules.

        StreamExecutionEnvironment env =    StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(1);
SplitStream<Long> splitString = text.split(new OutputSelector<Long>() {
@Override
public Iterable<String> select(Long value) {
ArrayList<String> output = new ArrayList<>();
if (value % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}

return output;
}
});


DataStream<Long> evenStream = splitString.select("even");
DataStream<Long> oddStream = splitString.select("odd");
DataStream<Long> moreStream = splitString.select("odd","even");
evenStream.print().setParallelism(1);
env.execute( "StreamingDemoWithMyNoParalleSource");

10、window and windowAll

window: Aggregate by time or group KeyedStream by other conditions, usage: inputStream.keyBy(0).window(Time.seconds(10)); windowAll: The function allows regular data streams to be grouped. Usually, this is a non-parallel data conversion because it runs on a non-partitioned data stream. Usage: inputStream.keyBy(0).windowAll(Time.seconds(10));

--

--