This is the first python script I've written. It's used to create a local backup of git repos from BitBucket. It works, but I'd like to make it more pythonic if possible.
"""
Script to check BitBucket for all repos accessible by the supplied user and clone,
tar and gzip each one (skipping unmodified and those explicitly told to).
"""
import sys
import argparse
import getpass
import glob
import os
import requests
import tarfile
from datetime import datetime
from os import path
from pprint import pprint
from shutil import rmtree
from subprocess import call
from time import sleep
from urllib import quote
# Required to write backups to the following directory:
backup_dir = '/back/git'
# Required to keep no more than this number of old versions:
version_count_limit = 5
# Get the options we were/should have been called with:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("user", help="your bitbucket username")
parser.add_argument("-p", "--password", help="your bitbucket password; you will be prompted if this is not provided")
parser.add_argument("-s", "--skipfile", type=argparse.FileType('r'), help="the location of a file containing a list of repo names to skip")
parser.add_argument("-v", "--verbose", action='store_true', help="increase output verbosity")
args = parser.parse_args()
# Required:
user = args.user
# Find out how much to display:
verbose = False
if args.verbose is not None:
verbose = args.verbose
# Get the list of repos not to back up, if any:
skip_repos = []
if args.skipfile is not None:
skip_repos = args.skipfile.read().splitlines()
args.skipfile.close
# If we haven't been given a password, request one now:
password = args.password
if password is None:
while True:
password = getpass.getpass()
if password:
break
# Try to get a list of repos from bitbucket:
r = requests.get('https://{user}:{password}@api.bitbucket.org/1.0/user/repositories'.format(user=user, password=password))
if r.status_code != 200:
print "Failed trying to fetch repos list: {code} - {error}".format(code=r.status_code, error=r.reason)
sys.exit(-1)
# Parse the response:
try:
json = r.json()
except ValueError as e:
print "Failed to decode JSON ({code}): {error}".format(code=e.errno, error=e.strerror)
pprint(r.text)
sys.exit(-1)
# Backup each repo:
first = True
for repo in json:
# Don't back up things we've been told to skip:
name = "{owner}_{name}".format(owner=repo['owner'], name=repo['name'].replace(' ', '_'))
if name in skip_repos:
if verbose:
print "{name} in skip list; skipping".format(name=name)
continue
# Don't backup if the backup already exists:
archive = "{dir}/{name}_{modified}.tar.gz".format(
dir=backup_dir, name=name,
modified=datetime.strptime(repo['last_updated'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%s"))
if path.isfile(archive):
if verbose:
print "{archive} already exists; skipping".format(archive=archive)
continue
# We don't want to hammer bitbucket so, on every loop except the first, wait thirty seconds before starting
if first:
first = False
else:
if verbose:
print "...sleeping..."
sleep(30)
if verbose:
print "Pulling {name} and archiving to {archive}".format(name=name,archive=archive)
# Create a working directory
tmp_dir = "{dir}/{newdir}".format(dir=backup_dir, newdir=name)
try:
os.makedirs(tmp_dir)
except OSError as e:
if not path.isdir(tmp_dir):
print "Failed to create working directory ({code}): {error}".format(code=e.errno, error=e.strerror)
sys.exit(-1)
if os.listdir(tmp_dir):
print "Working directory {dir} exists but is not empty???".format(dir=tmp_dir)
sys.exit(-1)
# Clone the repo into the working directory:
url = "https://{user}:{password}@bitbucket.org/{owner}/{slug}".format(
user=quote(user), password=quote(password), owner=quote(repo['owner']), slug=quote(repo['slug']))
quiet = "--verbose" if verbose else "--quiet"
if call(['git', 'clone', quiet, url, tmp_dir]) != 0:
print "Clone failed; aborting"
sys.exit(-1)
# Tar and zip the working directory:
try:
with tarfile.open(archive, "w:gz") as tar:
tar.add(tmp_dir, arcname=name)
except OSError as e:
print "Tar {dir} failed ({code}): {error}".format(dir=tmp_dir, code=e.errno, error=e.strerror)
sys.exit(-1)
# Clean up:
try:
rmtree(tmp_dir)
except OSError as e:
print "Failed to remove working directory ({code}): {error})".format(code=e.errno, error=e.strerror)
sys.exit(-1)
# Keep no more than five backups including the one we just made, removing oldest first:
files = glob.glob("{dir}/{name}*.tar.gz".format(dir=backup_dir,name=name))
if len(files) > version_count_limit :
files.sort(key=path.getmtime)
try:
os.remove(files[0])
except OSError as e:
print "Failed to remove {file} ({code}): {error}".format(file=files[0], code=e.errno, error=e.strerror)
sys.exit(-1)
if verbose:
print "Cloned, archived, and cleaned!\n"