Scoring Manager - Detailed Design Document
Architecture Version: 3.0 | Last Updated: January 2026 Pattern References: Apache Kafka/Flink Stream Processing, Redis Sorted Sets, Dream11 Real-Time Scoring
1. Executive Overview
The Scoring Manager is the real-time intelligence engine of the Fantasy Service, responsible for processing live match events, calculating player scores, updating team totals, and maintaining live leaderboards. Drawing from production patterns used by Dream11 and MPL, this design implements a stream processing architecture using Apache Kafka and Apache Flink for sub-second score updates at massive scale.
Think of the Scoring Manager like a live sports broadcast scoring system: Every ball bowled, every run scored, every wicket taken is instantly processed and reflected across millions of fantasy teams, with leaderboards updating in real-time.
Key Capabilities:
- Sub-second score updates using stream processing
- Redis Sorted Sets for O(log N) leaderboard operations
- Event-driven architecture with Apache Kafka
- Configurable scoring rules per sport and contest type
- Historical score tracking for analytics and disputes
- Horizontal scaling to handle peak match loads
2. Core Architecture
2.1 High-Level System Design
What this diagram shows: Live match data flows into Kafka, is processed by Apache Flink for stream processing, and the Scoring Engine calculates scores in real-time. Redis Sorted Sets maintain live leaderboards with O(log N) updates, while PostgreSQL stores historical data. Downstream services consume score updates via Kafka.
3. Stream Processing Architecture
3.1 Apache Flink Pipeline
3.2 Flink Job Implementation
public class ScoringPipeline {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Configure checkpointing for fault tolerance
env.enableCheckpointing(10000);
env.getCheckpointConfig().setCheckpointingMode(
CheckpointingMode.EXACTLY_ONCE
);
// Source: Kafka match events
KafkaSource<MatchEvent> source = KafkaSource.<MatchEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("match.events")
.setGroupId("scoring-manager")
.setValueOnlyDeserializer(new MatchEventDeserializer())
.build();
DataStream<MatchEvent> events = env.fromSource(
source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
"Match Events"
);
// Process events
DataStream<PlayerScore> scores = events
.filter(e -> e.isScoreable())
.keyBy(MatchEvent::getPlayerId)
.process(new ScoreCalculator())
.name("Calculate Player Scores");
// Aggregate by team with 1-second tumbling window
DataStream<TeamScore> teamScores = scores
.keyBy(PlayerScore::getTeamId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.aggregate(new TeamScoreAggregator())
.name("Aggregate Team Scores");
// Sink to Kafka
teamScores.sinkTo(
KafkaSink.<TeamScore>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(new TeamScoreSerializer())
.build()
);
// Sink to Redis for leaderboards
teamScores.addSink(new RedisSink<>(redisConfig, new LeaderboardMapper()));
env.execute("Fantasy Scoring Pipeline");
}
}
4. Scoring Rules Engine
4.1 Rule Configuration
4.2 Rule Engine Implementation
interface ScoringRule {
eventType: MatchEventType;
points: number;
conditions?: RuleCondition[];
bonuses?: BonusRule[];
}
interface ScoringRuleSet {
id: string;
sportType: SportType;
contestType: ContestType;
rules: ScoringRule[];
multipliers: MultiplierConfig;
}
class ScoringRuleEngine {
private ruleCache: Map<string, ScoringRuleSet> = new Map();
async calculatePoints(
event: MatchEvent,
context: ScoringContext
): Promise<PointsResult> {
const ruleSet = await this.getRuleSet(context.contestType);
const rule = ruleSet.rules.find(r => r.eventType === event.type);
if (!rule) return { points: 0, breakdown: [] };
let points = rule.points;
const breakdown: PointBreakdown[] = [{
event: event.type,
basePoints: points,
}];
// Apply conditions
if (rule.conditions) {
for (const condition of rule.conditions) {
if (this.evaluateCondition(condition, event, context)) {
points += condition.bonusPoints;
breakdown.push({
event: condition.name,
basePoints: condition.bonusPoints,
});
}
}
}
// Apply bonuses (milestones)
if (rule.bonuses) {
for (const bonus of rule.bonuses) {
if (this.checkBonus(bonus, event, context)) {
points += bonus.points;
breakdown.push({
event: bonus.name,
basePoints: bonus.points,
});
}
}
}
// Apply captain/vice-captain multiplier
const multiplier = this.getMultiplier(event.playerId, context);
const finalPoints = points * multiplier;
return {
points: finalPoints,
breakdown,
multiplier,
};
}
}
5. Redis Sorted Sets for Leaderboards
5.1 Leaderboard Architecture
5.2 Redis Operations
class LeaderboardManager {
private redis: Redis;
// Update score - O(log N)
async updateScore(
contestId: string,
teamId: string,
score: number
): Promise<void> {
const key = `contest:${contestId}:leaderboard`;
// ZADD with score update
await this.redis.zadd(key, score, teamId);
// Store detailed breakdown
await this.redis.hset(
`contest:${contestId}:scores`,
teamId,
JSON.stringify({ score, updatedAt: Date.now() })
);
}
// Get top N - O(log N + M) where M is count
async getTopN(contestId: string, count: number): Promise<LeaderboardEntry[]> {
const key = `contest:${contestId}:leaderboard`;
// ZREVRANGE with scores (highest first)
const results = await this.redis.zrevrange(key, 0, count - 1, 'WITHSCORES');
return this.parseResults(results);
}
// Get user rank - O(log N)
async getUserRank(contestId: string, teamId: string): Promise<RankInfo> {
const key = `contest:${contestId}:leaderboard`;
// ZREVRANK for 0-based rank (highest first)
const rank = await this.redis.zrevrank(key, teamId);
const score = await this.redis.zscore(key, teamId);
const total = await this.redis.zcard(key);
return {
rank: rank !== null ? rank + 1 : null,
score: score ? parseFloat(score) : 0,
totalParticipants: total,
percentile: rank !== null ? ((total - rank) / total) * 100 : 0,
};
}
// Get entries around user - O(log N + M)
async getAroundUser(
contestId: string,
teamId: string,
range: number = 5
): Promise<LeaderboardEntry[]> {
const key = `contest:${contestId}:leaderboard`;
const rank = await this.redis.zrevrank(key, teamId);
if (rank === null) return [];
const start = Math.max(0, rank - range);
const end = rank + range;
const results = await this.redis.zrevrange(key, start, end, 'WITHSCORES');
return this.parseResults(results);
}
// Batch update for efficiency
async batchUpdateScores(
contestId: string,
updates: Array<{ teamId: string; score: number }>
): Promise<void> {
const key = `contest:${contestId}:leaderboard`;
const pipeline = this.redis.pipeline();
for (const { teamId, score } of updates) {
pipeline.zadd(key, score, teamId);
}
await pipeline.exec();
}
}
6. Score Calculation Flow
6.1 Event Processing Sequence
6.2 Score Aggregation
class ScoreAggregator {
async aggregateTeamScore(
matchId: string,
teamId: string
): Promise<TeamScoreAggregate> {
// Get all player scores for this team
const team = await this.teamRepository.getById(teamId);
const playerScores = await this.getPlayerScores(matchId, team.playerIds);
let totalScore = 0;
const playerBreakdowns: PlayerScoreBreakdown[] = [];
for (const player of team.players) {
const score = playerScores.get(player.id) || 0;
// Apply multiplier
let multiplier = 1;
if (player.id === team.captainId) multiplier = 2;
else if (player.id === team.viceCaptainId) multiplier = 1.5;
const adjustedScore = score * multiplier;
totalScore += adjustedScore;
playerBreakdowns.push({
playerId: player.id,
playerName: player.name,
baseScore: score,
multiplier,
adjustedScore,
isCaptain: player.id === team.captainId,
isViceCaptain: player.id === team.viceCaptainId,
});
}
return {
teamId,
totalScore,
playerBreakdowns,
lastUpdated: new Date(),
};
}
}
7. Real-Time Updates
7.1 WebSocket Broadcasting
7.2 Update Message Format
interface ScoreUpdateMessage {
type: 'SCORE_UPDATE';
contestId: string;
matchId: string;
timestamp: Date;
// Event that triggered update
event: {
type: MatchEventType;
playerId: string;
playerName: string;
description: string;
points: number;
};
// Updated leaderboard snapshot
leaderboard: {
topN: LeaderboardEntry[];
userPosition?: LeaderboardEntry;
aroundUser?: LeaderboardEntry[];
};
// User's team update (if applicable)
userTeam?: {
teamId: string;
totalScore: number;
rank: number;
previousRank: number;
playerScores: PlayerScoreUpdate[];
};
}
8. Score Corrections & Replays
8.1 Correction Flow
8.2 Correction Implementation
class ScoreCorrector {
async applyCorrection(correction: ScoreCorrection): Promise<CorrectionResult> {
const { matchId, playerId, eventId, oldPoints, newPoints } = correction;
// Get all teams with this player
const affectedTeams = await this.teamRepository.findByPlayer(
matchId,
playerId
);
const pointsDelta = newPoints - oldPoints;
const updates: TeamScoreUpdate[] = [];
for (const team of affectedTeams) {
// Calculate multiplier for this player in this team
const multiplier = this.getMultiplier(playerId, team);
const adjustedDelta = pointsDelta * multiplier;
// Update team score
const newScore = team.currentScore + adjustedDelta;
updates.push({
teamId: team.id,
oldScore: team.currentScore,
newScore,
delta: adjustedDelta,
});
// Update Redis leaderboard
await this.leaderboardManager.updateScore(
team.contestId,
team.id,
newScore
);
}
// Record correction in audit log
await this.auditLog.recordCorrection({
correctionId: correction.id,
matchId,
playerId,
eventId,
oldPoints,
newPoints,
affectedTeams: updates.length,
timestamp: new Date(),
adminId: correction.adminId,
});
// Publish correction event
await this.kafka.produce('score.corrected', {
correction,
affectedTeams: updates,
});
return {
success: true,
affectedTeams: updates.length,
updates,
};
}
}
9. Performance Optimization
9.1 Caching Strategy
9.2 Batch Processing
class BatchScoreProcessor {
private batchSize = 1000;
private flushInterval = 100; // ms
private buffer: ScoreUpdate[] = [];
private timer: NodeJS.Timer | null = null;
async addUpdate(update: ScoreUpdate): Promise<void> {
this.buffer.push(update);
if (this.buffer.length >= this.batchSize) {
await this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
private async flush(): Promise<void> {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.buffer.length === 0) return;
const batch = this.buffer.splice(0, this.batchSize);
// Group by contest for efficient Redis updates
const byContest = this.groupByContest(batch);
// Parallel updates to Redis
await Promise.all(
Object.entries(byContest).map(([contestId, updates]) =>
this.leaderboardManager.batchUpdateScores(contestId, updates)
)
);
// Publish batch update event
await this.kafka.produce('scores.batch.updated', {
count: batch.length,
contests: Object.keys(byContest),
timestamp: Date.now(),
});
}
}
10. Data Models
10.1 Entity Relationship Diagram
11. API Specifications
11.1 REST API Endpoints
| Method | Endpoint | Description | Auth |
|---|---|---|---|
| GET | /api/v3/scores/match/:matchId | Get match scores | Public |
| GET | /api/v3/scores/player/:playerId | Get player score | Public |
| GET | /api/v3/scores/team/:teamId | Get team score | User |
| GET | /api/v3/leaderboard/:contestId | Get leaderboard | Public |
| GET | /api/v3/leaderboard/:contestId/rank/:teamId | Get team rank | User |
| POST | /api/v3/scores/correct | Submit correction | Admin |
11.2 gRPC Service Definition
service ScoringService {
// Score Queries
rpc GetPlayerScore(GetPlayerScoreRequest) returns (PlayerScore);
rpc GetTeamScore(GetTeamScoreRequest) returns (TeamScore);
rpc GetMatchScores(GetMatchScoresRequest) returns (MatchScores);
// Leaderboard
rpc GetLeaderboard(GetLeaderboardRequest) returns (Leaderboard);
rpc GetUserRank(GetUserRankRequest) returns (RankInfo);
rpc StreamLeaderboard(StreamLeaderboardRequest) returns (stream LeaderboardUpdate);
// Admin
rpc SubmitCorrection(CorrectionRequest) returns (CorrectionResponse);
rpc ReplayScores(ReplayRequest) returns (ReplayResponse);
}
message GetLeaderboardRequest {
string contest_id = 1;
int32 limit = 2;
int32 offset = 3;
string around_team_id = 4; // Optional: get entries around this team
}
message LeaderboardUpdate {
string contest_id = 1;
repeated LeaderboardEntry entries = 2;
int64 timestamp = 3;
MatchEvent triggering_event = 4;
}
12. Monitoring & Observability
12.1 Key Metrics
const scoringMetrics = {
// Processing Metrics
eventsProcessed: new Counter({
name: 'scoring_events_processed_total',
help: 'Total match events processed',
labelNames: ['event_type', 'match_id'],
}),
processingLatency: new Histogram({
name: 'scoring_processing_latency_seconds',
help: 'Event processing latency',
labelNames: ['event_type'],
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25],
}),
// Leaderboard Metrics
leaderboardUpdates: new Counter({
name: 'scoring_leaderboard_updates_total',
help: 'Leaderboard update operations',
labelNames: ['contest_id'],
}),
leaderboardLatency: new Histogram({
name: 'scoring_leaderboard_latency_seconds',
help: 'Leaderboard query latency',
labelNames: ['operation'],
buckets: [0.001, 0.005, 0.01, 0.025, 0.05],
}),
// Flink Metrics
flinkLag: new Gauge({
name: 'scoring_flink_consumer_lag',
help: 'Kafka consumer lag in Flink',
labelNames: ['topic', 'partition'],
}),
checkpointDuration: new Histogram({
name: 'scoring_flink_checkpoint_duration_seconds',
help: 'Flink checkpoint duration',
buckets: [1, 5, 10, 30, 60],
}),
};
12.2 Alert Rules
groups:
- name: scoring-manager-alerts
rules:
- alert: HighProcessingLatency
expr: histogram_quantile(0.99, scoring_processing_latency_seconds) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "Score processing p99 latency > 100ms"
- alert: FlinkConsumerLag
expr: scoring_flink_consumer_lag > 10000
for: 1m
labels:
severity: critical
annotations:
summary: "Flink consumer lag > 10k messages"
- alert: LeaderboardUpdateFailures
expr: rate(scoring_leaderboard_errors_total[5m]) > 0.01
for: 2m
labels:
severity: critical
annotations:
summary: "Leaderboard update failure rate > 1%"
13. Scalability
13.1 Horizontal Scaling Architecture
13.2 Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| Event Processing Latency (p50) | < 10ms | Kafka to Redis |
| Event Processing Latency (p99) | < 50ms | Worst case |
| Leaderboard Query (p50) | < 5ms | Redis ZREVRANGE |
| Leaderboard Query (p99) | < 20ms | Worst case |
| Events per Second | 100,000+ | Peak match load |
| Concurrent Leaderboards | 10,000+ | Active contests |
14. Testing Strategy
14.1 Load Testing
describe('Scoring Manager Load Tests', () => {
it('should process 10k events per second', async () => {
const events = generateMatchEvents(10000);
const startTime = Date.now();
await Promise.all(events.map(e => scoringPipeline.process(e)));
const duration = Date.now() - startTime;
expect(duration).toBeLessThan(1000);
});
it('should maintain leaderboard consistency under load', async () => {
const contest = await createTestContest({ entries: 10000 });
// Simulate rapid score updates
const updates = Array.from({ length: 1000 }, () => ({
teamId: randomTeamId(contest),
scoreDelta: Math.random() * 10,
}));
await Promise.all(updates.map(u =>
leaderboardManager.updateScore(contest.id, u.teamId, u.scoreDelta)
));
// Verify leaderboard is sorted correctly
const leaderboard = await leaderboardManager.getTopN(contest.id, 100);
for (let i = 1; i < leaderboard.length; i++) {
expect(leaderboard[i - 1].score).toBeGreaterThanOrEqual(leaderboard[i].score);
}
});
});
15. Future Enhancements
15.1 Roadmap
| Phase | Feature | Description |
|---|---|---|
| Q1 2026 | ML Score Predictions | Predict final scores during match |
| Q2 2026 | Real-time Analytics | Live player performance insights |
| Q3 2026 | Multi-Sport Scoring | Unified scoring for all sports |
| Q4 2026 | Edge Computing | Score calculation at edge nodes |
16. Appendix
16.1 Glossary
| Term | Definition |
|---|---|
| Sorted Set | Redis data structure for ranked data |
| Tumbling Window | Fixed-size, non-overlapping time window |
| Watermark | Mechanism for handling late events |
| Checkpoint | Flink's fault tolerance mechanism |
16.2 References
Document Version: 3.0.0 | Last Updated: January 2026