UDF、UDAF、UDTF相关概念

[TOC]

Flink Table & SQL中的用户自定义函数: UDF、UDAF、UDTF。

  1. UDF: 自定义标量函数(User Defined Scalar Function)。一行输入一行输出。

  2. UDAF: 自定义聚合函数。多行输入一行输出。

  3. UDTF: 自定义表函数。一行输入多行输出或一列输入多列输出。

    需求分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    今日目标: 
    (1) 会用Flink Java API
    (2) 会写Flink SQL 的UDF、UDAF、UDTF

    案例:

    1. 先把本地flink开发环境弄好(maven + flink 1.10.0)
    2. 基于开源flink 写一个Demo
    (1) 用Flink Java API 向 Kafka 写事件。事件的业务含义: 某个用户在某个时刻点击或浏览了某个页面。
    写到Kafka的数据格式是JSON字符串。
    {"uid":"用户ID, 取1-10的随机数", "name": "user_1-10的随机数", "event_time": "北京时间,时间格式为yyyy-MM-dd HH:mm:ss", "event_name":"click/browse", "page": 1-10的随机数 }
    如{"uid": "1", "name": "user_2", "event_time": "2021-01-06 11:22:00", "event_name": "click", "page": 5}

    (2) 用Flink SQL 消费Kafka数据
    (a) 自定义UDF, 每条数据增加一个字段uuid, uuid = uid+时间(event_time)+page
    (b) 自定义UDTF, 将每条数据中的event_time, 拆分出日期dt和时间time两个字段
    (c) 自定义UDAF, 求每个用户每天点击的总次数
1
2
3
4
5
6
7
8
发送数据: {"d":"2021-01-08 18:09:37","event_name":"click/browse","name":"user_10","page":10,"uid":5}
发送数据: {"d":"2021-01-08 18:09:37","event_name":"click/browse","name":"user_10","page":10,"uid":5}
发送数据: {"d":"2021-01-08 18:09:37","event_name":"click/browse","name":"user_10","page":10,"uid":5}
发送数据: {"d":"2021-01-08 18:09:37","event_name":"click/browse","name":"user_10","page":10,"uid":5}
发送数据: {"d":"2021-01-08 18:09:37","event_name":"click/browse","name":"user_10","page":10,"uid":5}
发送数据: {"d":"2021-01-08 18:09:37","event_name":"click/browse","name":"user_10","page":10,"uid":5}
发送数据: {"d":"2021-01-08 18:09:37","event_name":"click/browse","name":"user_10","page":10,"uid":5}
发送数据: {"d":"2021-01-08 18:09:37","event_name":"click/browse","name":"user_10","page":10,"uid":5}

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.imooc.flink.java.course02;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

public class Producer {
private static final String broker_list = "localhost:9092";
private static final String topic = "test"; //kafka topic 需要和 flink 程序用同一个 topic

public static void writeToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);//老版本producer已废弃
// Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);


//Student student = new Student(i, "itzzy" + i, "password" + i, 18 + i);
Data data = new Data();
Random random = new Random();
data.setUid(random.nextInt(10) + 1);
data.setName("user_"+(random.nextInt(10) + 1));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = sdf.format(new Date());
data.setD(time);
data.setEvent_name("click/browse");
data.setPage(random.nextInt(10) + 1);
String message = JSON.toJSONString(data);

while (true){
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(data));
producer.send(record);
producer.flush();
System.out.println("发送数据: " + JSON.toJSONString(data));
Thread.sleep(1000);
}
}

public static void main(String[] args) throws InterruptedException {
writeToKafka();
}

// public static void main(String[] args) throws InterruptedException {
// writeToKafka();
//
//
// Data data = new Data();
// Random random = new Random();
// data.setUid(random.nextInt(10) + 1);
// data.setName(random.nextInt(10) + 1);
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// String time = sdf.format(new Date());
// data.setDate(time);
// data.setEvent_name("click/browse");
// data.setPage(random.nextInt(10) + 1);
// String message = JSON.toJSONString(data);
//
//// // 创建Flink执行环境
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
////
// DataStream<String> stream = ;
//
// Properties properties = new Properties();
// properties.setProperty("bootstrap.servers", "localhost:9092");
// // properties.setProperty("group.id","test-consumer-group");
// FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
// "test", // target topic
// new SimpleStringSchema(), // serialization schema
// properties, // producer config
// FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
//
// stream.addSink(myProducer);
// }

}

Data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.imooc.flink.java.course02;


public class Data {
private int uid;
private String name;
// SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// String time= sdf.format( new Date());
private String d;
private String event_name;
private int page;
private String uuid;

@Override
public String toString() {
return "Data{" +
"uid=" + uid +
", name='" + name + '\'' +
", d='" + d + '\'' +
", event_name='" + event_name + '\'' +
", page=" + page +
", uuid='" + uuid + '\'' +
'}';
}

public String getUuid() {
return uuid;
}

public void setUuid(String uuid) {
this.uuid = uuid;
}

public int getUid() {
return uid;
}

public void setUid(int uid) {
this.uid = uid;
}


public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getD() {
return d;
}

public void setD(String d) {
this.d = d;
}

public String getEvent_name() {
return event_name;
}

public void setEvent_name(String event_name) {
this.event_name = event_name;
}

public int getPage() {
return page;
}

public void setPage(int page) {
this.page = page;
}

public Data() {
}
}

UDF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.imooc.flink.java.course02;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class UDFConsumer1 {
public static void main(String[] args) throws Exception{
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

// StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(streamEnv);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

streamEnv.setParallelism(1);

//3、注册Kafka数据源
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers","localhost:9092");
browseProperties.put("group.id","test-consumer-group");

// /**
// * 没有processFunction
// */

// System.out.println("-------------------");
// FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), browseProperties);
// DataStream<Data> stream = streamEnv.addSource(myConsumer)
// .process(new BrowseKafkaProcessFunction());
//
// stream.print();
// streamEnv.execute("hello");
// System.out.println("-------------------");
/**
*
* 有processFunction
*/
DataStream<Data> browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
// 增加uuid
System.out.println("----------------------------");
// browseStream.print();

System.out.println("----------------------------");
tableEnv.registerDataStream("source_kafka",browseStream,"uid,name,d,event_name,page,uuid");
//注册函数
tableEnv.registerFunction("f",new helper());

/**
* Data{uid=5, name='user_5', date='2021-01-08 13:01:20',
* event_name='click/browse', page=1, uuid='null'}
*/
// tableEnv.toAppendStream(tableEnv.scan("source_kafka"), Row.class).print();
// streamEnv.execute();
// tableEnv.execEnv();
String sql="select uid, name, d, event_name, page, f(uid,d,page) from source_kafka";

Table table=tableEnv.sqlQuery(sql);
tableEnv.toAppendStream(table, Row.class).print();
tableEnv.execute("test");
// streamEnv.execute();

}

// 实现自定义的ScalarFunction
public static class helper extends ScalarFunction {


public String eval(int uid,String date,int page){
String res=uid+date+page;
return res;
}
}
static class BrowseKafkaProcessFunction extends ProcessFunction<String, Data> {
@Override
public void processElement(String value, Context ctx, Collector<Data> out) throws Exception {
Data data= JSON.parseObject(value, Data.class);
out.collect(data);
}
}
}

UDTF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.imooc.flink.java.course02;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class UDTFConsumer1 {
public static void main(String[] args) throws Exception{
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

// StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(streamEnv);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

streamEnv.setParallelism(1);

//3、注册Kafka数据源
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers","localhost:9092");
browseProperties.put("group.id","test-consumer-group");
/**
*
* 有processFunction
*/
DataStream<Data> browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
// 增加uuid
System.out.println("----------------------------");
// browseStream.print();

System.out.println("----------------------------");
tableEnv.registerDataStream("source_kafka",browseStream,"uid,name,d,event_name,page,uuid");
//注册函数
tableEnv.registerFunction("f",new helper());

/**
* Data{uid=5, name='user_5', date='2021-01-08 13:01:20',
* event_name='click/browse', page=1, uuid='null'}
*/
// tableEnv.toAppendStream(tableEnv.scan("source_kafka"), Row.class).print();
// streamEnv.execute();
// tableEnv.execEnv();
String sql="select uid, name, d, event_name, page, date1, time1 " +
"from source_kafka ,"
+ "lateral table(f(d)) as T(date1,time1)";
;

Table table=tableEnv.sqlQuery(sql);
tableEnv.toAppendStream(table, Row.class).print();
tableEnv.execute("test");
// streamEnv.execute();

}

static class BrowseKafkaProcessFunction extends ProcessFunction<String, Data> {
@Override
public void processElement(String value, Context ctx, Collector<Data> out) throws Exception {
Data data= JSON.parseObject(value, Data.class);
out.collect(data);
}
}
public static class helper extends TableFunction<Row> {
public void eval(String value) {
String[] valueSplits = value.split(" ");

//一行,两列
Row row = new Row(2);
row.setField(0,valueSplits[0]);
row.setField(1,valueSplits[1]);
collect(row);
}
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(Types.STRING,Types.STRING);
}
}
}

UDAF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.imooc.flink.java.course02;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import scala.Int;

import java.util.Properties;

public class UDAFConsumer1 {
public static void main(String[] args) throws Exception{
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

// StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(streamEnv);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

streamEnv.setParallelism(1);

//3、注册Kafka数据源
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers","localhost:9092");
browseProperties.put("group.id","test-consumer-group");

// /**
// * 没有processFunction
// */

// System.out.println("-------------------");
// FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), browseProperties);
// DataStream<Data> stream = streamEnv.addSource(myConsumer)
// .process(new BrowseKafkaProcessFunction());
//
// stream.print();
// streamEnv.execute("hello");
// System.out.println("-------------------");
/**
*
* 有processFunction
*/
DataStream<Data> browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
// 增加uuid
System.out.println("----------------------------");
// browseStream.print();

System.out.println("----------------------------");
tableEnv.registerDataStream("source_kafka",browseStream,"uid,name,d,event_name,page,uuid");
//注册函数
tableEnv.registerFunction("f",new helper());

/**
* Data{uid=5, name='user_5', date='2021-01-08 13:01:20',
* event_name='click/browse', page=1, uuid='null'}
*/
// tableEnv.toAppendStream(tableEnv.scan("source_kafka"), Row.class).print();
// streamEnv.execute();
// tableEnv.execEnv();
String sql="select uid, f(uid) " +
// ", name, d, event_name, page, f(uid) " +
"from source_kafka " +
"group by uid";

Table table=tableEnv.sqlQuery(sql);
tableEnv.toRetractStream(table, Row.class).print();
tableEnv.execute("test");
// streamEnv.execute();

}


public static class helper extends AggregateFunction<Integer,helper.Account> {



public static class Account{
public Integer cnt;
}
@Override
public Integer getValue(Account account) {
return account.cnt;
}

@Override
public Account createAccumulator() {
Account account=new Account();
account.cnt=0;
return account;
}
public void accumulate(Account accumulator, Object iValue) {
accumulator.cnt++;
}


}
static class BrowseKafkaProcessFunction extends ProcessFunction<String, Data> {
@Override
public void processElement(String value, Context ctx, Collector<Data> out) throws Exception {
Data data= JSON.parseObject(value, Data.class);
out.collect(data);
}
}
}