self_example/phm_rotate/PHMWarehouse/dataChange/crud_hdfs.py

122 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import os
import time
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
def get_spark():
spark = SparkSession \
.builder \
.appName("insert_data_into_hive") \
.config("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", "true") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.sql.session.timeZone", "Asia/Shanghai") \
.config('spark.driver.memory', '8g') \
.config('spark.executor.memory', '8g') \
.config('spark.executor.instances', '4') \
.config('spark.executor.cores', '3') \
.config("hive.metastore.uris", "thrift://localhost:9083") \
.enableHiveSupport() \
.getOrCreate()
spark.sparkContext.setLogLevel("Error")
return spark
# 解析本地文件并存入hive
def load_data_to_hive():
spark = get_spark()
sensor_map = {"低速轴径向": 1,
"第一级内齿圈径向": 2,
"高速轴径向": 3,
"非驱动端径向": 4,
"驱动端径向": 5,
"驱动端轴向": 6,
"后轴承径向": 7,
"前轴承径向": 8,
}
batch = 500 # 一次存batch条数据太大会报错
pick_fj = ['052', '053'] # 只存这两台
base_folder = r'D:\lab\data_set\hz_osdn\苏步2期201703\时域波形数据'
all_files = os.listdir(base_folder)
pick_files = list(filter(lambda x: x.split('_')[3] in pick_fj, all_files))
records = 0
total_times = int(len(pick_files) / batch)
for i in range(total_times + 1):
data_dict = {}
if i != total_times:
batch_files = pick_files[i * batch:(i + 1) * batch]
else:
if len(pick_files) % batch != 0:
batch_files = pick_files[i * batch:]
else:
return
split_names = list(map(lambda x: x.split("_"), batch_files))
data_dict["farm_id"] = list(map(lambda x: x[0] + "_" + x[1], split_names))
data_dict["turbine_id"] = list(map(lambda x: x[3], split_names))
data_dict["sensor_id"] = list(map(lambda x: sensor_map[x[5]], split_names))
data_dict["data_time"] = list(map(lambda x: x[-2], split_names))
data_dict["rpm"] = list(map(lambda x: float(x[-1][:-7]), split_names))
data_dict["frequency"] = list(map(lambda x: int(x[-4][:-2]), split_names))
data_dict["year_id"] = list(map(lambda x: x[-2][:4], split_names))
data_dict["month_id"] = list(map(lambda x: x[-2][4:6], split_names))
data_dict["day_id"] = list(map(lambda x: x[-2][6:8], split_names))
tmp = []
for file in batch_files:
file_path = os.path.join(base_folder, file)
oldf = open(file_path, 'r', encoding='UTF-8')
data = list(csv.reader(oldf))[0]
data = list(filter(lambda x: x != '', data)) # 过滤空值
data = [float(item) for item in data]
tmp.append(data)
data_dict["vib"] = tmp
tmp_df = pd.DataFrame(data_dict)
df = spark.createDataFrame(tmp_df)
df1 = df.withColumn("data_time", to_timestamp("data_time", 'yyyyMMddHHmmss'))
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
df1.repartition(5).write \
.format("Hive") \
.mode('append') \
.option("header", "false") \
.option("delimiter", "\t") \
.saveAsTable("flask_data.sds", partitionBy=["year_id", "month_id", "day_id"])
records += len(batch_files)
print("已传{}/{}条数据".format(records, len(pick_files)))
# mode有append和overwrite两种overwrite表示把原来的表删除并重新建一张表 append表示向已存在的表中添加数据
# 如果表已经存在format应为建表时指定的格式(orc, parquet等)如果不知道建表时使用的格式用“Hive”自动设置
# load_data_to_hive()
def ods_to_dwd():
spark = get_spark()
def dwd_to_dws():
pass
def query_dwd(start_time, end_time=None):
pass
def query_dws():
pass
def query_ada():
pass
def query_dim():
pass