Flink commonly used operator transformation

public class FlinkSubmitter {
public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


Properties props = new Properties();
props.put("bootstrap.servers", "192.168.83.129:9092");
props.setProperty("group.id","con1");
props.put("zookeeper.connect","192.168.83.129:2181");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

System.out.println("ready to print");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"kafka_flink_mysql",
new SimpleStringSchema(),
props);
consumer.setStartFromGroupOffsets();


SingleOutputStreamOperator<Entity> StreamRecord = env.addSource(consumer)
.map(string -> JSON.parseObject(string, Entity.class))
.setParallelism(1);

SingleOutputStreamOperator<Entity> result = StreamRecord.map(new MapFunction<Entity, Entity>() {
@Override
public Entity map(Entity value) throws Exception {
Entity entity1 = new Entity();
entity1.city = value.city+".XPU.Xiax";
entity1.phoneName = value.phoneName.toUpperCase();
entity1.loginTime = value.loginTime;
entity1.os = value.os;
return entity1;
}
});
result.print().setParallelism(1);
env.execute("new one");

}
}
SingleOutputStreamOperator<Entity> result = StreamRecord
.flatMap(new FlatMapFunction<Entity, Entity>() {
@Override
public void flatMap(Entity entity, Collector<Entity> out) throws Exception {
if (entity.city.equals("NewYork")) {
out.collect(entity);
}
}
});

SingleOutputStreamOperator<Entity> result = StreamRecord
.filter(new FilterFunction<Entity>() {
@Override
public boolean filter(Entity entity) throws Exception {
if (entity.phoneName.equals("HUAWEI")) {
return true;
}
return false;
}
});
KeyedStream<Entity, String> result = StreamRecord
.keyBy(new KeySelector<Entity, String>() {
@Override
public String getKey(Entity entity) throws Exception {
return entity.os;
}
});
package com.bigdata.flink.Stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {
public static void main(String[] args) throws Exception{

int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch(Exception e){
System.err.println("No port Set, use default port---java");
port = 9000;
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String hostname = "192.168.83.129";
String delimiter = "\n";

DataStreamSource<String> socketWord = env.socketTextStream(hostname, port, delimiter);

DataStream<WordWithCount> windowcounts = socketWord.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] splits = value.split("\\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1));
}
}
}).keyBy("word")
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word, a.count + b.count);
}
});
windowcounts.print().setParallelism(1);
env.execute("socketWindow");
}

public static class WordWithCount{
public String word;
public int count;

public WordWithCount(){

}

public WordWithCount(String word, int count){
this.count = count;
this.word = word;
}

@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
DataStream<String> textStream9002 = env.socketTextStream("localhost", 9002, "\n");

DataStream<String> mapStream9000=textStream9000.map(s->"come from 9000 port:"+s);
DataStream<String> mapStream9001=textStream9001.map(s->"come from 9001 port:"+s);
DataStream<String> mapStream9002=textStream9002.map(s->"come from 9002 port:"+s);


DataStream<String> result = mapStream9000.union(mapStream9001,mapStream9002);

result.print();
env.execute();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Long> text1 = env.addSource(new MyParalleSource()).setParallelism(1);
DataStreamSource<Long> text2 = env.addSource(new MyParalleSource()).setParallelism(1);

SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "str" + value;
}
});

ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);

SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
@Override
public Object map1(Long value) throws Exception {
return value;
}

@Override
public Object map2(String value) throws Exception {
return value;
}
});
result.print().setParallelism(1);
env.execute( "StreamingDemoWithMyNoParalleSource");
        StreamExecutionEnvironment env =    StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(1);
SplitStream<Long> splitString = text.split(new OutputSelector<Long>() {
@Override
public Iterable<String> select(Long value) {
ArrayList<String> output = new ArrayList<>();
if (value % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}

return output;
}
});


DataStream<Long> evenStream = splitString.select("even");
DataStream<Long> oddStream = splitString.select("odd");
DataStream<Long> moreStream = splitString.select("odd","even");
evenStream.print().setParallelism(1);
env.execute( "StreamingDemoWithMyNoParalleSource");

--

--

--

spark kafak flink develop

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

My First Live Website As A Flatiron Student

JavaScript Asynchronous Programming

Using Typescript with Node.js

Are You Afraid To Solve These 3 JavaScript Questions?

9 Free and Useful jQuery Plugins Ever Web Developer Should Know

Introduction to the DOM

9 Tools That Will Make You a Productive and Effective Developer

Setting Up a Redux Project With Create-React-App

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
hivefans

hivefans

spark kafak flink develop

More from Medium

Datasource enabling indexing and sampling directly on the storage

Practical Tips for Dask, vol3: Limited-fan Repartitions

Distributed Sync SGD

Databases 101 : What are UUIDs? should we care?