Airflow和PySPARK实现带多组参数和标签的Amazon Redshift数据仓库批量数据导出程序

news/2025/2/27 5:53:03

设计一个基于多个带标签SQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数,用Airflow进行调度,自动批量地将Amazon Redshift数据仓库的数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。PySpark程序需要异常处理,输出带时间戳和每个运行批次和每个导出文件作业运行状态的日志文件,每天单独一个带日期的和.log扩展名日志文件,放在logs子目录中,参数全部设置在json配置文件中。

PySpark解决方案,包含代码结构、配置文件和日志处理:

注意事项:

  1. 确保Spark集群有访问Redshift和S3的权限

  2. 根据实际环境调整Redshift JDBC驱动版本

  3. 测试不同文件格式的导出需求

  4. 监控S3临时目录的存储使用情况

  5. 定期清理日志文件和临时数据

  6. 根据数据量调整Spark资源配置

  7. 项目结构

redshift_exporter/
├── config/
│   └── config.json
├── sql_templates/
│   └── sales_report.sql
├── logs/
├── jobs/
│   └── redshift_exporter.py
└── airflow/
    └── dags/
        └── redshift_export_dag.py
  1. config.json 示例
{
  "redshift_conn": {
    "url": "jdbc:redshift://cluster:5439/db",
    "user": "user",
    "password": "password",
    "tempdir": "s3a://temp-bucket/redshift_temp"
  },
  "s3_output": "s3a://output-bucket/reports",
  "tasks": [
    {
      "label": "sales_report",
      "sql_template": "sql_templates/sales_report.sql",
      "parameters": ["${date}", "region1"],
      "formats": ["parquet", "csv", "xlsx"]
    }
  ]
}
  1. PySpark程序 (redshift_exporter.py)
python">import json
import logging
import os
import sys
from datetime import datetime
from pyspark.sql import SparkSession

class RedshiftExporter:
    def __init__(self, config_path):
        self.spark = SparkSession.builder \
            .appName("RedshiftExporter") \
            .config("spark.jars.packages", "com.amazon.redshift:redshift-jdbc42:2.1.0.9,org.apache.hadoop:hadoop-aws:3.3.1") \
            .getOrCreate()
        
        with open(config_path) as f:
            self.config = json.load(f)
        
        self.setup_logging()
        
    def setup_logging(self):
        log_dir = "logs"
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        
        log_file = f"{log_dir}/{datetime.now().strftime('%Y-%m-%d')}.log"
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def read_sql_template(self, template_path):
        try:
            with open(template_path) as f:
                return f.read()
        except Exception as e:
            self.logger.error(f"Error reading SQL template: {str(e)}")
            raise

    def export_data(self, df, output_path, format_type):
        try:
            writer = df.write.mode("overwrite")
            if format_type == "parquet":
                writer.parquet(f"{output_path}.parquet")
            elif format_type == "csv":
                writer.option("header", "true").csv(f"{output_path}.csv")
            elif format_type == "xlsx":
                df.write.format("com.crealytics.spark.excel") \
                    .option("header", "true") \
                    .mode("overwrite") \
                    .save(f"{output_path}.xlsx")
            self.logger.info(f"Successfully exported {format_type} to {output_path}")
        except Exception as e:
            self.logger.error(f"Export failed for {format_type}: {str(e)}")
            raise

    def process_task(self, task, params):
        try:
            sql = self.read_sql_template(task["sql_template"])
            formatted_sql = sql.replace("${date}", params[0])
            
            df = self.spark.read \
                .format("com.databricks.spark.redshift") \
                .option("url", self.config["redshift_conn"]["url"]) \
                .option("query", formatted_sql) \
                .option("user", self.config["redshift_conn"]["user"]) \
                .option("password", self.config["redshift_conn"]["password"]) \
                .option("tempdir", self.config["redshift_conn"]["tempdir"]) \
                .load()
            
            filename = f"{task['label']}_{'_'.join(params)}"
            output_base = f"{self.config['s3_output']}/{filename}"
            
            for fmt in task["formats"]:
                self.export_data(df, output_base, fmt)
                
        except Exception as e:
            self.logger.error(f"Task {task['label']} failed: {str(e)}")
            raise

    def run(self, parameters):
        try:
            for task in self.config["tasks"]:
                self.logger.info(f"Processing task: {task['label']}")
                self.process_task(task, parameters)
        finally:
            self.spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: redshift_exporter.py <config_path> <parameters>")
        sys.exit(1)
    
    config_path = sys.argv[1]
    parameters = sys.argv[2].split("_") if len(sys.argv) > 2 else []
    
    exporter = RedshiftExporter(config_path)
    exporter.run(parameters)
  1. Airflow DAG 示例 (redshift_export_dag.py)
python">from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'data_team',
    'start_date': datetime(2023, 1, 1),
    'retries': 1
}

with DAG('redshift_export',
         default_args=default_args,
         schedule_interval='@daily',
         catchup=False) as dag:

    export_task = BashOperator(
        task_id='run_redshift_export',
        bash_command='spark-submit --packages com.amazon.redshift:redshift-jdbc42:2.1.0.9,org.apache.hadoop:hadoop-aws:3.3.1,com.crealytics:spark-excel_2.12:3.3.1_0.18.5 /path/to/jobs/redshift_exporter.py /path/to/config/config.json "{{ ds_nodash }}_region1"'
    )
  1. 实现说明
  • 配置文件管理:使用JSON配置文件管理Redshift连接参数、输出位置和任务配置
  • 参数化处理:支持动态参数替换SQL模板中的变量
  • 多格式导出:支持Parquet、CSV和Excel格式导出
  • 日志记录:每天生成独立的日志文件,包含详细的时间戳和操作状态
  • 异常处理:完善的错误捕获和日志记录机制
  • Spark优化:使用Redshift的Spark连接器进行高效数据传输
  • 文件覆盖:使用Spark的overwrite模式处理已存在文件
  • 依赖管理:通过–packages参数管理Spark依赖

运行方式:

spark-submit --packages com.amazon.redshift:redshift-jdbc42:2.1.0.9,org.apache.hadoop:hadoop-aws:3.3.1,com.crealytics:spark-excel_2.12:3.3.1_0.18.5 jobs/redshift_exporter.py config/config.json "20231001_region1"

http://www.niftyadmin.cn/n/5869575.html

相关文章

Docker下ARM64架构的源码编译Qt5.15.1,并移植到开发板上

Docker下ARM64架构的源码编译Qt5.15.1,并移植到开发板上 1、环境介绍 QT版本&#xff1a;5.15.1 待移植环境&#xff1a; jetson nano 系列开发板 aarch64架构&#xff08;arm64&#xff09; 编译环境&#xff1a; 虚拟机Ubuntu18.04&#xff08;x86_64&#xff09; 2、…

--- spring MVC ---

引言 所谓MVC是一种软件的设计模型&#xff0c;他把软件系统分为三部分&#xff0c;View&#xff08;视图&#xff09;&#xff0c;Controller&#xff08;控制器&#xff09;&#xff0c;Model&#xff08;模型&#xff09;&#xff0c;他们之间的关系是 spring mvc全称为spr…

深入解析 Spring 中的 BeanDefinition 和 BeanDefinitionRegistry

在 Spring 框架中&#xff0c;BeanDefinition 和 BeanDefinitionRegistry 是两个非常重要的概念&#xff0c;它们共同构成了 Spring IoC 容器的核心机制。本文将详细介绍这两个组件的作用、实现以及它们之间的关系。 一、BeanDefinition&#xff1a;Bean 的配置描述 1.1 什么…

线性回归(一)基于Scikit-Learn的简单线性回归

主要参考学习资料&#xff1a; 《机器学习算法的数学解析与Python实现》莫凡 著 前置知识&#xff1a;线性代数-Python 目录 问题背景数学模型假设函数损失函数优化方法训练步骤 代码实现 问题背景 回归问题是一类预测连续值的问题&#xff0c;满足这样要求的数学模型称作回归…

仿真环境下实现场景切换、定位物体和导航行走

1. 代码&#xff08;以微波炉为例&#xff09; from ai2thor.controller import Controller import math import randomdef distance_2d(pos1, pos2):"""计算两点之间的二维欧几里得距离&#xff08;忽略Z轴&#xff09;"""return math.sqrt((p…

Python在实际工作中的运用-通用格式CSV文件自动转换XLSX

继续上篇《Python在实际工作中的运用-CSV无损转XLSX的几个方法》我们对特定的CSV实现了快速转换XLSX的目标&#xff0c;但是运行Py脚本前还是需要编辑表格创建脚本和数据插入脚本&#xff0c;自动化程度很低&#xff0c;实用性不强&#xff0c;为提供工作效率&#xff0c;实现输…

kafka consumer 手动 ack

在消费 Kafka 消息时&#xff0c;手动确认&#xff08;acknowledge&#xff09;消息的消费&#xff0c;可以通过使用 KafkaConsumer 类中的 commitSync() 或 commitAsync() 方法来实现。这些方法将提交当前偏移量&#xff0c;确保在消费者崩溃时不会重新消费已处理的消息。 以…

HBase与MapReduce结合(二)——对HBase表中字段进行WordCount

目录 1. 数据文本2. pom.xml中依赖配置3. 工具类Util4. 导入数据ImportData5. 对HBase表进行WordCount6. 配置Job7. 结果参考 1. 数据文本 1_song1_2016-1-11 song1 singer1 man slow pc 2_song2_2016-1-11 song2 singer2 woman slow ios 3_song3_2016-1-11 song3 singer3 man…