What I am essentially doing is writing a snake game that everybody could play on the internet, broadcast their game if they like and so on. Now it looks like this, just websocket-based snake, no user authentication or everything. Pure and simple as possible.
Well, does my code follows best practices of coding both on server and client side? I am not sure adding extra fields to the app
object is a good idea.
# Server.py
import time
from collections import deque
from flask import Flask, render_template, session, request
from flask.ext.socketio import SocketIO, emit, join_room
from flask.ext.socketio import leave_room, close_room, rooms
from flask.ext.socketio import disconnect
async_mode = None
app = Flask(__name__)
app.config["SECRET_KEY"] = "secret"
socketio = SocketIO(app, async_mode=async_mode)
thread = None
from snake import GSnake
from snake import DIRECTIONS
from point import GPoint
b = deque()
b.appendleft(GPoint(x=0, y=0))
b.appendleft(GPoint(y=1))
b.appendleft(GPoint(y=2))
snake = GSnake(bodey=b)
tick = 1
app.paused = False
app.game_over = False
def restart_game():
b = deque()
b.appendleft(GPoint(x=0, y=0))
b.appendleft(GPoint(y=1))
b.appendleft(GPoint(y=2))
app.snake = GSnake(bodey=b)
app.paused = False
app.game_over = False
def background_thread():
count = 0
cols, rows = 8, 6
x, y = 0,0
restart_game()
while True:
socketio.sleep(tick)
if not app.paused:
# print("f")
app.snake.MakeStep()
response = {}
response["snake"] = app.snake.GetAsDict()
response["game_over"] = not app.snake.is_alive
response["apple"] = {"x":app.snake.apple.x, "y":app.snake.apple.y}
socketio.emit(
"field_change",
response,
namespace="/test"
)
if not app.snake.is_alive:
restart_game()
socketio.sleep(2)
@app.route("/")
def index():
return render_template("index.html")
@socketio.on('my event', namespace='/test')
def test_message(message):
session['receive_count'] = session.get('receive_count', 0) + 1
emit('my response',
{'data': message['data'], 'count': session['receive_count']})
@socketio.on("turn up", namespace="/test")
def turn_up(message):
app.snake.ChangeDirection(DIRECTIONS.UP)
@socketio.on("turn right", namespace="/test")
def turn_right(message):
app.snake.ChangeDirection(DIRECTIONS.RIGHT)
@socketio.on("turn left", namespace="/test")
def turn_left(message):
app.snake.ChangeDirection(DIRECTIONS.LEFT)
@socketio.on("turn down", namespace="/test")
def turn_down(message):
app.snake.ChangeDirection(DIRECTIONS.DOWN)
@socketio.on("toggle pause", namespace="/test")
def pause(message):
print("poop")
app.paused = not app.paused
@socketio.on("restart_game", namespace="/test")
def restart(message):
restart_game()
@socketio.on('my ping', namespace='/test')
def ping_pong():
emit('my pong')
@socketio.on('connect', namespace='/test')
def test_connect():
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread)
emit('my response', {'data': 'Connected', 'count': 0})
@socketio.on('disconnect', namespace='/test')
def test_disconnect():
print('Client disconnected', request.sid)
if __name__ == "__main__":
socketio.run(app, debug=True)
Client-side script. HTML contains just canvas and divs to render ping.
function DrawSnakeBody(canvas, aray) {
for (var i = 0; i < aray.length; i++) {
DrawSquare(canvas, aray[i], "rgb(50, 200, 50)");
}
}
function DrawSquare(canvas, point, colour) {
var context = canvas.getContext("2d");
context.fillStyle = colour;
context.fillRect(point.x * 50, point.y * 50, 50, 50);
}
function DrawApple(canvas, apple) {
DrawSquare(canvas, apple, "rgb(50, 50, 200)")
}
function DrawGameOver(canvas) {
var context = canvas.getContext("2d");
context.clearRect(0, 0, canvas.width, canvas.height);
context.font = "20px Arial";
context.fillText("Game Over", canvas.width / 2, canvas.height / 2);
}
function RenderEverything(snake, apple, game_over) {
var canvas = document.getElementById("snake_field");
var ccontext = canvas.getContext("2d");
ccontext.clearRect(0, 0, canvas.width, canvas.height);
DrawSnakeBody(canvas, snake);
DrawApple(canvas, apple);
if (game_over) {
DrawGameOver(canvas);
}
}
namespace = '/test';
var socket = io.connect('http://' + document.domain + ':' + location.port + namespace);
document.addEventListener("keydown", (event) => {
if (event.key === "ArrowRight") {
socket.emit("turn right", {key: event.key});
}
if (event.key === "ArrowLeft") {
socket.emit("turn left", {key: event.key});
}
if (event.key === "ArrowUp") {
socket.emit("turn up", {key: event.key});
}
if (event.key === "ArrowDown") {
socket.emit("turn down", {key: event.key});
}
if (event.key === " ") {
socket.emit("toggle pause", {key:event.key});
}
if (event.key === "R") {
console.log("Rfff");
socket.emit("restart_game", {key:event.key});
}
}, false);
document.body.onload = Main;
function Main() {
socket.on("field_change", function(msg) {
console.log(msg.game_over);
RenderEverything(msg.snake.bodey, msg.apple, msg.game_over);
});
socket.on('connect', function() {
socket.emit('my event', {data: 'I\'m connected!'});
});
socket.on('my response', function(msg) {
$('#log').append('<br>' + $('<div/>').text('Received #' + msg.count + ': ' + msg.data).html());
});
// Interval function that tests message latency by sending a "ping"
// message. The server then responds with a "pong" message and the
// round trip time is measured.
var ping_pong_times = [];
var start_time;
window.setInterval(function() {
start_time = (new Date).getTime();
socket.emit('my ping');
}, 1000);
// Handler for the "pong" message. When the pong is received, the
// time from the ping is stored, and the average of the last 30
// samples is average and displayed.
socket.on('my pong', function() {
var latency = (new Date).getTime() - start_time;
ping_pong_times.push(latency);
ping_pong_times = ping_pong_times.slice(-30); // keep last 30 samples
var sum = 0;
for (var i = 0; i < ping_pong_times.length; i++)
sum += ping_pong_times[i];
$('#ping-pong').text(Math.round(10 * sum / ping_pong_times.length) / 10);
});
}
Though snake itself is simple and not essential, I prefer to put it here.
# snake.py
from collections import deque
from random import randint
from point import GPoint
class GStruct:
pass
DIRECTIONS = GStruct()
DIRECTIONS.DOWN = GPoint(x=0, y=1)
DIRECTIONS.UP = GPoint(x=0, y=-1)
DIRECTIONS.LEFT = GPoint(x=-1, y=0)
DIRECTIONS.RIGHT = GPoint(x=1, y=0)
class GSnake(object):
"""docstring for GSnake"""
def __init__(self, bodey=deque(), max_x=8, max_y=6):
super(GSnake, self).__init__()
self._current_direction = DIRECTIONS.DOWN
self._previous_direction = DIRECTIONS.DOWN
self._bodey = bodey
self._apple = GPoint(x=randint(0, max_x), y=randint(0, max_y))
self._max_x = max_x
self._max_y = max_y
self._bite_self = False
self._GenerateApple()
@property
def head(self):
return self._bodey[0]
@property
def apple(self):
return self._apple
@property
def is_alive(self):
a = 0 <= self.head.x < self._max_x
b = 0 <= self.head.y < self._max_y
c = not self._bite_self
print(a, b, c)
return a and b and c
def MakeStep(self):
head = self._bodey[0]
new_head = head + self._current_direction
if new_head in self._bodey:
self._bite_self = True
self._bodey.appendleft(new_head)
if new_head == self.apple:
self._GenerateApple()
else:
self._bodey.pop()
self._previous_direction = self._current_direction
def ChangeDirection(self, new_direction=DIRECTIONS.UP):
dd = self._previous_direction + new_direction
if dd.x == 0 and dd.y == 0:
pass
else:
self._current_direction = new_direction
def GetAsDict(self):
res = {"bodey": []}
for point in self._bodey:
res["bodey"].append({"x": point.x, "y": point.y})
return res
def _GenerateApple(self):
apple = GPoint(x=randint(0, self._max_x - 1), y=randint(0, self._max_y - 1))
while apple in self._bodey:
apple = GPoint(x=randint(0, self._max_x - 1), y=randint(0, self._max_y - 1))
self._apple = apple