PySpark - Entenda a Engine do Spark para Rodar Python

Entenda como funciona a engine do Apache Spark para rodar Python e como obter o máximo de performance dos seu pipelines de processamentos de dados no PySpark

 


Introdução

Muitos cientistas de dados e engenheiro de dados que utilizam o Apache Spark preferem se utilizar do PySpark para criar seus pipelines de dados, é uma interface bastante conveniente e sem dúvida um dos motivos da popularidade do Apache Spark, muitas vezes parece “magia“ como spark escrito em Scala que roda na JVM interage com Python transparentemente.

No entanto, passado algum tempo utilizando a plataforma você entende os gargalos e começa perceber que a interação não é tão transparente assim, apesar de funcionar muito bem.

Existe muito pouco na documentação principal do Apache Spark sobre o assunto de como o PySpark é implementado e quais são suas estratégias, infelizmente este material é escasso e eu acredito ser importante para todos que se utilizam do PySpark, pois entender o funcionamento do mesmo me ajudou bastante ao debugar o código e tomar melhores decisões ao criar pipeline de dados com Apache Spark.

 

Funções no PySpark


Existem diferentes tipos de funções no PySpark e a forma como interagimos com elas podem interferir na performance do nosso programa Spark, vamos listar aqui quais são os tipos de funções que podemos utilizar no PySpark.

 

Funções Nativas


Estas são função que são implementadas no core do Spark e estão disponíveis para serem usadas via API. Um exemplo de função nativa é quando chamamos funções SQL disponibilizadas na API do PySpark Doc. Para Funções SQL.
df.agg(countDistinct(df.age, df.name).alias('c')).collect()

No exemplo acima a função de agregação “countDistinct“ é uma função nativa do Spark.
Contudo nem sempre podemos encontrar tudo que queremos fazer através das funções nativas é aí então que entra o conceito de funções definidas pelo usuário ou UDFs

 

UDFs Não Vetorizadas

 

Este tipo de função permite o usuário implementar funções em Python para processar os dados do dataframe

@udf
def plus_one(v):
    return v + 1


É um conceito simples onde basicamente o Spark pega a função do usuário e aplica linha por linha no seu dataframe, contudo devido a arquitetura do PySpark como veremos em detalhes mais à frente, este tipo de implementação não é tão performática, foi então que com a intenção de resolver estes problemas foram introduzidas as UDFs vetorizadas

 

UDFs Vetorizadas


Também conhecidas como Pandas UDFs devido ao uso da biblioteca Pandas para implementação das mesmas, para executar este tipo de função o Spark irá dividir as as colunas do dataframes em batches e irá chamar a função passando este subgrupo dos dados em forma de uma “Panda Series”

@pandas_udf("double", PandasUDFType.SCALAR)
def pandas_plus_one(v):
    return v + 1

Existem outros tipos de UDFs vetorizadas se quiser saber um pouco mais sugiro dar uma olhada na documentação do Spark
Tendo em vista estes conceitos vamos agora nos aprofundar na arquitetura do PySpark

 

PySpark

 

O Pyspark é construído em cima da API Java, ele é apenas um fina camada de software Python que repassa as chamadas de funções para o core Java, é bastante interessante e acredito que este seja um dos motivos de sucesso da plataforma que o código do PySpark seja tão simplório, se der uma conferida no repositório oficial do spark você pode ver por exemplo que o código de um DataFrame em Python que é a principal abstração do Apache Spark tem apenas ~2000 linhas e diria que pelo menos 50% são comentários para geração de documentação.

Investigando mais profundamente encontrei em uma antiga documentação do Spark de 2016 um excelente diagrama do funcionamento interno e sua arquitetura e apesar de que alguns anos atrás percebi que esta página foi marcada como obsoleta, pode ser verificado no código do PySpark que esta arquitetura não foi drasticamente alterada desde de então, e que este diagrama nos dá uma excelente visão do funcionamento interno do Spark e sua interação com o Python

 

Fonte: Wiki do Apache Spark

 

Na figura acima notamos que o Spark Context que é um processo Python que na verdade é apenas usado para comunicação local, este processo então inicia uma JVM através da biblioteca Py4J que contém um Spark Context java que na verdade é o elemento que controla o cluster e inicia os spark executors, estes então iniciam subprocessos Python para que sejam processados os dados, os executors JVM e Python se comunicam através de um pipe para troca dos dados que serão processados.

Toda comunicação entre nós do clusters, entradas e saídas de arquivos são processadas na JVM, também são processados na JVM as demais operações necessárias para operação do cluster como agendamento, “broadcast”, tolerância a falhas, etc…

Os processos Python dos nós apenas existem para rodar as funções definidas pelo usuário em Python. Os dados são serializados utilizando a biblioteca padrão do Python cPickle e o código Python que é definido pelo usuário que também precisa ser serializado utiliza-se da biblioteca Cloud Pickle para ser enviado aos nós do cluster.

 

Entendendo Performance e Limitações

 

Vendo o diagrama anterior fica mais simples de entender o maior gargalo do PySpark que é a execução mais lenta das função definidas pelo usuário, que se dá em sua maioria ao processo de serialização e deserialização dos dados para que seja possível a comunicação entre a JVM e o processo Python iniciado nos nós, neste ponto é que o Apache Arrow e os Pandas UDFs ganharam espaço otimizando esta comunicação, embora posso falar sobre isso com mais profundidade em um outro artigo…

Outra interessante pergunta que podemos fazer, é o porque os códigos que mesmo rodando em Python mas que não utilizam Python UDFs não tem queda de performance? O que ocorre é que como código não precisa executar funções Python para processar o conjunto de dados, a criação dos subprocessos Python e a comunicação com os mesmos nos nós do cluster não é necessária, o que permite que todo processamento seja executado pela JVM assim eliminando o gargalo de comunicação com Python.

 

Considerações de Performance

 

Tendo em vista esta arquitetura do PySpark como alguém que deseja obter o máximo de performance do Apache Spark deveria proceder?

  • Inicialmente se a operação pode ser expressada com chamadas de funções nativas esta seria melhor opção
  • Caso seja necessário implementar funções customizadas então as funções definidas pelo usuários vetorizadas ou pandas UDFs é a opção mais performática de acordo um benchmark do blog da databricks estas podem ser de 3x até 100x mais rápidas que as função definidas pelo usuário que não são vetorizadas em Python
  • Por último funções definidas pelo usuário não vetorizadas devem ser evitadas e utilizadas apenas como último recurso

 

Uma consideração final de performance é se você realmente “hardcore” e gostaria de extrair o máximo da plataforma, existe a possibilidade de se implementar funções em Scala ou Java e invoca-las via PySpark. Neste caso a performance desta função seria muito parecida com as funções nativas do Spark e são bastante performáticas, contudo se levarmos em considerações não apenas a performance e considerarmos por exemplo, o custos de gerenciamento de uma base de código de com diferentes linguagens de programação muitas vezes esta pode não ser a melhor opção.

 

Conclusão

 

O PySpark é um importante peça e grande parte do que fez o Apache Spark uma das ferramentas mais bem sucedidas do mercado de Big Data.

Contudo para se obter a melhor performance principalmente quando se utilizando de funções customizadas, é necessário ser bastante cauteloso e entender as diferenças de implementação de cada uma das opções disponíveis, além disso é importante pesar outros fatores não relativos a performance e que podem ser importantes para o seu projeto

Se você leu até aqui muito obrigado! Espero que este artigo tenha sido relevante

Te vejo no próximo artigo

Referências:

  1. Spark Wiki
  2. Python API For Spark
  3. Spark Github

Se Inscreva Na Nossa Newsletter Tenha Acesso Aos Melhores Artigos