Antes de começar, vamos configurar o ClickHouse:
# Instalar ClickHouse
sudo apt-get update
sudo apt-get install -y apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754
echo "deb <https://packages.clickhouse.com/deb> stable main" | sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
# Iniciar o servidor
sudo service clickhouse-server start
# Verificar status
sudo service clickhouse-server status
# Acessar o cliente
clickhouse-client
ClickHouse é um SGBD colunar de código aberto para análise de dados em tempo real (OLAP) com:
Diferenças: https://clickhouse-com.translate.goog/docs/faq/general/columnar-database?_x_tr_sl=en&_x_tr_tl=pt&_x_tr_hl=pt&_x_tr_pto=tc
-- Criação do banco de dados
CREATE DATABASE IF NOT EXISTS ecommerce;
-- Tabela de eventos de usuários
CREATE TABLE ecommerce.user_events (
event_time DateTime,
event_type Enum8('view' = 1, 'add_to_cart' = 2, 'purchase' = 3),
user_id UInt64,
product_id UInt32,
price Decimal(10, 2),
quantity UInt8,
device_type Enum8('desktop' = 1, 'mobile' = 2, 'tablet' = 3)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id);
-- View materializada para estatísticas diárias
CREATE MATERIALIZED VIEW ecommerce.daily_stats
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(day)
ORDER BY (day, product_id, event_type)
AS SELECT
toDate(event_time) AS day,
product_id,
event_type,
count() AS events_count,
sum(quantity) AS total_quantity,
sum(price * quantity) AS total_value
FROM ecommerce.user_events
GROUP BY day, product_id, event_type;
Agora vamos instalar o pip do clickhouse: pip install clickhouse-server
e rodar este arquivo em python:
import clickhouse_driver
import random
from datetime import datetime, timedelta
import time
def generate_data(num_rows=1000000):
"""Gera dados aleatórios para inserção no ClickHouse"""
# Configurações para geração de dados
event_types = ['view', 'add_to_cart', 'purchase']
event_type_weights = [0.7, 0.2, 0.1] # 70% views, 20% add_to_cart, 10% purchase
device_types = ['desktop', 'mobile', 'tablet']
device_type_weights = [0.4, 0.5, 0.1] # 40% desktop, 50% mobile, 10% tablet
# Intervalo de datas para os eventos (últimos 90 dias)
end_date = datetime.now()
start_date = end_date - timedelta(days=90)
date_range_seconds = (end_date - start_date).total_seconds()
# Número de usuários e produtos
num_users = 50000 # 50 mil usuários
num_products = 10000 # 10 mil produtos
# Intervalo de preços e quantidades
min_price, max_price = 5.99, 499.99
min_quantity, max_quantity = 1, 5
data = []
for _ in range(num_rows):
# Gerar timestamp aleatório nos últimos 90 dias
random_seconds = random.uniform(0, date_range_seconds)
event_time = start_date + timedelta(seconds=random_seconds)
# Selecionar tipo de evento baseado nas proporções definidas
event_type = random.choices(event_types, weights=event_type_weights)[0]
# Gerar dados restantes
user_id = random.randint(1, num_users)
product_id = random.randint(1, num_products)
price = round(random.uniform(min_price, max_price), 2)
# Para compras, usar quantidades menores na média
if event_type == 'purchase':
quantity = random.randint(min_quantity, 3)
else:
quantity = random.randint(min_quantity, max_quantity)
device_type = random.choices(device_types, weights=device_type_weights)[0]
data.append((
event_time,
event_type,
user_id,
product_id,
price,
quantity,
device_type
))
# Enviar em lotes para melhorar performance
if len(data) >= 100000:
yield data
data = []
if data:
yield data
def insert_data_to_clickhouse(batch_size=100000, total_rows=1000000):
"""Insere dados no ClickHouse"""
# Configurar conexão com o ClickHouse
client = clickhouse_driver.Client(
host='localhost',
user='default',
password='123456', # Se necessário, adicione a senha
database='ecommerce'
)
# Verificar se a tabela existe
try:
client.execute('SELECT 1 FROM ecommerce.user_events LIMIT 1')
print("Tabela user_events encontrada.")
except Exception as e:
print(f"Erro ao verificar tabela: {e}")
# Criar o banco de dados e tabela se não existirem
create_database = "CREATE DATABASE IF NOT EXISTS ecommerce"
create_table = """
CREATE TABLE IF NOT EXISTS ecommerce.user_events (
event_time DateTime,
event_type Enum8('view' = 1, 'add_to_cart' = 2, 'purchase' = 3),
user_id UInt64,
product_id UInt32,
price Decimal(10, 2),
quantity UInt8,
device_type Enum8('desktop' = 1, 'mobile' = 2, 'tablet' = 3)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id)
"""
create_view = """
CREATE MATERIALIZED VIEW IF NOT EXISTS ecommerce.daily_stats
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(day)
ORDER BY (day, product_id, event_type)
AS SELECT
toDate(event_time) AS day,
product_id,
event_type,
count() AS events_count,
sum(quantity) AS total_quantity,
sum(price * quantity) AS total_value
FROM ecommerce.user_events
GROUP BY day, product_id, event_type
"""
client.execute(create_database)
client.execute(create_table)
client.execute(create_view)
print("Banco de dados, tabela e view criados.")
# Inserir dados em lotes
start_time = time.time()
total_inserted = 0
print(f"Iniciando inserção de {total_rows} linhas...")
for i, batch in enumerate(generate_data(total_rows)):
batch_start = time.time()
# Inserir lote de dados
client.execute(
'INSERT INTO ecommerce.user_events (event_time, event_type, user_id, product_id, price, quantity, device_type) VALUES',
batch
)
total_inserted += len(batch)
batch_end = time.time()
batch_time = batch_end - batch_start
print(f"Lote {i+1}: Inseridas {len(batch)} linhas em {batch_time:.2f} segundos. ({len(batch)/batch_time:.0f} linhas/s)")
print(f"Progresso: {total_inserted}/{total_rows} ({total_inserted/total_rows*100:.1f}%)")
end_time = time.time()
total_time = end_time - start_time
print(f"\\nConcluído! {total_inserted} linhas inseridas em {total_time:.2f} segundos.")
print(f"Taxa média: {total_inserted/total_time:.0f} linhas por segundo")
if __name__ == "__main__":
# Verifique se o pacote clickhouse-driver está instalado
try:
import clickhouse_driver
except ImportError:
print("O pacote clickhouse-driver não está instalado.")
print("Instalando via pip...")
import subprocess
subprocess.call(["pip", "install", "clickhouse-driver"])
import clickhouse_driver
# Executar a inserção de dados
insert_data_to_clickhouse(batch_size=100000, total_rows=1000000)
SELECT count() FROM ecommerce.daily_stats;
-- Total de vendas por dia
SELECT
day,
sum(total_value) AS daily_sales
FROM ecommerce.daily_stats
WHERE event_type = 'purchase'
GROUP BY day
ORDER BY day DESC
LIMIT 7;
-- Taxa de conversão
SELECT
toDate(event_time) AS day,
uniqExact(user_id) FILTER (WHERE event_type = 'view') AS viewers,
uniqExact(user_id) FILTER (WHERE event_type = 'purchase') AS buyers,
round(buyers / viewers * 100, 2) AS conversion_rate
FROM ecommerce.user_events
GROUP BY day
ORDER BY day DESC;