Compare commits

...

10 Commits

Author SHA1 Message Date
seppedl 3341c3922a [update] Documentation for heuristic, fork detection, and playable threats.
Updated Background information.md, Achtergrondinformatie.md, and
README.md to describe the improved AI: playable vs non-playable
threat scoring, fork detection bonus, and the split Phase 1 strategy.
README now lists all three implementations and the AI strategy section.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 17:00:10 +01:00
seppedl b27032762e [fix] Add heuristic evaluation, fork detection, and Phase 1 win/block split to AI.
Minimax leaf nodes now return a positional score instead of 0, using
playable-threat detection (±100), non-playable threats (±40), fork
bonus (±200), two-in-a-row (±5), and center control (±3). Phase 1
is split into two passes so the AI never blocks when it can win.
Game sequence is now auto-logged to the browser console on game end.
Applied to all three implementations (C++, JS, Python).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 16:59:55 +01:00
seppedl 54bae2faf5 [update] Documentation. 2026-03-27 13:41:47 +01:00
seppedl 1c370f80a6 [fix] Add heuristic to Python code. 2026-03-27 13:29:07 +01:00
seppedl 025f0457c7 [fix] Non heuristic moves... 2026-03-27 12:17:25 +01:00
seppedl 223fc91b19 [add] Javascript version for Matthis and Jef. 2026-03-21 16:41:39 +01:00
seppedl d5345c6cee {add] Python TEXT game for Jef! 2026-03-21 16:27:12 +01:00
seppedl 3257d40722 [update] Background information and Dutch translation with blunder mode section.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 16:03:48 +01:00
seppedl f9d100f918 [update] README for blunder mode, WIFI_SSID, and DEMO_RESET_PAUSE build flags.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 16:03:46 +01:00
seppedl 0fc20da274 [refactor] Replace hardcoded values with build flags and cleanup duplicates.
- Add #ifndef guards for pin defines duplicated between platformio.ini and main.cpp
- Use DEFAULT_LOOK_AHEAD, DEFAULT_BRIGHTNESS, DEFAULT_IDLE_TIMEOUT, DEMO_RESET_PAUSE
  build flags instead of hardcoded magic numbers
- Add WIFI_SSID build flag for configurable access point name
- Remove unused build flags (BRIGHTNESS, IDLE_TIMEOUT, DEBOUNCE_DELAY)
- Remove progressive difficulty / evolution feature (getDynamicPly)
- Replace goto with structured control flow in performAiMove
- Deduplicate checkGameEnd win/draw branches
- Implement blunder mode: configurable chance (%) to pick a random column,
  preserving instant win/block detection

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 16:03:41 +01:00
24 changed files with 2898 additions and 95 deletions
+11
View File
@@ -0,0 +1,11 @@
# AI Settings
LOOK_AHEAD=8
BLUNDER_ENABLED=false
BLUNDER_CHANCE=20
# Demo Settings
DEMO_RESET_PAUSE=5
IDLE_TIMEOUT=60
# Game Log
MAX_GAME_LOG=100
+5
View File
@@ -5,3 +5,8 @@
.vscode/ipch
.vscode/settings.json
CLAUDE.md
.venv/
__pycache__/
*.pyc
.games.txt
uv.lock
+1
View File
@@ -0,0 +1 @@
3.13
+37 -11
View File
@@ -57,22 +57,38 @@ De AI kiest dan de kolom met de hoogste score die overblijft, in dit geval kolom
### Scoren: Hoe de AI een Bord Waardeert
Na het doorspelen van een "wat als?"-scenario, moet de AI beslissen: is dit een goed of slecht resultaat? Hij gebruikt een eenvoudig scoringsysteem met drie mogelijke uitkomsten:
Na het doorspelen van een "wat als?"-scenario, moet de AI beslissen: is dit een goed of slecht resultaat? Hij gebruikt een gelaagd scoringsysteem:
- **+1000 of meer: "Ik win!"** De AI heeft een manier gevonden om vier op een rij te krijgen. Hoe sneller hij kan winnen, hoe hoger de score. Winnen in 2 zetten scoort hoger dan winnen in 6 zetten. Daarom gaat de AI altijd voor de snelste overwinning.
- **-1000 of minder: "Ik verlies!"** De tegenstander krijgt vier op een rij. Hoe sneller hij verliest, hoe slechter de score. Dit zorgt ervoor dat de AI het hardst vecht tegen zetten die dreigen tot een direct verlies.
- **0: "Ik weet het nog niet."** De AI heeft zo ver vooruit gekeken als hij kon (hij is door zijn plies heen) en niemand heeft gewonnen. Hij noemt deze positie "neutraal" — niet goed, niet slecht.
- **Heuristiek-score: "Ik weet het nog niet, maar ik kan zien hoe goed het eruitziet."** Als de AI zo ver vooruit heeft gekeken als hij kon (door zijn plies heen) en niemand heeft gewonnen, beoordeelt hij de positie met een heuristiek — een snelle schatting van wie er sterker voor staat.
Dat is alles — de AI geeft geen extra punten voor drie op een rij, het controleren van het midden, of andere slimme trucs. Hij vertrouwt volledig op het ver vooruit kijken om te bepalen welke zetten tot een overwinning leiden en welke niet. Als hij binnen zijn zoekdiepte geen winst of verlies ziet, ziet elke positie er hetzelfde uit.
### De Heuristiek: Het Bord Lezen
In plaats van elke onbesliste positie "neutraal" te noemen, bekijkt de AI elke mogelijke groep van vier opeenvolgende cellen op het bord (horizontaal, verticaal en beide diagonalen — 69 groepen in totaal). Voor elke groep telt hij de schijfjes:
- **3 AI-schijfjes + 1 leeg (speelbaar):** De lege cel kan nu meteen gevuld worden (hij zit op de onderste rij of er zit een schijfje onder). Dit is een directe dreiging. Score: **+100**.
- **3 AI-schijfjes + 1 leeg (nog niet speelbaar):** De lege cel zweeft in de lucht — de dreiging bestaat maar kan nog niet benut worden. Score: **+40**.
- **2 AI-schijfjes + 2 leeg:** Een veelbelovende opbouw die zich tot een dreiging kan ontwikkelen. Score: **+5**.
- **3 tegenstander-schijfjes + 1 leeg (speelbaar):** Een direct gevaar. Score: **-100**.
- **3 tegenstander-schijfjes + 1 leeg (nog niet speelbaar):** Een toekomstig gevaar. Score: **-40**.
- **2 tegenstander-schijfjes + 2 leeg:** De tegenstander bouwt iets op. Score: **-5**.
- **Gemengde groepen** (beide spelers hebben schijfjes in dezelfde groep): Geblokkeerd — niemand kan hier winnen. Score: **0**.
Daarbovenop gebruikt de AI twee extra scorebonussen:
- **Controle over de middelste kolom:** +3 per AI-schijfje in de middelste kolom, -3 per tegenstander-schijfje. De middelste kolom is betrokken bij meer winnende lijnen dan elke andere kolom, dus het beheersen ervan is waardevol.
- **Vorkdetectie:** Als een speler **twee of meer** drie-op-een-rij dreigingen tegelijk heeft, is dat een vork — de tegenstander kan er maar één per beurt blokkeren, dus de andere wint het spel. De AI geeft een grote bonus (**+200** of **-200**) wanneer hij een vork detecteert, waardoor hij agressief vork-opstellingen najaagt en wanhopig probeert te voorkomen dat de tegenstander er een maakt.
Al deze scores tellen bij elkaar op. De maximale heuristiek-score ligt ruim onder 1000, dus het verstoort nooit de echte winst/verlies-detectie — een gegarandeerde winst wint altijd van de beste heuristiek-positie.
Deze heuristiek betekent dat de AI nu het verschil kan zien tussen een sterke positie (veel dreigingen in opbouw, vooral speelbare) en een zwakke (de tegenstander heeft alle dreigingen), zelfs als hij geen gedwongen winst of verlies kan zien binnen zijn zoekdiepte.
### Waarom de middelste kolom belangrijk is
Ook al geeft de AI geen bonuspunten voor spelen in het midden,
hij controleert altijd eerst de middelste kolom (kolom 3), en werkt dan naar buiten toe (2, 4, 1, 5, 0, 6).
De middelste kolom is betrokken bij meer mogelijke winnende lijnen dan de randen, dus door deze eerst te controleren,
vindt de AI sneller goede zetten en kan hij slechte zetten eerder overslaan (dankzij alpha-beta snoeien).
De AI controleert altijd eerst de middelste kolom (kolom 3), en werkt dan naar buiten toe (2, 4, 1, 5, 0, 6). De middelste kolom is betrokken bij meer mogelijke winnende lijnen dan de randen, dus door deze eerst te controleren, vindt de AI sneller goede zetten en kan hij slechte zetten eerder overslaan (dankzij alpha-beta snoeien). De heuristiek geeft ook een kleine bonus voor controle over het midden, wat dit natuurlijke voordeel versterkt.
---
@@ -114,9 +130,11 @@ Goede zetten zitten vaak in het midden, dus door deze eerst te controleren, leid
De AI doet zijn werk in drie stappen:
1. **Kan ik nu winnen?** De AI probeert in elke kolom een schijfje te leggen. Als hij ergens vier op een rij kan maken, doet hij dat meteen. Geen verdere berekeningen nodig.
2. **Kan de tegenstander volgende beurt winnen?** De AI controleert of jij ergens vier op een rij kunt maken. Zo ja, dan blokkeert hij die kolom. Dit overslaan zou een grote fout zijn.
3. **Diepe zoektocht.** Als er geen directe winst of bedreiging is, voert de AI de volledige minimax-strategie uit met alpha-beta snoeien.
1. **Kan ik nu winnen?** De AI controleert **alle** kolommen op een winnende zet. Als hij ergens vier op een rij kan maken, doet hij dat meteen. Geen verdere berekeningen nodig. Belangrijk: de AI controleert eerst elke kolom op eigen winst voordat hij naar dreigingen kijkt — zo blokkeert hij nooit per ongeluk een dreiging van de tegenstander als hij zelf het spel kan winnen.
2. **Kan de tegenstander volgende beurt winnen?** Pas nadat is bevestigd dat er geen directe winst is, controleert de AI alle kolommen op dreigingen van de tegenstander. Als jij ergens vier op een rij kunt maken, blokkeert hij die kolom. Dit overslaan zou een grote fout zijn.
3. **Diepe zoektocht.** Als er geen directe winst of bedreiging is, voert de AI de volledige minimax-strategie uit met alpha-beta snoeien en de heuristiek-evaluatie.
Deze drie stappen maken de AI zowel snel (directe reacties op duidelijke zetten) als slim (diep nadenken als het nodig is).
@@ -131,7 +149,15 @@ De sterkere speler kan zo winnende zetten vinden die de zwakkere mist. Wie sterk
---
## 7. Snelle Bediening
## 7. Blunder-modus
Normaal speelt de AI altijd de beste zet die hij kan vinden. Maar dat kan frustrerend zijn voor jongere of minder ervaren spelers die nooit winnen. De **blunder-modus** geeft de AI een instelbare kans (bijvoorbeeld 20%) om een willekeurige zet te doen in plaats van diep na te denken. Als er een blunder gebeurt, slaat de AI zijn slimme analyse over en laat hij een schijfje in een willekeurige open kolom vallen. De rest van de tijd speelt hij gewoon op volle kracht — maar af en toe maakt hij een domme fout die een oplettende speler kan afstraffen.
Blunders gaan nooit boven een directe winst of blokkade. Als de AI nu kan winnen, of als de tegenstander op het punt staat te winnen, maakt de AI altijd de juiste zet. Blunders vervangen alleen de diepe zoektocht op beurten waar er geen directe dreiging is.
---
## 8. Snelle Bediening
De ESP32-C3 heeft maar één kern. Als de AI nadenkt, kan hij de bediening een paar seconden blokkeren.
Twee trucs zorgen ervoor dat het spel soepel blijft:
+33 -9
View File
@@ -55,20 +55,38 @@ The AI compares -3, +2, and -1, and picks column 3 because +2 is the best it can
### Scoring: How the AI Rates a Board
After playing out a "what if?" scenario, the AI needs to decide: is this a good result or a bad one? It uses a very simple scoring system with only three possible outcomes:
After playing out a "what if?" scenario, the AI needs to decide: is this a good result or a bad one? It uses a layered scoring system:
- **+1000 or more: "I win!"** The AI found a way to get four in a row. The bonus points above 1000 depend on how quickly it can win. Winning in 2 moves scores higher than winning in 6 moves. This is why the AI always goes for the fastest victory — it never wastes time when it can finish the game.
- **-1000 or less: "I lose!"** The opponent gets four in a row. Losing sooner gets an even worse score. This makes the AI fight hardest against moves that threaten an immediate loss.
- **0: "I don't know yet."** The AI looked as far ahead as it could (it ran out of plies) and nobody won. It simply calls this position "neutral" — not good, not bad.
- **Heuristic score: "I don't know yet, but I can tell how good this looks."** When the AI has looked as far ahead as it can (it ran out of plies) and nobody has won, it evaluates the position using a heuristic — a quick estimate of who is in a stronger position.
That's it — the AI does not give extra points for having three in a row, controlling the center, or any other clever trick. It relies entirely on looking many moves ahead to figure out which moves lead to wins and which ones don't. If it can't see a win or loss within its search depth, every position looks the same.
### The Heuristic: Reading the Board
Instead of calling every unsolved position "neutral," the AI examines every possible group of four consecutive cells on the board (horizontal, vertical, and both diagonals — 69 groups in total). For each group, it counts pieces:
- **3 AI pieces + 1 empty (playable):** The empty cell can be filled right now (it's on the bottom row or has a piece below it). This is an immediate threat. Score: **+100**.
- **3 AI pieces + 1 empty (not yet playable):** The empty cell is floating in the air — the threat exists but can't be used yet. Score: **+40**.
- **2 AI pieces + 2 empty:** A promising setup that could develop into a threat. Score: **+5**.
- **3 opponent pieces + 1 empty (playable):** An immediate danger. Score: **-100**.
- **3 opponent pieces + 1 empty (not yet playable):** A future danger. Score: **-40**.
- **2 opponent pieces + 2 empty:** The opponent is building something. Score: **-5**.
- **Mixed groups** (both players have pieces in the same group): Blocked — nobody can win here. Score: **0**.
On top of that, the AI uses two more scoring bonuses:
- **Center column control:** +3 per AI piece in the center column, -3 per opponent piece. The center column is involved in more winning lines than any other column, so controlling it is valuable.
- **Fork detection:** If a player has **two or more** three-in-a-row threats at the same time, that's a fork — the opponent can only block one per turn, so the other wins the game. The AI adds a large bonus (**+200** or **-200**) when it detects a fork, making it aggressively pursue fork setups and desperately avoid letting the opponent create one.
All these scores add up. The maximum possible heuristic score is well below 1000, so it never interferes with actual win/loss detection — a guaranteed win always beats the best heuristic position.
This heuristic means the AI can now tell the difference between a strong position (many threats being built, especially playable ones) and a weak one (the opponent has all the threats), even when it can't see a forced win or loss within its search depth.
### Why the center column matters
Even though the AI doesn't give bonus points for playing in the center, it always checks the center column first (column 3), then works outward (2, 4, 1, 5, 0, 6).
The center column is involved in more possible winning lines than the edges, so checking it first helps the AI find good moves faster and skip bad ones sooner (thanks to alpha-beta pruning).
The AI always checks the center column first (column 3), then works outward (2, 4, 1, 5, 0, 6). The center column is involved in more possible winning lines than the edges, so checking it first helps the AI find good moves faster and skip bad ones sooner (thanks to alpha-beta pruning). The heuristic also gives a small bonus for center control, reinforcing this natural advantage.
## 4. Alpha-Beta Pruning: The Smart Shortcut
@@ -100,11 +118,11 @@ In practice, pruning lets the AI skip 50-90% of the positions it would otherwise
Before running the expensive minimax search, the AI takes two quick shortcuts:
1. **Can I win right now?** The AI tries placing its disc in each column. If any column completes four in a row, it takes that move immediately. No need to think further.
1. **Can I win right now?** The AI checks **all** columns for a winning move. If any column completes four in a row, it takes that move immediately. No need to think further. Importantly, the AI scans every column for its own win before checking for threats — this ensures it never accidentally blocks an opponent's threat when it could win the game outright.
2. **Can my opponent win next turn?** The AI checks if the opponent could win by playing in any column. If so, it blocks that column. Missing this would be a fatal mistake.
2. **Can my opponent win next turn?** Only after confirming there is no instant win, the AI checks all columns for opponent threats. If the opponent could win by playing in any column, the AI blocks it. Missing this would be a fatal mistake.
3. **Deep search.** Only if there are no immediate wins or threats does the AI run the full minimax search with alpha-beta pruning.
3. **Deep search.** Only if there are no immediate wins or threats does the AI run the full minimax search with alpha-beta pruning and the heuristic evaluation.
This three-phase approach makes the AI both fast (instant reactions to obvious moves) and smart (deep strategic thinking when needed).
@@ -112,7 +130,13 @@ This three-phase approach makes the AI both fast (instant reactions to obvious m
In demo mode, two AI players play against each other. To make the games interesting (rather than always ending in a draw), each player is randomly assigned a different search depth. One player might look 5 moves ahead while the other only looks 3 moves ahead. The stronger player can find winning setups that the weaker one misses, leading to exciting games with real winners. Who gets the advantage is randomized each game.
## 7. Responsive Controls
## 7. Blunder Mode
Normally, the AI always plays the best move it can find. But that can be frustrating for younger or casual players who never get to win. **Blunder mode** gives the AI a configurable chance (for example 20%) to make a random move instead of running the deep minimax search. When a blunder happens, the AI simply drops a disc in a random open column. It still plays normally the rest of the time, so the game feels real - but every now and then the AI makes a silly mistake that a sharp player can punish.
Blunders never override an instant win or block. If the AI can win right now, or if the opponent is about to win, the AI always makes the correct move. Blunders only replace the deep search on turns where there is no immediate threat.
## 8. Responsive Controls
The ESP32-C3 is a single-core processor. When the AI is thinking, it could block all input for several seconds. Two techniques keep the game responsive:
+31 -6
View File
@@ -53,7 +53,7 @@ When idle (no input for the configured timeout), the board enters demo mode wher
The ESP32 creates a WiFi access point:
- **Network:** `Connect4-Config`
- **Network:** Configured via `WIFI_SSID` build flag (default: `Connect4`)
- **Password:** Configured via `WIFI_PASSWORD` build flag (default: `youlose4`)
- **Admin page:** Connect to the network and open `http://192.168.4.1`
@@ -64,8 +64,7 @@ The ESP32 creates a WiFi access point:
| **Base AI Ply** | Search depth for the AI (1-10). Higher = stronger. |
| **Brightness** | LED brightness (0-255). |
| **Idle Timeout** | Seconds of inactivity before demo mode starts. |
| **Blunders** | Reserved for future use. |
| **Evolution** | Progressive difficulty: AI gets stronger as game goes on.|
| **Blunders** | AI randomly picks a bad move at the configured chance %. |
Settings are saved to flash (NVS) and persist across reboots.
@@ -100,7 +99,7 @@ pio device monitor
All configurable parameters are defined as `-D` flags in `platformio.ini`:
| Flag | Default | Description |
| :--------------------- | :------ | :--------------------------------------------- |
| :--------------------- | :--------- | :------------------------------------------------- |
| `LED_PIN` | `4` | GPIO pin for NeoPixel data line |
| `ENC_A` | `0` | GPIO pin for encoder CLK |
| `ENC_B` | `1` | GPIO pin for encoder DT |
@@ -110,15 +109,41 @@ All configurable parameters are defined as `-D` flags in `platformio.ini`:
| `DEFAULT_LOOK_AHEAD` | `8` | Default AI search depth (plies) |
| `DEFAULT_BRIGHTNESS` | `25` | Default LED brightness (0-255) |
| `DEFAULT_IDLE_TIMEOUT` | `45` | Seconds before demo mode activates |
| `DEMO_RESET_PAUSE` | `30000` | Milliseconds before finished game enters demo |
| `MAX_GAME_LOG` | `5` | Number of games stored in the game log |
| `WIFI_SSID` | `Connect4` | SSID for the WiFi access point |
| `WIFI_PASSWORD` | `youlose4` | Password for the WiFi access point |
## AI Strategy
The AI uses **minimax with alpha-beta pruning** and a **heuristic evaluation function**. Moves are selected in three phases:
1. **Instant win/block** — scan all columns for an immediate win first, then for an opponent threat to block.
2. **Blunder** (optional) — random move at a configurable chance, skipping the deep search.
3. **Deep minimax search** — full tree search with alpha-beta pruning up to the configured ply depth.
The heuristic evaluates leaf nodes by scoring all 69 possible four-cell windows on the board:
- **Playable threats** (3-in-a-row where the gap can be filled now): ±100
- **Non-playable threats** (gap is floating in the air): ±40
- **Two-in-a-row setups**: ±5
- **Center column control**: ±3 per piece
- **Fork bonus** (2+ simultaneous three-in-a-row threats): ±200
See `Background information.md` / `Achtergrondinformatie.md` for a detailed explanation accessible to all ages.
## Project Structure
```
src/main.cpp Single-file application (all game logic, AI, LED, web server)
src/main.cpp ESP32 application (game logic, AI, LED, web server)
connect_four.js JavaScript browser edition (canvas rendering)
connect_four.html HTML wrapper for the JavaScript version
connect_four.py Python terminal edition (Rich TUI)
platformio.ini Build configuration, pin mappings, and tunable parameters
README.md This file - technical and practical information
Background information.md How the AI works (suitable for all ages)
Background information.md How the AI works (English, suitable for all ages)
Achtergrondinformatie.md How the AI works (Dutch, suitable for all ages)
CLAUDE.md AI assistant project context
```
All three implementations (C++, JavaScript, Python) share the same AI algorithm and heuristic.
+28
View File
@@ -0,0 +1,28 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Connect Four</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
background: #1a1a2e;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
font-family: system-ui, -apple-system, sans-serif;
}
canvas {
max-width: 100vw;
max-height: 100vh;
cursor: pointer;
}
</style>
</head>
<body>
<canvas id="gameCanvas"></canvas>
<script src="connect_four.js"></script>
</body>
</html>
+871
View File
@@ -0,0 +1,871 @@
/* ============================================================
* Connect Four — Browser Edition
* A single-file game: AI (minimax + alpha-beta + heuristic), demo mode,
* game log (localStorage), blunder mode, idle timeout.
*
* Include this script in an HTML page that has:
* <canvas id="gameCanvas"></canvas>
*
* Works in Firefox, Chrome, Edge, Safari, and Brave.
* ============================================================ */
// --- Configurable Parameters --------------------------------
const COLS = 7; // board columns
const ROWS = 6; // board rows
const LOOK_AHEAD = 8; // AI search depth (plies)
const BLUNDER_ENABLED = false; // allow random AI mistakes
const BLUNDER_CHANCE = 20; // percent chance of blunder (0-100)
const DEMO_RESET_PAUSE = 5; // seconds before auto-demo after game end
const IDLE_TIMEOUT = 60; // seconds of inactivity before demo starts
const MAX_GAME_LOG = 100; // max stored game entries (localStorage)
// --- Visual Parameters --------------------------------------
const CELL_SIZE = 70; // pixel size of each board cell
const DISC_RADIUS = 28; // radius of a disc
const BOARD_PAD_TOP = 100; // space above the board (cursor + col numbers)
const BOARD_PAD_X = 40; // horizontal padding
const BOARD_PAD_BOTTOM = 40; // space below the board
const ANIM_DROP_SPEED = 1200; // pixels per second for drop animation
const FONT_FAMILY = "system-ui, -apple-system, sans-serif";
// --- Colors -------------------------------------------------
const COLOR_BG = "#1a1a2e";
const COLOR_BOARD = "#16213e";
const COLOR_GRID_LINE = "#0f3460";
const COLOR_EMPTY = "#0a1628";
const COLOR_P1 = "#ffd700"; // Yellow (player 1)
const COLOR_P2 = "#e63946"; // Red (player 2)
const COLOR_P1_DIM = "#8b7500";
const COLOR_P2_DIM = "#7a1f26";
const COLOR_HIGHLIGHT = "#ffffff";
const COLOR_TEXT = "#e0e0e0";
const COLOR_TEXT_DIM = "#666680";
const COLOR_MENU_BG = "#1a1a2e";
const COLOR_MENU_SEL = "#0f3460";
// ------------------------------------------------------------
const COL_ORDER = [3, 2, 4, 1, 5, 0, 6];
const State = Object.freeze({
MENU: 0,
PLAYING: 1,
AI_TURN: 2,
FINISHED_WIN: 3,
FINISHED_DRAW: 4,
DEMO: 5,
});
// --- Canvas setup -------------------------------------------
const canvas = document.getElementById("gameCanvas");
const ctx = canvas.getContext("2d");
const BOARD_W = COLS * CELL_SIZE;
const BOARD_H = ROWS * CELL_SIZE;
const CANVAS_W = BOARD_W + BOARD_PAD_X * 2;
const CANVAS_H = BOARD_PAD_TOP + BOARD_H + BOARD_PAD_BOTTOM;
canvas.width = CANVAS_W;
canvas.height = CANVAS_H;
canvas.style.display = "block";
canvas.style.margin = "0 auto";
canvas.tabIndex = 0;
canvas.focus();
// --- Game state ---------------------------------------------
let board = makeBoard();
let gameState = State.MENU;
let menuMode = 0;
let currentPlayer = 1;
let activeCol = 3;
let winnerPlayer = 0;
let winPositions = [];
let currentMoves = "";
let gameMenuMode = 0;
let gameLevel = LOOK_AHEAD;
let games = loadGameLog();
let demoPly = [4, 4];
let lastActivity = performance.now() / 1000;
let demoResetTimer = 0;
let flashToggle = true;
let lastFlash = 0;
let hoverCol = -1;
// Drop animation state
let dropping = false;
let dropCol = -1;
let dropPlayer = 0;
let dropTargetRow = -1;
let dropY = 0;
let dropTargetY = 0;
// --- Board helpers ------------------------------------------
function makeBoard() {
const b = [];
for (let c = 0; c < COLS; c++) {
b[c] = new Array(ROWS).fill(0);
}
return b;
}
function resetGame() {
board = makeBoard();
winnerPlayer = 0;
winPositions = [];
currentMoves = "";
}
function getFirstEmptyRow(b, col) {
for (let r = 0; r < ROWS; r++) {
if (b[col][r] === 0) return r;
}
return -1;
}
function isBoardFull(b) {
for (let c = 0; c < COLS; c++) {
if (b[c][ROWS - 1] === 0) return false;
}
return true;
}
function scanBoard(b) {
function check(c, r, dc, dr) {
const p = b[c][r];
if (p === 0) return [0, []];
const pos = [];
for (let i = 0; i < 4; i++) {
const cc = c + i * dc;
const rr = r + i * dr;
if (cc < 0 || cc >= COLS || rr < 0 || rr >= ROWS) return [0, []];
if (b[cc][rr] !== p) return [0, []];
pos.push([cc, rr]);
}
return [p, pos];
}
for (let r = 0; r < ROWS; r++)
for (let c = 0; c <= COLS - 4; c++) {
const [w, pos] = check(c, r, 1, 0);
if (w) return [w, pos];
}
for (let r = 0; r <= ROWS - 4; r++)
for (let c = 0; c < COLS; c++) {
const [w, pos] = check(c, r, 0, 1);
if (w) return [w, pos];
}
for (let r = 0; r <= ROWS - 4; r++)
for (let c = 0; c <= COLS - 4; c++) {
const [w, pos] = check(c, r, 1, 1);
if (w) return [w, pos];
}
for (let r = 3; r < ROWS; r++)
for (let c = 0; c <= COLS - 4; c++) {
const [w, pos] = check(c, r, 1, -1);
if (w) return [w, pos];
}
return [0, []];
}
function evaluateBoard(b, aiP, huP) {
let score = 0;
let aiThreats = 0, huThreats = 0;
// Center column bonus
for (let r = 0; r < ROWS; r++) {
if (b[3][r] === aiP) score += 3;
else if (b[3][r] === huP) score -= 3;
}
// Score a window of 4 cells by piece counts
function scoreWindow(c, r, dc, dr) {
let ai = 0, hu = 0, emptyC = -1, emptyR = -1;
for (let i = 0; i < 4; i++) {
const cc = c + i * dc;
const rr = r + i * dr;
const v = b[cc][rr];
if (v === aiP) ai++;
else if (v === huP) hu++;
else { emptyC = cc; emptyR = rr; }
}
if (ai > 0 && hu > 0) return 0;
if (ai === 3) {
aiThreats++;
const playable = emptyR === 0 || b[emptyC][emptyR - 1] !== 0;
return playable ? 100 : 40;
}
if (ai === 2) return 5;
if (hu === 3) {
huThreats++;
const playable = emptyR === 0 || b[emptyC][emptyR - 1] !== 0;
return playable ? -100 : -40;
}
if (hu === 2) return -5;
return 0;
}
// Horizontal
for (let r = 0; r < ROWS; r++)
for (let c = 0; c <= COLS - 4; c++)
score += scoreWindow(c, r, 1, 0);
// Vertical
for (let r = 0; r <= ROWS - 4; r++)
for (let c = 0; c < COLS; c++)
score += scoreWindow(c, r, 0, 1);
// Diagonal up-right
for (let r = 0; r <= ROWS - 4; r++)
for (let c = 0; c <= COLS - 4; c++)
score += scoreWindow(c, r, 1, 1);
// Diagonal down-right
for (let r = 3; r < ROWS; r++)
for (let c = 0; c <= COLS - 4; c++)
score += scoreWindow(c, r, 1, -1);
// Fork bonus: multiple threats are disproportionately dangerous
if (aiThreats >= 2) score += 200;
if (huThreats >= 2) score -= 200;
return score;
}
// --- AI -----------------------------------------------------
function minimax(b, depth, alpha, beta, isMax, aiP, huP) {
const [winner] = scanBoard(b);
if (winner === aiP) return 1000 + depth;
if (winner === huP) return -1000 - depth;
if (depth === 0 || isBoardFull(b)) return evaluateBoard(b, aiP, huP);
let best = isMax ? -10000 : 10000;
for (const c of COL_ORDER) {
const r = getFirstEmptyRow(b, c);
if (r === -1) continue;
b[c][r] = isMax ? aiP : huP;
const score = minimax(b, depth - 1, alpha, beta, !isMax, aiP, huP);
b[c][r] = 0;
if (isMax) {
if (score > best) best = score;
if (best > alpha) alpha = best;
} else {
if (score < best) best = score;
if (best < beta) beta = best;
}
if (beta <= alpha) break;
}
return best;
}
function performAiMove(b, aiP, lookAhead, isDemo = false, dPly = 4) {
const huP = aiP === 1 ? 2 : 1;
const ply = isDemo ? dPly : lookAhead;
// Phase 1a: check ALL columns for instant AI win
for (let c = 0; c < COLS; c++) {
const r = getFirstEmptyRow(b, c);
if (r === -1) continue;
b[c][r] = aiP;
if (scanBoard(b)[0] === aiP) { b[c][r] = 0; return c; }
b[c][r] = 0;
}
// Phase 1b: check ALL columns for opponent block
for (let c = 0; c < COLS; c++) {
const r = getFirstEmptyRow(b, c);
if (r === -1) continue;
b[c][r] = huP;
if (scanBoard(b)[0] === huP) { b[c][r] = 0; return c; }
b[c][r] = 0;
}
// Phase 2: blunder
if (!isDemo && BLUNDER_ENABLED && Math.random() * 100 < BLUNDER_CHANCE) {
const valid = [];
for (let c = 0; c < COLS; c++) if (getFirstEmptyRow(b, c) !== -1) valid.push(c);
return valid[Math.floor(Math.random() * valid.length)];
}
// Phase 3: minimax
let bestScore = -30000;
let bestCol = 3;
for (const c of COL_ORDER) {
const r = getFirstEmptyRow(b, c);
if (r === -1) continue;
b[c][r] = aiP;
const score = minimax(b, ply, -30000, 30000, false, aiP, huP);
b[c][r] = 0;
if (score > bestScore) {
bestScore = score;
bestCol = c;
}
}
return bestCol;
}
function randomizeDemoPlies() {
const strong = 4 + Math.floor(Math.random() * 2);
const weak = 2 + Math.floor(Math.random() * 2);
return Math.random() < 0.5 ? [strong, weak] : [weak, strong];
}
// --- Game log (localStorage) --------------------------------
function loadGameLog() {
try {
const raw = localStorage.getItem("connectFourLog");
if (!raw) return [];
return JSON.parse(raw).slice(-MAX_GAME_LOG);
} catch { return []; }
}
function saveGameLog(g) {
try {
localStorage.setItem("connectFourLog", JSON.stringify(g.slice(-MAX_GAME_LOG)));
} catch { /* storage full or unavailable */ }
}
function logGame(g, gMenuMode, level, winner, moves) {
const type = gMenuMode === 0 ? "Y" : gMenuMode === 1 ? "R" : "2";
const winChar = winner === 1 ? "Y" : winner === 2 ? "R" : "D";
g.push({ type, level: String(level), winner: winChar, moves });
g = g.slice(-MAX_GAME_LOG);
saveGameLog(g);
return g;
}
// --- Check game end -----------------------------------------
function checkGameEnd() {
const [w, pos] = scanBoard(board);
winnerPlayer = w;
winPositions = pos;
const won = w !== 0;
const draw = !won && isBoardFull(board);
if (!won && !draw) return false;
if (gameState !== State.DEMO) {
games = logGame(games, gameMenuMode, gameLevel, won ? w : 0, currentMoves);
console.log(`Game: ${currentMoves}${won ? playerName(w) + " wins" : "Draw"}`);
}
gameState = won ? State.FINISHED_WIN : State.FINISHED_DRAW;
demoResetTimer = performance.now() / 1000;
lastActivity = performance.now() / 1000;
return true;
}
// --- Drawing ------------------------------------------------
function playerColor(p) { return p === 1 ? COLOR_P1 : COLOR_P2; }
function playerColorDim(p) { return p === 1 ? COLOR_P1_DIM : COLOR_P2_DIM; }
function playerName(p) { return p === 1 ? "Yellow" : "Red"; }
function cellX(c) { return BOARD_PAD_X + c * CELL_SIZE + CELL_SIZE / 2; }
function cellY(r) { return BOARD_PAD_TOP + (ROWS - 1 - r) * CELL_SIZE + CELL_SIZE / 2; }
function isWinPos(c, r) {
for (const [wc, wr] of winPositions) {
if (wc === c && wr === r) return true;
}
return false;
}
function drawDisc(x, y, radius, color) {
ctx.beginPath();
ctx.arc(x, y, radius, 0, Math.PI * 2);
ctx.fillStyle = color;
ctx.fill();
}
function drawBoard() {
// Board background
ctx.fillStyle = COLOR_BOARD;
const bx = BOARD_PAD_X;
const by = BOARD_PAD_TOP;
ctx.beginPath();
ctx.roundRect(bx - 5, by - 5, BOARD_W + 10, BOARD_H + 10, 12);
ctx.fill();
// Grid lines
ctx.strokeStyle = COLOR_GRID_LINE;
ctx.lineWidth = 1;
for (let c = 1; c < COLS; c++) {
const x = BOARD_PAD_X + c * CELL_SIZE;
ctx.beginPath();
ctx.moveTo(x, BOARD_PAD_TOP);
ctx.lineTo(x, BOARD_PAD_TOP + BOARD_H);
ctx.stroke();
}
for (let r = 1; r < ROWS; r++) {
const y = BOARD_PAD_TOP + r * CELL_SIZE;
ctx.beginPath();
ctx.moveTo(BOARD_PAD_X, y);
ctx.lineTo(BOARD_PAD_X + BOARD_W, y);
ctx.stroke();
}
// Cells
for (let c = 0; c < COLS; c++) {
for (let r = 0; r < ROWS; r++) {
const x = cellX(c);
const y = cellY(r);
const val = board[c][r];
// Skip drawing in cell if we're animating a drop into it
if (dropping && c === dropCol && r === dropTargetRow) continue;
if (val === 0) {
drawDisc(x, y, DISC_RADIUS, COLOR_EMPTY);
} else {
const isWin = isWinPos(c, r);
if (gameState === State.FINISHED_WIN) {
if (isWin && flashToggle) {
drawDisc(x, y, DISC_RADIUS, COLOR_EMPTY);
} else if (!isWin) {
drawDisc(x, y, DISC_RADIUS, playerColorDim(val));
} else {
drawDisc(x, y, DISC_RADIUS, playerColor(val));
}
} else if (gameState === State.FINISHED_DRAW && flashToggle) {
drawDisc(x, y, DISC_RADIUS, COLOR_EMPTY);
} else {
drawDisc(x, y, DISC_RADIUS, playerColor(val));
}
}
}
}
// Drop animation disc
if (dropping) {
drawDisc(cellX(dropCol), dropY, DISC_RADIUS, playerColor(dropPlayer));
}
}
function drawCursor() {
if (gameState === State.PLAYING && !dropping) {
const x = cellX(activeCol);
const y = BOARD_PAD_TOP - 45;
drawDisc(x, y, DISC_RADIUS * 0.8, playerColor(currentPlayer));
}
if (gameState === State.PLAYING && hoverCol >= 0 && hoverCol !== activeCol && !dropping) {
const x = cellX(hoverCol);
const y = BOARD_PAD_TOP - 45;
drawDisc(x, y, DISC_RADIUS * 0.5, playerColorDim(currentPlayer));
}
}
function drawColNumbers() {
ctx.font = `14px ${FONT_FAMILY}`;
ctx.textAlign = "center";
ctx.textBaseline = "middle";
for (let c = 0; c < COLS; c++) {
ctx.fillStyle = (c === activeCol && gameState === State.PLAYING) ? COLOR_TEXT : COLOR_TEXT_DIM;
ctx.fillText(String(c + 1), cellX(c), BOARD_PAD_TOP - 12);
}
}
function drawStatus() {
const y = BOARD_PAD_TOP + BOARD_H + 25;
ctx.font = `bold 18px ${FONT_FAMILY}`;
ctx.textAlign = "center";
ctx.textBaseline = "middle";
if (gameState === State.PLAYING) {
ctx.fillStyle = playerColor(currentPlayer);
const label = gameMenuMode === 2 ? `${playerName(currentPlayer)}'s turn`
: currentPlayer === (gameMenuMode === 0 ? 1 : 2) ? "Your turn" : "AI thinking...";
ctx.fillText(label, CANVAS_W / 2, y);
} else if (gameState === State.AI_TURN) {
const aiP = gameMenuMode === 0 ? 2 : 1;
ctx.fillStyle = playerColor(aiP);
ctx.fillText("AI thinking...", CANVAS_W / 2, y);
} else if (gameState === State.FINISHED_WIN) {
ctx.fillStyle = playerColor(winnerPlayer);
ctx.fillText(`${playerName(winnerPlayer)} wins!`, CANVAS_W / 2, y);
ctx.font = `14px ${FONT_FAMILY}`;
ctx.fillStyle = COLOR_TEXT_DIM;
ctx.fillText("Click or press any key for menu", CANVAS_W / 2, y + 24);
} else if (gameState === State.FINISHED_DRAW) {
ctx.fillStyle = COLOR_TEXT;
ctx.fillText("Draw!", CANVAS_W / 2, y);
ctx.font = `14px ${FONT_FAMILY}`;
ctx.fillStyle = COLOR_TEXT_DIM;
ctx.fillText("Click or press any key for menu", CANVAS_W / 2, y + 24);
} else if (gameState === State.DEMO) {
ctx.fillStyle = COLOR_TEXT_DIM;
ctx.font = `14px ${FONT_FAMILY}`;
ctx.fillText("Demo mode - click or press any key for menu", CANVAS_W / 2, y);
}
}
function drawMenu() {
ctx.fillStyle = COLOR_MENU_BG;
ctx.fillRect(0, 0, CANVAS_W, CANVAS_H);
ctx.font = `bold 36px ${FONT_FAMILY}`;
ctx.textAlign = "center";
ctx.textBaseline = "middle";
ctx.fillStyle = COLOR_P1;
ctx.fillText("Connect", CANVAS_W / 2 - 60, 80);
ctx.fillStyle = COLOR_P2;
ctx.fillText("Four", CANVAS_W / 2 + 70, 80);
const items = [
{ label: "1P Yellow (you start)", color: COLOR_P1 },
{ label: "1P Red (AI starts)", color: COLOR_P2 },
{ label: "Multiplayer", color: "#5dade2" },
];
const startY = 160;
const itemH = 60;
const itemW = 340;
for (let i = 0; i < items.length; i++) {
const y = startY + i * itemH;
const x = (CANVAS_W - itemW) / 2;
const selected = i === menuMode;
// Background
ctx.fillStyle = selected ? COLOR_MENU_SEL : "transparent";
ctx.beginPath();
ctx.roundRect(x, y, itemW, 48, 8);
ctx.fill();
// Border for selected
if (selected) {
ctx.strokeStyle = items[i].color;
ctx.lineWidth = 2;
ctx.beginPath();
ctx.roundRect(x, y, itemW, 48, 8);
ctx.stroke();
}
// Arrow
ctx.font = `bold 20px ${FONT_FAMILY}`;
ctx.textAlign = "left";
ctx.textBaseline = "middle";
ctx.fillStyle = selected ? items[i].color : COLOR_TEXT_DIM;
ctx.fillText(selected ? "\u25b6 " : " ", x + 16, y + 24);
// Label
ctx.font = `${selected ? "bold " : ""}18px ${FONT_FAMILY}`;
ctx.fillText(items[i].label, x + 50, y + 24);
}
// Instructions
ctx.font = `14px ${FONT_FAMILY}`;
ctx.textAlign = "center";
ctx.fillStyle = COLOR_TEXT_DIM;
ctx.fillText("Up/Down or hover to select, click or Enter to start", CANVAS_W / 2, startY + items.length * itemH + 20);
ctx.fillText("During game: Arrow keys or click columns, 1-7 for direct drop", CANVAS_W / 2, startY + items.length * itemH + 44);
}
function render() {
ctx.fillStyle = COLOR_BG;
ctx.fillRect(0, 0, CANVAS_W, CANVAS_H);
if (gameState === State.MENU) {
drawMenu();
} else {
drawBoard();
drawCursor();
drawColNumbers();
drawStatus();
}
}
// --- Drop animation -----------------------------------------
function animateDrop(col, row, player) {
return new Promise(resolve => {
dropping = true;
dropCol = col;
dropPlayer = player;
dropTargetRow = row;
dropY = BOARD_PAD_TOP - 45;
dropTargetY = cellY(row);
function step(timestamp) {
dropY += ANIM_DROP_SPEED * (1 / 60);
if (dropY >= dropTargetY) {
dropY = dropTargetY;
dropping = false;
resolve();
return;
}
requestAnimationFrame(step);
}
requestAnimationFrame(step);
});
}
// --- Input: column from mouse / touch -----------------------
function colFromEvent(e) {
const rect = canvas.getBoundingClientRect();
const scaleX = CANVAS_W / rect.width;
const x = (e.clientX - rect.left) * scaleX;
const col = Math.floor((x - BOARD_PAD_X) / CELL_SIZE);
return (col >= 0 && col < COLS) ? col : -1;
}
function menuItemFromEvent(e) {
const rect = canvas.getBoundingClientRect();
const scaleY = CANVAS_H / rect.height;
const scaleX = CANVAS_W / rect.width;
const y = (e.clientY - rect.top) * scaleY;
const x = (e.clientX - rect.left) * scaleX;
const startY = 160;
const itemH = 60;
const itemW = 340;
const mx = (CANVAS_W - itemW) / 2;
for (let i = 0; i < 3; i++) {
const iy = startY + i * itemH;
if (x >= mx && x <= mx + itemW && y >= iy && y <= iy + 48) return i;
}
return -1;
}
// --- Place a disc (with animation) --------------------------
let busy = false; // prevents input during animation / AI
async function placeDisk(col, player) {
const r = getFirstEmptyRow(board, col);
if (r === -1) return false;
currentMoves += String(col);
await animateDrop(col, r, player);
board[col][r] = player;
return true;
}
// --- AI turn (async to not block UI) ------------------------
async function doAiTurn() {
busy = true;
const aiP = gameMenuMode === 0 ? 2 : 1;
gameState = State.AI_TURN;
// Yield a frame so "AI thinking" shows
await new Promise(r => setTimeout(r, 50));
const bestCol = performAiMove(board, aiP, LOOK_AHEAD);
await placeDisk(bestCol, aiP);
activeCol = bestCol;
if (!checkGameEnd()) {
gameState = State.PLAYING;
currentPlayer = aiP === 1 ? 2 : 1;
}
lastActivity = performance.now() / 1000;
busy = false;
}
// --- Demo turn ----------------------------------------------
let demoTimer = null;
function stopDemo() {
if (demoTimer !== null) {
clearTimeout(demoTimer);
demoTimer = null;
}
}
async function demoStep() {
if (gameState !== State.DEMO) return;
busy = true;
const ply = demoPly[currentPlayer - 1];
const bestCol = performAiMove(board, currentPlayer, LOOK_AHEAD, true, ply);
await placeDisk(bestCol, currentPlayer);
if (!checkGameEnd()) {
currentPlayer = currentPlayer === 1 ? 2 : 1;
demoTimer = setTimeout(demoStep, 400);
}
busy = false;
}
function startDemo() {
resetGame();
demoPly = randomizeDemoPlies();
gameState = State.DEMO;
currentPlayer = 1;
lastActivity = performance.now() / 1000;
demoTimer = setTimeout(demoStep, 400);
}
// --- Start game from menu -----------------------------------
function startGame(mode) {
resetGame();
gameMenuMode = mode;
gameLevel = LOOK_AHEAD;
currentPlayer = 1;
activeCol = 3;
hoverCol = -1;
if (mode === 1) {
gameState = State.PLAYING; // briefly, then AI
doAiTurn();
} else {
gameState = State.PLAYING;
}
lastActivity = performance.now() / 1000;
}
function returnToMenu() {
stopDemo();
resetGame();
gameState = State.MENU;
menuMode = 0;
lastActivity = performance.now() / 1000;
}
// --- Mouse events -------------------------------------------
canvas.addEventListener("mousemove", (e) => {
if (gameState === State.MENU) {
const mi = menuItemFromEvent(e);
if (mi >= 0) menuMode = mi;
} else if (gameState === State.PLAYING && !busy) {
hoverCol = colFromEvent(e);
}
});
canvas.addEventListener("click", async (e) => {
if (busy) return;
lastActivity = performance.now() / 1000;
if (gameState === State.MENU) {
const mi = menuItemFromEvent(e);
if (mi >= 0) {
menuMode = mi;
startGame(mi);
}
return;
}
if (gameState === State.FINISHED_WIN || gameState === State.FINISHED_DRAW || gameState === State.DEMO) {
returnToMenu();
return;
}
if (gameState === State.PLAYING) {
const col = colFromEvent(e);
if (col < 0) return;
const r = getFirstEmptyRow(board, col);
if (r === -1) return;
busy = true;
activeCol = col;
await placeDisk(col, currentPlayer);
if (!checkGameEnd()) {
if (gameMenuMode < 2) {
await doAiTurn();
} else {
currentPlayer = currentPlayer === 1 ? 2 : 1;
}
}
busy = false;
}
});
// --- Touch support (mobile) ---------------------------------
canvas.addEventListener("touchend", (e) => {
if (e.changedTouches.length > 0) {
const touch = e.changedTouches[0];
const click = new MouseEvent("click", {
clientX: touch.clientX,
clientY: touch.clientY,
});
canvas.dispatchEvent(click);
}
e.preventDefault();
}, { passive: false });
// --- Keyboard events ----------------------------------------
document.addEventListener("keydown", async (e) => {
if (busy) return;
lastActivity = performance.now() / 1000;
if (e.key === "q" || e.key === "Q") {
if (gameState !== State.MENU) {
returnToMenu();
}
return;
}
if (gameState === State.MENU) {
if (e.key === "ArrowUp") {
menuMode = (menuMode - 1 + 3) % 3;
} else if (e.key === "ArrowDown") {
menuMode = (menuMode + 1) % 3;
} else if (e.key === "Enter" || e.key === " ") {
startGame(menuMode);
}
return;
}
if (gameState === State.FINISHED_WIN || gameState === State.FINISHED_DRAW || gameState === State.DEMO) {
returnToMenu();
return;
}
if (gameState === State.PLAYING) {
if (e.key === "ArrowLeft") {
activeCol = Math.max(0, activeCol - 1);
} else if (e.key === "ArrowRight") {
activeCol = Math.min(COLS - 1, activeCol + 1);
} else if (e.key >= "1" && e.key <= "7") {
const col = parseInt(e.key) - 1;
const r = getFirstEmptyRow(board, col);
if (r === -1) return;
busy = true;
activeCol = col;
await placeDisk(col, currentPlayer);
if (!checkGameEnd()) {
if (gameMenuMode < 2) {
await doAiTurn();
} else {
currentPlayer = currentPlayer === 1 ? 2 : 1;
}
}
busy = false;
} else if (e.key === "Enter" || e.key === " ") {
const r = getFirstEmptyRow(board, activeCol);
if (r === -1) return;
busy = true;
await placeDisk(activeCol, currentPlayer);
if (!checkGameEnd()) {
if (gameMenuMode < 2) {
await doAiTurn();
} else {
currentPlayer = currentPlayer === 1 ? 2 : 1;
}
}
busy = false;
}
}
});
// --- Main loop ----------------------------------------------
let lastTime = 0;
function gameLoop(timestamp) {
const now = timestamp / 1000;
// Flash toggle for win/draw
if (gameState === State.FINISHED_WIN || gameState === State.FINISHED_DRAW) {
if (now - lastFlash > 0.4) {
lastFlash = now;
flashToggle = !flashToggle;
}
// Auto-restart to demo
if (now - demoResetTimer > DEMO_RESET_PAUSE) {
startDemo();
}
}
// Idle timeout -> demo
if (gameState !== State.DEMO && gameState !== State.FINISHED_WIN && gameState !== State.FINISHED_DRAW) {
if (now - lastActivity > IDLE_TIMEOUT) {
startDemo();
}
}
render();
requestAnimationFrame(gameLoop);
}
requestAnimationFrame(gameLoop);
+657
View File
@@ -0,0 +1,657 @@
"""Connect Four terminal game with AI (minimax + alpha-beta + heuristic), using Rich for display."""
import os
import queue
import random
import threading
import time
from enum import Enum, auto
from pathlib import Path
import readchar
from dotenv import load_dotenv
from rich.console import Console, Group
from rich.live import Live
from rich.text import Text
load_dotenv(Path(__file__).parent / ".env")
# --- Configuration from .env ---
LOOK_AHEAD = int(os.getenv("LOOK_AHEAD", "8"))
BLUNDER_ENABLED = os.getenv("BLUNDER_ENABLED", "false").lower() == "true"
BLUNDER_CHANCE = int(os.getenv("BLUNDER_CHANCE", "20"))
DEMO_RESET_PAUSE = int(os.getenv("DEMO_RESET_PAUSE", "5"))
IDLE_TIMEOUT = int(os.getenv("IDLE_TIMEOUT", "60"))
MAX_GAME_LOG = int(os.getenv("MAX_GAME_LOG", "100"))
GAMES_FILE = Path(__file__).parent / ".games.txt"
COLS = 7
ROWS = 6
COL_ORDER = [3, 2, 4, 1, 5, 0, 6]
# Box-drawing characters for the board frame
DISC = "\u2b24"
EMPTY = "\u25cb"
H_LINE = "\u2500"
V_LINE = "\u2502"
TL = "\u250c"
TR = "\u2510"
BL = "\u2514"
BR = "\u2518"
T_DOWN = "\u252c"
T_UP = "\u2534"
T_RIGHT = "\u251c"
T_LEFT = "\u2524"
CROSS = "\u253c"
console = Console()
# Key constants - readchar uses escape sequences
KEY_LEFT = readchar.key.LEFT if hasattr(readchar.key, "LEFT") else "\x1b[D"
KEY_RIGHT = readchar.key.RIGHT if hasattr(readchar.key, "RIGHT") else "\x1b[C"
KEY_UP = readchar.key.UP if hasattr(readchar.key, "UP") else "\x1b[A"
KEY_DOWN = readchar.key.DOWN if hasattr(readchar.key, "DOWN") else "\x1b[B"
KEY_ENTER = readchar.key.ENTER if hasattr(readchar.key, "ENTER") else "\r"
CONFIRM_KEYS = {KEY_ENTER, " ", "\r", "\n"}
class State(Enum):
MENU = auto()
PLAYING = auto()
AI_TURN = auto()
FINISHED_WIN = auto()
FINISHED_DRAW = auto()
DEMO = auto()
def player_name(player: int) -> str:
return "Yellow" if player == 1 else "Red"
def player_style(player: int) -> str:
return "bold yellow" if player == 1 else "bold red"
def dim_player_style(player: int) -> str:
return "dim yellow" if player == 1 else "dim red"
# --- Board ---
def make_board() -> list[list[int]]:
return [[0] * ROWS for _ in range(COLS)]
def get_first_empty_row(board: list[list[int]], col: int) -> int:
for r in range(ROWS):
if board[col][r] == 0:
return r
return -1
def is_board_full(board: list[list[int]]) -> bool:
return all(board[c][ROWS - 1] != 0 for c in range(COLS))
def scan_board(board: list[list[int]]) -> tuple[int, list[tuple[int, int]]]:
"""Returns (winner, winning_positions). winner=0 if no winner."""
def check(c, r, dc, dr):
p = board[c][r]
if p != 0:
positions = [(c + i * dc, r + i * dr) for i in range(4)]
if all(board[cc][rr] == p for cc, rr in positions):
return p, positions
return 0, []
for r in range(ROWS):
for c in range(COLS - 3):
w, pos = check(c, r, 1, 0)
if w:
return w, pos
for r in range(ROWS - 3):
for c in range(COLS):
w, pos = check(c, r, 0, 1)
if w:
return w, pos
for r in range(ROWS - 3):
for c in range(COLS - 3):
w, pos = check(c, r, 1, 1)
if w:
return w, pos
for r in range(3, ROWS):
for c in range(COLS - 3):
w, pos = check(c, r, 1, -1)
if w:
return w, pos
return 0, []
# --- Display ---
def render_board(
board: list[list[int]],
active_col: int = -1,
current_player: int = 0,
win_positions: list[tuple[int, int]] | None = None,
flash_off: bool = False,
is_draw_flash: bool = False,
thinking_col: int = -1,
thinking_bright: bool = False,
) -> Text:
cell_w = 4 # width per cell including padding
lines = Text()
# Cursor row above the board
cursor_line = Text(" ")
for c in range(COLS):
if thinking_col == c:
style = player_style(current_player) if thinking_bright else dim_player_style(current_player)
cursor_line.append(f" {DISC} ", style=style)
elif c == active_col and current_player > 0:
cursor_line.append(f" {DISC} ", style=player_style(current_player))
else:
cursor_line.append(" ")
lines.append_text(cursor_line)
lines.append("\n")
# Column numbers row
num_line = Text(" ")
for c in range(COLS):
style = "bold white" if c == active_col else "dim"
num_line.append(f" {c + 1} ", style=style)
lines.append_text(num_line)
lines.append("\n")
# Top border
top = Text(" ", style="bold blue")
top.append(TL, style="bold blue")
for c in range(COLS):
top.append(H_LINE * (cell_w - 1), style="bold blue")
top.append(T_DOWN if c < COLS - 1 else TR, style="bold blue")
lines.append_text(top)
lines.append("\n")
# Board rows (top row of board = row 5, displayed first)
for r in range(ROWS - 1, -1, -1):
row_line = Text(" ", style="bold blue")
for c in range(COLS):
row_line.append(V_LINE, style="bold blue")
val = board[c][r]
if val == 0:
row_line.append(f" {EMPTY} ", style="dim blue")
else:
is_win = win_positions and (c, r) in win_positions
if flash_off and is_win:
row_line.append(" ")
elif is_draw_flash and flash_off:
row_line.append(" ")
elif not is_win and win_positions:
row_line.append(f" {DISC} ", style=dim_player_style(val))
else:
row_line.append(f" {DISC} ", style=player_style(val))
row_line.append(V_LINE, style="bold blue")
lines.append_text(row_line)
lines.append("\n")
# Row separator or bottom border
if r > 0:
sep = Text(" ", style="bold blue")
sep.append(T_RIGHT, style="bold blue")
for c in range(COLS):
sep.append(H_LINE * (cell_w - 1), style="bold blue")
sep.append(CROSS if c < COLS - 1 else T_LEFT, style="bold blue")
lines.append_text(sep)
lines.append("\n")
# Bottom border
bot = Text(" ", style="bold blue")
bot.append(BL, style="bold blue")
for c in range(COLS):
bot.append(H_LINE * (cell_w - 1), style="bold blue")
bot.append(T_UP if c < COLS - 1 else BR, style="bold blue")
lines.append_text(bot)
lines.append("\n")
return lines
def render_menu(menu_mode: int) -> Text:
items = ["1P Yellow (you start)", "1P Red (AI starts)", "Multiplayer"]
lines = ["\n [bold blue]Connect Four[/bold blue]\n"]
for i, item in enumerate(items):
marker = " \u25b6 " if i == menu_mode else " "
style = "bold yellow" if i == 0 else "bold red" if i == 1 else "bold blue"
if i == menu_mode:
lines.append(f"[{style}]{marker}{item}[/{style}]")
else:
lines.append(f"[dim]{marker}{item}[/dim]")
lines.append("\n [dim]Up/Down to select, Space/Enter to start, Q to quit[/dim]\n")
return Text.from_markup("\n".join(lines))
# --- Game log ---
def load_game_log() -> list[dict]:
if not GAMES_FILE.exists():
return []
games = []
for line in GAMES_FILE.read_text().splitlines():
line = line.strip()
if not line:
continue
parts = line.split(":", 3)
if len(parts) == 4:
games.append({
"type": parts[0],
"level": parts[1],
"winner": parts[2],
"moves": parts[3],
})
return games[-MAX_GAME_LOG:]
def save_game_log(games: list[dict]):
with GAMES_FILE.open("w") as f:
for g in games:
f.write(f"{g['type']}:{g['level']}:{g['winner']}:{g['moves']}\n")
def log_game(games: list[dict], game_menu_mode: int, level: int, winner: int, moves: str) -> list[dict]:
game_type = "Y" if game_menu_mode == 0 else "R" if game_menu_mode == 1 else "2"
win_char = "Y" if winner == 1 else "R" if winner == 2 else "D"
entry = {"type": game_type, "level": str(level), "winner": win_char, "moves": moves}
games.append(entry)
games = games[-MAX_GAME_LOG:]
save_game_log(games)
return games
# --- AI ---
def evaluate_board(board: list[list[int]], ai_p: int, hu_p: int) -> int:
score = 0
ai_threats = 0
hu_threats = 0
# Center column bonus
for r in range(ROWS):
if board[3][r] == ai_p:
score += 3
elif board[3][r] == hu_p:
score -= 3
# Score a window of 4 cells by piece counts
def score_window(c: int, r: int, dc: int, dr: int) -> int:
nonlocal ai_threats, hu_threats
ai, hu, empty_c, empty_r = 0, 0, -1, -1
for i in range(4):
cc = c + i * dc
rr = r + i * dr
v = board[cc][rr]
if v == ai_p:
ai += 1
elif v == hu_p:
hu += 1
else:
empty_c, empty_r = cc, rr
if ai > 0 and hu > 0:
return 0
if ai == 3:
ai_threats += 1
playable = empty_r == 0 or board[empty_c][empty_r - 1] != 0
return 100 if playable else 40
if ai == 2:
return 5
if hu == 3:
hu_threats += 1
playable = empty_r == 0 or board[empty_c][empty_r - 1] != 0
return -100 if playable else -40
if hu == 2:
return -5
return 0
# Horizontal
for r in range(ROWS):
for c in range(COLS - 3):
score += score_window(c, r, 1, 0)
# Vertical
for r in range(ROWS - 3):
for c in range(COLS):
score += score_window(c, r, 0, 1)
# Diagonal up-right
for r in range(ROWS - 3):
for c in range(COLS - 3):
score += score_window(c, r, 1, 1)
# Diagonal down-right
for r in range(3, ROWS):
for c in range(COLS - 3):
score += score_window(c, r, 1, -1)
# Fork bonus: multiple threats are disproportionately dangerous
if ai_threats >= 2:
score += 200
if hu_threats >= 2:
score -= 200
return score
def minimax(
board: list[list[int]], depth: int, alpha: int, beta: int,
is_max: bool, ai_p: int, hu_p: int,
) -> int:
winner, _ = scan_board(board)
if winner == ai_p:
return 1000 + depth
if winner == hu_p:
return -1000 - depth
if depth == 0 or is_board_full(board):
return evaluate_board(board, ai_p, hu_p)
best = -10000 if is_max else 10000
for c in COL_ORDER:
r = get_first_empty_row(board, c)
if r != -1:
board[c][r] = ai_p if is_max else hu_p
score = minimax(board, depth - 1, alpha, beta, not is_max, ai_p, hu_p)
board[c][r] = 0
if is_max:
if score > best:
best = score
if best > alpha:
alpha = best
else:
if score < best:
best = score
if best < beta:
beta = best
if beta <= alpha:
break
return best
def perform_ai_move(
board: list[list[int]], ai_p: int, look_ahead: int, is_demo: bool = False, demo_ply: int = 4,
) -> int:
hu_p = 2 if ai_p == 1 else 1
ply = demo_ply if is_demo else look_ahead
# Phase 1a: check ALL columns for instant AI win
for c in range(COLS):
r = get_first_empty_row(board, c)
if r != -1:
board[c][r] = ai_p
if scan_board(board)[0] == ai_p:
board[c][r] = 0
return c
board[c][r] = 0
# Phase 1b: check ALL columns for opponent block
for c in range(COLS):
r = get_first_empty_row(board, c)
if r != -1:
board[c][r] = hu_p
if scan_board(board)[0] == hu_p:
board[c][r] = 0
return c
board[c][r] = 0
# Phase 2: blunder
if not is_demo and BLUNDER_ENABLED and random.randint(0, 99) < BLUNDER_CHANCE:
valid = [c for c in range(COLS) if get_first_empty_row(board, c) != -1]
return random.choice(valid)
# Phase 3: minimax
best_score = -30000
best_col = 3
for c in COL_ORDER:
r = get_first_empty_row(board, c)
if r != -1:
board[c][r] = ai_p
score = minimax(board, ply, -30000, 30000, False, ai_p, hu_p)
board[c][r] = 0
if score > best_score:
best_score = score
best_col = c
return best_col
def randomize_demo_plies() -> tuple[int, int]:
strong = random.randint(4, 5)
weak = random.randint(2, 3)
if random.randint(0, 1) == 0:
return strong, weak
return weak, strong
# --- Input (cross-platform, non-blocking via thread) ---
_key_queue: queue.Queue[str] = queue.Queue()
_input_stop = threading.Event()
def _input_thread():
"""Background thread that reads keys and puts them on the queue."""
while not _input_stop.is_set():
try:
key = readchar.readkey()
_key_queue.put(key)
except Exception:
break
def read_key() -> str | None:
"""Non-blocking key read from the queue."""
try:
return _key_queue.get_nowait()
except queue.Empty:
return None
# --- Main game loop ---
def main():
console.clear()
game_state = State.MENU
board = make_board()
menu_mode = 0
current_player = 1
active_col = 3
winner_player = 0
win_positions: list[tuple[int, int]] = []
current_moves = ""
game_menu_mode = 0
game_level = LOOK_AHEAD
games = load_game_log()
demo_ply = (4, 4)
last_activity = time.time()
demo_reset_timer = 0.0
flash_toggle = True
last_flash = 0.0
def reset():
nonlocal board, winner_player, win_positions, current_moves
board = make_board()
winner_player = 0
win_positions = []
current_moves = ""
def check_game_end() -> bool:
nonlocal winner_player, win_positions, game_state, games, demo_reset_timer, last_activity
winner_player, win_positions = scan_board(board)
won = winner_player != 0
draw = not won and is_board_full(board)
if not won and not draw:
return False
if game_state != State.DEMO:
games = log_game(games, game_menu_mode, game_level, winner_player if won else 0, current_moves)
game_state = State.FINISHED_WIN if won else State.FINISHED_DRAW
demo_reset_timer = time.time()
last_activity = time.time()
return True
# Start input thread
input_thread = threading.Thread(target=_input_thread, daemon=True)
input_thread.start()
try:
with Live(render_menu(menu_mode), console=console, refresh_per_second=10, screen=True) as live:
while True:
key = read_key()
# Quit
if key in ("q", "Q"):
break
# --- MENU ---
if game_state == State.MENU:
if key in (KEY_UP,):
menu_mode = (menu_mode - 1) % 3
last_activity = time.time()
elif key in (KEY_DOWN,):
menu_mode = (menu_mode + 1) % 3
last_activity = time.time()
elif key in CONFIRM_KEYS:
reset()
game_menu_mode = menu_mode
game_level = LOOK_AHEAD
current_player = 1
active_col = 3
if menu_mode == 1:
game_state = State.AI_TURN
else:
game_state = State.PLAYING
last_activity = time.time()
if game_state == State.MENU:
live.update(render_menu(menu_mode))
time.sleep(0.05)
continue
# --- Interrupt: return to menu from finished/demo ---
if game_state in (State.FINISHED_WIN, State.FINISHED_DRAW, State.DEMO) and key is not None:
reset()
game_state = State.MENU
menu_mode = 0
last_activity = time.time()
live.update(render_menu(menu_mode))
time.sleep(0.2)
continue
# --- Idle timeout: enter demo ---
if game_state not in (State.DEMO, State.FINISHED_WIN, State.FINISHED_DRAW):
if time.time() - last_activity > IDLE_TIMEOUT:
reset()
demo_ply = randomize_demo_plies()
game_state = State.DEMO
current_player = 1
# --- PLAYING ---
if game_state == State.PLAYING:
if key in (KEY_LEFT,):
active_col = max(0, active_col - 1)
last_activity = time.time()
elif key in (KEY_RIGHT,):
active_col = min(COLS - 1, active_col + 1)
last_activity = time.time()
elif key in ("1", "2", "3", "4", "5", "6", "7"):
col = int(key) - 1
r = get_first_empty_row(board, col)
if r != -1:
active_col = col
current_moves += str(col)
board[col][r] = current_player
if not check_game_end():
if menu_mode < 2:
game_state = State.AI_TURN
else:
current_player = 2 if current_player == 1 else 1
last_activity = time.time()
elif key in CONFIRM_KEYS:
r = get_first_empty_row(board, active_col)
if r != -1:
current_moves += str(active_col)
board[active_col][r] = current_player
if not check_game_end():
if menu_mode < 2:
game_state = State.AI_TURN
else:
current_player = 2 if current_player == 1 else 1
last_activity = time.time()
live.update(render_board(board, active_col, current_player))
# --- AI_TURN ---
elif game_state == State.AI_TURN:
ai_p = 2 if menu_mode == 0 else 1
live.update(render_board(board, -1, ai_p, thinking_col=active_col, thinking_bright=True))
best_col = perform_ai_move(board, ai_p, LOOK_AHEAD)
r = get_first_empty_row(board, best_col)
if r != -1:
current_moves += str(best_col)
board[best_col][r] = ai_p
active_col = best_col
if not check_game_end():
game_state = State.PLAYING
current_player = 2 if ai_p == 1 else 1
last_activity = time.time()
live.update(render_board(board, active_col, current_player, win_positions if winner_player else None))
# --- DEMO ---
elif game_state == State.DEMO:
ply = demo_ply[current_player - 1]
best_col = perform_ai_move(board, current_player, LOOK_AHEAD, is_demo=True, demo_ply=ply)
r = get_first_empty_row(board, best_col)
if r != -1:
board[best_col][r] = current_player
if not check_game_end():
current_player = 2 if current_player == 1 else 1
live.update(render_board(board, -1, 0))
time.sleep(0.4)
# --- FINISHED ---
elif game_state in (State.FINISHED_WIN, State.FINISHED_DRAW):
now = time.time()
if now - last_flash > 0.4:
last_flash = now
flash_toggle = not flash_toggle
if game_state == State.FINISHED_WIN:
style = player_style(winner_player)
status = Text.from_markup(
f"\n [{style}]{player_name(winner_player)} wins![/{style}] [dim]Press any key for menu[/dim]\n"
)
tbl = render_board(board, -1, 0, win_positions, flash_off=flash_toggle)
else:
status = Text.from_markup(
"\n [bold]Draw![/bold] [dim]Press any key for menu[/dim]\n"
)
tbl = render_board(board, -1, 0, is_draw_flash=True, flash_off=flash_toggle)
live.update(Group(tbl, status))
# Auto-restart to demo after pause
if time.time() - demo_reset_timer > DEMO_RESET_PAUSE:
reset()
demo_ply = randomize_demo_plies()
game_state = State.DEMO
current_player = 1
last_activity = time.time()
time.sleep(0.05)
except KeyboardInterrupt:
pass
finally:
_input_stop.set()
console.clear()
console.print("[bold]Thanks for playing![/bold]")
if __name__ == "__main__":
main()
+1 -3
View File
@@ -12,14 +12,12 @@ build_flags =
-D ENC_SW=2
-D SENSITIVITY=4
-D SHOW_BORDER=0
-D BRIGHTNESS=25
-D IDLE_TIMEOUT=45000
-D DEMO_RESET_PAUSE=20000
-D DEBOUNCE_DELAY=50
-D DEFAULT_LOOK_AHEAD=8
-D DEFAULT_BRIGHTNESS=25
-D DEFAULT_IDLE_TIMEOUT=45
-D MAX_GAME_LOG=100
-D WIFI_SSID=\"Connect4\"
-D WIFI_PASSWORD=\"youlose4\"
lib_deps =
fastled/FastLED @ 3.9.12
+16
View File
@@ -0,0 +1,16 @@
[project]
name = "connect-four-terminal"
version = "1.0.0"
description = "Connect Four terminal game with AI"
requires-python = ">=3.10,<3.14"
dependencies = [
"rich>=13.0",
"python-dotenv>=1.0",
"readchar>=4.0",
"tensorflow>=2.16",
"numpy>=2.0",
"pygame>=2.5",
]
[project.scripts]
connect-four = "connect_four:main"
+1
View File
@@ -0,0 +1 @@
3.13
View File
View File
+38
View File
@@ -0,0 +1,38 @@
"""Entry point: python -m rl [train|export|info]"""
import os
import sys
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
os.environ["CUDA_VISIBLE_DEVICES"] = ""
def main():
cmd = sys.argv[1] if len(sys.argv) > 1 else "train"
if cmd == "train":
from .train import train
train()
elif cmd == "export":
from .export import export_tflite
model_path = sys.argv[2] if len(sys.argv) > 2 else "rl/checkpoints/model_final.keras"
export_tflite(model_path)
elif cmd == "visualize":
from .visualize import run_visualized
run_visualized()
elif cmd == "info":
from .model import build_model, print_model_info
model = build_model()
print_model_info(model)
else:
print(f"Unknown command: {cmd}")
print("Usage: python -m rl [train|visualize|export|info]")
sys.exit(1)
main()
+36
View File
@@ -0,0 +1,36 @@
"""Training hyperparameters — edit these to tune your model."""
# ── Model architecture ──────────────────────────────────────────────
CONV_FILTERS = 32 # filters per conv layer (keep small for ESP32)
NUM_CONV_LAYERS = 3 # number of convolutional blocks
DENSE_UNITS = 64 # units in the dense layer before heads
# ── Training ────────────────────────────────────────────────────────
LEARNING_RATE = 1e-3 # Adam learning rate
BATCH_SIZE = 256 # training batch size
EPOCHS_PER_ITERATION = 4 # epochs per training iteration
REPLAY_BUFFER_SIZE = 50000 # max samples kept in replay buffer
# ── Self-play ───────────────────────────────────────────────────────
NUM_ITERATIONS = 50 # total train iterations (self-play → train cycles)
GAMES_PER_ITERATION = 100 # self-play games generated per iteration
MCTS_SIMULATIONS = 50 # MCTS simulations per move
MCTS_C_PUCT = 1.4 # exploration constant
MCTS_TEMPERATURE = 1.0 # move selection temperature (1 = proportional, →0 = greedy)
TEMP_DROP_MOVE = 10 # switch to greedy after this many moves
# ── Parallelism ────────────────────────────────────────────────────
NUM_WORKERS = 0 # 0 = use all available CPU cores
# ── Reward shaping ──────────────────────────────────────────────────
WIN_REWARD = 1.0
DRAW_REWARD = 0.0
LOSS_REWARD = -1.0
# ── Checkpointing ──────────────────────────────────────────────────
CHECKPOINT_DIR = "rl/checkpoints"
CHECKPOINT_INTERVAL = 5 # save model every N iterations
EXPORT_DIR = "rl/export"
# ── ESP32 export ────────────────────────────────────────────────────
QUANTIZE_INT8 = True # int8 quantization for TFLite (recommended for ESP32)
+86
View File
@@ -0,0 +1,86 @@
"""Export trained Keras model to TFLite (optionally int8-quantized) for ESP32."""
import os
import numpy as np
from .game import ConnectFour, ROWS, COLS
from .config import EXPORT_DIR, QUANTIZE_INT8
def representative_dataset():
"""Generate sample inputs for int8 calibration."""
game = ConnectFour()
for _ in range(200):
game.reset()
# Play random moves to get diverse board states
moves = np.random.randint(0, min(ROWS * COLS, 20))
for _ in range(moves):
legal = game.legal_moves()
if not legal or game.done:
break
game.step(np.random.choice(legal))
yield [game.get_state()[np.newaxis].astype(np.float32)]
def export_tflite(model_path, quantize=None):
"""Convert a saved Keras model to TFLite.
Args:
model_path: Path to the .keras model file.
quantize: Override quantization setting. If None, uses config.QUANTIZE_INT8.
"""
import tensorflow as tf
if quantize is None:
quantize = QUANTIZE_INT8
os.makedirs(EXPORT_DIR, exist_ok=True)
model = tf.keras.models.load_model(model_path)
converter = tf.lite.TFLiteConverter.from_keras_model(model)
if quantize:
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
suffix = "_int8"
else:
suffix = "_f32"
tflite_model = converter.convert()
out_path = os.path.join(EXPORT_DIR, f"connect4{suffix}.tflite")
with open(out_path, "wb") as f:
f.write(tflite_model)
size_kb = len(tflite_model) / 1024
print(f"Exported: {out_path} ({size_kb:.1f} KB)")
# Also export as C header for direct embedding in firmware
header_path = os.path.join(EXPORT_DIR, f"connect4_model{suffix}.h")
_write_c_header(tflite_model, header_path)
print(f"C header: {header_path}")
return out_path
def _write_c_header(model_bytes, path):
"""Write TFLite model as a C byte array for ESP32 firmware inclusion."""
with open(path, "w") as f:
f.write("#pragma once\n\n")
f.write(f"// Auto-generated — {len(model_bytes)} bytes\n")
f.write(f"const unsigned int connect4_model_len = {len(model_bytes)};\n")
f.write("alignas(16) const unsigned char connect4_model[] = {\n")
for i in range(0, len(model_bytes), 12):
chunk = model_bytes[i:i + 12]
f.write(" " + ", ".join(f"0x{b:02x}" for b in chunk) + ",\n")
f.write("};\n")
if __name__ == "__main__":
import sys
model_path = sys.argv[1] if len(sys.argv) > 1 else "rl/checkpoints/model_final.keras"
export_tflite(model_path)
+102
View File
@@ -0,0 +1,102 @@
"""Connect Four game environment for self-play training."""
import numpy as np
ROWS = 6
COLS = 7
WIN_LENGTH = 4
class ConnectFour:
"""Connect Four game with numpy board representation.
Board encoding: 0 = empty, 1 = player 1, -1 = player 2.
"""
def __init__(self):
self.reset()
def reset(self):
self.board = np.zeros((ROWS, COLS), dtype=np.int8)
self.current_player = 1
self.done = False
self.winner = 0 # 0 = no winner / draw, 1 or -1
self.move_count = 0
return self.get_state()
def get_state(self):
"""Return board from current player's perspective as (6,7,2) tensor.
Channel 0: current player's pieces (1s).
Channel 1: opponent's pieces (1s).
"""
state = np.zeros((ROWS, COLS, 2), dtype=np.float32)
state[:, :, 0] = (self.board == self.current_player).astype(np.float32)
state[:, :, 1] = (self.board == -self.current_player).astype(np.float32)
return state
def legal_moves(self):
"""Return list of columns that are not full."""
return [c for c in range(COLS) if self.board[0, c] == 0]
def legal_moves_mask(self):
"""Return binary mask of legal columns."""
return (self.board[0] == 0).astype(np.float32)
def step(self, col):
"""Play a move in the given column. Returns (state, reward, done)."""
if self.done:
raise ValueError("Game is already over.")
if col < 0 or col >= COLS or self.board[0, col] != 0:
raise ValueError(f"Illegal move: column {col}")
# Drop piece
row = self._get_drop_row(col)
self.board[row, col] = self.current_player
self.move_count += 1
# Check win
if self._check_win(row, col):
self.done = True
self.winner = self.current_player
reward = 1.0
elif self.move_count == ROWS * COLS:
self.done = True
self.winner = 0
reward = 0.0
else:
reward = 0.0
# Switch player
self.current_player *= -1
return self.get_state(), reward, self.done
def _get_drop_row(self, col):
for r in range(ROWS - 1, -1, -1):
if self.board[r, col] == 0:
return r
raise ValueError(f"Column {col} is full")
def _check_win(self, row, col):
player = self.board[row, col]
directions = [(0, 1), (1, 0), (1, 1), (1, -1)]
for dr, dc in directions:
count = 1
for sign in (1, -1):
r, c = row + sign * dr, col + sign * dc
while 0 <= r < ROWS and 0 <= c < COLS and self.board[r, c] == player:
count += 1
r += sign * dr
c += sign * dc
if count >= WIN_LENGTH:
return True
return False
def clone(self):
g = ConnectFour()
g.board = self.board.copy()
g.current_player = self.current_player
g.done = self.done
g.winner = self.winner
g.move_count = self.move_count
return g
+6
View File
@@ -0,0 +1,6 @@
def main():
print("Hello from rl!")
if __name__ == "__main__":
main()
+103
View File
@@ -0,0 +1,103 @@
"""Monte Carlo Tree Search for self-play data generation."""
import math
import numpy as np
from .game import ConnectFour
from .config import MCTS_C_PUCT
class MCTSNode:
__slots__ = ("parent", "action", "prior", "visit_count", "value_sum", "children", "game")
def __init__(self, game, parent=None, action=None, prior=0.0):
self.game = game
self.parent = parent
self.action = action
self.prior = prior
self.visit_count = 0
self.value_sum = 0.0
self.children = {}
@property
def q_value(self):
if self.visit_count == 0:
return 0.0
return self.value_sum / self.visit_count
def ucb_score(self):
parent_visits = self.parent.visit_count if self.parent else 1
exploration = MCTS_C_PUCT * self.prior * math.sqrt(parent_visits) / (1 + self.visit_count)
return self.q_value + exploration
def is_leaf(self):
return len(self.children) == 0
def expand(self, policy_probs):
"""Expand node using network policy output."""
legal = self.game.legal_moves()
for col in legal:
if col not in self.children:
self.children[col] = MCTSNode(
game=None, parent=self, action=col, prior=policy_probs[col]
)
def select_child(self):
return max(self.children.values(), key=lambda c: c.ucb_score())
def run_mcts(game, model, num_simulations):
"""Run MCTS from current game state, return visit-count policy vector."""
root = MCTSNode(game.clone())
# Evaluate root
state = root.game.get_state()
policy_logits, value = model.predict(state[np.newaxis], verbose=0)
policy = _mask_and_normalize(policy_logits[0], root.game.legal_moves_mask())
root.expand(policy)
for _ in range(num_simulations):
node = root
sim_game = root.game.clone()
# SELECT — walk down tree picking best UCB child
while not node.is_leaf() and not sim_game.done:
node = node.select_child()
sim_game.step(node.action)
# EVALUATE leaf
if sim_game.done:
# Terminal: value from perspective of player who just moved
if sim_game.winner == 0:
leaf_value = 0.0
else:
# The winner is sim_game.winner; current_player already switched
leaf_value = -1.0 # current player lost (winner was previous player)
else:
node.game = sim_game.clone()
state = sim_game.get_state()
policy_logits, value = model.predict(state[np.newaxis], verbose=0)
leaf_value = value[0, 0]
policy = _mask_and_normalize(policy_logits[0], sim_game.legal_moves_mask())
node.expand(policy)
# BACKUP — propagate value up, flipping sign each level
while node is not None:
node.visit_count += 1
node.value_sum += leaf_value
leaf_value = -leaf_value
node = node.parent
# Build policy from visit counts
visits = np.zeros(7, dtype=np.float32)
for col, child in root.children.items():
visits[col] = child.visit_count
return visits
def _mask_and_normalize(logits, mask):
"""Apply legal-move mask and softmax."""
logits = np.array(logits, dtype=np.float64)
logits[mask == 0] = -1e9
exp = np.exp(logits - np.max(logits))
probs = exp / np.sum(exp)
return probs.astype(np.float32)
+54
View File
@@ -0,0 +1,54 @@
"""Compact dual-head neural network (policy + value) sized for ESP32."""
from .config import CONV_FILTERS, NUM_CONV_LAYERS, DENSE_UNITS, LEARNING_RATE
def build_model():
"""Build a small AlphaZero-style network.
Input: (6, 7, 2) — current player pieces / opponent pieces
Output: policy (7,) — log-probabilities over columns
value (1,) — board evaluation in [-1, 1]
"""
from tensorflow import keras
from tensorflow.keras import layers
inp = layers.Input(shape=(6, 7, 2), name="board")
x = inp
for i in range(NUM_CONV_LAYERS):
x = layers.Conv2D(
CONV_FILTERS, 3, padding="same", activation="relu", name=f"conv{i}"
)(x)
x = layers.BatchNormalization(name=f"bn{i}")(x)
flat = layers.Flatten(name="flat")(x)
shared = layers.Dense(DENSE_UNITS, activation="relu", name="shared_dense")(flat)
# Policy head
policy = layers.Dense(7, name="policy_logits")(shared)
# Value head
value = layers.Dense(1, activation="tanh", name="value")(shared)
model = keras.Model(inputs=inp, outputs=[policy, value], name="connect4_net")
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE),
loss={
"policy_logits": keras.losses.CategoricalCrossentropy(from_logits=True),
"value": keras.losses.MeanSquaredError(),
},
loss_weights={"policy_logits": 1.0, "value": 1.0},
)
return model
def print_model_info(model):
model.summary()
total_params = model.count_params()
approx_size_kb = total_params * 4 / 1024 # float32
approx_int8_kb = total_params / 1024 # int8
print(f"\nTotal parameters: {total_params:,}")
print(f"Approx size (float32): {approx_size_kb:.1f} KB")
print(f"Approx size (int8): {approx_int8_kb:.1f} KB")
+143
View File
@@ -0,0 +1,143 @@
"""Self-play training loop with parallel game generation."""
import os
import numpy as np
from collections import deque
from multiprocessing import Pool, cpu_count
from .game import ConnectFour
from .model import build_model, print_model_info
from .mcts import run_mcts
from .config import (
NUM_ITERATIONS, GAMES_PER_ITERATION, MCTS_SIMULATIONS,
MCTS_TEMPERATURE, TEMP_DROP_MOVE,
WIN_REWARD, DRAW_REWARD, LOSS_REWARD,
BATCH_SIZE, EPOCHS_PER_ITERATION, REPLAY_BUFFER_SIZE,
CHECKPOINT_DIR, CHECKPOINT_INTERVAL, NUM_WORKERS,
)
# Per-worker global model (loaded once per process)
_worker_model = None
def _init_worker(weights_list):
"""Initialize a worker process with its own model copy."""
global _worker_model
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
os.environ["CUDA_VISIBLE_DEVICES"] = ""
_worker_model = build_model()
_worker_model.set_weights(weights_list)
def _play_one_game(_):
"""Play a single self-play game in a worker process."""
game = ConnectFour()
trajectory = []
while not game.done:
state = game.get_state()
visit_counts = run_mcts(game, _worker_model, MCTS_SIMULATIONS)
if game.move_count < TEMP_DROP_MOVE:
temp = MCTS_TEMPERATURE
else:
temp = 0.1
if temp < 0.2:
action = int(np.argmax(visit_counts))
policy = np.zeros(7, dtype=np.float32)
policy[action] = 1.0
else:
counts = visit_counts ** (1.0 / temp)
policy = counts / counts.sum()
action = np.random.choice(7, p=policy)
trajectory.append((state, policy, game.current_player))
game.step(action)
samples = []
for state, policy, player in trajectory:
if game.winner == 0:
value = DRAW_REWARD
elif game.winner == player:
value = WIN_REWARD
else:
value = LOSS_REWARD
samples.append((state, policy, value))
return samples
def train():
"""Main training entry point."""
model = build_model()
print_model_info(model)
num_workers = NUM_WORKERS if NUM_WORKERS > 0 else cpu_count()
print(f"Using {num_workers} worker processes for self-play")
replay_buffer = deque(maxlen=REPLAY_BUFFER_SIZE)
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
for iteration in range(1, NUM_ITERATIONS + 1):
print(f"\n{'='*60}")
print(f"Iteration {iteration}/{NUM_ITERATIONS}")
print(f"{'='*60}")
# ── Self-play (parallel) ───────────────────────────────
weights = model.get_weights()
with Pool(processes=num_workers, initializer=_init_worker, initargs=(weights,)) as pool:
results = pool.map(_play_one_game, range(GAMES_PER_ITERATION))
wins = {1: 0, -1: 0, 0: 0}
for samples in results:
replay_buffer.extend(samples)
if samples:
last_value = samples[-1][2]
if last_value == WIN_REWARD:
wins[1] += 1
elif last_value == LOSS_REWARD:
wins[-1] += 1
else:
wins[0] += 1
print(f" Self-play: {GAMES_PER_ITERATION} games "
f"(P1 wins: {wins[1]}, P2 wins: {wins[-1]}, draws: {wins[0]})")
print(f" Buffer size: {len(replay_buffer)}")
# ── Train ───────────────────────────────────────────────
if len(replay_buffer) >= BATCH_SIZE:
sample_size = min(len(replay_buffer), BATCH_SIZE * EPOCHS_PER_ITERATION)
indices = np.random.choice(len(replay_buffer), size=sample_size, replace=False)
batch = [replay_buffer[i] for i in indices]
states = np.array([s[0] for s in batch])
policies = np.array([s[1] for s in batch])
values = np.array([s[2] for s in batch]).reshape(-1, 1)
history = model.fit(
states,
{"policy_logits": policies, "value": values},
batch_size=BATCH_SIZE,
epochs=EPOCHS_PER_ITERATION,
verbose=1,
)
policy_loss = history.history["policy_logits_loss"][-1]
value_loss = history.history["value_loss"][-1]
print(f" Policy loss: {policy_loss:.4f} Value loss: {value_loss:.4f}")
# ── Checkpoint ──────────────────────────────────────────
if iteration % CHECKPOINT_INTERVAL == 0:
path = os.path.join(CHECKPOINT_DIR, f"model_iter{iteration}.keras")
model.save(path)
print(f" Saved checkpoint: {path}")
final_path = os.path.join(CHECKPOINT_DIR, "model_final.keras")
model.save(final_path)
print(f"\nTraining complete. Final model saved to {final_path}")
return model
if __name__ == "__main__":
train()
+482
View File
@@ -0,0 +1,482 @@
"""Pygame visualization of Connect Four RL training.
Left panel: live self-play game board
Right panel: loss curves + win-rate chart + training stats
"""
import os
import threading
import time
from collections import deque
import numpy as np
import pygame
from .game import ConnectFour, ROWS, COLS
from .model import build_model, print_model_info
from .mcts import run_mcts
from .config import (
NUM_ITERATIONS, GAMES_PER_ITERATION, MCTS_SIMULATIONS,
MCTS_TEMPERATURE, TEMP_DROP_MOVE,
WIN_REWARD, DRAW_REWARD, LOSS_REWARD,
BATCH_SIZE, EPOCHS_PER_ITERATION, REPLAY_BUFFER_SIZE,
CHECKPOINT_DIR, CHECKPOINT_INTERVAL, NUM_WORKERS,
)
from multiprocessing import Pool, cpu_count
# ── Layout constants ────────────────────────────────────────────────
CELL = 80
BOARD_W = COLS * CELL
BOARD_H = ROWS * CELL
PANEL_W = 420
MARGIN = 20
WIN_W = BOARD_W + PANEL_W + MARGIN * 3
WIN_H = BOARD_H + MARGIN * 2
FPS = 30
# ── Colors ──────────────────────────────────────────────────────────
BG = (30, 30, 40)
BOARD_BG = (0, 60, 180)
EMPTY = (20, 20, 30)
P1_COLOR = (255, 220, 50) # yellow
P2_COLOR = (220, 40, 40) # red
WIN_HIGHLIGHT = (100, 255, 100)
GRID_LINE = (0, 40, 140)
TEXT_COLOR = (220, 220, 220)
CHART_BG = (40, 40, 55)
POLICY_LINE = (80, 200, 255)
VALUE_LINE = (255, 160, 60)
P1_CHART = (255, 220, 50)
P2_CHART = (220, 40, 40)
DRAW_CHART = (140, 140, 140)
# ── Shared state between training thread and pygame loop ────────────
_state = {
"board": np.zeros((ROWS, COLS), dtype=np.int8),
"iteration": 0,
"game_num": 0,
"phase": "init", # init / self-play / training / done
"policy_losses": [],
"value_losses": [],
"win_history": [], # list of (p1_wins, p2_wins, draws) per iteration
"move_delay": 0.3,
"status": "Initializing...",
"winner": 0,
"running": True,
}
_lock = threading.Lock()
# ── Worker setup (same as train.py) ─────────────────────────────────
_worker_model = None
def _init_worker(weights_list):
global _worker_model
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
os.environ["CUDA_VISIBLE_DEVICES"] = ""
_worker_model = build_model()
_worker_model.set_weights(weights_list)
def _play_one_game(_):
game = ConnectFour()
trajectory = []
while not game.done:
state = game.get_state()
visit_counts = run_mcts(game, _worker_model, MCTS_SIMULATIONS)
if game.move_count < TEMP_DROP_MOVE:
temp = MCTS_TEMPERATURE
else:
temp = 0.1
if temp < 0.2:
action = int(np.argmax(visit_counts))
policy = np.zeros(7, dtype=np.float32)
policy[action] = 1.0
else:
counts = visit_counts ** (1.0 / temp)
policy = counts / counts.sum()
action = np.random.choice(7, p=policy)
trajectory.append((state, policy, game.current_player))
game.step(action)
samples = []
for state, policy, player in trajectory:
if game.winner == 0:
value = DRAW_REWARD
elif game.winner == player:
value = WIN_REWARD
else:
value = LOSS_REWARD
samples.append((state, policy, value))
return samples
def _play_showcase_game(model):
"""Play one game slowly on the main training thread, updating shared state."""
game = ConnectFour()
trajectory = []
with _lock:
_state["board"] = game.board.copy()
_state["winner"] = 0
while not game.done and _state["running"]:
state = game.get_state()
visit_counts = run_mcts(game, model, MCTS_SIMULATIONS)
if game.move_count < TEMP_DROP_MOVE:
temp = MCTS_TEMPERATURE
else:
temp = 0.1
if temp < 0.2:
action = int(np.argmax(visit_counts))
policy = np.zeros(7, dtype=np.float32)
policy[action] = 1.0
else:
counts = visit_counts ** (1.0 / temp)
policy = counts / counts.sum()
action = np.random.choice(7, p=policy)
trajectory.append((state, policy, game.current_player))
game.step(action)
with _lock:
_state["board"] = game.board.copy()
time.sleep(_state["move_delay"])
with _lock:
_state["winner"] = game.winner
samples = []
for state, policy, player in trajectory:
if game.winner == 0:
value = DRAW_REWARD
elif game.winner == player:
value = WIN_REWARD
else:
value = LOSS_REWARD
samples.append((state, policy, value))
return samples
def _training_thread():
"""Run the full training loop, pushing updates to shared state."""
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
model = build_model()
print_model_info(model)
num_workers = NUM_WORKERS if NUM_WORKERS > 0 else cpu_count()
replay_buffer = deque(maxlen=REPLAY_BUFFER_SIZE)
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
with _lock:
_state["status"] = f"Using {num_workers} workers"
for iteration in range(1, NUM_ITERATIONS + 1):
if not _state["running"]:
break
with _lock:
_state["iteration"] = iteration
_state["phase"] = "self-play"
_state["status"] = f"Iteration {iteration}/{NUM_ITERATIONS} - Self-play"
# Play one showcase game visually
with _lock:
_state["game_num"] = 0
showcase_samples = _play_showcase_game(model)
replay_buffer.extend(showcase_samples)
# Play remaining games in parallel
remaining = GAMES_PER_ITERATION - 1
if remaining > 0 and _state["running"]:
with _lock:
_state["status"] = f"Iter {iteration} - Playing {remaining} games (parallel)..."
weights = model.get_weights()
with Pool(processes=num_workers, initializer=_init_worker, initargs=(weights,)) as pool:
results = pool.map(_play_one_game, range(remaining))
for samples in results:
replay_buffer.extend(samples)
# Count wins across all games this iteration
wins = {1: 0, -1: 0, 0: 0}
# Showcase game
if showcase_samples:
last_val = showcase_samples[-1][2]
if last_val == WIN_REWARD:
wins[1] += 1
elif last_val == LOSS_REWARD:
wins[-1] += 1
else:
wins[0] += 1
# Parallel games
if remaining > 0 and _state["running"]:
for samples in results:
if samples:
last_val = samples[-1][2]
if last_val == WIN_REWARD:
wins[1] += 1
elif last_val == LOSS_REWARD:
wins[-1] += 1
else:
wins[0] += 1
with _lock:
_state["win_history"].append((wins[1], wins[-1], wins[0]))
# Train
if len(replay_buffer) >= BATCH_SIZE and _state["running"]:
with _lock:
_state["phase"] = "training"
_state["status"] = f"Iter {iteration} - Training..."
sample_size = min(len(replay_buffer), BATCH_SIZE * EPOCHS_PER_ITERATION)
indices = np.random.choice(len(replay_buffer), size=sample_size, replace=False)
batch = [replay_buffer[i] for i in indices]
states = np.array([s[0] for s in batch])
policies = np.array([s[1] for s in batch])
values = np.array([s[2] for s in batch]).reshape(-1, 1)
history = model.fit(
states,
{"policy_logits": policies, "value": values},
batch_size=BATCH_SIZE,
epochs=EPOCHS_PER_ITERATION,
verbose=0,
)
with _lock:
_state["policy_losses"].append(history.history["policy_logits_loss"][-1])
_state["value_losses"].append(history.history["value_loss"][-1])
# Checkpoint
if iteration % CHECKPOINT_INTERVAL == 0:
path = os.path.join(CHECKPOINT_DIR, f"model_iter{iteration}.keras")
model.save(path)
if _state["running"]:
final_path = os.path.join(CHECKPOINT_DIR, "model_final.keras")
model.save(final_path)
with _lock:
_state["phase"] = "done"
_state["status"] = "Training complete!"
# ── Drawing helpers ─────────────────────────────────────────────────
def _draw_board(surface, board, x0, y0):
"""Draw the Connect Four board."""
# Board background
pygame.draw.rect(surface, BOARD_BG, (x0, y0, BOARD_W, BOARD_H), border_radius=8)
for r in range(ROWS):
for c in range(COLS):
cx = x0 + c * CELL + CELL // 2
cy = y0 + r * CELL + CELL // 2
radius = CELL // 2 - 6
val = board[r, c]
if val == 1:
color = P1_COLOR
elif val == -1:
color = P2_COLOR
else:
color = EMPTY
pygame.draw.circle(surface, color, (cx, cy), radius)
pygame.draw.circle(surface, GRID_LINE, (cx, cy), radius, 2)
def _draw_chart(surface, x, y, w, h, series_list, colors, title, font):
"""Draw a simple line chart with multiple series."""
pygame.draw.rect(surface, CHART_BG, (x, y, w, h), border_radius=6)
pygame.draw.rect(surface, (60, 60, 75), (x, y, w, h), 1, border_radius=6)
# Title
title_surf = font.render(title, True, TEXT_COLOR)
surface.blit(title_surf, (x + 8, y + 4))
chart_x = x + 8
chart_y = y + 24
chart_w = w - 16
chart_h = h - 32
if not any(series_list):
return
# Find global min/max
all_vals = [v for s in series_list if s for v in s]
if not all_vals:
return
min_val = min(all_vals)
max_val = max(all_vals)
val_range = max_val - min_val if max_val != min_val else 1.0
for series, color in zip(series_list, colors):
if len(series) < 2:
continue
points = []
for i, v in enumerate(series):
px = chart_x + int(i / (len(series) - 1) * chart_w)
py = chart_y + chart_h - int((v - min_val) / val_range * chart_h)
points.append((px, py))
pygame.draw.lines(surface, color, False, points, 2)
def _draw_stacked_bar(surface, x, y, w, h, win_history, font):
"""Draw stacked bar chart of win rates."""
pygame.draw.rect(surface, CHART_BG, (x, y, w, h), border_radius=6)
pygame.draw.rect(surface, (60, 60, 75), (x, y, w, h), 1, border_radius=6)
title_surf = font.render("Win rates per iteration", True, TEXT_COLOR)
surface.blit(title_surf, (x + 8, y + 4))
if not win_history:
return
chart_x = x + 8
chart_y = y + 24
chart_w = w - 16
chart_h = h - 48
n = len(win_history)
bar_w = max(2, chart_w // max(n, 1))
for i, (p1, p2, dr) in enumerate(win_history):
total = p1 + p2 + dr
if total == 0:
continue
bx = chart_x + int(i / max(n, 1) * chart_w)
# Stack: P1 (bottom), draws (middle), P2 (top)
h1 = int(p1 / total * chart_h)
hd = int(dr / total * chart_h)
h2 = chart_h - h1 - hd
by = chart_y
pygame.draw.rect(surface, P2_CHART, (bx, by, bar_w - 1, h2))
by += h2
pygame.draw.rect(surface, DRAW_CHART, (bx, by, bar_w - 1, hd))
by += hd
pygame.draw.rect(surface, P1_CHART, (bx, by, bar_w - 1, h1))
# Legend
ly = y + h - 18
for label, color, lx in [("P1", P1_CHART, x + 8), ("Draw", DRAW_CHART, x + 70), ("P2", P2_CHART, x + 150)]:
pygame.draw.rect(surface, color, (lx, ly, 12, 12))
surface.blit(font.render(label, True, TEXT_COLOR), (lx + 16, ly - 2))
def run_visualized():
"""Launch pygame window and run training with live visualization."""
pygame.init()
screen = pygame.display.set_mode((WIN_W, WIN_H))
pygame.display.set_caption("Connect Four RL Training")
clock = pygame.time.Clock()
font = pygame.font.SysFont("monospace", 14)
font_big = pygame.font.SysFont("monospace", 18, bold=True)
# Start training in background thread
train_thread = threading.Thread(target=_training_thread, daemon=True)
train_thread.start()
running = True
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
_state["running"] = False
elif event.type == pygame.KEYDOWN:
if event.key == pygame.K_ESCAPE:
running = False
_state["running"] = False
elif event.key == pygame.K_UP:
_state["move_delay"] = max(0.05, _state["move_delay"] - 0.05)
elif event.key == pygame.K_DOWN:
_state["move_delay"] = min(2.0, _state["move_delay"] + 0.05)
screen.fill(BG)
with _lock:
board = _state["board"].copy()
iteration = _state["iteration"]
phase = _state["phase"]
status = _state["status"]
policy_losses = list(_state["policy_losses"])
value_losses = list(_state["value_losses"])
win_history = list(_state["win_history"])
winner = _state["winner"]
delay = _state["move_delay"]
# ── Left: game board ────────────────────────────────────
bx, by = MARGIN, MARGIN
_draw_board(screen, board, bx, by)
# Winner overlay
if winner != 0 and phase == "self-play":
label = f"Player {1 if winner == 1 else 2} wins!"
color = P1_COLOR if winner == 1 else P2_COLOR
win_surf = font_big.render(label, True, color)
wrect = win_surf.get_rect(center=(bx + BOARD_W // 2, by + BOARD_H + 2))
if wrect.bottom < WIN_H:
screen.blit(win_surf, wrect)
# ── Right panel ────────────────────────────────────────
px = BOARD_W + MARGIN * 2
py = MARGIN
# Status
status_surf = font_big.render(status, True, TEXT_COLOR)
screen.blit(status_surf, (px, py))
py += 28
iter_surf = font.render(f"Iteration: {iteration}/{NUM_ITERATIONS} Phase: {phase}", True, TEXT_COLOR)
screen.blit(iter_surf, (px, py))
py += 20
delay_surf = font.render(f"Move delay: {delay:.2f}s (Up/Down to adjust)", True, (150, 150, 170))
screen.blit(delay_surf, (px, py))
py += 28
# Loss chart
chart_h = 140
_draw_chart(
screen, px, py, PANEL_W, chart_h,
[policy_losses, value_losses],
[POLICY_LINE, VALUE_LINE],
"Loss (blue=policy, orange=value)",
font,
)
py += chart_h + 12
# Win rate chart
bar_h = 160
_draw_stacked_bar(screen, px, py, PANEL_W, bar_h, win_history, font)
py += bar_h + 12
# Latest stats
if policy_losses:
pl = font.render(f"Policy loss: {policy_losses[-1]:.4f}", True, POLICY_LINE)
screen.blit(pl, (px, py))
py += 18
if value_losses:
vl = font.render(f"Value loss: {value_losses[-1]:.4f}", True, VALUE_LINE)
screen.blit(vl, (px, py))
py += 18
if win_history:
p1, p2, dr = win_history[-1]
ws = font.render(f"Last iter: P1={p1} P2={p2} Draw={dr}", True, TEXT_COLOR)
screen.blit(ws, (px, py))
pygame.display.flip()
clock.tick(FPS)
pygame.quit()
_state["running"] = False
train_thread.join(timeout=5)
+130 -40
View File
@@ -13,16 +13,48 @@
#define SENSITIVITY 4
#endif
#ifndef LED_PIN
#define LED_PIN 4
#endif
#ifndef ENC_A
#define ENC_A 0
#endif
#ifndef ENC_B
#define ENC_B 1
#endif
#ifndef ENC_SW
#define ENC_SW 2
#endif
#define NUM_LEDS 64
#ifndef MAX_GAME_LOG
#define MAX_GAME_LOG 5
#endif
#ifndef DEFAULT_LOOK_AHEAD
#define DEFAULT_LOOK_AHEAD 8
#endif
#ifndef DEFAULT_BRIGHTNESS
#define DEFAULT_BRIGHTNESS 25
#endif
#ifndef DEFAULT_IDLE_TIMEOUT
#define DEFAULT_IDLE_TIMEOUT 60
#endif
#ifndef DEMO_RESET_PAUSE
#define DEMO_RESET_PAUSE 30000
#endif
#ifndef WIFI_SSID
#define WIFI_SSID "Connect4"
#endif
const int COLS = 7;
const int ROWS = 6;
const int colOrder[] = {3, 2, 4, 1, 5, 0, 6};
@@ -49,11 +81,11 @@ uint8_t demoPly[2] = {4, 4};
bool abortAi = false;
bool lastButtonState = HIGH;
uint8_t currentLookAhead = 6;
uint8_t currentBrightness = 30;
uint32_t currentIdleTimeoutMs = 60000;
uint8_t currentLookAhead = DEFAULT_LOOK_AHEAD;
uint8_t currentBrightness = DEFAULT_BRIGHTNESS;
uint32_t currentIdleTimeoutMs = DEFAULT_IDLE_TIMEOUT * 1000;
bool blunderEnabled = false;
bool progressiveDifficulty = false;
uint8_t blunderChance = 20;
uint8_t aiBrightness = 0;
bool aiFadeUp = true;
@@ -84,12 +116,12 @@ void renderBoard();
void showMenu();
int getFirstEmptyRow(int col);
bool isBoardFull();
int getDynamicPly();
int8_t scanBoard();
bool checkGameEnd();
void updateThinkingVisuals(int8_t pColor, int8_t column);
void animateDrop(int col, int player);
void moveDiscToCol(int startCol, int targetCol, int player, int speed);
int evaluateBoard(int8_t aiP, int8_t huP);
int minimax(int depth, int alpha, int beta, bool isMax, int8_t aiP, int8_t huP, int8_t rootCol);
void performAiMove(int8_t aiP);
void randomizeDemoPlies();
@@ -217,13 +249,6 @@ bool isBoardFull() {
return true;
}
int getDynamicPly() {
if (!progressiveDifficulty && gameState != DEMO) return currentLookAhead;
int count = 0;
for (int c = 0; c < COLS; c++) for (int r = 0; r < ROWS; r++) if (board[c][r] != 0) count++;
return constrain(currentLookAhead + (count / 7), 1, 10);
}
int8_t scanBoard() {
memset(winMask, 0, sizeof(winMask));
auto check = [&](int c, int r, int dc, int dr) {
@@ -241,24 +266,66 @@ int8_t scanBoard() {
return 0;
}
int evaluateBoard(int8_t aiP, int8_t huP) {
int score = 0;
int aiThreats = 0, huThreats = 0;
// Center column bonus
for (int r = 0; r < ROWS; r++) {
if (board[3][r] == aiP) score += 3;
else if (board[3][r] == huP) score -= 3;
}
// Score a window of 4 cells by piece counts
auto scoreWindow = [&](int c, int r, int dc, int dr) -> int {
int ai = 0, hu = 0, emptyC = -1, emptyR = -1;
for (int i = 0; i < 4; i++) {
int cc = c + i * dc;
int rr = r + i * dr;
int8_t v = board[cc][rr];
if (v == aiP) ai++;
else if (v == huP) hu++;
else { emptyC = cc; emptyR = rr; }
}
if (ai > 0 && hu > 0) return 0;
if (ai == 3) {
aiThreats++;
bool playable = emptyR == 0 || board[emptyC][emptyR - 1] != 0;
return playable ? 100 : 40;
}
if (ai == 2) return 5;
if (hu == 3) {
huThreats++;
bool playable = emptyR == 0 || board[emptyC][emptyR - 1] != 0;
return playable ? -100 : -40;
}
if (hu == 2) return -5;
return 0;
};
for (int r = 0; r < 6; r++) for (int c = 0; c < 4; c++) score += scoreWindow(c, r, 1, 0);
for (int r = 0; r < 3; r++) for (int c = 0; c < 7; c++) score += scoreWindow(c, r, 0, 1);
for (int r = 0; r < 3; r++) for (int c = 0; c < 4; c++) score += scoreWindow(c, r, 1, 1);
for (int r = 3; r < 6; r++) for (int c = 0; c < 4; c++) score += scoreWindow(c, r, 1, -1);
// Fork bonus: multiple threats are disproportionately dangerous
if (aiThreats >= 2) score += 200;
if (huThreats >= 2) score -= 200;
return score;
}
bool checkGameEnd() {
winnerPlayer = scanBoard();
if (winnerPlayer != 0) {
if (gameState != DEMO) logGame(winnerPlayer);
gameState = FINISHED_WIN;
bool won = winnerPlayer != 0;
bool draw = !won && isBoardFull();
if (!won && !draw) return false;
if (gameState != DEMO) logGame(won ? winnerPlayer : 0);
gameState = won ? FINISHED_WIN : FINISHED_DRAW;
demoResetTimer = millis();
lastActivityTime = millis();
return true;
}
if (isBoardFull()) {
if (gameState != DEMO) logGame(0);
gameState = FINISHED_DRAW;
demoResetTimer = millis();
lastActivityTime = millis();
return true;
}
return false;
}
// --- Animation ---
@@ -314,7 +381,7 @@ int minimax(int depth, int alpha, int beta, bool isMax, int8_t aiP, int8_t huP,
int8_t win = scanBoard();
if (win == aiP) return 1000 + depth;
if (win == huP) return -1000 - depth;
if (depth == 0 || isBoardFull()) return 0;
if (depth == 0 || isBoardFull()) return evaluateBoard(aiP, huP);
int best = isMax ? -10000 : 10000;
for (int c : colOrder) {
@@ -337,18 +404,40 @@ void performAiMove(int8_t aiP) {
int huP = (aiP == 1) ? 2 : 1;
int bestScore = -30000; int bestCol = 3;
int originalPly = currentLookAhead;
currentLookAhead = (gameState == DEMO) ? demoPly[aiP - 1] : getDynamicPly();
if (gameState == DEMO) currentLookAhead = demoPly[aiP - 1];
for (int c = 0; c < COLS; c++) {
// Phase 1a: check ALL columns for instant AI win
bool found = false;
for (int c = 0; c < COLS && !found; c++) {
int r = getFirstEmptyRow(c);
if (r != -1) {
board[c][r] = aiP; if (scanBoard() == aiP) { board[c][r]=0; bestCol=c; goto finalizeMove; }
board[c][r] = huP; if (scanBoard() == huP) { board[c][r]=0; bestCol=c; goto finalizeMove; }
board[c][r] = aiP;
if (scanBoard() == aiP) { board[c][r] = 0; bestCol = c; found = true; break; }
board[c][r] = 0;
}
}
// Phase 1b: check ALL columns for opponent block
for (int c = 0; c < COLS && !found; c++) {
int r = getFirstEmptyRow(c);
if (r != -1) {
board[c][r] = huP;
if (scanBoard() == huP) { board[c][r] = 0; bestCol = c; found = true; break; }
board[c][r] = 0;
}
}
// Phase 2: blunder — pick a random column instead of deep search
if (!found && blunderEnabled && gameState != DEMO && (random(100) < blunderChance)) {
int validCols[COLS], count = 0;
for (int c = 0; c < COLS; c++) if (getFirstEmptyRow(c) != -1) validCols[count++] = c;
bestCol = validCols[random(count)];
found = true;
}
// Phase 3: deep minimax search
if (!found) {
for (int c : colOrder) {
if (abortAi) goto finalizeMove;
if (abortAi) break;
int r = getFirstEmptyRow(c);
if (r != -1) {
board[c][r] = aiP;
@@ -357,7 +446,8 @@ void performAiMove(int8_t aiP) {
if (score > bestScore) { bestScore = score; bestCol = c; }
}
}
finalizeMove:
}
currentLookAhead = originalPly;
if (!abortAi) { moveDiscToCol(activeCol, bestCol, aiP, 80); if (!abortAi) { delay(100); animateDrop(bestCol, aiP); } }
}
@@ -377,8 +467,8 @@ void handleRoot() {
html += "Base AI Ply:<input type='number' name='ply' value='" + String(currentLookAhead) + "'>";
html += "Brightness:<input type='number' name='br' value='" + String(currentBrightness) + "'>";
html += "Idle Timeout (s):<input type='number' name='idle' value='" + String(currentIdleTimeoutMs / 1000) + "'>";
html += "Blunders: <input type='checkbox' name='blunder' " + String(blunderEnabled ? "checked" : "") + "><br>";
html += "Evolution: <input type='checkbox' name='evolve' " + String(progressiveDifficulty ? "checked" : "") + "><br><br>";
html += "Blunders: <input type='checkbox' name='blunder' " + String(blunderEnabled ? "checked" : "") + ">";
html += " Chance (%):<input type='number' name='blunderPct' min='1' max='100' value='" + String(blunderChance) + "'><br><br>";
html += "<input type='submit' value='Save Settings' style='background:#28a745;color:white;'>";
html += "</form></div>";
html += "<div class='card' style='margin-top:15px;text-align:left;'><h3 style='text-align:center;'>Game Log</h3>";
@@ -405,7 +495,7 @@ void handleSave() {
if (server.hasArg("br")) { currentBrightness = server.arg("br").toInt(); FastLED.setBrightness(currentBrightness); prefs.putUChar("br", currentBrightness); }
if (server.hasArg("idle")) { currentIdleTimeoutMs = server.arg("idle").toInt() * 1000; prefs.putUInt("idle", currentIdleTimeoutMs / 1000); }
blunderEnabled = server.hasArg("blunder"); prefs.putBool("blunder", blunderEnabled);
progressiveDifficulty = server.hasArg("evolve"); prefs.putBool("evolve", progressiveDifficulty);
if (server.hasArg("blunderPct")) { blunderChance = constrain(server.arg("blunderPct").toInt(), 1, 100); prefs.putUChar("blPct", blunderChance); }
server.sendHeader("Location", "/"); server.send(303);
}
@@ -485,7 +575,7 @@ void handleFinished() {
}
FastLED.show();
}
if (millis() - demoResetTimer > 30000) {
if (millis() - demoResetTimer > DEMO_RESET_PAUSE) {
resetBoard();
randomizeDemoPlies();
gameState = DEMO;
@@ -498,16 +588,16 @@ void handleFinished() {
void setup() {
prefs.begin("c4-game", false);
currentLookAhead = prefs.getUChar("ply", 8);
currentBrightness = prefs.getUChar("br", 25);
currentIdleTimeoutMs = prefs.getUInt("idle", 60) * 1000;
currentLookAhead = prefs.getUChar("ply", DEFAULT_LOOK_AHEAD);
currentBrightness = prefs.getUChar("br", DEFAULT_BRIGHTNESS);
currentIdleTimeoutMs = prefs.getUInt("idle", DEFAULT_IDLE_TIMEOUT) * 1000;
blunderEnabled = prefs.getBool("blunder", false);
progressiveDifficulty = prefs.getBool("evolve", false);
blunderChance = prefs.getUChar("blPct", 20);
loadGameLog();
FastLED.addLeds<WS2812B, LED_PIN, GRB>(leds, NUM_LEDS);
FastLED.setBrightness(currentBrightness);
pinMode(ENC_SW, INPUT_PULLUP);
WiFi.softAP("Connect4-Config", WIFI_PASSWORD);
WiFi.softAP(WIFI_SSID, WIFI_PASSWORD);
server.on("/", handleRoot);
server.on("/save", HTTP_POST, handleSave);
server.begin();