Skip to main content

Scoring Manager - Detailed Design Document

Architecture Version: 3.0 | Last Updated: January 2026 Pattern References: Apache Kafka/Flink Stream Processing, Redis Sorted Sets, Dream11 Real-Time Scoring


1. Executive Overview

The Scoring Manager is the real-time intelligence engine of the Fantasy Service, responsible for processing live match events, calculating player scores, updating team totals, and maintaining live leaderboards. Drawing from production patterns used by Dream11 and MPL, this design implements a stream processing architecture using Apache Kafka and Apache Flink for sub-second score updates at massive scale.

Think of the Scoring Manager like a live sports broadcast scoring system: Every ball bowled, every run scored, every wicket taken is instantly processed and reflected across millions of fantasy teams, with leaderboards updating in real-time.

Key Capabilities:

  • Sub-second score updates using stream processing
  • Redis Sorted Sets for O(log N) leaderboard operations
  • Event-driven architecture with Apache Kafka
  • Configurable scoring rules per sport and contest type
  • Historical score tracking for analytics and disputes
  • Horizontal scaling to handle peak match loads

2. Core Architecture

2.1 High-Level System Design

What this diagram shows: Live match data flows into Kafka, is processed by Apache Flink for stream processing, and the Scoring Engine calculates scores in real-time. Redis Sorted Sets maintain live leaderboards with O(log N) updates, while PostgreSQL stores historical data. Downstream services consume score updates via Kafka.


3. Stream Processing Architecture

public class ScoringPipeline {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

// Configure checkpointing for fault tolerance
env.enableCheckpointing(10000);
env.getCheckpointConfig().setCheckpointingMode(
CheckpointingMode.EXACTLY_ONCE
);

// Source: Kafka match events
KafkaSource<MatchEvent> source = KafkaSource.<MatchEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("match.events")
.setGroupId("scoring-manager")
.setValueOnlyDeserializer(new MatchEventDeserializer())
.build();

DataStream<MatchEvent> events = env.fromSource(
source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)),
"Match Events"
);

// Process events
DataStream<PlayerScore> scores = events
.filter(e -> e.isScoreable())
.keyBy(MatchEvent::getPlayerId)
.process(new ScoreCalculator())
.name("Calculate Player Scores");

// Aggregate by team with 1-second tumbling window
DataStream<TeamScore> teamScores = scores
.keyBy(PlayerScore::getTeamId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.aggregate(new TeamScoreAggregator())
.name("Aggregate Team Scores");

// Sink to Kafka
teamScores.sinkTo(
KafkaSink.<TeamScore>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(new TeamScoreSerializer())
.build()
);

// Sink to Redis for leaderboards
teamScores.addSink(new RedisSink<>(redisConfig, new LeaderboardMapper()));

env.execute("Fantasy Scoring Pipeline");
}
}

4. Scoring Rules Engine

4.1 Rule Configuration

4.2 Rule Engine Implementation

interface ScoringRule {
eventType: MatchEventType;
points: number;
conditions?: RuleCondition[];
bonuses?: BonusRule[];
}

interface ScoringRuleSet {
id: string;
sportType: SportType;
contestType: ContestType;
rules: ScoringRule[];
multipliers: MultiplierConfig;
}

class ScoringRuleEngine {
private ruleCache: Map<string, ScoringRuleSet> = new Map();

async calculatePoints(
event: MatchEvent,
context: ScoringContext
): Promise<PointsResult> {
const ruleSet = await this.getRuleSet(context.contestType);
const rule = ruleSet.rules.find(r => r.eventType === event.type);

if (!rule) return { points: 0, breakdown: [] };

let points = rule.points;
const breakdown: PointBreakdown[] = [{
event: event.type,
basePoints: points,
}];

// Apply conditions
if (rule.conditions) {
for (const condition of rule.conditions) {
if (this.evaluateCondition(condition, event, context)) {
points += condition.bonusPoints;
breakdown.push({
event: condition.name,
basePoints: condition.bonusPoints,
});
}
}
}

// Apply bonuses (milestones)
if (rule.bonuses) {
for (const bonus of rule.bonuses) {
if (this.checkBonus(bonus, event, context)) {
points += bonus.points;
breakdown.push({
event: bonus.name,
basePoints: bonus.points,
});
}
}
}

// Apply captain/vice-captain multiplier
const multiplier = this.getMultiplier(event.playerId, context);
const finalPoints = points * multiplier;

return {
points: finalPoints,
breakdown,
multiplier,
};
}
}

5. Redis Sorted Sets for Leaderboards

5.1 Leaderboard Architecture

5.2 Redis Operations

class LeaderboardManager {
private redis: Redis;

// Update score - O(log N)
async updateScore(
contestId: string,
teamId: string,
score: number
): Promise<void> {
const key = `contest:${contestId}:leaderboard`;

// ZADD with score update
await this.redis.zadd(key, score, teamId);

// Store detailed breakdown
await this.redis.hset(
`contest:${contestId}:scores`,
teamId,
JSON.stringify({ score, updatedAt: Date.now() })
);
}

// Get top N - O(log N + M) where M is count
async getTopN(contestId: string, count: number): Promise<LeaderboardEntry[]> {
const key = `contest:${contestId}:leaderboard`;

// ZREVRANGE with scores (highest first)
const results = await this.redis.zrevrange(key, 0, count - 1, 'WITHSCORES');

return this.parseResults(results);
}

// Get user rank - O(log N)
async getUserRank(contestId: string, teamId: string): Promise<RankInfo> {
const key = `contest:${contestId}:leaderboard`;

// ZREVRANK for 0-based rank (highest first)
const rank = await this.redis.zrevrank(key, teamId);
const score = await this.redis.zscore(key, teamId);
const total = await this.redis.zcard(key);

return {
rank: rank !== null ? rank + 1 : null,
score: score ? parseFloat(score) : 0,
totalParticipants: total,
percentile: rank !== null ? ((total - rank) / total) * 100 : 0,
};
}

// Get entries around user - O(log N + M)
async getAroundUser(
contestId: string,
teamId: string,
range: number = 5
): Promise<LeaderboardEntry[]> {
const key = `contest:${contestId}:leaderboard`;

const rank = await this.redis.zrevrank(key, teamId);
if (rank === null) return [];

const start = Math.max(0, rank - range);
const end = rank + range;

const results = await this.redis.zrevrange(key, start, end, 'WITHSCORES');
return this.parseResults(results);
}

// Batch update for efficiency
async batchUpdateScores(
contestId: string,
updates: Array<{ teamId: string; score: number }>
): Promise<void> {
const key = `contest:${contestId}:leaderboard`;

const pipeline = this.redis.pipeline();

for (const { teamId, score } of updates) {
pipeline.zadd(key, score, teamId);
}

await pipeline.exec();
}
}

6. Score Calculation Flow

6.1 Event Processing Sequence

6.2 Score Aggregation

class ScoreAggregator {
async aggregateTeamScore(
matchId: string,
teamId: string
): Promise<TeamScoreAggregate> {
// Get all player scores for this team
const team = await this.teamRepository.getById(teamId);
const playerScores = await this.getPlayerScores(matchId, team.playerIds);

let totalScore = 0;
const playerBreakdowns: PlayerScoreBreakdown[] = [];

for (const player of team.players) {
const score = playerScores.get(player.id) || 0;

// Apply multiplier
let multiplier = 1;
if (player.id === team.captainId) multiplier = 2;
else if (player.id === team.viceCaptainId) multiplier = 1.5;

const adjustedScore = score * multiplier;
totalScore += adjustedScore;

playerBreakdowns.push({
playerId: player.id,
playerName: player.name,
baseScore: score,
multiplier,
adjustedScore,
isCaptain: player.id === team.captainId,
isViceCaptain: player.id === team.viceCaptainId,
});
}

return {
teamId,
totalScore,
playerBreakdowns,
lastUpdated: new Date(),
};
}
}

7. Real-Time Updates

7.1 WebSocket Broadcasting

7.2 Update Message Format

interface ScoreUpdateMessage {
type: 'SCORE_UPDATE';
contestId: string;
matchId: string;
timestamp: Date;

// Event that triggered update
event: {
type: MatchEventType;
playerId: string;
playerName: string;
description: string;
points: number;
};

// Updated leaderboard snapshot
leaderboard: {
topN: LeaderboardEntry[];
userPosition?: LeaderboardEntry;
aroundUser?: LeaderboardEntry[];
};

// User's team update (if applicable)
userTeam?: {
teamId: string;
totalScore: number;
rank: number;
previousRank: number;
playerScores: PlayerScoreUpdate[];
};
}

8. Score Corrections & Replays

8.1 Correction Flow

8.2 Correction Implementation

class ScoreCorrector {
async applyCorrection(correction: ScoreCorrection): Promise<CorrectionResult> {
const { matchId, playerId, eventId, oldPoints, newPoints } = correction;

// Get all teams with this player
const affectedTeams = await this.teamRepository.findByPlayer(
matchId,
playerId
);

const pointsDelta = newPoints - oldPoints;
const updates: TeamScoreUpdate[] = [];

for (const team of affectedTeams) {
// Calculate multiplier for this player in this team
const multiplier = this.getMultiplier(playerId, team);
const adjustedDelta = pointsDelta * multiplier;

// Update team score
const newScore = team.currentScore + adjustedDelta;

updates.push({
teamId: team.id,
oldScore: team.currentScore,
newScore,
delta: adjustedDelta,
});

// Update Redis leaderboard
await this.leaderboardManager.updateScore(
team.contestId,
team.id,
newScore
);
}

// Record correction in audit log
await this.auditLog.recordCorrection({
correctionId: correction.id,
matchId,
playerId,
eventId,
oldPoints,
newPoints,
affectedTeams: updates.length,
timestamp: new Date(),
adminId: correction.adminId,
});

// Publish correction event
await this.kafka.produce('score.corrected', {
correction,
affectedTeams: updates,
});

return {
success: true,
affectedTeams: updates.length,
updates,
};
}
}

9. Performance Optimization

9.1 Caching Strategy

9.2 Batch Processing

class BatchScoreProcessor {
private batchSize = 1000;
private flushInterval = 100; // ms
private buffer: ScoreUpdate[] = [];
private timer: NodeJS.Timer | null = null;

async addUpdate(update: ScoreUpdate): Promise<void> {
this.buffer.push(update);

if (this.buffer.length >= this.batchSize) {
await this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}

private async flush(): Promise<void> {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}

if (this.buffer.length === 0) return;

const batch = this.buffer.splice(0, this.batchSize);

// Group by contest for efficient Redis updates
const byContest = this.groupByContest(batch);

// Parallel updates to Redis
await Promise.all(
Object.entries(byContest).map(([contestId, updates]) =>
this.leaderboardManager.batchUpdateScores(contestId, updates)
)
);

// Publish batch update event
await this.kafka.produce('scores.batch.updated', {
count: batch.length,
contests: Object.keys(byContest),
timestamp: Date.now(),
});
}
}

10. Data Models

10.1 Entity Relationship Diagram


11. API Specifications

11.1 REST API Endpoints

MethodEndpointDescriptionAuth
GET/api/v3/scores/match/:matchIdGet match scoresPublic
GET/api/v3/scores/player/:playerIdGet player scorePublic
GET/api/v3/scores/team/:teamIdGet team scoreUser
GET/api/v3/leaderboard/:contestIdGet leaderboardPublic
GET/api/v3/leaderboard/:contestId/rank/:teamIdGet team rankUser
POST/api/v3/scores/correctSubmit correctionAdmin

11.2 gRPC Service Definition

service ScoringService {
// Score Queries
rpc GetPlayerScore(GetPlayerScoreRequest) returns (PlayerScore);
rpc GetTeamScore(GetTeamScoreRequest) returns (TeamScore);
rpc GetMatchScores(GetMatchScoresRequest) returns (MatchScores);

// Leaderboard
rpc GetLeaderboard(GetLeaderboardRequest) returns (Leaderboard);
rpc GetUserRank(GetUserRankRequest) returns (RankInfo);
rpc StreamLeaderboard(StreamLeaderboardRequest) returns (stream LeaderboardUpdate);

// Admin
rpc SubmitCorrection(CorrectionRequest) returns (CorrectionResponse);
rpc ReplayScores(ReplayRequest) returns (ReplayResponse);
}

message GetLeaderboardRequest {
string contest_id = 1;
int32 limit = 2;
int32 offset = 3;
string around_team_id = 4; // Optional: get entries around this team
}

message LeaderboardUpdate {
string contest_id = 1;
repeated LeaderboardEntry entries = 2;
int64 timestamp = 3;
MatchEvent triggering_event = 4;
}

12. Monitoring & Observability

12.1 Key Metrics

const scoringMetrics = {
// Processing Metrics
eventsProcessed: new Counter({
name: 'scoring_events_processed_total',
help: 'Total match events processed',
labelNames: ['event_type', 'match_id'],
}),

processingLatency: new Histogram({
name: 'scoring_processing_latency_seconds',
help: 'Event processing latency',
labelNames: ['event_type'],
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25],
}),

// Leaderboard Metrics
leaderboardUpdates: new Counter({
name: 'scoring_leaderboard_updates_total',
help: 'Leaderboard update operations',
labelNames: ['contest_id'],
}),

leaderboardLatency: new Histogram({
name: 'scoring_leaderboard_latency_seconds',
help: 'Leaderboard query latency',
labelNames: ['operation'],
buckets: [0.001, 0.005, 0.01, 0.025, 0.05],
}),

// Flink Metrics
flinkLag: new Gauge({
name: 'scoring_flink_consumer_lag',
help: 'Kafka consumer lag in Flink',
labelNames: ['topic', 'partition'],
}),

checkpointDuration: new Histogram({
name: 'scoring_flink_checkpoint_duration_seconds',
help: 'Flink checkpoint duration',
buckets: [1, 5, 10, 30, 60],
}),
};

12.2 Alert Rules

groups:
- name: scoring-manager-alerts
rules:
- alert: HighProcessingLatency
expr: histogram_quantile(0.99, scoring_processing_latency_seconds) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "Score processing p99 latency > 100ms"

- alert: FlinkConsumerLag
expr: scoring_flink_consumer_lag > 10000
for: 1m
labels:
severity: critical
annotations:
summary: "Flink consumer lag > 10k messages"

- alert: LeaderboardUpdateFailures
expr: rate(scoring_leaderboard_errors_total[5m]) > 0.01
for: 2m
labels:
severity: critical
annotations:
summary: "Leaderboard update failure rate > 1%"

13. Scalability

13.1 Horizontal Scaling Architecture

13.2 Performance Targets

MetricTargetMeasurement
Event Processing Latency (p50)< 10msKafka to Redis
Event Processing Latency (p99)< 50msWorst case
Leaderboard Query (p50)< 5msRedis ZREVRANGE
Leaderboard Query (p99)< 20msWorst case
Events per Second100,000+Peak match load
Concurrent Leaderboards10,000+Active contests

14. Testing Strategy

14.1 Load Testing

describe('Scoring Manager Load Tests', () => {
it('should process 10k events per second', async () => {
const events = generateMatchEvents(10000);
const startTime = Date.now();

await Promise.all(events.map(e => scoringPipeline.process(e)));

const duration = Date.now() - startTime;
expect(duration).toBeLessThan(1000);
});

it('should maintain leaderboard consistency under load', async () => {
const contest = await createTestContest({ entries: 10000 });

// Simulate rapid score updates
const updates = Array.from({ length: 1000 }, () => ({
teamId: randomTeamId(contest),
scoreDelta: Math.random() * 10,
}));

await Promise.all(updates.map(u =>
leaderboardManager.updateScore(contest.id, u.teamId, u.scoreDelta)
));

// Verify leaderboard is sorted correctly
const leaderboard = await leaderboardManager.getTopN(contest.id, 100);

for (let i = 1; i < leaderboard.length; i++) {
expect(leaderboard[i - 1].score).toBeGreaterThanOrEqual(leaderboard[i].score);
}
});
});

15. Future Enhancements

15.1 Roadmap

PhaseFeatureDescription
Q1 2026ML Score PredictionsPredict final scores during match
Q2 2026Real-time AnalyticsLive player performance insights
Q3 2026Multi-Sport ScoringUnified scoring for all sports
Q4 2026Edge ComputingScore calculation at edge nodes

16. Appendix

16.1 Glossary

TermDefinition
Sorted SetRedis data structure for ranked data
Tumbling WindowFixed-size, non-overlapping time window
WatermarkMechanism for handling late events
CheckpointFlink's fault tolerance mechanism

16.2 References


Document Version: 3.0.0 | Last Updated: January 2026