Carregar muitos dados a partir dos arquivos zipados da Bovespa disponibilizados pela B3 é oneroso, pelo fato de acessar todos os 38 campos para cada linha de cotação, além de acessar uma mídia relativa lenta que são os discos rígidos, mesmo com tecnologia SSD ou mais avançada.
Importamos os pacotes que necessitaremos.
import pandas as pd
import os
from pathlib import Path
from zipfile import ZipFile
import sqlite3
from datetime import datetime
from pytz import timezone
Iniciamos a zona de horário e guardamos o horário atual.
tz = timezone('America/Fortaleza')
data_e_hora_atuais = datetime.now()
data_e_hora_atuais_tz = data_e_hora_atuais.astimezone(tz)
A função cieda_b3_conectar_bd retorna o objeto da classe sqlite3.Connection com a conexão ao banco de dados sqlite no diretório do argumento pasta com o nome de arquivo do argumento nomebd.
def cieda_b3_conectar_bd(pasta,nomebd):
try:
dt = detect_types=sqlite3.PARSE_DECLTYPES
os.makedirs(pasta, exist_ok=True)
pathnomebd = os.path.join(pasta,nomebd)
return sqlite3.connect(pathnomebd,dt)
except:
return None
A função cieda_b3_cursor_bd retorna o objeto sqlite3.Cursor com o cursor retornado pela conexão passada como argumento.
def cieda_b3_cursor_bd(conn):
try:
return conn.cursor()
except:
return None
A função cieda_b3_preparar_bd ....
def cieda_b3_preparar_bd():
conn = cieda_b3_conectar_bd(
"./cotacoes/database",
"b3_01.db")
if conn == None: return -1, None, None
cursor = cieda_b3_cursor_bd(conn)
if cursor == None: return -2, None, None
if not cieda_b3_existe_tabela_cotacoes(conn,cursor):
cieda_b3_criar_tabela_cotacoes(cursor)
return 1, conn, cursor
A função cieda_b3_existe_tabela_cotacoes ....
def cieda_b3_existe_tabela_cotacoes(conn,cursor):
cursor.execute("SELECT count(name) FROM sqlite_master WHERE type='table' AND name='cotacoes'")
num = cursor.fetchone()[0]
#conn.commit()
return 1 if num == 1 else 0
A função cieda_b3_criar_tabela_cotacoes cria a tabela cotacoes no banco de dados.
def cieda_b3_criar_tabela_cotacoes(cursor):
try:
cursor.execute("""
create table cotacoes (
id integer not null primary key autoincrement,
linha text,
datpre date not null,
codbid text,
codneg text not null,
tpmerc integer,
nomres text,
especi text,
prazot integer,
modref text,
preabe real,
premax real,
premin real,
premed real,
preult real,
preofc real,
preofv real,
totneg integer,
quatot integer,
voltot real,
preexe real,
indopc integer,
datven date,
fatcot integer,
ptoexe real,
codisi text,
dismes integer)
""")
except sqlite3.OperationalError as er:
return 0
cieda_b3_criar_indices_tabcot(cursor)
return 1
A função cieda_b3_criar_indice usa o cursor de cursor para criar na tabela de nome_tabela o índice com o nome em nome_tabela e os campos em campos_indice .
def cieda_b3_criar_indice(cursor,nome_tabela,nome_indice,campos_indice):
cursor.execute(
"create index idx_{} on {} ({})".format(
nome_indice,
nome_tabela,
campos_indice))
A função cieda_b3_criar_indices_tabcot cria os diversos índices para a tabela cotações, usando o cursor passado como parâmetro.
Apenas o índice idx_data_codneg_prazot é composto, com os campos data, codneg e prazot.
São criado também os índices de campos.
def cieda_b3_criar_indices_tabcot(cursor):
cieda_b3_criar_indice(cursor,"cotacoes","linha","linha")
#cieda_b3_criar_indice(cursor,"cotacoes","data_codneg_prazot","datpre,codneg,prazot")
cieda_b3_criar_indice(cursor,"cotacoes","datpre","datpre")
cieda_b3_criar_indice(cursor,"cotacoes","codbid","codbid")
cieda_b3_criar_indice(cursor,"cotacoes","codneg","codneg")
cieda_b3_criar_indice(cursor,"cotacoes","tpmerc","tpmerc")
cieda_b3_criar_indice(cursor,"cotacoes","nomres","nomres")
cieda_b3_criar_indice(cursor,"cotacoes","especi","especi")
cieda_b3_criar_indice(cursor,"cotacoes","prazot","prazot")
cieda_b3_criar_indice(cursor,"cotacoes","modref","modref")
cieda_b3_criar_indice(cursor,"cotacoes","preabe","preabe")
cieda_b3_criar_indice(cursor,"cotacoes","premax","premax")
cieda_b3_criar_indice(cursor,"cotacoes","premin","premin")
cieda_b3_criar_indice(cursor,"cotacoes","premed","premed")
cieda_b3_criar_indice(cursor,"cotacoes","preult","preult")
cieda_b3_criar_indice(cursor,"cotacoes","preofc","preofc")
cieda_b3_criar_indice(cursor,"cotacoes","preofv","preofv")
cieda_b3_criar_indice(cursor,"cotacoes","totneg","totneg")
cieda_b3_criar_indice(cursor,"cotacoes","quatot","quatot")
cieda_b3_criar_indice(cursor,"cotacoes","voltot","voltot")
cieda_b3_criar_indice(cursor,"cotacoes","preexe","preexe")
cieda_b3_criar_indice(cursor,"cotacoes","indopc","indopc")
cieda_b3_criar_indice(cursor,"cotacoes","datven","datven")
cieda_b3_criar_indice(cursor,"cotacoes","fatcot","fatcot")
cieda_b3_criar_indice(cursor,"cotacoes","ptoexe","ptoexe")
cieda_b3_criar_indice(cursor,"cotacoes","codisi","codisi")
cieda_b3_criar_indice(cursor,"cotacoes","dismes","dismes")
A função cieda_b3_existe_cotacao retorna verdadeiro para uma cotação já existente.
#str_select = """
# select count(*)
# from cotacoes
# indexed by idx_dt_cn_pt
# where datpre = ? and codneg = ? and prazot = ?"""
str_select = """
select count(*)
from cotacoes
indexed by idx_linha
where linha = '{}'"""
#def cieda_b3_existe_cotacao(conn,cursor,datpre,codneg,prazot):
# global str_select
# cursor.execute(str_select,(datpre,codneg,prazot))
# num = cursor.fetchone()[0]
# return 1 if num >= 1 else 0
def cieda_b3_existe_cotacao(conn,cursor,linha,linha_chave):
global str_select
cursor.execute(str_select.format(linha_chave))
num = cursor.fetchone()[0]
return 1 if num >= 1 else 0
A variável ___ é global para ser declarada dentro da função inserir_cotacao_tabcot, evitando ser redeclarada dentro da função sempre que for evocada.
str_insert_tabcot = """
insert into cotacoes (
linha,
datpre, codbid, codneg, tpmerc, nomres,
especi, prazot, modref, preabe, premax,
premin, premed, preult, preofc, preofv,
totneg, quatot, voltot, preexe, indopc,
datven, fatcot, ptoexe, codisi, dismes)
values (
?,
?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?, ?)
"""
A função cieda_b3_decompor_valores_linha retorna a tupla com os valores decompostos a partir da linha vinda do arquivo zipado da B3.
def cieda_b3_decompor_valores_linha(linha,linha_chave):
#
controle = {"ini": 2}
#
def s(tam,linha=linha,controle=controle):
ini = controle["ini"]
fim = ini + tam
controle["ini"] = fim
return linha[ini:fim].strip()
#
def i(tam,linha=linha,controle=controle):
aux = s(tam).lstrip("0")
try:
return int(aux)
except:
return 0
#
def f(tam,linha=linha,controle=controle):
aux = s(tam).lstrip("0")
try:
return float(aux)/100
except:
return 0
#
def d(linha=linha,controle=controle):
return s(4) + "-" + s(2) + "-" + s(2)
#
return (
linha_chave,
d(), s(2), s(12), i(3), s(12),
s(10), i(3), s(4), s(13), f(13),
f(13), f(13), f(13), f(13), f(13),
i(5), i(18), f(18), f(13), i(1),
d(), i(7), f(13), s(12), i(3)
)
A função inserir_cotacao_tabcot insere uma nova linha de cotações.
def inserir_cotacao_tabcot(conn,cursor,linha,linha_chave):
global str_insert_tabcot
tupla_vals = cieda_b3_decompor_valores_linha(linha,linha_chave)
datpre = tupla_vals[0]
codneg = tupla_vals[2]
prazot = tupla_vals[6]
if cieda_b3_existe_cotacao(conn,cursor,linha) # datpre,codneg,prazot): return 0
try:
cursor.execute(str_insert_tabcot,tupla_vals)
#conn.commit()
except:
return -1
return 1
A função cieda_b3_arqcot_to_tabcot transfere o conteúdo da linha do aqrquivo zipado para o banco de dados.
A função retorna um dicionário com as seguintes chaves:
{
"data": d,
"esperados": e,
"processados": p,
"incluidos": i
}
onde:
CHAVE | VALOR | DESCRIÇÃO |
---|---|---|
data | d | data do arquivo no formato "aaaa-mm-dd' |
esperados | e | número de registros esperados |
processados | p | número de registros processados |
incluidos | i | número de registros incluidos |
def cieda_b3_arqcot_to_tabcot(conn,cursor,arqcot):
linha = arqcot.readline() # primeira linha é lida do arquivo
res = {"data": None, "esperados": -1, "incluidos": -1, "processados": -1}
processados = incluidos = esperados = 0
while linha: # enquanto houver linhas
if isinstance(linha,bytes): # se a linha vier do arquivo-zip estará como uma sequência de bytes
linha = linha.decode('iso-8859-1').strip() # a sequência de bytes é convertida para texto
tipo = linha[:2]
if tipo == '00': # registro de metadados
res["data"] = datetime.strptime(linha[23:31],'%Y%m%d')
elif tipo == "01":
linha_chave = "".join([c if c.isalpha() or c.isnumeric() else "_" for c in linha])
incluidos += inserir_cotacao_tabcot(conn,cursor,linha,linha_chave)
processados += 1
elif tipo == '99': # registro de metadados
res["esperados"] = int(linha[31:42].lstrip("0"))
linha = arqcot.readline()
res["processados"] = processados
res["incluidos"] = incluidos
return res
A função cieda_b3_processar_cotacoes ....
def cieda_b3_processar_cotacoes(pasta, ano, mes=None, dia=None):
res, conn, cursor = cieda_b3_preparar_bd()
nome_arq_zip = cieda_b3_obter_nome_arq_zip_cotacoes(ano,mes,dia)
pasta_nome_arq_zip = os.path.join(pasta,nome_arq_zip)
if not os.path.isfile(pasta_nome_arq_zip): return -1
arq_zip = open(pasta_nome_arq_zip,"rb")
meu_zip = ZipFile(BytesIO(arq_zip.read()))
for nome_arq_txt in meu_zip.namelist():
res = cieda_b3_arqcot_to_tabcot(conn,cursor,meu_zip.open(nome_arq_txt,"r"))
print(res)
break
arq_zip.close()
conn.commit()
cursor.close()
conn.close()
cieda_b3_processar_cotacoes("./cotacoes",1986)