Estudos BOVESPA
Carregando, aguarde alguns segundos.

5 - SGBD

Carregar muitos dados a partir dos arquivos zipados da Bovespa disponibilizados pela B3 é oneroso, pelo fato de acessar todos os 38 campos para cada linha de cotação, além de acessar uma mídia relativa lenta que são os discos rígidos, mesmo com tecnologia SSD ou mais avançada.

Importamos os pacotes que necessitaremos.

import pandas as pd
import os
from pathlib import Path
from zipfile import ZipFile
import sqlite3
from datetime import datetime
from pytz import timezone

Iniciamos a zona de horário e guardamos o horário atual.

tz = timezone('America/Fortaleza')
data_e_hora_atuais = datetime.now()
data_e_hora_atuais_tz = data_e_hora_atuais.astimezone(tz)

5.1 - Função cieda_b3_conectar_bd()

A função cieda_b3_conectar_bd retorna o objeto da classe sqlite3.Connection com a conexão ao banco de dados sqlite no diretório do argumento pasta com o nome de arquivo do argumento nomebd.

def cieda_b3_conectar_bd(pasta,nomebd):
    try:
        dt = detect_types=sqlite3.PARSE_DECLTYPES
        os.makedirs(pasta, exist_ok=True)
        pathnomebd = os.path.join(pasta,nomebd)
        return sqlite3.connect(pathnomebd,dt)
    except:
        return None

5.2 - Função cieda_b3_cursor_bd()

A função cieda_b3_cursor_bd retorna o objeto sqlite3.Cursor com o cursor retornado pela conexão passada como argumento.

def cieda_b3_cursor_bd(conn):
    try:
        return conn.cursor()
    except:
        return None

5.3 - Função cieda_b3_preparar_bd()

A função cieda_b3_preparar_bd ....

def cieda_b3_preparar_bd():
    conn = cieda_b3_conectar_bd(
        "./cotacoes/database",
        "b3_01.db")
    if conn == None: return -1, None, None
    cursor = cieda_b3_cursor_bd(conn)
    if cursor == None: return -2, None, None
    if not cieda_b3_existe_tabela_cotacoes(conn,cursor):
        cieda_b3_criar_tabela_cotacoes(cursor)
    return 1, conn, cursor

5.4 - Função cieda_b3_existe_tabela_cotacoes()

A função cieda_b3_existe_tabela_cotacoes ....

def cieda_b3_existe_tabela_cotacoes(conn,cursor):
    cursor.execute("SELECT count(name) FROM sqlite_master WHERE type='table' AND name='cotacoes'")
    num = cursor.fetchone()[0]
    #conn.commit()
    return 1 if num == 1 else 0

5.5 - Função cieda_b3_criar_tabela_cotacoes()

A função cieda_b3_criar_tabela_cotacoes cria a tabela cotacoes no banco de dados.

def cieda_b3_criar_tabela_cotacoes(cursor):
    try:
        cursor.execute("""
            create table cotacoes (
                id integer not null primary key autoincrement,
                linha  text,
                datpre date not null,
                codbid text,
                codneg text not null,
                tpmerc integer,
                nomres text,
                especi text,
                prazot integer,
                modref text,
                preabe real,
                premax real,
                premin real,
                premed real,
                preult real,
                preofc real,
                preofv real,
                totneg integer,
                quatot integer,
                voltot real,
                preexe real,
                indopc integer,
                datven date,
                fatcot integer,
                ptoexe real,
                codisi text,
                dismes integer)
        """)
    except sqlite3.OperationalError as er:
        return 0
    cieda_b3_criar_indices_tabcot(cursor)
    return 1

5.6 - Função cieda_b3_criar_indice()

A função cieda_b3_criar_indice usa o cursor de cursor para criar na tabela de nome_tabela o índice com o nome em nome_tabela e os campos em campos_indice .

def cieda_b3_criar_indice(cursor,nome_tabela,nome_indice,campos_indice):
        cursor.execute(
            "create index idx_{} on {} ({})".format(
                nome_indice,
                nome_tabela,
                campos_indice))

5.7 - Função cieda_b3_criar_indices_tabcot()

A função cieda_b3_criar_indices_tabcot cria os diversos índices para a tabela cotações, usando o cursor passado como parâmetro.

Apenas o índice idx_data_codneg_prazot é composto, com os campos data, codneg e prazot.

São criado também os índices de campos.

def cieda_b3_criar_indices_tabcot(cursor):
    cieda_b3_criar_indice(cursor,"cotacoes","linha","linha")
    #cieda_b3_criar_indice(cursor,"cotacoes","data_codneg_prazot","datpre,codneg,prazot")
    cieda_b3_criar_indice(cursor,"cotacoes","datpre","datpre")
    cieda_b3_criar_indice(cursor,"cotacoes","codbid","codbid")
    cieda_b3_criar_indice(cursor,"cotacoes","codneg","codneg")
    cieda_b3_criar_indice(cursor,"cotacoes","tpmerc","tpmerc")
    cieda_b3_criar_indice(cursor,"cotacoes","nomres","nomres")
    cieda_b3_criar_indice(cursor,"cotacoes","especi","especi")
    cieda_b3_criar_indice(cursor,"cotacoes","prazot","prazot")
    cieda_b3_criar_indice(cursor,"cotacoes","modref","modref")
    cieda_b3_criar_indice(cursor,"cotacoes","preabe","preabe")
    cieda_b3_criar_indice(cursor,"cotacoes","premax","premax")
    cieda_b3_criar_indice(cursor,"cotacoes","premin","premin")
    cieda_b3_criar_indice(cursor,"cotacoes","premed","premed")
    cieda_b3_criar_indice(cursor,"cotacoes","preult","preult")
    cieda_b3_criar_indice(cursor,"cotacoes","preofc","preofc")
    cieda_b3_criar_indice(cursor,"cotacoes","preofv","preofv")
    cieda_b3_criar_indice(cursor,"cotacoes","totneg","totneg")
    cieda_b3_criar_indice(cursor,"cotacoes","quatot","quatot")
    cieda_b3_criar_indice(cursor,"cotacoes","voltot","voltot")
    cieda_b3_criar_indice(cursor,"cotacoes","preexe","preexe")
    cieda_b3_criar_indice(cursor,"cotacoes","indopc","indopc")
    cieda_b3_criar_indice(cursor,"cotacoes","datven","datven")
    cieda_b3_criar_indice(cursor,"cotacoes","fatcot","fatcot")
    cieda_b3_criar_indice(cursor,"cotacoes","ptoexe","ptoexe")
    cieda_b3_criar_indice(cursor,"cotacoes","codisi","codisi")
    cieda_b3_criar_indice(cursor,"cotacoes","dismes","dismes")

5.8 - Função cieda_b3_existe_cotacao()

A função cieda_b3_existe_cotacao retorna verdadeiro para uma cotação já existente.

#str_select = """
#    select count(*)
#    from cotacoes
#    indexed by idx_dt_cn_pt
#    where datpre = ? and codneg = ? and prazot = ?"""

str_select = """
    select count(*)
    from cotacoes
    indexed by idx_linha
    where linha = '{}'"""

#def cieda_b3_existe_cotacao(conn,cursor,datpre,codneg,prazot):
#    global str_select
#    cursor.execute(str_select,(datpre,codneg,prazot))
#    num = cursor.fetchone()[0]
#    return 1 if num >= 1 else 0

def cieda_b3_existe_cotacao(conn,cursor,linha,linha_chave):
    global str_select
    cursor.execute(str_select.format(linha_chave))
    num = cursor.fetchone()[0]
    return 1 if num >= 1 else 0

5.9 - Variável global str_insert_tabcot()

A variável ___ é global para ser declarada dentro da função inserir_cotacao_tabcot, evitando ser redeclarada dentro da função sempre que for evocada.

str_insert_tabcot = """
    insert into cotacoes (
        linha,
        datpre, codbid, codneg, tpmerc, nomres,
        especi, prazot, modref, preabe, premax,
        premin, premed, preult, preofc, preofv,
        totneg, quatot, voltot, preexe, indopc,
        datven, fatcot, ptoexe, codisi, dismes)
    values (
        ?,
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?, ?)
"""

5.10 - Função cieda_b3_decompor_valores_linha()

A função cieda_b3_decompor_valores_linha retorna a tupla com os valores decompostos a partir da linha vinda do arquivo zipado da B3.

def cieda_b3_decompor_valores_linha(linha,linha_chave):
    #
    controle = {"ini": 2}
    #
    def s(tam,linha=linha,controle=controle):
        ini = controle["ini"]
        fim = ini + tam
        controle["ini"] = fim
        return linha[ini:fim].strip()
    #
    def i(tam,linha=linha,controle=controle):
        aux = s(tam).lstrip("0")
        try:
            return int(aux)
        except:
            return 0
    #
    def f(tam,linha=linha,controle=controle):
        aux = s(tam).lstrip("0")
        try:
            return float(aux)/100
        except:
            return 0
    #
    def d(linha=linha,controle=controle):
        return s(4) + "-" + s(2)  + "-" + s(2)
    #
    return (
        linha_chave,
        d(),   s(2),  s(12), i(3),  s(12),
        s(10), i(3),  s(4),  s(13), f(13),
        f(13), f(13), f(13), f(13), f(13),
        i(5),  i(18), f(18), f(13), i(1),
        d(),   i(7),  f(13), s(12), i(3)
    )

5.11 - Função inserir_cotacao_tabcot()

A função inserir_cotacao_tabcot insere uma nova linha de cotações.

def inserir_cotacao_tabcot(conn,cursor,linha,linha_chave):
    global str_insert_tabcot
    tupla_vals = cieda_b3_decompor_valores_linha(linha,linha_chave)
    datpre = tupla_vals[0]
    codneg = tupla_vals[2]
    prazot = tupla_vals[6]
    if cieda_b3_existe_cotacao(conn,cursor,linha) # datpre,codneg,prazot): return 0
    try:
        cursor.execute(str_insert_tabcot,tupla_vals)
        #conn.commit()
    except:
        return -1
    return 1

5.12 - Função cieda_b3_arqcot_to_tabcot()

A função cieda_b3_arqcot_to_tabcot transfere o conteúdo da linha do aqrquivo zipado para o banco de dados.

A função retorna um dicionário com as seguintes chaves:

{

"data": d,

"esperados": e,

"processados": p,

"incluidos": i

}

onde:

CHAVE VALOR DESCRIÇÃO
data d data do arquivo no formato "aaaa-mm-dd'
esperados e número de registros esperados
processados p número de registros processados
incluidos i número de registros incluidos

def cieda_b3_arqcot_to_tabcot(conn,cursor,arqcot):
    linha = arqcot.readline() # primeira linha é lida do arquivo
    res = {"data": None, "esperados": -1, "incluidos": -1, "processados": -1}
    processados = incluidos = esperados = 0
    while linha: # enquanto houver linhas
        if isinstance(linha,bytes): # se a linha vier do arquivo-zip estará como uma sequência de bytes
            linha = linha.decode('iso-8859-1').strip() # a sequência de bytes é convertida para texto
        tipo = linha[:2]
        if tipo == '00': # registro de metadados
            res["data"] = datetime.strptime(linha[23:31],'%Y%m%d')
        elif tipo == "01":
            linha_chave = "".join([c if c.isalpha() or c.isnumeric() else "_" for c in linha])
            incluidos += inserir_cotacao_tabcot(conn,cursor,linha,linha_chave)
            processados += 1
        elif tipo == '99': # registro de metadados
            res["esperados"] = int(linha[31:42].lstrip("0"))
        linha = arqcot.readline()
    res["processados"] = processados
    res["incluidos"] = incluidos
    return res

5.13 - Função cieda_b3_processar_cotacoes()

A função cieda_b3_processar_cotacoes ....

def cieda_b3_processar_cotacoes(pasta, ano, mes=None, dia=None):
    res, conn, cursor = cieda_b3_preparar_bd()
    nome_arq_zip = cieda_b3_obter_nome_arq_zip_cotacoes(ano,mes,dia)
    pasta_nome_arq_zip = os.path.join(pasta,nome_arq_zip)
    if not os.path.isfile(pasta_nome_arq_zip): return -1
    arq_zip = open(pasta_nome_arq_zip,"rb")
    meu_zip = ZipFile(BytesIO(arq_zip.read()))
    for nome_arq_txt in meu_zip.namelist():
        res = cieda_b3_arqcot_to_tabcot(conn,cursor,meu_zip.open(nome_arq_txt,"r"))
        print(res)
        break
    arq_zip.close()
    conn.commit()
    cursor.close()
    conn.close()
cieda_b3_processar_cotacoes("./cotacoes",1986)
Arduino
Coautor
Betobyte
Autor
Autores
||| Áreas ||| Estatística ||| Python ||| Projetos ||| Dicas & Truques ||| Quantum ||| Estudos BOVESPA || Estudos BOVESPA || Aulas | Introdução (Apresentação do contexto) | ATB-1 (gráficos de linha, bollinger, histogramas) | ATB-2 (3D e correlações) | ATI-1 (Análise de padrões e consulta direta na B3) | ATI-2 (Banco de dados SQLite de cotações) |