Files
2026-06-22 17:58:42 -07:00

148 lines
5.3 KiB
Python

from typing import List
import requests
import zipfile
from pathlib import Path
import polars as pl
from datetime import datetime, timedelta
from tqdm import tqdm
import research
MAKER_FEE = 0.000450
TAKER_FEE = 0.000450
def download_and_unzip(symbol: str, date: str | datetime,
download_dir: str = "data", cache_dir: str = "cache") -> pl.DataFrame:
"""
Download and unzip Binance futures trade data for a given symbol and date.
Caches results as parquet files to avoid repeated downloads.
"""
# Normalize date to string
date_str = date.strftime('%Y-%m-%d') if isinstance(date, datetime) else date
cache_dir = Path(cache_dir)
cache_dir.mkdir(exist_ok=True)
cache_path = cache_dir / f"{symbol}-trades-{date_str}.parquet"
if cache_path.exists():
return pl.read_parquet(cache_path)
url = f"https://data.binance.vision/data/futures/um/daily/trades/{symbol}/{symbol}-trades-{date_str}.zip"
download_dir = Path(download_dir)
download_dir.mkdir(exist_ok=True)
zip_path = download_dir / f"{symbol}-trades-{date_str}.zip"
# Download zip
response = requests.get(url, stream=True)
response.raise_for_status()
with open(zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Extract
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(download_dir)
csv_path = download_dir / f"{symbol}-trades-{date_str}.csv"
# Load into Polars
df = pl.read_csv(
csv_path,
schema={
"id": pl.Int64,
"price": pl.Float64,
"qty": pl.Float64,
"quoteQty": pl.Float64,
"time": pl.Int64,
"isBuyerMaker": pl.Boolean,
}
).with_columns(
pl.from_epoch("time", time_unit="ms").alias("datetime")
)
# Cache and clean
df.write_parquet(cache_path)
zip_path.unlink(missing_ok=True)
csv_path.unlink(missing_ok=True)
return df
def download_date_range(symbol: str, start_date: str | datetime, end_date: str | datetime,
download_dir: str = "data", cache_dir: str = "cache") -> list[pl.DataFrame]:
"""
Download trade data for a range of dates with a progress bar.
"""
if isinstance(start_date, str):
start_date = datetime.strptime(start_date, '%Y-%m-%d')
if isinstance(end_date, str):
end_date = datetime.strptime(end_date, '%Y-%m-%d')
num_days = (end_date - start_date).days + 1
for i in tqdm(range(num_days), desc=f"Downloading {symbol}"):
current_date = start_date + timedelta(days=i)
try:
download_and_unzip(symbol, current_date, download_dir, cache_dir)
except Exception as e:
tqdm.write(f"[ERROR] {symbol} {current_date.date()}: {e}")
def download_trades(symbol: str, no_days: int,
download_dir: str = "data", cache_dir: str = "cache", return_trades=False) -> pl.DataFrame:
"""
Download trades for the last N days up to yesterday with a progress bar.
"""
yesterday = datetime.now() - timedelta(days=1)
start_date = yesterday - timedelta(days=no_days - 1)
dfs = []
for i in tqdm(range(no_days), desc=f"Downloading {symbol}"):
current_date = start_date + timedelta(days=i)
try:
if return_trades:
dfs.append(download_and_unzip(symbol, current_date, download_dir, cache_dir))
else:
download_and_unzip(symbol, current_date, download_dir, cache_dir)
except Exception as e:
tqdm.write(f"[ERROR] {symbol} {current_date.date()}: {e}")
return pl.concat(dfs) if return_trades else None
def download_ohlc_timeseries(symbol: str, no_days: int, time_interval: str, download_dir: str = "data", cache_dir: str = "cache") -> pl.DataFrame:
"""
Download trades for the last N days up to yesterday with a progress bar.
"""
yesterday = datetime.now() - timedelta(days=1)
start_date = yesterday - timedelta(days=no_days - 1)
time_series = []
for i in tqdm(range(no_days), desc=f"Downloading {symbol}"):
current_date = start_date + timedelta(days=i)
try:
trades = download_and_unzip(symbol, current_date, download_dir, cache_dir)
time_series.append(research.timeseries(trades, time_interval, research.OHLC_AGGS))
except Exception as e:
tqdm.write(f"[ERROR] {symbol} {current_date.date()}: {e}")
return pl.concat(time_series)
def download_timeseries(symbol: str, no_days: int, time_interval: str, aggs: List[pl.Expr], download_dir: str = "data", cache_dir: str = "cache") -> pl.DataFrame:
"""
Download trades for the last N days up to yesterday with a progress bar.
"""
yesterday = datetime.now() - timedelta(days=1)
start_date = yesterday - timedelta(days=no_days - 1)
time_series = []
for i in tqdm(range(no_days), desc=f"Downloading {symbol}"):
current_date = start_date + timedelta(days=i)
try:
trades = download_and_unzip(symbol, current_date, download_dir, cache_dir)
time_series.append(research.timeseries(trades, time_interval, aggs))
except Exception as e:
tqdm.write(f"[ERROR] {symbol} {current_date.date()}: {e}")
return pl.concat(time_series)