Tired of Copy-Pasting Hive Output? This PySpark Hack Fixes It
文章介绍了一种将Hive或Impala查询结果从控制台输出转换为CSV文件的方法。通过PySpark清洗和解析数据,并生成可导出的CSV文件。该方法适用于单元测试、数据共享和调试等场景。 2025-4-29 09:5:18 Author: hackernoon.com(查看原文) 阅读量:10 收藏

As a data engineer, I have lost too many hours madly for simple task of turning Hive or Impala console output into a usable CSV file.

Problem :

I have employee table in hive (may be impala or Spark job log), I will execute a query in hive employee table which will give console output like this:

+-----+------------------+-------+
|empid|empname           |salary|
|    1|    Ram Ghadiyaram| 10000|
+-----+-------+----------+--------+

I want a quick csv export for above console printed query result.

Why 🙃:

One of the many usecases is in my unit test cases, I have to use this csv file with meaningful data from any database or spark job log ( for example fixing a production issue with production data from an email from site reliablity engineer i.e. SRE) to replicate data or schema issues.

In Highly confidential world like Fintech Banks, Insurance companies.. its not possible to login in to production and see data from hive or impala or spark job log with data etc… 😅

Below is the pyspark way to achieve csv from console output (can test directly from Intellij). Here I am using '|' as delimiter to parse from spark directly and inferred the schema as well. so this data set is exact replica of console output.

import os
import re
import sys

from pyspark.sql import SparkSession
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
    .appName("String to CSV") \
    .getOrCreate()  

# Input data as a string
input_data = """
+-----+------------------+-------+
|empid|empname           |salary|
|    1|    Ram Ghadiyaram| 10000|
+-----+-------+----------+--------+
""".replace("|\n","\n").replace("\n|","\n")

#remove +-----+-------+------+ from the string
input_data = re.sub(r'\n[+-]+\n' , '\n', input_data)
# Capture the input data as a string
df = spark.read.option("header","true").option("inferSchema","true").option("delimiter", "|").csv(spark.sparkContext.parallelize(input_data.split("\n")))
df.printSchema()
df.show()
# Specify the path where you want to save the CSV file
output_path = "./output1.csv"
# Write the DataFrame as CSV
df.coalesce(1).write.csv(output_path, header=True)
# Stop the Spark session
spark.stop()

Flow diagram for illustrative purpose

Cleaning Input :

step 1)remove start andd end pipes using  '.replace("|\n","\n").replace("\n|","\n")'  step 2) remove +-----+-------+------+ from the string using 're.sub(r'\n[+-]+\n' , '\n', input_data)'

Combining all step together :

Before :

+-----+------------------+-------+\n
|empid|empname           |salary|\n
|    1|    Ram Ghadiyaram| 10000|\n
+-----+-------+----------+--------+\n

After : The border lines are gone, leaving the header & data rows with internal pipes. The extra newlines at the start and end are harmless, as input_data.split("\n") and the CSV parser will ignore empty lines.

\n
empid|empname           |salary\n
    1|    Ram Ghadiyaram| 10000\n
\n

Now converting in to CSV using the below code

df = spark.read.option("header","true").option("inferSchema","true").option("delimiter", "|").csv(spark.sparkContext.parallelize(input_data.split("\n")))
df.printSchema()
# Show the result CSV data
df.show()

# Specify the path where you want to save the CSV file
output_path = "./output1.csv"

# Write the DataFrame as CSV
df.coalesce(1).write.csv(output_path, header=True)

Final Output 😀

Generated CSV :

Note : For large csv you can compress for example : df.write.mode("overwrite").option("compression", "gzip").csv

Why this is important :

  1. Unit Testing: For example generate test data from production-like queries (from any database/warehouse or spark log).
  2. Data Exports: Share query results with non-technical stakeholders like business or non technical users.
  3. Debugging: Capture and analyze console output during development.
  4. Generic Parsing: Will suite to any table console output, such as Spark DataFrame show()

Using this technique of converting in to CSV from any console output, I was able to replicate data problems quickly for many datasets… for illustrative purpose demoed small data. Happy learning 😃 Happy Problem Solving 😀


文章来源: https://hackernoon.com/tired-of-copy-pasting-hive-output-this-pyspark-hack-fixes-it?source=rss
如有侵权请联系:admin#unsafe.sh