diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/singlestack/LC_496_NextGreaterElement.java b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/singlestack/LC_496_NextGreaterElement.java new file mode 100644 index 0000000..1b2426b --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/singlestack/LC_496_NextGreaterElement.java @@ -0,0 +1,50 @@ +package com.markilue.leecode.hot100.interviewHot.singlestack; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.LinkedList; + +/** + *@BelongsProject: Leecode + *@BelongsPackage: com.markilue.leecode.hot100.interviewHot.singlestack + *@Author: markilue + *@CreateTime: 2023-05-18 10:17 + *@Description: TODO 力扣496 下一个更大元素I: + *@Version: 1.0 + */ +public class LC_496_NextGreaterElement { + + @Test + public void test(){ + int[] nums1 = {4, 1, 2}, nums2 = {1, 3, 4, 2}; + System.out.println(Arrays.toString(nextGreaterElement(nums1,nums2))); + } + + + /** + * 单调栈解法:记录右边出现的数更大的元素是什么 + * @param nums1 + * @param nums2 + * @return + */ + public int[] nextGreaterElement(int[] nums1, int[] nums2) { + LinkedList stack = new LinkedList<>(); + int[] find = new int[10001]; + + for (int i = nums2.length - 1; i >= 0; i--) { + while (!stack.isEmpty() && stack.peek() <= nums2[i]) { + stack.pop(); + } + find[nums2[i]] = stack.isEmpty() ? -1 : stack.peek(); + stack.push(nums2[i]); + } + + for (int i = 0; i < nums1.length; i++) { + nums1[i] = find[nums1[i]]; + } + return nums1; + } + + +} diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/singlestack/LC_739_DailyTemperatures.java b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/singlestack/LC_739_DailyTemperatures.java new file mode 100644 index 0000000..4b5a2c9 --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/singlestack/LC_739_DailyTemperatures.java @@ -0,0 +1,46 @@ +package com.markilue.leecode.hot100.interviewHot.singlestack; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.LinkedList; + +/** + *@BelongsProject: Leecode + *@BelongsPackage: com.markilue.leecode.hot100.interviewHot.singlestack + *@Author: markilue + *@CreateTime: 2023-05-18 09:59 + *@Description: TODO 力扣739 每日温度 + *@Version: 1.0 + */ +public class LC_739_DailyTemperatures { + + @Test + public void test(){ +// int[] temperatures = {73, 74, 75, 71, 69, 72, 76, 73}; + int[] temperatures = {89,62,70,58,47,47,46,76,100,70}; + System.out.println(Arrays.toString(dailyTemperatures(temperatures))); + } + + + /** + * 单调栈解法:从后往前遍历,记录一个单调的栈, + * @param temperatures + * @return + */ + public int[] dailyTemperatures(int[] temperatures) { + + LinkedList stack = new LinkedList<>(); + int[] result = new int[temperatures.length]; + for (int i = temperatures.length - 1; i >= 0; i--) { + while (!stack.isEmpty() && temperatures[stack.peek()] <= temperatures[i]) { + stack.pop(); + } + result[i] = stack.isEmpty() ? 0 : stack.peek() - i; + stack.push(i); + } + + return result; + + } +} diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/singlestack/T503_NextGreaterElements.java b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/singlestack/T503_NextGreaterElements.java new file mode 100644 index 0000000..2203924 --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/interviewHot/singlestack/T503_NextGreaterElements.java @@ -0,0 +1,98 @@ +package com.markilue.leecode.hot100.interviewHot.singlestack; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.LinkedList; + +/** + *@BelongsProject: Leecode + *@BelongsPackage: com.markilue.leecode.hot100.interviewHot.singlestack + *@Author: markilue + *@CreateTime: 2023-05-18 10:33 + *@Description: TODO 力扣503 下一个更大元素II + *@Version: 1.0 + */ +public class T503_NextGreaterElements { + + @Test + public void test() { +// int[] nums = {1, 5, 3, 4, 3}; +// int[] nums = {1, 2, 3, 4, 3}; + int[] nums = {1, 2, 3, 2, 1}; + System.out.println(Arrays.toString(nextGreaterElements1(nums))); + } + + + /** + * 单调栈解法:由于还是需要知道和当前位置有关的其他位置的解法,所以考虑使用单调栈 + * 由于是循环数组,所以需要考虑左右两边;考虑左右分别遍历一遍,确定当前位置的 + * 暂时有问题: 反例:{1, 2, 3, 2, 1} 会nums[5]直接到3的位置就停了 + * @param nums + * @return + */ + public int[] nextGreaterElements(int[] nums) { + + LinkedList stack = new LinkedList<>(); + int[] result = new int[nums.length]; + + int[] left = new int[nums.length]; + //左边遍历一遍,确定当前位置左边有没有比当前更大的 + for (int i = 0; i < nums.length; i++) { + while (!stack.isEmpty() && nums[stack.peek()] <= nums[i]) { + stack.pop(); + } + left[i] = stack.isEmpty() ? -1 : stack.peek(); + stack.push(i); + } + stack.clear(); + //右边遍历,确定真正的数 + for (int i = nums.length - 1; i >= 0; i--) { + while (!stack.isEmpty() && stack.peek() <= nums[i]) { + stack.pop(); + } + //遍历,去寻找上一个最大值 + if (stack.isEmpty()) {//没有比当前位置更大的数 + int j = i; + if (left[j] != -1) { + while (left[j] != -1) { + j = left[j]; + } + result[i] = nums[j]; + } else { + result[i] = -1; + } + + } else { + result[i] = stack.peek(); + } + stack.push(nums[i]); + } + + return result; + + } + + + /** + * 官方的理解:没必要反向这个麻烦,有个朴素的思想就是:直接把循环数组拉直 + * 将该序列的前n-1拼接到原序列的后面就可以了 + * @param nums + * @return + */ + public int[] nextGreaterElements1(int[] nums) { + int n = nums.length; + int[] result = new int[n]; + LinkedList stack = new LinkedList<>(); + for (int i = 2 * (n - 1); i >= 0; i--) { + while (!stack.isEmpty() && stack.peek() <= nums[i % n]) { + stack.pop(); + } + if (i < n) result[i] = stack.isEmpty() ? -1 : stack.peek(); + stack.push(nums[i%n]); + } + + return result; + + } +} diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/second/T59_160_GetIntersectionNode.java b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T59_160_GetIntersectionNode.java new file mode 100644 index 0000000..c5995a1 --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T59_160_GetIntersectionNode.java @@ -0,0 +1,29 @@ +package com.markilue.leecode.hot100.second; + +import com.markilue.leecode.listnode.ListNode; + +/** + *@BelongsProject: Leecode + *@BelongsPackage: com.markilue.leecode.hot100.second + *@Author: markilue + *@CreateTime: 2023-05-18 11:33 + *@Description: TODO 力扣160 相交链表 + *@Version: 1.0 + */ +public class T59_160_GetIntersectionNode { + + public ListNode getIntersectionNode(ListNode headA, ListNode headB) { + + ListNode curA = headA; + ListNode curB = headB; + + while (curA != curB) { + //由于走一遍两个走的长度一定一样,所以就算两个不相交也会因为最后遍历到null之后出去 + curA = curA == null ? headB : curA.next; + curB = curB == null ? headA : curB.next; + } + + return curA; + + } +} diff --git a/Leecode/src/main/java/com/markilue/leecode/hot100/second/T60_MajorityElement.java b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T60_MajorityElement.java new file mode 100644 index 0000000..17d2957 --- /dev/null +++ b/Leecode/src/main/java/com/markilue/leecode/hot100/second/T60_MajorityElement.java @@ -0,0 +1,31 @@ +package com.markilue.leecode.hot100.second; + +/** + *@BelongsProject: Leecode + *@BelongsPackage: com.markilue.leecode.hot100.second + *@Author: markilue + *@CreateTime: 2023-05-18 11:46 + *@Description: TODO 力扣169 多数元素 + *@Version: 1.0 + */ +public class T60_MajorityElement { + + //莫斯投票法:当当前的数的count数就替换最大数 + public int majorityElement(int[] nums) { + int count = 1; + int maj = nums[0]; + for (int i = 1; i < nums.length; i++) { + if (nums[i] == maj) { + count++; + } else { + count--; + if (count == 0) { + maj = nums[i + 1]; + } + } + } + + return maj; + + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/pom.xml b/phm_rotate/backend/phm_backend/warehouse/pom.xml new file mode 100644 index 0000000..57654e6 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/pom.xml @@ -0,0 +1,23 @@ + + + + phm_parent + com.cqu + 0.0.1-SNAPSHOT + + 4.0.0 + + warehouse + pom + + rt-warehouse + + + + 8 + 8 + + + \ No newline at end of file diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/pom.xml b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/pom.xml new file mode 100644 index 0000000..a64d000 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/pom.xml @@ -0,0 +1,218 @@ + + + + warehouse + com.cqu + 0.0.1-SNAPSHOT + + 4.0.0 + + rt-warehouse + + + 1.8 + ${java.version} + ${java.version} + 1.12.0 + 2.12 + 3.1.3 + + + + + org.apache.flink + flink-java + ${flink.version} + + + + + org.apache.flink + flink-streaming-java_${scala.version} + ${flink.version} + + + + + org.apache.flink + flink-connector-kafka_${scala.version} + ${flink.version} + + + + org.apache.flink + flink-clients_${scala.version} + ${flink.version} + + + + + org.apache.flink + flink-cep_${scala.version} + ${flink.version} + + + + + org.apache.flink + flink-json + ${flink.version} + + + + + com.alibaba + fastjson + 1.2.68 + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + + + org.slf4j + slf4j-api + 1.7.25 + + + + + org.slf4j + slf4j-log4j12 + 1.7.25 + + + + + org.apache.logging.log4j + log4j-to-slf4j + 2.14.0 + + + + + + org.projectlombok + lombok + 1.18.14 + + + + + + mysql + mysql-connector-java + 5.1.47 + + + com.alibaba.ververica + flink-connector-mysql-cdc + 1.2.0 + + + + junit + junit + 4.13.2 + compile + + + + + commons-beanutils + commons-beanutils + 1.9.3 + + + + org.apache.phoenix + phoenix-spark + 5.0.0-HBase-2.0 + + + org.glassfish + javax.el + + + + + + + redis.clients + jedis + 3.3.0 + + + + + ru.yandex.clickhouse + clickhouse-jdbc + 0.3.0 + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + + + + org.apache.flink + flink-connector-jdbc_${scala.version} + ${flink.version} + + + + + org.apache.flink + flink-table-api-java-bridge_${scala.version} + ${flink.version} + + + + + org.apache.flink + flink-table-planner-blink_${scala.version} + ${flink.version} + + + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + \ No newline at end of file diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/base/BaseStreamApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/base/BaseStreamApp.java new file mode 100644 index 0000000..539ce13 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/base/BaseStreamApp.java @@ -0,0 +1,39 @@ +package com.cqu.warehouse.realtime.app.base; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.app.base + *@Author: markilue + *@CreateTime: 2023-05-17 18:08 + *@Description: TODO 模板方法设计模式:获取基本的流处理环境 + *@Version: 1.0 + */ +public abstract class BaseStreamApp { + + public void entry() throws Exception { + //TODO 1.环境准备 + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + //TODO 2.检查点设置 + env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().setCheckpointTimeout(5000L); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//任务结束检查点是否保存 + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000L)); + env.setStateBackend(new FsStateBackend("hdfs://Ding202:8020/phm_warehosue/rt/ck")); + System.setProperty("HADOOP_USER_NAME", "dingjiawen"); + //模板 + execute(env); + env.execute(); + + } + + public abstract void execute(StreamExecutionEnvironment env); + +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDataApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDataApp.java new file mode 100644 index 0000000..dc053da --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwd/BaseDataApp.java @@ -0,0 +1,106 @@ +package com.cqu.warehouse.realtime.app.dwd; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONAware; +import com.alibaba.fastjson.JSONObject; +import com.cqu.warehouse.realtime.app.base.BaseStreamApp; +import com.cqu.warehouse.realtime.utils.MyKafkaUtils; +import com.google.gson.JsonObject; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.app.dwd + *@Author: markilue + *@CreateTime: 2023-05-17 18:04 + *@Description: + * TODO phm的dwd层: + * 1.简单ETL + * 2.数据分流 + *@Version: 1.0 + */ +public class BaseDataApp extends BaseStreamApp { + + public static void main(String[] args) throws Exception { + + new BaseDataApp().entry(); + } + + @Override + public void execute(StreamExecutionEnvironment env) { + //TODO 1.从kafka中读取数据 + String topic = "ods_base_data"; + String groupId = "base_data_app_group"; + DataStreamSource kafkaDS = env.addSource( + MyKafkaUtils.getKafkaSource(topic, groupId) + ); + +// kafkaDS.print(">>>>"); + + //TODO 2.转为jsonObject + SingleOutputStreamOperator jsonObjectDS = kafkaDS.map(JSON::parseObject); + + //TODO 3.侧输出标签定义 + OutputTag CMSTag = new OutputTag("CMSTag") { + }; + + OutputTag GasTag = new OutputTag("GasTag") { + }; + + OutputTag AeroTag = new OutputTag("AeroTag") { + }; + + SingleOutputStreamOperator windSCADADS = jsonObjectDS.process( + new ProcessFunction() { + @Override + public void processElement(JSONObject jsonObject, ProcessFunction.Context context, Collector collector) throws Exception { + String tageName = jsonObject.getString("tagName"); + String x = jsonObject.getString("x"); + String dataset = jsonObject.getString("sub_dataset"); + if (tageName != null && !tageName.isEmpty()) { + context.output(GasTag, jsonObject.toJSONString()); + } else if (x != null && !x.isEmpty()) { + context.output(CMSTag, jsonObject.toJSONString()); + } else if (dataset != null && !dataset.isEmpty()) { + context.output(AeroTag, jsonObject.toJSONString()); + } else { + collector.collect(jsonObject.toJSONString()); + } + } + } + ); + + DataStream CMSDS = windSCADADS.getSideOutput(CMSTag); + DataStream gasDS = windSCADADS.getSideOutput(GasTag); + DataStream aeroDS = windSCADADS.getSideOutput(AeroTag); + + windSCADADS.print(">>>"); + CMSDS.print("###"); + gasDS.print("$$$"); + aeroDS.print("%%%"); + + windSCADADS.addSink( + MyKafkaUtils.getKafkaSink("dwd_wind_scada") + ); + + CMSDS.addSink( + MyKafkaUtils.getKafkaSink("dwd_wind_cms") + ); + + gasDS.addSink( + MyKafkaUtils.getKafkaSink("dwd_gas_scada") + ); + + aeroDS.addSink( + MyKafkaUtils.getKafkaSink("dwd_aero_scada") + ); + + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java new file mode 100644 index 0000000..bf48777 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/app/dwm/GasWideApp.java @@ -0,0 +1,63 @@ +package com.cqu.warehouse.realtime.app.dwm; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.cqu.warehouse.realtime.app.base.BaseStreamApp; +import com.cqu.warehouse.realtime.utils.MyKafkaUtils; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.app.dwm + *@Author: markilue + *@CreateTime: 2023-05-17 20:34 + *@Description: + * TODO 燃气轮机宽表: + * 1.维度拆分 + * 2.数据脱敏 + * 3.维度关联 + *@Version: 1.0 + */ +public class GasWideApp extends BaseStreamApp { + + public static void main(String[] args) throws Exception { + new GasWideApp().entry(); + } + + @Override + public void execute(StreamExecutionEnvironment env) { + + //TODO 1.从kafka中读取指定数据 + String topic = "dwd_gas_scada"; + String groupId = "gas_wide_app_group"; + DataStreamSource kafkaDS = env.addSource( + MyKafkaUtils.getKafkaSource(topic, groupId) + ); + + //TODO 2.格式转换+维度拆分 str->jsonObject->addArea ,addCompany,addTypeId,addGT_No + SingleOutputStreamOperator jsonObjectDS = kafkaDS.map( + new MapFunction() { + @Override + public JSONObject map(String s) throws Exception { + JSONObject jsonObject = JSON.parseObject(s); + String tagName = jsonObject.getString("tagName"); + String[] tagArray = tagName.split("]."); + jsonObject.put("tagName",tagArray[1]); + String[] message = tagArray[0].split("\\."); + jsonObject.put("area",message[0].substring(1)); + jsonObject.put("company",message[1]); + jsonObject.put("typeId",message[2]); + jsonObject.put("gt_no",message[3]); + return jsonObject; + } + } + ); + + jsonObjectDS.print(">>>"); + + + } +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/common/PHMConfig.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/common/PHMConfig.java new file mode 100644 index 0000000..7e50c15 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/common/PHMConfig.java @@ -0,0 +1,16 @@ +package com.cqu.warehouse.realtime.common; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.common + *@Author: markilue + *@CreateTime: 2023-05-17 17:31 + *@Description: TODO phm通用配置常量类 + *@Version: 1.0 + */ +public class PHMConfig { + + public static final String HBASE_SCHEMA = "PHM_REALTIME";//Hbase库名 + public static final String PHOENIX_SERVER = "jdbc:phoenix:Ding202,Ding203,Ding204:2181";//Phoenix连接 + public static final String CLICKHOUSE_URL = "jdbc:clickhouse://Ding202:8123/rt_phm";//Phoenix连接 +} diff --git a/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/MyKafkaUtils.java b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/MyKafkaUtils.java new file mode 100644 index 0000000..aa5f309 --- /dev/null +++ b/phm_rotate/backend/phm_backend/warehouse/rt-warehouse/src/main/java/com/cqu/warehouse/realtime/utils/MyKafkaUtils.java @@ -0,0 +1,54 @@ +package com.cqu.warehouse.realtime.utils; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; +import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.Nullable; +import java.util.Locale; +import java.util.Properties; + +/** + *@BelongsProject: phm_parent + *@BelongsPackage: com.cqu.warehouse.realtime.utils + *@Author: markilue + *@CreateTime: 2023-05-17 17:33 + *@Description: TODO kafka工具类 获取kafka + *@Version: 1.0 + */ +public class MyKafkaUtils { + + private static final String KAFKA_SERVER = "Ding202:9092,Ding203:9092,Ding204:9092"; + + //创建kafka的消费者 + public static FlinkKafkaConsumer getKafkaSource(String topic, String groupId) { + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + +// return new FlinkKafkaConsumer(topic, new SimpleStringSchema(), properties); + return new FlinkKafkaConsumer(topic, new SimpleStringSchema(), properties); + } + + //创建kafka的生产者 + public static FlinkKafkaProducer getKafkaSink(String topic) { + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); + properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");//15分钟事务超时 + + + return new FlinkKafkaProducer(topic, new KafkaSerializationSchema() { + @Override + public ProducerRecord serialize(String s, @Nullable Long aLong) { + return new ProducerRecord<>(topic,s.getBytes()); + } + }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); + } + + +}