leecode.rt_phm更新更新

This commit is contained in:
markilue 2023-05-18 13:24:02 +08:00
parent 2bb13486cd
commit 4707fd922b
12 changed files with 773 additions and 0 deletions

View File

@ -0,0 +1,50 @@
package com.markilue.leecode.hot100.interviewHot.singlestack;
import org.junit.Test;
import java.util.Arrays;
import java.util.LinkedList;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.interviewHot.singlestack
*@Author: markilue
*@CreateTime: 2023-05-18 10:17
*@Description: TODO 力扣496 下一个更大元素I
*@Version: 1.0
*/
public class LC_496_NextGreaterElement {
@Test
public void test(){
int[] nums1 = {4, 1, 2}, nums2 = {1, 3, 4, 2};
System.out.println(Arrays.toString(nextGreaterElement(nums1,nums2)));
}
/**
* 单调栈解法:记录右边出现的数更大的元素是什么
* @param nums1
* @param nums2
* @return
*/
public int[] nextGreaterElement(int[] nums1, int[] nums2) {
LinkedList<Integer> stack = new LinkedList<>();
int[] find = new int[10001];
for (int i = nums2.length - 1; i >= 0; i--) {
while (!stack.isEmpty() && stack.peek() <= nums2[i]) {
stack.pop();
}
find[nums2[i]] = stack.isEmpty() ? -1 : stack.peek();
stack.push(nums2[i]);
}
for (int i = 0; i < nums1.length; i++) {
nums1[i] = find[nums1[i]];
}
return nums1;
}
}

View File

@ -0,0 +1,46 @@
package com.markilue.leecode.hot100.interviewHot.singlestack;
import org.junit.Test;
import java.util.Arrays;
import java.util.LinkedList;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.interviewHot.singlestack
*@Author: markilue
*@CreateTime: 2023-05-18 09:59
*@Description: TODO 力扣739 每日温度
*@Version: 1.0
*/
public class LC_739_DailyTemperatures {
@Test
public void test(){
// int[] temperatures = {73, 74, 75, 71, 69, 72, 76, 73};
int[] temperatures = {89,62,70,58,47,47,46,76,100,70};
System.out.println(Arrays.toString(dailyTemperatures(temperatures)));
}
/**
* 单调栈解法:从后往前遍历记录一个单调的栈
* @param temperatures
* @return
*/
public int[] dailyTemperatures(int[] temperatures) {
LinkedList<Integer> stack = new LinkedList<>();
int[] result = new int[temperatures.length];
for (int i = temperatures.length - 1; i >= 0; i--) {
while (!stack.isEmpty() && temperatures[stack.peek()] <= temperatures[i]) {
stack.pop();
}
result[i] = stack.isEmpty() ? 0 : stack.peek() - i;
stack.push(i);
}
return result;
}
}

View File

@ -0,0 +1,98 @@
package com.markilue.leecode.hot100.interviewHot.singlestack;
import org.junit.Test;
import java.util.Arrays;
import java.util.LinkedList;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.interviewHot.singlestack
*@Author: markilue
*@CreateTime: 2023-05-18 10:33
*@Description: TODO 力扣503 下一个更大元素II
*@Version: 1.0
*/
public class T503_NextGreaterElements {
@Test
public void test() {
// int[] nums = {1, 5, 3, 4, 3};
// int[] nums = {1, 2, 3, 4, 3};
int[] nums = {1, 2, 3, 2, 1};
System.out.println(Arrays.toString(nextGreaterElements1(nums)));
}
/**
* 单调栈解法:由于还是需要知道和当前位置有关的其他位置的解法所以考虑使用单调栈
* 由于是循环数组所以需要考虑左右两边考虑左右分别遍历一遍确定当前位置的
* 暂时有问题: 反例{1, 2, 3, 2, 1} 会nums[5]直接到3的位置就停了
* @param nums
* @return
*/
public int[] nextGreaterElements(int[] nums) {
LinkedList<Integer> stack = new LinkedList<>();
int[] result = new int[nums.length];
int[] left = new int[nums.length];
//左边遍历一遍确定当前位置左边有没有比当前更大的
for (int i = 0; i < nums.length; i++) {
while (!stack.isEmpty() && nums[stack.peek()] <= nums[i]) {
stack.pop();
}
left[i] = stack.isEmpty() ? -1 : stack.peek();
stack.push(i);
}
stack.clear();
//右边遍历确定真正的数
for (int i = nums.length - 1; i >= 0; i--) {
while (!stack.isEmpty() && stack.peek() <= nums[i]) {
stack.pop();
}
//遍历去寻找上一个最大值
if (stack.isEmpty()) {//没有比当前位置更大的数
int j = i;
if (left[j] != -1) {
while (left[j] != -1) {
j = left[j];
}
result[i] = nums[j];
} else {
result[i] = -1;
}
} else {
result[i] = stack.peek();
}
stack.push(nums[i]);
}
return result;
}
/**
* 官方的理解:没必要反向这个麻烦,有个朴素的思想就是直接把循环数组拉直
* 将该序列的前n-1拼接到原序列的后面就可以了
* @param nums
* @return
*/
public int[] nextGreaterElements1(int[] nums) {
int n = nums.length;
int[] result = new int[n];
LinkedList<Integer> stack = new LinkedList<>();
for (int i = 2 * (n - 1); i >= 0; i--) {
while (!stack.isEmpty() && stack.peek() <= nums[i % n]) {
stack.pop();
}
if (i < n) result[i] = stack.isEmpty() ? -1 : stack.peek();
stack.push(nums[i%n]);
}
return result;
}
}

View File

@ -0,0 +1,29 @@
package com.markilue.leecode.hot100.second;
import com.markilue.leecode.listnode.ListNode;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.second
*@Author: markilue
*@CreateTime: 2023-05-18 11:33
*@Description: TODO 力扣160 相交链表
*@Version: 1.0
*/
public class T59_160_GetIntersectionNode {
public ListNode getIntersectionNode(ListNode headA, ListNode headB) {
ListNode curA = headA;
ListNode curB = headB;
while (curA != curB) {
//由于走一遍两个走的长度一定一样所以就算两个不相交也会因为最后遍历到null之后出去
curA = curA == null ? headB : curA.next;
curB = curB == null ? headA : curB.next;
}
return curA;
}
}

View File

@ -0,0 +1,31 @@
package com.markilue.leecode.hot100.second;
/**
*@BelongsProject: Leecode
*@BelongsPackage: com.markilue.leecode.hot100.second
*@Author: markilue
*@CreateTime: 2023-05-18 11:46
*@Description: TODO 力扣169 多数元素
*@Version: 1.0
*/
public class T60_MajorityElement {
//莫斯投票法:当当前的数的count数就替换最大数
public int majorityElement(int[] nums) {
int count = 1;
int maj = nums[0];
for (int i = 1; i < nums.length; i++) {
if (nums[i] == maj) {
count++;
} else {
count--;
if (count == 0) {
maj = nums[i + 1];
}
}
}
return maj;
}
}

View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>phm_parent</artifactId>
<groupId>com.cqu</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>warehouse</artifactId>
<packaging>pom</packaging>
<modules>
<module>rt-warehouse</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>

View File

@ -0,0 +1,218 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>warehouse</artifactId>
<groupId>com.cqu</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rt-warehouse</artifactId>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.12.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--如果保存检查点到hdfs上需要引入此依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!--Flink默认使用的是slf4j记录日志相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<!--lomback插件依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.14</version>
<!-- <scope>provided</scope>-->
</dependency>
<!--添加flink-mysql的cdc依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>compile</scope>
</dependency>
<!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils我们可以很方便的对bean对象的属性进行操作-->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--redis客户端-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<!--clickhouse驱动-->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink sql-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,39 @@
package com.cqu.warehouse.realtime.app.base;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.base
*@Author: markilue
*@CreateTime: 2023-05-17 18:08
*@Description: TODO 模板方法设计模式:获取基本的流处理环境
*@Version: 1.0
*/
public abstract class BaseStreamApp {
public void entry() throws Exception {
//TODO 1.环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//TODO 2.检查点设置
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(5000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//任务结束检查点是否保存
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L));
env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/phm_warehosue/rt/ck"));
System.setProperty("HADOOP_USER_NAME", "dingjiawen");
//模板
execute(env);
env.execute();
}
public abstract void execute(StreamExecutionEnvironment env);
}

View File

@ -0,0 +1,106 @@
package com.cqu.warehouse.realtime.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import com.google.gson.JsonObject;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.dwd
*@Author: markilue
*@CreateTime: 2023-05-17 18:04
*@Description:
* TODO phm的dwd层:
* 1.简单ETL
* 2.数据分流
*@Version: 1.0
*/
public class BaseDataApp extends BaseStreamApp {
public static void main(String[] args) throws Exception {
new BaseDataApp().entry();
}
@Override
public void execute(StreamExecutionEnvironment env) {
//TODO 1.从kafka中读取数据
String topic = "ods_base_data";
String groupId = "base_data_app_group";
DataStreamSource<String> kafkaDS = env.addSource(
MyKafkaUtils.getKafkaSource(topic, groupId)
);
// kafkaDS.print(">>>>");
//TODO 2.转为jsonObject
SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.map(JSON::parseObject);
//TODO 3.侧输出标签定义
OutputTag<String> CMSTag = new OutputTag<String>("CMSTag") {
};
OutputTag<String> GasTag = new OutputTag<String>("GasTag") {
};
OutputTag<String> AeroTag = new OutputTag<String>("AeroTag") {
};
SingleOutputStreamOperator<String> windSCADADS = jsonObjectDS.process(
new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject jsonObject, ProcessFunction<JSONObject, String>.Context context, Collector<String> collector) throws Exception {
String tageName = jsonObject.getString("tagName");
String x = jsonObject.getString("x");
String dataset = jsonObject.getString("sub_dataset");
if (tageName != null && !tageName.isEmpty()) {
context.output(GasTag, jsonObject.toJSONString());
} else if (x != null && !x.isEmpty()) {
context.output(CMSTag, jsonObject.toJSONString());
} else if (dataset != null && !dataset.isEmpty()) {
context.output(AeroTag, jsonObject.toJSONString());
} else {
collector.collect(jsonObject.toJSONString());
}
}
}
);
DataStream<String> CMSDS = windSCADADS.getSideOutput(CMSTag);
DataStream<String> gasDS = windSCADADS.getSideOutput(GasTag);
DataStream<String> aeroDS = windSCADADS.getSideOutput(AeroTag);
windSCADADS.print(">>>");
CMSDS.print("###");
gasDS.print("$$$");
aeroDS.print("%%%");
windSCADADS.addSink(
MyKafkaUtils.getKafkaSink("dwd_wind_scada")
);
CMSDS.addSink(
MyKafkaUtils.getKafkaSink("dwd_wind_cms")
);
gasDS.addSink(
MyKafkaUtils.getKafkaSink("dwd_gas_scada")
);
aeroDS.addSink(
MyKafkaUtils.getKafkaSink("dwd_aero_scada")
);
}
}

View File

@ -0,0 +1,63 @@
package com.cqu.warehouse.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.cqu.warehouse.realtime.app.base.BaseStreamApp;
import com.cqu.warehouse.realtime.utils.MyKafkaUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.app.dwm
*@Author: markilue
*@CreateTime: 2023-05-17 20:34
*@Description:
* TODO 燃气轮机宽表:
* 1.维度拆分
* 2.数据脱敏
* 3.维度关联
*@Version: 1.0
*/
public class GasWideApp extends BaseStreamApp {
public static void main(String[] args) throws Exception {
new GasWideApp().entry();
}
@Override
public void execute(StreamExecutionEnvironment env) {
//TODO 1.从kafka中读取指定数据
String topic = "dwd_gas_scada";
String groupId = "gas_wide_app_group";
DataStreamSource<String> kafkaDS = env.addSource(
MyKafkaUtils.getKafkaSource(topic, groupId)
);
//TODO 2.格式转换+维度拆分 str->jsonObject->addArea ,addCompany,addTypeId,addGT_No
SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.map(
new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String s) throws Exception {
JSONObject jsonObject = JSON.parseObject(s);
String tagName = jsonObject.getString("tagName");
String[] tagArray = tagName.split("].");
jsonObject.put("tagName",tagArray[1]);
String[] message = tagArray[0].split("\\.");
jsonObject.put("area",message[0].substring(1));
jsonObject.put("company",message[1]);
jsonObject.put("typeId",message[2]);
jsonObject.put("gt_no",message[3]);
return jsonObject;
}
}
);
jsonObjectDS.print(">>>");
}
}

View File

@ -0,0 +1,16 @@
package com.cqu.warehouse.realtime.common;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.common
*@Author: markilue
*@CreateTime: 2023-05-17 17:31
*@Description: TODO phm通用配置常量类
*@Version: 1.0
*/
public class PHMConfig {
public static final String HBASE_SCHEMA = "PHM_REALTIME";//Hbase库名
public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://Ding202:8123/rt_phm";//Phoenix连接
}

View File

@ -0,0 +1,54 @@
package com.cqu.warehouse.realtime.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Locale;
import java.util.Properties;
/**
*@BelongsProject: phm_parent
*@BelongsPackage: com.cqu.warehouse.realtime.utils
*@Author: markilue
*@CreateTime: 2023-05-17 17:33
*@Description: TODO kafka工具类 获取kafka
*@Version: 1.0
*/
public class MyKafkaUtils {
private static final String KAFKA_SERVER = "Ding202:9092,Ding203:9092,Ding204:9092";
//创建kafka的消费者
public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
}
//创建kafka的生产者
public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");//15分钟事务超时
return new FlinkKafkaProducer<String>(topic, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
return new ProducerRecord<>(topic,s.getBytes());
}
}, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
}