top of page
  • Foto do escritorDiogo Vidal

Insert CSV no BigQuery via bucket GCS + Python

Atualizado: 7 de dez. de 2020

Abaixo desenvolvi um processo automatizado para inserir dados em uma tabela do BigQuery a partir de um arquivo .csv. Especificamente usando uma cloud function que será executada automaticamente sempre que um novo arquivo csv for carregado no bucket do Google Cloud Storage.


Para fazer essa POC eu gerei alguns dados fictícios.


Gerei um arquivo CSV com 1000 linhas de dados fictícios pelo site https://www.mockaroo.com/ tente usar o esquema padrão para que se pareça com:


$ head -5 csv_dados_mock.csv
id,first_name,last_name,email,gender,ip_address
1,Austine,Cormode,acormode0@paypal.com,Female,35.80.41.166
2,Gipsy,Bloor,gbloor1@google.ca,Female,217.37.240.238
3,Nichol,Ellissen,nellissen2@purevolume.com,Female,201.214.92.177
4,Amargo,Griffitts,agriffitts3@goo.gl,Female,37.175.107.194
5,Zack,Schneidau,zschneidau4@youtube.com,Male,126.44.217.80

Crie um bucket no Google Cloud Storage

$ gsutil mb gs://bucket-csv-test

Se necessário instale:

$ pip3 install google-cloud-bigquery --upgrade

Crie um dataset no BigQuery

$ bq mk --dataset diogo-dev-projeto:dataset_csv_test

Crie um tabela dentro do dataset correspondente ao esquema definido.

$ bq mk -t dataset_csv_test.tb_csv_test \
 
id:INTEGER,
first_name:STRING,
last_name:STRING,
email:STRING,
gender:STRING,
ip_address:STRING

  • a tabela criada no dataset do Bigquery deve estar limpa:

  • agora chegou a hora de escrever o código python:

import os
from google.cloud import bigquery

def csv_loader(data, context):
        client = bigquery.Client()
        dataset_id = os.environ['DATASET']
        dataset_ref = client.dataset(dataset_id)
        job_config = bigquery.LoadJobConfig()
        job_config.schema = [
                bigquery.SchemaField('id', 'INTEGER'),
                bigquery.SchemaField('first_name', 'STRING'),
                bigquery.SchemaField('last_name', 'STRING'),
                bigquery.SchemaField('email', 'STRING'),
                bigquery.SchemaField('gender', 'STRING'),
                bigquery.SchemaField('ip_address', 'STRING')
                ]
        job_config.skip_leading_rows = 1
        job_config.source_format = bigquery.SourceFormat.CSV

        # para carregar o CSV no Storage 
        uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']

        # Vamos carregar
        load_job = client.load_table_from_uri(
                uri,
                dataset_ref.table(os.environ['TABLE']),
                job_config=job_config)

        print('Starting job {}'.format(load_job.job_id))
        print('Function=cf_csv_test, Version=' + os.environ['VERSION'])
        print('File: {}'.format(data['name']))
        
        # Aguarde a conclusão de carga na tabela.
        load_job.result()  
        print('Job finished.')

        destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
        print('Loaded {} rows.'.format(destination_table.num_rows))

Agora vamos criar um arquivo env.yaml que armazenará a configuração das variáveis ​​de ambiente, será armazenado o nome bucket | dataset | tabelas.

BUCKET: bucket-csv-test
DATASET: dataset_csv_test
TABLE: tb_csv_test
VERSION: v14

Crie um arquivo de requisitos.txt para as importações necessárias.

google-cloud
google-cloud-bigquery

Crie um arquivo .gcloudignore para que seus arquivos yaml ou CSV não entrem no deploy no GCP

*csv
*yaml

Até aqui a sua folder deve estar assim:

env.yaml  main.py  requisitos.txt  csv_dados_mock.csv

Agora estamos prontos para fazer o deploy da Cloud Function, vamos adicionar uma trigger no bucket GCS que será acionado sempre que um novo arquivo for adicionado ao bucket. Vou criar a cloud function e chama la de cf_csv_test.

$ gcloud beta functions deploy cf_csv_test \
 
--runtime=python37 \
--trigger-resource=gs://bucket-csv-test \
--trigger-event=google.storage.object.finalize \
--entry-point=cf_csv_test \
--env-vars-file=env.yaml

Agora que fizemos o deploy da function, vamos copiar o arquivo .csv para o bucket.

$ gsutil cp csv_dados_mock.csv gs://bucket-csv-test/
 
Copying file://csv_dados_mock.csv [Content-Type=text/csv]...
- [1 files][ 72.4 KiB/ 72.4 KiB] 
Operation completed over 1 objects/72.4 KiB.

Agora que copiamos o arquivo csv para o bucket, a function deve ser acionada automaticamente, vejo o log.

$ gcloud functions logs read
 
[ ... snipped for brevity ... ]
 
cf_csv_test  274732139359754  2020-10-07 20:12:27.852  Function execution started
cf_csv_test  274732139359754  2020-10-07 20:12:28.492  Starting job 9ca2f39c-539f-454d-aa8e-3299bc9f7287
cf_csv_test  274732139359754  2020-10-07 20:12:28.492  Function=cf_csv_test, Version=v14
cf_csv_test  274732139359754  2020-10-07 20:12:28.492  File: csv_dados_mock.csv
cf_csv_test  274732139359754  2020-10-07 20:12:31.022  Job finished.
cf_csv_test  274732139359754  2020-10-07 20:12:31.136  Loaded 1000 rows.
cf_csv_test  274732139359754  2020-10-07 20:12:31.139  Function execution took 3288 ms, finished with status: 'ok'

Boa, function executada com sucesso!


Vamos dar uma olhada na tabela do BigQuery e checar a contagem de linhas.

$ bq show dataset_csv_test.tb_csv_test
 
Table diogo-dev-projeto:dataset_csv_test.tb_csv_test

Last modified           Schema           Total Rows   Total Bytes   Expiration   Time Partitioning   Labels
----------------- ----------------------- ------------ ------------- ------------ ------------------- --------
07 Dec 20:19:29   |- id: integer          1000         70950
 
|- first_name: string
|- last_name: string
|- email: string
|- gender: string
|- ip_address: string

Ótimo, 1000 linhas inseridas.


Conclusão final: cloud functions funcionam muito bem rsrs


Espero ter ajudado!!

833 visualizações0 comentário

Posts recentes

Ver tudo
bottom of page