Pytest_unificando

############################

IMPORT DAS BIBLIOTECAS

############################
import sys
import time
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StringType, FloatType, DateType, IntegerType
from datetime import date
from itaudatautils.data_utils import DataUtils
from utils.funcoes import Funcoes
import logging

Definindo log

log = logging.getLogger('–> tb_xt3_spec_fato_alocacao_hf: ')
log.setLevel(logging.DEBUG)

log.info(“==== Iniciando a execução do job ====”)

############################

PARAMETROS

############################

Lendo os parametros do Glue Job que serao utilizados

log.info(“Capturando valor dos parametros do job.”)
args = getResolvedOptions(sys.argv, [‘S3_INPUT_PATH’,
‘FILE_FORMAT’,
‘S3_OUTPUT_PATH’,
‘GLUE_OUTPUT_DATABASE’,
‘GLUE_OUTPUT_DATABASE_QUALITY’,
‘S3_INPUT_FILE’,
‘GLUE_OUTPUT_TABLE_QUALITY’,
‘S3_BUCKET_SOR’])

############################

SPARK SESSION

############################

Instanciando a spark session, utilizando o DataUtils

datautils = DataUtils.get_spark()
spark = datautils.engine.spark_session
quality_engine = datautils.get_quality_engine()
conn_catalog = datautils.get_catalog_connector()
s3 = boto3.client(“s3”)
s3_resource = boto3.resource(“s3”)
funcoes = Funcoes(s3, s3_resource)

Configurando variaveis que serão utilizadas

S3_INPUT_PATH = args[“S3_INPUT_PATH”]
S3_INPUT_NPS_FILE = args[“S3_INPUT_FILE”]
S3_FILE_FORMAT = args[“FILE_FORMAT”]
S3_OUTPUT_PATH = args[“S3_OUTPUT_PATH”]
GLUE_OUTPUT_DATABASE = args[“GLUE_OUTPUT_DATABASE”]
GLUE_OUTPUT_DATABASE_QUALITY = args[“GLUE_OUTPUT_DATABASE_QUALITY”]
GLUE_OUTPUT_TABLE_QUALITY = args[“GLUE_OUTPUT_TABLE_QUALITY”]
S3_BUCKET_SOR = args[“S3_BUCKET_SOR”]
PATH_DESTINO = “s3://” + args[
‘S3_OUTPUT_PATH’] + ‘/’ + GLUE_OUTPUT_DATABASE # "s3://itau-corp-spec-sa-east-1-64221676680

JOB_NAME = “tb_xt3_spec_fato_alocacao_hf”

db_destino_quality = “db_corp_repositoriosdedados_sgit_sor_01”
tb_destino_quality = “dataquality_metrics”
today = date.today()

def alocacao_hf():
try:

    if not funcoes.verifica_arquivo_s3(S3_INPUT_PATH, S3_INPUT_NPS_FILE):
        raise NameError(
            f"O arquivo '{S3_INPUT_NPS_FILE}' não existe no Bucket '{S3_INPUT_PATH}'. Favor providencias a cópia dos arquivos pra ele.")

    ################################################################################
    if not funcoes.verifica_diretorio_existe(S3_OUTPUT_PATH, 'tb_xt3_spec_fato_alocacao_hf'):
        log.info(f"Criando novo diretorio do processo no S3: '{PATH_DESTINO}'")
        funcoes.cria_diretorio('tb_xt3_spec_fato_alocacao_hf', S3_OUTPUT_PATH)

    ################################################################################
    arquivo_mais_atual_envio = funcoes.recupera_arquivo_mais_atual_bucket(S3_FILE_FORMAT
                                                                    , S3_INPUT_PATH
                                                                    , S3_INPUT_NPS_FILE)

    arquivo_mais_atual_envio = f"s3://{S3_INPUT_PATH}/{arquivo_mais_atual_envio}"
    print(arquivo_mais_atual_envio)
    


    log.info(f"Importando os dados dos arquivos para DataFrame spark.")

#fazer todo o de para das colunas de envio e de resposta
#procurar caron pros testes de data quality
#salvar observabilyt – usar referencia gustavo

    cgitgluealocacaohf = spark.read.option("inferSchema", True).option("header", True).option("delimiter", "|").option(
        "encoding", "latin1").csv(arquivo_mais_atual_envio)

    if cgitgluealocacaohf.rdd.isEmpty():
        raise NameError("O processo não conseguiu encontrar os dados.")

    log.info("Definindo dtype das colunas do DataFrame Spark")
    
    print(arquivo_mais_atual_envio)
    cgitgluealocacaohf.show()
    
    cgitgluealocacaohf = cgitgluealocacaohf \
        .withColumn("cod_re", cgitgluealocacaohf["cod_re"].cast(StringType())) \
        .withColumn("cod_pre", cgitgluealocacaohf["cod_pre"].cast(StringType())) \
        .withColumn("cod_comunidade", cgitgluealocacaohf["cod_comunidade"].cast(StringType())) \
        .withColumn("cod_releasetrain", cgitgluealocacaohf["cod_releasetrain"].cast(StringType())) \
        .withColumn("cod_squad", cgitgluealocacaohf["cod_squad"].cast(StringType())) \
        .withColumn("des_especialidade", cgitgluealocacaohf["des_especialidade"].cast(StringType())) \
        .withColumn("cod_fornecedor", cgitgluealocacaohf["cod_fornecedor"].cast(StringType())) \
        .withColumn("cod_are", cgitgluealocacaohf["cod_are"].cast(StringType())) \
        .withColumn("des_status_alocacao", cgitgluealocacaohf["des_status_alocacao"].cast(StringType())) \
        .withColumn("qtd_horasplanejadas", cgitgluealocacaohf["qtd_horasplanejadas"].cast(FloatType())) \
        .withColumn("dt_inicio_alocacao", cgitgluealocacaohf["dt_inicio_alocacao"].cast(DateType())) \
        .withColumn("dt_fim_alocacao", cgitgluealocacaohf["dt_fim_alocacao"].cast(DateType())) \
        .withColumn("cod_funcional", cgitgluealocacaohf["cod_funcional"].cast(StringType())) \
        .withColumn("des_nome", cgitgluealocacaohf["des_nome"].cast(StringType())) \
        .withColumn("num_cpf", cgitgluealocacaohf["num_cpf"].cast(StringType())) \
        .withColumn("dt_criacao", cgitgluealocacaohf["dt_criacao"].cast(DateType())) \
        .withColumn("dt_atualizacao", cgitgluealocacaohf["dt_atualizacao"].cast(DateType())) \
        .withColumn("dt_ref", cgitgluealocacaohf["dt_ref"].cast(DateType())) \
        .withColumn("dt_processamento", cgitgluealocacaohf["dt_processamento"].cast(DateType())) \
      
    log.info("Iniciando a validação das regras de QD...")

    # Regras de Qualidade de Dados
    log.info("Aplicando regras de qualidade de dados.")
    rulesets = {
    'rules': [
        'IsPrimaryKey "cod_re"',
        'ColumnExists "cod_funcional"',
        'ColumnExists "cod_pre"',
        'IsComplete "cod_funcional"',
        'ColumnExists "cod_pre"'
        ],
    
    'database_name': GLUE_OUTPUT_DATABASE,
    'table_name': 'tb_xt3_spec_fato_alocacao_hf',
    'partition': 'n/a'
    
    }

    # Valida regra de qualidade a partir do dataframe
    result = quality_engine.run_evaluate(
        dataframe=cgitgluealocacaohf,
        rulesets=rulesets
    )
    
    log.info("Coloca o resultado DQ DataFrame")
    df_result_dq = result.get_df()
    log.info(f"Resultado do DQ: {df_result_dq.show()}")

    # Gravando df na tabela de qualidade
    log.info(f"Grava resultado DQ na tabela de QD '{db_destino_quality + '.' + tb_destino_quality}'.")
    conn_catalog.put_df_to_table(
        dataframe=df_result_dq,
        database=db_destino_quality,
        table=tb_destino_quality,
        write_mode='append'
    )

    if result.is_rules_successful():

        log.info("Teste DQ passou com sucesso!")

        # Montar o dataframe com o resultado do data quality

    else:

        log.info("Teste DQ falhou. Veja os detalhes a seguir.")

    # Escrever DataFrame spark em uma tabela
    log.info("Gravando os dados processados pelo job na tabela do catalogo local.")
    conn_catalog.put_df_to_table(
        dataframe=cgitgluealocacaohf,
        database=GLUE_OUTPUT_DATABASE,
        table='tb_xt3_spec_fato_alocacao_hf',
        write_mode='append',
        format='parquet',
        compression='snappy',
        repartition=1
    )
    log.info("O processamento foi finalizado com sucesso!")

except Exception as e:

    raise e

funcoes.observability(alocacao_hf, “tb_xt3_spec_fato_alocacao_hf”, JOB_NAME)

import pytest
from unittest.mock import MagicMock, patch
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DateType
from your_module import alocacao_hf # Substitua ‘your_module’ pelo nome do módulo onde a função está

Fixture para criar uma SparkSession

@pytest.fixture(scope=“module”)
def spark():
return SparkSession.builder
.appName(“Test”)
.master(“local”)
.getOrCreate()

Fixture para criar um DataFrame de exemplo

@pytest.fixture
def sample_df(spark):
schema = StructType([
StructField(“cod_re”, StringType(), True),
StructField(“cod_pre”, StringType(), True),
StructField(“cod_funcional”, StringType(), True),
StructField(“qtd_horasplanejadas”, FloatType(), True),
StructField(“dt_inicio_alocacao”, DateType(), True),
# Adicione outros campos conforme necessário
])
data = [
(“001”, “002”, “func1”, 10.0, “2023-01-01”),
# Adicione outras linhas conforme necessário
]
return spark.createDataFrame(data, schema)

Teste para a função alocacao_hf

def test_alocacao_hf(sample_df):
# Arrange
mock_funcoes = MagicMock()
mock_funcoes.verifica_arquivo_s3.return_value = True
mock_funcoes.verifica_diretorio_existe.return_value = True
mock_funcoes.recupera_arquivo_mais_atual_bucket.return_value = “mock_file.csv”

with patch('your_module.funcoes', mock_funcoes):
    with patch('your_module.spark.read.csv', return_value=sample_df):
        with patch('your_module.awsglue.transforms', MagicMock()):
            with patch('your_module.awsglue.utils.getResolvedOptions', return_value={
                'S3_INPUT_PATH': 'mock_path',
                'FILE_FORMAT': 'csv',
                'S3_OUTPUT_PATH': 'mock_output',
                'GLUE_OUTPUT_DATABASE': 'mock_db',
                'GLUE_OUTPUT_DATABASE_QUALITY': 'mock_quality_db',
                'S3_INPUT_FILE': 'mock_file',
                'GLUE_OUTPUT_TABLE_QUALITY': 'mock_table_quality',
                'S3_BUCKET_SOR': 'mock_bucket'
            }):
                # Act
                alocacao_hf()

                # Assert
                mock_funcoes.verifica_arquivo_s3.assert_called_once()
                mock_funcoes.verifica_diretorio_existe.assert_called_once()
                mock_funcoes.recupera_arquivo_mais_atual_bucket.assert_called_once()
                # Adicione outras asserções conforme necessário

#https#://www.inspiredpython.com/article/five-advanced-#pytest-fixture-patterns

como façar para pegar o main.py : C:.
├───.github
│ └───workflows
├───.idea
│ └───inspectionProfiles
├───.pytest_cache
│ └───v
│ └───cache
├───.stk
├───.vscode
├───app
│ ├───.pytest_cache
│ │ └───v
│ │ └───cache
│ ├───libs
│ ├───src
│ │ └───pycache
│ └───utils
│ └───sub_modules
├───infra
│ ├───iamsr
│ │ ├───policy
│ │ └───trust
│ ├───inventories
│ │ ├───dev
│ │ ├───hom
│ │ └───prod
│ └───templates
│ ├───glue-crawler-and-glue-job
│ ├───glue-workflow-with-job
│ └───only-gluejob-with-connection
└───tests
└───pycache