import csv import os import time import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.functions import to_timestamp def get_spark(): spark = SparkSession \ .builder \ .appName("insert_data_into_hive") \ .config("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", "true") \ .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ .config("spark.sql.session.timeZone", "Asia/Shanghai") \ .config('spark.driver.memory', '8g') \ .config('spark.executor.memory', '8g') \ .config('spark.executor.instances', '4') \ .config('spark.executor.cores', '3') \ .config("hive.metastore.uris", "thrift://localhost:9083") \ .enableHiveSupport() \ .getOrCreate() spark.sparkContext.setLogLevel("Error") return spark # 解析本地文件,并存入hive def load_data_to_hive(): spark = get_spark() sensor_map = {"低速轴径向": 1, "第一级内齿圈径向": 2, "高速轴径向": 3, "非驱动端径向": 4, "驱动端径向": 5, "驱动端轴向": 6, "后轴承径向": 7, "前轴承径向": 8, } batch = 500 # 一次存batch条数据,太大会报错 pick_fj = ['052', '053'] # 只存这两台 base_folder = r'D:\lab\data_set\hz_osdn\苏步#2期201703\时域波形数据' all_files = os.listdir(base_folder) pick_files = list(filter(lambda x: x.split('_')[3] in pick_fj, all_files)) records = 0 total_times = int(len(pick_files) / batch) for i in range(total_times + 1): data_dict = {} if i != total_times: batch_files = pick_files[i * batch:(i + 1) * batch] else: if len(pick_files) % batch != 0: batch_files = pick_files[i * batch:] else: return split_names = list(map(lambda x: x.split("_"), batch_files)) data_dict["farm_id"] = list(map(lambda x: x[0] + "_" + x[1], split_names)) data_dict["turbine_id"] = list(map(lambda x: x[3], split_names)) data_dict["sensor_id"] = list(map(lambda x: sensor_map[x[5]], split_names)) data_dict["data_time"] = list(map(lambda x: x[-2], split_names)) data_dict["rpm"] = list(map(lambda x: float(x[-1][:-7]), split_names)) data_dict["frequency"] = list(map(lambda x: int(x[-4][:-2]), split_names)) data_dict["year_id"] = list(map(lambda x: x[-2][:4], split_names)) data_dict["month_id"] = list(map(lambda x: x[-2][4:6], split_names)) data_dict["day_id"] = list(map(lambda x: x[-2][6:8], split_names)) tmp = [] for file in batch_files: file_path = os.path.join(base_folder, file) oldf = open(file_path, 'r', encoding='UTF-8') data = list(csv.reader(oldf))[0] data = list(filter(lambda x: x != '', data)) # 过滤空值 data = [float(item) for item in data] tmp.append(data) data_dict["vib"] = tmp tmp_df = pd.DataFrame(data_dict) df = spark.createDataFrame(tmp_df) df1 = df.withColumn("data_time", to_timestamp("data_time", 'yyyyMMddHHmmss')) spark.sql("set hive.exec.dynamic.partition=true") spark.sql("set hive.exec.dynamic.partition.mode = nonstrict") df1.repartition(5).write \ .format("Hive") \ .mode('append') \ .option("header", "false") \ .option("delimiter", "\t") \ .saveAsTable("flask_data.sds", partitionBy=["year_id", "month_id", "day_id"]) records += len(batch_files) print("已传{}/{}条数据".format(records, len(pick_files))) # mode有append和overwrite两种,overwrite表示把原来的表删除,并重新建一张表, append表示向已存在的表中添加数据 # 如果表已经存在,format应为建表时指定的格式(orc, parquet等),如果不知道建表时使用的格式,用“Hive”自动设置 # load_data_to_hive() def ods_to_dwd(): spark = get_spark() def dwd_to_dws(): pass def query_dwd(start_time, end_time=None): pass def query_dws(): pass def query_ada(): pass def query_dim(): pass