Apache Spark 3.0 e o Adaptive Query Execution

Saiba mais sobre as novas funcionalidades do Apache Spark 3.0 e as principais mudanças como o novo Adaptive Query Execution


 

Apache Spark Versão 3.0

Comunidade de big data espera  ansiosamente o lançamento da nova release do apache spark versão 3.0 desde novembro quando foi disponibilizado sua primeira versão beta no repositório do apache spark, muito se fala desta release que promete trazer mudanças significativas para plataforma mais famosa de processamento de dados em larga escala, desde o lançamento da versão 2.0 a comunidade do apache spark não via mudanças tão significativas e importantes para evolução da plataforma, neste artigo vou falar do adaptive query execution engine (motor de execução de queries adaptativo) que é uma das maiores novidades do spark na versão 3.0.

 

Adaptive Query Execution

A ideia básica do adaptive query execution é simples otimizar a estratégia de execução da query a medida que se obtêm mais informações dos seus dados. um desafio importante que a engine do apache spark encontra ao tentar escolher a melhor estratégia para uma query é de se obter informações precisas dos conjuntos de dados envolvidos, como o spark trabalha principalmente com arquivos brutos (raw) existe então uma dificuldade em se obter métricas mais precisas das tabelas que irão estar envolvidas nas queries sem que seja feita a prévia leitura do arquivo, anteriormente a principal métrica utilizada para construção da estratégia de query (query plan) era o tamanho do arquivo em disco, contudo, esta abordagem não é ótima para todos os casos e ocasionalmente pode gerar estratégias de queries não são tão eficientes. 

Outro ponto onde a estratégia query anterior falhava é que que os filtros e resultados intermediários destes não podiam ser considerados para criação do query plan, um exemplo rápido seria o join de dois dataframes com 1 milhão de registros que não cabem na memória de uma máquina suponhamos que nesta mesma query antes do JOIN existe um WHERE que filtre uma das tabelas reduzindo seu tamanho para 1.000 registros o que nos permitiria fazer um broadcast desta tabela e executar o JOIN de maneira muito mais eficiente contudo a engine do spark neste ponto acabaria selecionando a estratégia Sort-Merge Join pois é mais segura quando não se consegue estimar se uma das tabelas pode ser carregada na memória de cada executor, (caso não esteja familiarizado com as estratégias de JOIN do spark me informe nos comentários e farei um artigo mais detalhado sobre o assunto).

 

Alterando Estratégias de JOIN Dinamicamente

Com o adaptive query execution o apache spark então altera o plano de execução de query a medida que mais dados estatísticos são obtidos do nosso conjunto de dados, assim ele consegue eficientemente replanejar a sua estratégia obtendo melhores resultados. Abaixo temos um exemplo onde a databricks mostra que um plano de execução que inicialmente usaria o sort merge join como estratégia de execução, contudo dado as mudanças geradas no conjunto de dados decidiu-se então que usaria o Broadcast Hash Join ao perceber que uma das tabelas foi filtrada suficientemente para ser “broadcasted” transmitida para os executores completamente 

 

 

 

Coalescing Partições Dinamicamente

Um dos principais desafios de performance tuning no apache spark era de se chegar a um número eficiente para o parâmetro: “spark.sql.shuffle.partitions” que tinha como padrão o valor “200”  este parâmetro determina o valor de partições que o seu conjuntos de dados será dividido depois de uma operação shuffle, um valor não ótimo neste parâmetro pode causar diversos problemas de performance como:

  • Muito Pequeno: Pressão no Garbage Collector  e Uso de disco para salvar partições (disk spilling)
  • Muito Grande: I/O Ineficiente e pressão no scheduler para gerenciar um grande número de pequenas partições

Ao gerenciar workloads no apache spark achar um valor ótimo para esse número sempre foi um obstáculo muito grande e normalmente iria envolver algumas execuções de uma query até se chegar um um valor aceitável, outro problema é que mesmo que sua query não mude e continue sendo a mesma mas dados foram atualizados existe um risco de que este valor tenha que ser atualizado, outro aspecto que pode influenciar em uma possível alteração de performance é se os recursos que seus executores têm disponíveis memória/CPU  e a quantidade de executores mudarem isso possivelmente pode gerar uma necessidade de reajustar o valor do “spark.sql.shuffle.partitions”.

Como o novo dinâmico "coalesce shuffle partitions" a databricks recomenda que você escolha um valor mais alto para este parâmetro capaz de acomodar o maior conjunto de dados que é gerado durante a execução da sua e query, e o apache spark irá cuidar internamente de reduzir o número de partições a medida que for necessário como visto na imagem abaixo:

 

 

Este novo atributo do apache spark sem dúvida deixa a vida do engenheiro de dados que lida com otimização deste tipo de pipeline muito mais simples e prática, além de ser um mecanismo muito mais eficiente e dinâmico do que o pragmático ajuste manual deste "valor mágico”.

 

Otimizando Dinamicamente Skew Joins

Um desafio de performance do apache spark e de diversas outras plataformas de big data baseadas em mapreduce são dados desbalanceados (data skew) este tipo de problema ocorre quando em uma query join que normalmente se utiliza das chaves de join para otimizar a operação no seu conjunto de dados traz um número muito maior de registros que possuem uma chave quando comparado a outros, quando um join é executado normalmente a estratégia de execução agrupa registros com a mesma chave na mesma partição e assim é divido o trabalho dos executores, desta forma alguns executores tem uma quantidade de trabalho muito maior que os outros e vão executar a maior parte do trabalho, o que resulta em uma query ineficiente, se você trabalha com spark a algum tempo já deve ter notado que muitas vezes em uma query demorada você pode ter uma grande quantidade de executores que finalizaram e 1 ou 2 que demoram muito mais que os outros como no exemplo abaixo:

 

 

Já existiam na comunidade spark algumas técnicas para se resolver este problemas contudo o spark ainda ficava devendo uma solução padrão.

O novo adaptive query execution do apache spark 3.0 será capaz de otimizar as queries em skew data joins automaticamente, com a divisão automática das partições que possuem um tamanho muito grande desproporcional as outros partições.

 

 

A imagem acima mostra um exemplo onde a partição A0 por representar uma quantidade de dados muito maior que as demais foi subdivididas em 3 subpartições para que seja processada mais eficientemente o que resulta em uma execução final mais rápida, esta era sem com certeza uma característica bastante esperada na engine do spark tendo em vista que diversos membros da comunidade relatam este problema frequentemente.

 

Limitações

Existem algumas limitações com relação a nova engine de execução de query do spark, a primeira é que esta engine ainda não vem habilitada por padrão e deve ser habilitada mudando o parâmetro “spark.sql.adaptive.enabled” para “True”, além disso a engine adaptive query execution não irá funcionar em queries de streaming  de dados,  o que é um dos principais pontos negativos tendo em vista o crescimento deste tipo de processamento de dados nos data lakes.

Finalmente é necessário que seu plano de execução a ser otimizado tenha pelo menos um shuffle de dados o motivo disso é segundo Xiao Li gerente de engenharia na databricks a engine de execução de query adaptive utiliza-se dos dados coletados durante o shuffle para planejamentos dos próximos estágios de execução e naturalmente se a sua query não requer esta operação a engine ņão conseguiria otimizá-la.

 

Conclusão

O apache spark 3.0 traz uma maneira extremamente mais sofisticada de dinamicamente gerar plano de execuções eficientes, com a diminuição da dependência de estatísticas estáticas, o apache spark entra um outro nível de geração de planos de execução de queries. Balanceando os custos de coletas de estatísticas em grandes base de dados e a geração de um melhor plano de execução de queries. 

A comunidade espera ansiosamente pelo lançamento da versão 3.0 devido a to seu potencial, além do adaptive query execution a nova versão vem carregada de novidades que sem dúvida irão facilitar a vida de seus usuários e são bastante esperadas.

Referências:

  1. https://www.slideshare.net/databricks/whats-new-in-the-upcoming-apache-spark-30
  2. https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
  3. https://www.youtube.com/watch?v=OLJKIogf2nU

Se Inscreva Na Nossa Newsletter Tenha Acesso Aos Melhores Artigos