Data Source
The DataSource class manages data flow for TradeX charts in an agnostic manner, implementing a pull-based architecture where charts request data rather than having data pushed to them.
Overview
DataSource handles:
- Symbol and timeframe management
- Real-time ticker streams
- Historical data fetching
- State management across different chart configurations
- Data source registry and lifecycle management
Static Properties
Time Constants
export const SECOND_MS = 1000
export const MINUTE_MS = SECOND_MS * 60
export const HOUR_MS = MINUTE_MS * 60
export const DAY_MS = HOUR_MS * 24
export const WEEK_MS = DAY_MS * 7
export const MONTHR_MS = DAY_MS * 30
export const YEAR_MS = DAY_MS * 365
Default Time Frames
static get defaultTimeFrames()
Returns the default supported timeframes:
- Minutes: 1m, 2m, 3m, 5m, 10m, 15m, 30m
- Hours: 1h, 2h, 3h, 4h
- Days: 1d
- Weeks: 1w
- Months: 1M, 3M, 6M
- Years: 1y
Static Methods
create(cfg, state)
Creates a new DataSource instance and registers it.
Parameters:
cfg(Object) - Configuration objectsymbol(String) - Trading pair symbol (e.g., “BTC/USDT”)timeFrames(Array) - Supported timeframes in milliseconds timeFrameInit(Number) - Initial timeframeticker(Object) - Ticker stream configurationhistory(Object) - Historical data configuration
state(State) - Chart state instance
Returns: DataSource instance or undefined if creation fails
delete(key), has(key), get(key)
Registry management methods for DataSource instances.
list(chart)
Lists all registered data sources for a given chart.
Parameters:
chart(TradeXchart) - Target chart instance
Returns: Array<DataSource> or undefined
Constructor
constructor(cfg, state)
Creates a DataSource instance for a specific symbol and state.
Parameters:
cfg(Object) - Configuration objectsource(Object) - Exchange or data provider configurationsymbol(String) - Trading pair symbolsymbols(Object) - List of symbols and their datatimeFrames(Array) - Supported timeframes timeFrameInit(Number) - Initial timeframeticker(Ticker) - Ticker stream handlerhistory(History) - Historical data handlerinitialRange(Object) - Initial chart range configuration
state(State) - Chart state instance
Properties
Getters
id- Unique DataSource identifiercfg- Configuration objectstate- Associated chart statesource- Data source configurationstream- Stream handler instancesymbol- Current trading symboltimeFrame/timeFrameMS- Current timeframe in millisecondstimeFrameStr- Current timeframe as string (e.g., “1h”)timeFrames- Available timeframesrange- Chart range handler
Setters
timeFrame- Sets the current timeframe
Core Methods
Symbol Management
symbolSet(symbol)
Sets the trading symbol for the data source.
symbolsAdd(symbols)
Adds multiple symbols to the data source.
Time Frame Management
timeFrameUse(tf)
Sets the chart timeframe and manages state transitions.
Parameters:
tf(Number|String) - Timeframe in milliseconds or string format (e.g., “1h”)
timeFrameValidate(tf)
Validates a timeframe value.
Parameters:
tf(Number|String) - Timeframe to validate
Returns: Number - Validated timeframe in milliseconds, or undefined if invalid
timeFrameExists(tf)
Checks if a timeframe is supported.
Parameters:
tf(Number|String) - Timeframe to check
Returns: Number - Timeframe in milliseconds if exists, undefined otherwise
Ticker Stream Management
tickerAdd(ticker, begin)
Adds a ticker stream to the chart.
Parameters:
ticker(Object) - Ticker stream definitionstart(Function) - Function to start the streamstop(Function) - Function to stop the stream
begin(Object) - Optional auto-start configurationsymbol(String) - Ticker symboltf(Number) - Timeframe in milliseconds
tickerStart(symbol, tf)
Starts the ticker stream for a specific symbol and timeframe.
Parameters:
symbol(String) - Trading symboltf(Number|String) - Timeframe
Returns: Boolean - Success status
tickerStop()
Stops the current ticker stream.
Historical Data Management
historyAdd(history)
Adds historical data fetching functions.
Parameters:
history(Object) - History configurationrangeLimitPast(Function) - Fetch function for past datarangeLimitFuture(Function) - Fetch function for future data
Both functions must accept 4 parameters and return a Promise.
startTickerHistory(config)
Starts both ticker stream and loads initial historical data.
Parameters:
config(Object) - Combined configurationrangeLimitPast(Function) - Historical data fetch functionrangeLimitFuture(Function) - Future data fetch function (optional)start(Function) - Ticker start functionstop(Function) - Ticker stop functionsymbol(String) - Trading symboltf(Number) - Timeframe in milliseconds
Returns: Boolean - Success status
onRangeLimit(event, fn, timestamp)
Executes data fetching when chart range limits are reached.
Parameters:
event- Range limit eventfn(Function) - Fetch function to executetimestamp(Number) - Unix timestamp in milliseconds
State Management
findMatchingState(source, symbol, timeFrame)
Finds existing states that match the given criteria.
Parameters:
source(String) - Data source name (optional, defaults to current)symbol(String) - Trading symbol (optional, defaults to current)timeFrame(Number) - Timeframe (optional, defaults to current)
Returns: State|Object|undefined - Exact match, closest matches, or undefined
Usage Example
import DataSource from './dataSource.js';
// Create a data source
const config = {
symbol: 'BTC/USDT',
timeFrameInit: 3600000, // 1 hour
source: {
name: 'binance',
rangeLimitPast: async (event, symbol, tf, timestamp) => {
// Fetch historical data
return await fetchHistoricalData(symbol, tf, timestamp);
}
}
};
const dataSource = DataSource.create(config, chartState);
// Start real-time data
dataSource.startTickerHistory({
symbol: 'BTC/USDT',
tf: 3600000,
rangeLimitPast: fetchHistoricalData,
start: startTickerStream,
stop: stopTickerStream
});
Error Handling
The DataSource class includes comprehensive error handling:
- Parameter validation for all public methods
- Promise rejection handling for async operations
- Graceful fallbacks for missing configurations
- Detailed error messages with context information
Events
DataSource integrates with the chart’s event system:
range_limitPast- Triggered when more historical data is neededrange_limitFuture- Triggered when future data is requestedstream_candleFirst- Triggered when first real-time candle arrives
Internal Architecture
Private Properties
The DataSource class uses private fields to encapsulate internal state:
#cnt- Instance counter#id- Unique identifier#core- Reference to TradeXchart core#cfg- Configuration object#source- Data source configuration#symbol- Current trading symbol#state- Associated chart state#range- Range management instance#stream- Stream handler#timeFrames- Available timeframes map#timeFrameCurr- Current active timeframe#waiting- Flag for pending data requests#fetching- Flag for active fetch operations
Static Registry
DataSource maintains static registries for tracking instances:
static #chartList = new xMap() // Chart to DataSource mapping
static #sourceList = new xMap() // DataSource to State mapping
static #sourceCnt = 0 // Global instance counter
Advanced Features
Multi-State Management
DataSource can manage multiple chart states for the same symbol with different timeframes:
// Switch between timeframes
dataSource.timeFrame = '5m'; // Automatically manages state transitions
dataSource.timeFrame = '1h'; // Creates new state or switches to existing one
Stream Integration
The DataSource integrates with the Stream class for real-time data:
// Stream lifecycle management
this.#stream.start(); // Start processing real-time data
this.#stream.onTick(tickData); // Process incoming tick data
this.#stream.stop(); // Stop stream processing
Range-Based Data Loading
Automatic data loading based on chart viewport:
// Past data loading
core.on('range_limitPast', (event) => {
// Automatically triggered when user scrolls to chart beginning
return fetchHistoricalData(symbol, timeframe, event.startTS);
});
// Future data loading
core.on('range_limitFuture', (event) => {
// Triggered when scrolling beyond available data
return fetchFutureData(symbol, timeframe, event.endTS);
});
Data Flow Architecture
Pull-Based Design
Unlike traditional push-based systems, DataSource implements a pull architecture:
- Chart Request: Chart requests data for specific range/timeframe
- DataSource Evaluation: DataSource checks cache and determines data needs
- Data Fetching: External data sources are queried as needed
- State Update: Retrieved data is merged into appropriate chart state
- Chart Notification: Chart is notified of data availability
State Synchronization
// Automatic state management
const matchingState = dataSource.findMatchingState('binance', 'BTC/USDT', '1h');
if (matchingState instanceof State) {
// Switch to existing state
this.#state.use(matchingState.key);
} else {
// Create new state with current configuration
const newStateDef = {
dataSource: doStructuredClone(this.#state.dataSource)
};
this.#state.use(newStateDef);
}
Configuration Options
Source Configuration
const sourceConfig = {
name: 'binance', // Data provider identifier
tickerStream: {
start: startStreamFunction, // WebSocket connection starter
stop: stopStreamFunction // WebSocket connection closer
},
rangeLimitPast: fetchPastData, // Historical data fetcher
rangeLimitFuture: fetchFutureData // Future data fetcher (optional)
};
Symbol Configuration
const symbolConfig = {
symbol: 'BTC/USDT', // Primary trading pair
symbols: { // Additional symbol data
'BTC/USDT': { precision: 8 },
'ETH/USDT': { precision: 6 }
}
};
TimeFrame Configuration
const timeFrameConfig = {
timeFrames: [60000, 300000, 3600000], // Custom timeframes in ms
timeFrameInit: 3600000 // Initial timeframe
};
Error Scenarios and Handling
Common Error Cases
-
Invalid Symbol:
// Throws error if symbol is invalid or empty throwError(this.#core.ID, this.#state.key, 'symbol invalid'); -
Invalid TimeFrame:
// Validates timeframe before use if (!isInteger(tf)) { throwError(this.#core.ID, this.#state.key, 'time frame invalid'); } -
Function Parameter Mismatch:
// Validates function signatures if (rangeLimitPast.length !== 4) { consoleError(this.#core, this.#state.key, 'rangeLimitPast function requires 4 parameters'); } -
Promise Rejection Handling:
promise.catch(error => { this.#waiting = false; this.#core.progress.stop(); this.#core.error(error); });
Recovery Mechanisms
- Automatic Retry: Failed data requests can be automatically retried
- Fallback States: System falls back to previous working state on errors
- Progress Indicators: Loading states are properly managed and cleaned up
- Resource Cleanup: Streams and event listeners are properly disposed
Performance Considerations
Memory Management
// Proper cleanup on DataSource deletion
static delete(key) {
if (key instanceof DataSource) {
key.historyRemove(); // Remove event listeners
key.tickerStop(); // Stop streams
DataSource.#sourceList.delete(key);
}
}
Efficient State Switching
- States are reused when possible rather than recreated
- Data is shared between compatible states
- Minimal data copying through structured cloning only when necessary
Stream Optimization
- Single stream per symbol/timeframe combination
- Automatic stream lifecycle management
- Efficient tick data processing through Stream class
Integration Examples
Basic Setup
import DataSource from './dataSource.js';
import TradeXChart from './core.js';
const chart = new TradeXChart(container, config);
const state = chart.state;
const dataSource = DataSource.create({
symbol: 'BTC/USDT',
timeFrameInit: '1h',
source: {
name: 'myExchange'
}
}, state);
Advanced Setup with Custom Data Provider
class CustomDataProvider {
async fetchHistoricalData(symbol, timeframe, timestamp) {
const response = await fetch(`/api/history/${symbol}/${timeframe}/${timestamp}`);
return response.json();
}
startWebSocket(symbol, timeframe, onTick) {
const ws = new WebSocket(`wss://api.example.com/stream/${symbol}/${timeframe}`);
ws.onmessage = (event) => onTick(JSON.parse(event.data));
return ws;
}
}
const provider = new CustomDataProvider();
const dataSource = DataSource.create({
symbol: 'BTC/USDT',
source: {
name: 'custom',
rangeLimitPast: provider.fetchHistoricalData.bind(provider),
tickerStream: {
start: provider.startWebSocket.bind(provider),
stop: (ws) => ws.close()
}
}
}, state);
Multiple TimeFrame Management
// Create data source with multiple timeframes
const dataSource = DataSource.create({
symbol: 'BTC/USDT',
timeFrames: ['1m', '5m', '15m', '1h', '4h', '1d'],
timeFrameInit: '1h'
}, state);
// Switch between timeframes
dataSource.timeFrame = '5m'; // Switches to 5-minute chart
dataSource.timeFrame = '1d'; // Switches to daily chart
// Each timeframe maintains its own state and data
Testing and Debugging
Debug Information
// Enable detailed logging
dataSource.identifyState(); // Logs current state information
// Access internal state for debugging
console.log({
symbol: dataSource.symbol,
timeFrame: dataSource.timeFrameStr,
isWaiting: dataSource.#waiting, // Note: Private fields not accessible
stateKey: dataSource.state.key
});
Mock Data Provider for Testing
const mockProvider = {
rangeLimitPast: async (event, symbol, tf, timestamp) => {
// Return mock historical data
return {
chart: {
data: generateMockCandles(symbol, tf, timestamp, 100)
}
};
},
tickerStream: {
start: (symbol, tf, onTick) => {
// Simulate real-time ticks
const interval = setInterval(() => {
onTick(generateMockTick(symbol));
}, 1000);
return interval;
},
stop: (interval) => clearInterval(interval)
}
};
Migration and Compatibility
Upgrading from Previous Versions
When upgrading DataSource implementations:
- Function Signatures: Ensure fetch functions accept exactly 4 parameters
- Promise Returns: All async functions must return Promises
- Error Handling: Implement proper error handling in custom providers
- State Management: Update state access patterns for new API
Backward Compatibility
DataSource maintains backward compatibility through:
- Default value fallbacks for missing configuration
- Automatic timeframe detection from existing data
- Graceful handling of legacy data formats
Best Practices
1. Resource Management
// Always clean up resources
dataSource.historyRemove(); // Remove event listeners
dataSource.tickerStop(); // Stop streams
2. Error Handling
// Implement comprehensive error handling
const fetchData = async (event, symbol, tf, timestamp) => {
try {
const data = await apiCall(symbol, tf, timestamp);
return data;
} catch (error) {
console.error('Data fetch failed:', error);
return {}; // Return empty object on failure
}
};
3. Performance Optimization
// Use appropriate timeframes for data density
const appropriateTimeframes = {
intraday: ['1m', '5m', '15m', '1h'],
daily: ['4h', '1d', '1w'],
longTerm: ['1w', '1M', '3M']
};
4. State Management
// Check for existing states before creating new ones
const existing = dataSource.findMatchingState();
if (existing instanceof State) {
// Use existing state
state.use(existing.key);
} else {
// Create new state only when necessary
state.use(newStateDefinition);
}
API Reference
Utility Functions
consoleError(core, key, error)
Internal utility for consistent error logging.
Parameters:
core- TradeXChart core instancekey- State key for contexterror- Error message string
throwError(id, key, error)
Internal utility for throwing formatted errors.
Parameters:
id- Chart IDkey- State keyerror- Error message string
buildTimeFrames(timeframes)
Converts array of timeframe milliseconds to timeframe object.
Parameters:
timeframes(Array) - Array of timeframes in milliseconds
Returns: Object - Timeframe mapping object
timeFrame2MS(timeframe)
Converts timeframe string or number to milliseconds.
Parameters:
timeframe(String|Number) - Timeframe (e.g., “1h” or 3600000)
Returns: Number - Timeframe in milliseconds
Event System Integration
Core Events
DataSource integrates with the chart’s event system to provide seamless data loading:
// Range limit events
core.on('range_limitPast', handler); // Triggered when scrolling to past
core.on('range_limitFuture', handler); // Triggered when scrolling to future
// Stream events
core.on('stream_candleFirst', handler); // First real-time candle received
core.on('stream_candleUpdate', handler);// Real-time candle updates
Custom Event Handlers
// Example: Custom range limit handler
const customRangeLimitHandler = async (event, symbol, timeframe, timestamp) => {
// Log the request
console.log(`Fetching ${symbol} data for ${timeframe}ms from ${timestamp}`);
// Fetch data with retry logic
let retries = 3;
while (retries > 0) {
try {
const data = await fetchFromAPI(symbol, timeframe, timestamp);
return {
chart: { data: data.candles },
volume: { data: data.volume },
trades: { data: data.trades }
};
} catch (error) {
retries--;
if (retries === 0) throw error;
await new Promise(resolve => setTimeout(resolve, 1000)); // Wait 1s before retry
}
}
};
Data Formats
Expected Data Structure
DataSource expects data in specific formats for different chart components:
Candle Data Format
const candleData = {
chart: {
data: [
[timestamp, open, high, low, close, volume], // OHLCV array format
[1640995200000, 47000, 48000, 46500, 47500, 1234.56],
// ... more candles
]
}
};
Tick Data Format
const tickData = {
timestamp: 1640995200000,
price: 47500.00,
volume: 0.5,
side: 'buy' // or 'sell'
};
Volume Data Format
const volumeData = {
volume: {
data: [
[timestamp, volume, buyVolume, sellVolume],
[1640995200000, 1234.56, 800.00, 434.56],
// ... more volume data
]
}
};
Data Validation
DataSource performs automatic validation on incoming data:
// Validates candle data structure
const isValidCandle = (candle) => {
return Array.isArray(candle) &&
candle.length >= 6 &&
candle.every(val => typeof val === 'number');
};
// Validates timestamp ordering
const isChronological = (data) => {
for (let i = 1; i < data.length; i++) {
if (data[i][0] <= data[i-1][0]) return false;
}
return true;
};
WebSocket Integration
Real-time Data Streaming
DataSource provides a standardized interface for WebSocket connections:
class WebSocketProvider {
constructor(baseUrl) {
this.baseUrl = baseUrl;
this.connections = new Map();
}
start(symbol, timeframe, onTick) {
const key = `${symbol}_${timeframe}`;
if (this.connections.has(key)) {
this.connections.get(key).close();
}
const ws = new WebSocket(`${this.baseUrl}/stream/${symbol}/${timeframe}`);
ws.onopen = () => {
console.log(`WebSocket connected for ${symbol} ${timeframe}`);
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
onTick(this.formatTick(data));
} catch (error) {
console.error('Failed to parse WebSocket message:', error);
}
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = () => {
console.log(`WebSocket closed for ${symbol} ${timeframe}`);
this.connections.delete(key);
};
this.connections.set(key, ws);
return ws;
}
stop(ws) {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.close();
}
}
formatTick(rawData) {
return {
timestamp: rawData.t,
price: parseFloat(rawData.p),
volume: parseFloat(rawData.v),
side: rawData.s
};
}
}
Connection Management
// Automatic reconnection logic
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxReconnectAttempts: 5,
reconnectInterval: 1000,
...options
};
this.reconnectAttempts = 0;
}
connect(onMessage) {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
this.reconnectAttempts = 0;
console.log('WebSocket connected');
};
this.ws.onmessage = onMessage;
this.ws.onclose = () => {
if (this.reconnectAttempts < this.options.maxReconnectAttempts) {
setTimeout(() => {
this.reconnectAttempts++;
this.connect(onMessage);
}, this.options.reconnectInterval);
}
};
}
}
Caching Strategies
Memory-Based Caching
class DataCache {
constructor(maxSize = 1000) {
this.cache = new Map();
this.maxSize = maxSize;
}
get(key) {
if (this.cache.has(key)) {
// Move to end (LRU)
const value = this.cache.get(key);
this.cache.delete(key);
this.cache.set(key, value);
return value;
}
return null;
}
set(key, value) {
if (this.cache.has(key)) {
this.cache.delete(key);
} else if (this.cache.size >= this.maxSize) {
// Remove oldest entry
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
}
this.cache.set(key, value);
}
generateKey(symbol, timeframe, timestamp) {
return `${symbol}_${timeframe}_${Math.floor(timestamp / timeframe)}`;
}
}
Persistent Caching
class PersistentCache {
constructor(dbName = 'TradeXCache') {
this.dbName = dbName;
this.initDB();
}
async initDB() {
return new Promise((resolve, reject) => {
const request = indexedDB.open(this.dbName, 1);
request.onerror = () => reject(request.error);
request.onsuccess = () => {
this.db = request.result;
resolve();
};
request.onupgradeneeded = (event) => {
const db = event.target.result;
const store = db.createObjectStore('candles', { keyPath: 'key' });
store.createIndex('symbol', 'symbol', { unique: false });
store.createIndex('timeframe', 'timeframe', { unique: false });
};
});
}
async store(symbol, timeframe, timestamp, data) {
const transaction = this.db.transaction(['candles'], 'readwrite');
const store = transaction.objectStore('candles');
const key = `${symbol}_${timeframe}_${timestamp}`;
await store.put({
key,
symbol,
timeframe,
timestamp,
data,
cached: Date.now()
});
}
async retrieve(symbol, timeframe, startTime, endTime) {
const transaction = this.db.transaction(['candles'], 'readonly');
const store = transaction.objectStore('candles');
const results = [];
const range = IDBKeyRange.bound(
`${symbol}_${timeframe}_${startTime}`,
`${symbol}_${timeframe}_${endTime}`
);
return new Promise((resolve) => {
store.openCursor(range).onsuccess = (event) => {
const cursor = event.target.result;
if (cursor) {
results.push(cursor.value.data);
cursor.continue();
} else {
resolve(results);
}
};
});
}
}
Advanced Configuration
Multi-Exchange Setup
class MultiExchangeDataSource {
constructor() {
this.exchanges = new Map();
this.primaryExchange = null;
}
addExchange(name, config) {
const dataSource = DataSource.create({
...config,
source: { name, ...config.source }
}, config.state);
this.exchanges.set(name, dataSource);
if (!this.primaryExchange) {
this.primaryExchange = name;
}
}
async fetchFromBestSource(symbol, timeframe, timestamp) {
const promises = Array.from(this.exchanges.values()).map(ds =>
ds.source.rangeLimitPast(null, symbol, timeframe, timestamp)
.catch(error => ({ error, exchange: ds.source.name }))
);
const results = await Promise.allSettled(promises);
// Return first successful result
for (const result of results) {
if (result.status === 'fulfilled' && !result.value.error) {
return result.value;
}
}
throw new Error('All exchanges failed to provide data');
}
}
Aggregated Data Sources
class AggregatedDataSource extends DataSource {
constructor(sources, aggregationMethod = 'average') {
super(...arguments);
this.sources = sources;
this.aggregationMethod = aggregationMethod;
}
async fetchAggregatedData(symbol, timeframe, timestamp) {
const promises = this.sources.map(source =>
source.rangeLimitPast(null, symbol, timeframe, timestamp)
);
const results = await Promise.allSettled(promises);
const validResults = results
.filter(r => r.status === 'fulfilled')
.map(r => r.value);
if (validResults.length === 0) {
throw new Error('No valid data sources available');
}
return this.aggregateData(validResults);
}
aggregateData(datasets) {
switch (this.aggregationMethod) {
case 'average':
return this.averageData(datasets);
case 'median':
return this.medianData(datasets);
case 'weighted':
return this.weightedData(datasets);
default:
return datasets[0]; // Return first dataset as fallback
}
}
averageData(datasets) {
// Implementation for averaging OHLCV data across sources
const aggregated = { chart: { data: [] } };
const maxLength = Math.max(...datasets.map(d => d.chart.data.length));
for (let i = 0; i < maxLength; i++) {
const candles = datasets
.map(d => d.chart.data[i])
.filter(Boolean);
if (candles.length > 0) {
const avgCandle = [
candles[0][0], // timestamp (same for all)
this.average(candles.map(c => c[1])), // open
Math.max(...candles.map(c => c[2])), // high
Math.min(...candles.map(c => c[3])), // low
this.average(candles.map(c => c[4])), // close
this.sum(candles.map(c => c[5])) // volume
];
aggregated.chart.data.push(avgCandle);
}
}
return aggregated;
}
average(values) {
return values.reduce((sum, val) => sum + val, 0) / values.length;
}
sum(values) {
return values.reduce((sum, val) => sum + val, 0);
}
}
Performance Monitoring
Metrics Collection
class DataSourceMetrics {
constructor(dataSource) {
this.dataSource = dataSource;
this.metrics = {
fetchCount: 0,
fetchTime: [],
errorCount: 0,
cacheHits: 0,
cacheMisses: 0
};
}
wrapFetchFunction(originalFetch) {
return async (...args) => {
const startTime = performance.now();
this.metrics.fetchCount++;
try {
const result = await originalFetch.apply(this.dataSource, args);
const endTime = performance.now();
this.metrics.fetchTime.push(endTime - startTime);
return result;
} catch (error) {
this.metrics.errorCount++;
throw error;
}
};
}
getAverageResponseTime() {
if (this.metrics.fetchTime.length === 0) return 0;
return this.metrics.fetchTime.reduce((sum, time) => sum + time, 0) /
this.metrics.fetchTime.length;
}
getErrorRate() {
if (this.metrics.fetchCount === 0) return 0;
return this.metrics.errorCount / this.metrics.fetchCount;
}
getCacheHitRate() {
const total = this.metrics.cacheHits + this.metrics.cacheMisses;
if (total === 0) return 0;
return this.metrics.cacheHits / total;
}
generateReport() {
return {
totalFetches: this.metrics.fetchCount,
averageResponseTime: this.getAverageResponseTime(),
errorRate: this.getErrorRate(),
cacheHitRate: this.getCacheHitRate(),
totalErrors: this.metrics.errorCount
};
}
reset() {
this.metrics = {
fetchCount: 0,
fetchTime: [],
errorCount: 0,
cacheHits: 0,
cacheMisses: 0
};
}
}
```
### Performance Optimization Techniques
```javascript
class OptimizedDataSource extends DataSource {
constructor(cfg, state) {
super(cfg, state);
this.requestQueue = [];
this.processingQueue = false;
this.batchSize = 10;
this.batchTimeout = 100; // ms
}
// Batch multiple requests together
async batchRequest(symbol, timeframe, timestamp) {
return new Promise((resolve, reject) => {
this.requestQueue.push({ symbol, timeframe, timestamp, resolve, reject });
if (!this.processingQueue) {
this.processingQueue = true;
setTimeout(() => this.processBatch(), this.batchTimeout);
}
});
}
async processBatch() {
const batch = this.requestQueue.splice(0, this.batchSize);
this.processingQueue = false;
if (batch.length === 0) return;
try {
// Group requests by symbol and timeframe
const grouped = batch.reduce((acc, req) => {
const key = `${req.symbol}_${req.timeframe}`;
if (!acc[key]) acc[key] = [];
acc[key].push(req);
return acc;
}, {});
// Process each group
for (const [key, requests] of Object.entries(grouped)) {
const [symbol, timeframe] = key.split('_');
const timestamps = requests.map(r => r.timestamp);
const minTime = Math.min(...timestamps);
const maxTime = Math.max(...timestamps);
// Fetch data for the entire range
const data = await this.fetchRange(symbol, parseInt(timeframe), minTime, maxTime);
// Distribute results to individual requests
requests.forEach(req => {
const relevantData = this.extractRelevantData(data, req.timestamp);
req.resolve(relevantData);
});
}
} catch (error) {
// Reject all requests in the batch
batch.forEach(req => req.reject(error));
}
// Process remaining queue if any
if (this.requestQueue.length > 0) {
this.processingQueue = true;
setTimeout(() => this.processBatch(), this.batchTimeout);
}
}
// Prefetch data based on user behavior
async prefetchData(symbol, timeframe, currentTimestamp) {
const prefetchAmount = 100; // Number of candles to prefetch
const timeframeDuration = parseInt(timeframe);
const futureTimestamp = currentTimestamp + (prefetchAmount * timeframeDuration);
// Prefetch in background without blocking
this.fetchRange(symbol, timeframe, currentTimestamp, futureTimestamp)
.catch(error => console.warn('Prefetch failed:', error));
}
}
```
## Security Considerations
### API Key Management
```javascript
class SecureDataSource extends DataSource {
constructor(cfg, state) {
super(cfg, state);
this.apiKeys = new Map();
this.rateLimiter = new RateLimiter();
}
setApiKey(exchange, apiKey, secret) {
// Store encrypted credentials
this.apiKeys.set(exchange, {
key: this.encrypt(apiKey),
secret: this.encrypt(secret),
timestamp: Date.now()
});
}
encrypt(data) {
// Use Web Crypto API for encryption
// Implementation depends on your security requirements
return btoa(data); // Simple base64 for example
}
decrypt(encryptedData) {
return atob(encryptedData);
}
async authenticatedRequest(exchange, endpoint, params) {
const credentials = this.apiKeys.get(exchange);
if (!credentials) {
throw new Error(`No credentials found for exchange: ${exchange}`);
}
// Check rate limits
if (!this.rateLimiter.canMakeRequest(exchange)) {
throw new Error(`Rate limit exceeded for ${exchange}`);
}
const apiKey = this.decrypt(credentials.key);
const secret = this.decrypt(credentials.secret);
// Create authenticated request
const signature = this.createSignature(params, secret);
return fetch(endpoint, {
headers: {
'X-API-Key': apiKey,
'X-Signature': signature,
'Content-Type': 'application/json'
},
body: JSON.stringify(params)
});
}
createSignature(params, secret) {
// Implementation depends on exchange requirements
const queryString = Object.keys(params)
.sort()
.map(key => `${key}=${params[key]}`)
.join('&');
// Use HMAC-SHA256 or similar
return this.hmacSha256(queryString, secret);
}
}
```
### Rate Limiting
```javascript
class RateLimiter {
constructor() {
this.limits = new Map();
this.requests = new Map();
}
setLimit(exchange, requestsPerSecond) {
this.limits.set(exchange, {
requests: requestsPerSecond,
window: 1000 // 1 second
});
}
canMakeRequest(exchange) {
const limit = this.limits.get(exchange);
if (!limit) return true;
const now = Date.now();
const requests = this.requests.get(exchange) || [];
// Remove old requests outside the window
const validRequests = requests.filter(time => now - time < limit.window);
if (validRequests.length >= limit.requests) {
return false;
}
// Add current request
validRequests.push(now);
this.requests.set(exchange, validRequests);
return true;
}
getWaitTime(exchange) {
const limit = this.limits.get(exchange);
const requests = this.requests.get(exchange) || [];
if (requests.length === 0) return 0;
const oldestRequest = Math.min(...requests);
const waitTime = limit.window - (Date.now() - oldestRequest);
return Math.max(0, waitTime);
}
}
```
## Testing Framework
### Unit Tests
```javascript
// Example test suite for DataSource
describe('DataSource', () => {
let mockCore, mockState, dataSource;
beforeEach(() => {
mockCore = {
id: 'test-chart',
config: { symbol: 'BTC/USDT' },
on: jest.fn(),
off: jest.fn(),
error: jest.fn(),
log: jest.fn(),
progress: { start: jest.fn(), stop: jest.fn() }
};
mockState = {
core: mockCore,
key: 'test-state',
isEmpty: false,
isActive: true,
mergeData: jest.fn(),
use: jest.fn(),
list: jest.fn(() => [])
};
dataSource = DataSource.create({
symbol: 'BTC/USDT',
timeFrameInit: 3600000
}, mockState);
});
afterEach(() => {
if (dataSource) {
DataSource.delete(dataSource);
}
});
describe('constructor', () => {
it('should create a valid DataSource instance', () => {
expect(dataSource).toBeInstanceOf(DataSource);
expect(dataSource.symbol).toBe('BTC/USDT');
expect(dataSource.timeFrameMS).toBe(3600000);
});
it('should generate unique ID', () => {
const dataSource2 = DataSource.create({
symbol: 'ETH/USDT',
timeFrameInit: 3600000
}, mockState);
expect(dataSource.id).not.toBe(dataSource2.id);
DataSource.delete(dataSource2);
});
});
describe('timeFrame management', () => {
it('should validate timeframes correctly', () => {
expect(dataSource.timeFrameValidate(3600000)).toBe(3600000);
expect(dataSource.timeFrameValidate('1h')).toBe(3600000);
expect(() => dataSource.timeFrameValidate('invalid')).toThrow();
});
it('should switch timeframes', () => {
dataSource.timeFrame = '5m';
expect(dataSource.timeFrameMS).toBe(300000);
expect(dataSource.timeFrameStr).toBe('5m');
});
});
describe('ticker stream', () => {
it('should start ticker stream successfully', () => {
const mockStart = jest.fn();
const mockStop = jest.fn();
dataSource.tickerAdd({
start: mockStart,
stop: mockStop
});
const result = dataSource.tickerStart('BTC/USDT', 3600000);
expect(result).toBe(true);
expect(mockStart).toHaveBeenCalledWith('BTC/USDT', 3600000, expect.any(Function));
});
it('should reject mismatched symbols', () => {
const result = dataSource.tickerStart('ETH/USDT', 3600000);
expect(result).toBe(false);
});
});
describe('history management', () => {
it('should add history functions', () => {
const mockRangeLimitPast = jest.fn(() => Promise.resolve({}));
dataSource.historyAdd({
rangeLimitPast: mockRangeLimitPast
});
expect(mockCore.on).toHaveBeenCalledWith(
'range_limitPast',
expect.any(Function),
dataSource
);
});
it('should validate function parameters', () => {
const invalidFunction = () => {}; // 0 parameters instead of 4
dataSource.historyAdd({
rangeLimitPast: invalidFunction
});
expect(mockCore.error).toHaveBeenCalled();
});
});
});
```
### Integration Tests
```javascript
describe('DataSource Integration', () => {
let chart, dataSource;
beforeEach(async () => {
// Create actual chart instance
chart = new TradeXChart(document.createElement('div'), {
symbol: 'BTC/USDT'
});
await chart.ready;
dataSource = DataSource.create({
symbol: 'BTC/USDT',
timeFrameInit: '1h',
source: {
name: 'test-exchange',
rangeLimitPast: mockFetchHistoricalData,
tickerStream: {
start: mockStartTicker,
stop: mockStopTicker
}
}
}, chart.state);
});
afterEach(() => {
chart.destroy();
});
it('should load historical data on range limit', async () => {
// Simulate scrolling to trigger range limit
chart.emit('range_limitPast', {
startTS: Date.now() - 86400000 // 1 day ago
});
// Wait for data to load
await new Promise(resolve => setTimeout(resolve, 100));
expect(chart.state.data.chart.data.length).toBeGreaterThan(0);
});
it('should handle real-time updates', async () => {
// Start ticker
dataSource.tickerStart('BTC/USDT', '1h');
// Simulate tick data
const mockTick = {
timestamp: Date.now(),
price: 50000,
volume: 1.5
};
dataSource.stream.onTick(mockTick);
// Verify data was processed
expect(chart.state.data.chart.data.length).toBeGreaterThan(0);
});
});
// Mock functions for testing
async function mockFetchHistoricalData(event, symbol, timeframe, timestamp) {
return {
chart: {
data: generateMockCandles(100, timestamp, timeframe)
}
};
}
function mockStartTicker(symbol, timeframe, onTick) {
// Simulate periodic ticks
const interval = setInterval(() => {
onTick({
timestamp: Date.now(),
price: 50000 + Math.random() * 1000,
volume: Math.random() * 10
});
}, 1000);
return interval;
}
function mockStopTicker(interval) {
clearInterval(interval);
}
function generateMockCandles(count, startTime, timeframe) {
const candles = [];
let time = startTime;
let price = 50000;
for (let i = 0; i < count; i++) {
const open = price;
const change = (Math.random() - 0.5) * 1000;
const close = open + change;
const high = Math.max(open, close) + Math.random() * 500;
const low = Math.min(open, close) - Math.random() * 500;
const volume = Math.random() * 100;
candles.push([time, open, high, low, close, volume]);
time += timeframe;
price = close;
}
return candles;
}
```
## Troubleshooting Guide
### Common Issues
#### 1. Data Not Loading
**Symptoms:**
- Chart appears empty
- No error messages
- Network requests not being made
**Diagnosis:**
```javascript
// Check DataSource configuration
console.log('DataSource config:', dataSource.cfg);
console.log('Symbol:', dataSource.symbol);
console.log('TimeFrame:', dataSource.timeFrameStr);
console.log('State active:', dataSource.state.isActive);
// Check if history functions are registered
console.log('Range limit handlers:', {
past: typeof dataSource.source.rangeLimitPast,
future: typeof dataSource.source.rangeLimitFuture
});
```
**Solutions:**
- Verify symbol and timeframe are valid
- Ensure history fetch functions are properly registered
- Check network connectivity and API endpoints
- Verify state is active and not empty
#### 2. Real-time Updates Not Working
**Symptoms:**
- Historical data loads but no live updates
- WebSocket connection issues
- Ticker stream errors
**Diagnosis:**
```javascript
// Check stream status
console.log('Stream active:', dataSource.stream.isActive);
console.log('Ticker functions:', {
start: typeof dataSource.source.tickerStream.start,
stop: typeof dataSource.source.tickerStream.stop
});
// Monitor WebSocket connection
dataSource.source.tickerStream.start = (symbol, tf, onTick) => {
console.log(`Starting ticker for ${symbol} ${tf}`);
const ws = new WebSocket(`wss://api.example.com/stream/${symbol}/${tf}`);
ws.onopen = () => console.log('WebSocket connected');
ws.onclose = () => console.log('WebSocket disconnected');
ws.onerror = (error) => console.error('WebSocket error:', error);
ws.onmessage = (event) => {
console.log('Received tick:', event.data);
onTick(JSON.parse(event.data));
};
return ws;
};
```
**Solutions:**
- Verify WebSocket URL and connection parameters
- Check if ticker start/stop functions are properly implemented
- Ensure onTick callback is being called with correct data format
- Verify firewall/proxy settings allow WebSocket connections
#### 3. Memory Leaks
**Symptoms:**
- Browser memory usage increases over time
- Performance degradation
- Browser becomes unresponsive
**Diagnosis:**
```javascript
// Monitor DataSource instances
console.log('Active DataSources:', DataSource.list().length);
console.log('Memory usage:', performance.memory);
// Check for unclosed streams
const activeStreams = Array.from(DataSource.#sourceList.values())
.filter(ds => ds.stream && ds.stream.isActive);
console.log('Active streams:', activeStreams.length);
```
**Solutions:**
- Always call `DataSource.delete()` when removing charts
- Ensure WebSocket connections are properly closed
- Remove event listeners when switching timeframes
- Implement proper cleanup in custom data providers
#### 4. Performance Issues
**Symptoms:**
- Slow chart rendering
- Delayed data updates
- High CPU usage
**Diagnosis:**
```javascript
// Monitor fetch performance
const metrics = new DataSourceMetrics(dataSource);
console.log('Performance metrics:', metrics.generateReport());
// Check data volume
console.log('Data points:', dataSource.state.data.chart.data.length);
console.log('Update frequency:', dataSource.stream.tickRate);
```
**Solutions:**
- Implement data pagination for large datasets
- Use appropriate timeframes for chart resolution
- Enable data compression for network requests
- Implement efficient caching strategies
### Debug Mode
Enable comprehensive debugging:
```javascript
class DebugDataSource extends DataSource {
constructor(cfg, state) {
super(cfg, state);
this.debug = true;
this.debugLog = [];
}
log(message, data = null) {
if (this.debug) {
const entry = {
timestamp: Date.now(),
message,
data: data ? JSON.stringify(data) : null
};
this.debugLog.push(entry);
console.log(`[DataSource ${this.id}] ${message}`, data);
}
}
timeFrameUse(tf) {
this.log('Switching timeframe', { from: this.timeFrameStr, to: tf });
return super.timeFrameUse(tf);
}
tickerStart(symbol, tf) {
this.log('Starting ticker', { symbol, tf });
return super.tickerStart(symbol, tf);
}
onRangeLimit(event, fn, timestamp) {
this.log('Range limit triggered', { event: event.type, timestamp });
return super.onRangeLimit(event, fn, timestamp);
}
getDebugInfo() {
return {
id: this.id,
symbol: this.symbol,
timeFrame: this.timeFrameStr,
state: this.state.key,
streamActive: this.stream?.isActive || false,
dataPoints: this.state.data?.chart?.data?.length || 0,
debugLog: this.debugLog.slice(-50) // Last 50 entries
};
}
}
```
## Migration Guide
### From Version 1.x to 2.x
#### Breaking Changes
1. **Constructor Parameters**
```javascript
// Old (v1.x)
const dataSource = new DataSource(symbol, timeframe, config);
// New (v2.x)
const dataSource = DataSource.create({
symbol: symbol,
timeFrameInit: timeframe,
source: config
}, state);
```
2. **Event Handler Signatures**
```javascript
// Old (v1.x)
rangeLimitPast: (timestamp) => fetchData(timestamp)
// New (v2.x)
rangeLimitPast: (event, symbol, timeframe, timestamp) => fetchData(symbol, timeframe, timestamp)
```
3. **TimeFrame Format**
```javascript
// Old (v1.x) - String only
dataSource.setTimeFrame('1h');
// New (v2.x) - String or milliseconds
dataSource.timeFrame = '1h';
dataSource.timeFrame = 3600000;
```
#### Migration Steps
1. **Update DataSource Creation**
```javascript
// Before
const dataSource = new DataSource('BTC/USDT', '1h', {
fetchHistory: fetchFunction
});
// After
const dataSource = DataSource.create({
symbol: 'BTC/USDT',
timeFrameInit: '1h',
source: {
name: 'myExchange',
rangeLimitPast: fetchFunction
}
}, chartState);
```
2. **Update Event Handlers**
```javascript
// Before
const fetchHistory = async (timestamp) => {
return await api.getCandles(timestamp);
};
// After
const fetchHistory = async (event, symbol, timeframe, timestamp) => {
return await api.getCandles(symbol, timeframe, timestamp);
};
```
3. **Update Stream Handlers**
```javascript
// Before
dataSource.setStream(startFunction, stopFunction);
// After
dataSource.tickerAdd({
start: startFunction,
stop: stopFunction
});
```
### Compatibility Layer
For gradual migration, use this compatibility wrapper:
```javascript
class LegacyDataSource {
constructor(symbol, timeframe, config) {
console.warn('LegacyDataSource is deprecated. Use DataSource.create() instead.');
// Convert old format to new format
const newConfig = {
symbol,
timeFrameInit: timeframe,
source: {
name: config.name || 'legacy',
rangeLimitPast: this.wrapLegacyFunction(config.fetchHistory),
tickerStream: config.stream ? {
start: config.stream.start,
stop: config.stream.stop
} : undefined
}
};
return DataSource.create(newConfig, config.state);
}
wrapLegacyFunction(legacyFn) {
if (!legacyFn) return undefined;
return (event, symbol, timeframe, timestamp) => {
// Call legacy function with old signature
return legacyFn(timestamp);
};
}
}
```
## Conclusion
The DataSource class provides a comprehensive solution for managing financial data in TradeX charts. Its pull-based architecture, flexible configuration options, and robust error handling make it suitable for a wide range of trading applications.
Key benefits:
- **Agnostic Design**: Works with any data provider or exchange
- **Efficient State Management**: Automatic state switching and data sharing
- **Real-time Capabilities**: Seamless integration of live data streams
- **Performance Optimized**: Built-in caching, batching, and prefetching
- **Developer Friendly**: Comprehensive error handling and debugging tools
For additional support or advanced use cases, refer to the TradeX documentation or community resources.
## Related Documentation
- [TradeXChart Core API](./TradeXChart.md)
- [State Management](./State.md)
- [Stream Processing](./Stream.md)
- [Range Management](./Range.md)
- [Event System](./Events.md)
## Version History
- **v2.1.0**: Added multi-exchange support and performance monitoring
- **v2.0.0**: Major refactor with pull-based architecture
- **v1.5.0**: Added WebSocket streaming support
- **v1.0.0**: Initial release with basic data management
---
*Last updated: 2024*
*Documentation version: 2.1.0*
```