Agunua is a Python library for the development of Gemini clients, by Stephane Bortzmeyer - stephane+frama@bortzmeyer.org https://framagit.org/bortzmeyer
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

321 lines
12 KiB

#!/usr/bin/env python3
"""A command-line utility to retrieve a complete Gemini
<https://gemini.circumlunar.space/> capsule recursively. It can be
used, for instance, to backup an existing capsule."""
# https://framagit.org/bortzmeyer/agunua
import Agunua
import Agunua.urltinkering
import Agunua.status
import sys
import getopt
import signal
import random
import time
import re
import pathlib
import urllib.parse
import io
import zipfile
import tempfile
import os
# Defaults (but configurable)
# Programmer: if you change the defaults, change also the documentation in geminitrack.md
verbose = False
maximum_time = 30 # Seconds
maximum_files = 20
sleep_duration = 1 # Seconds
base_directory = "."
prepend_host_and_path = True
exclude = None # Regular expression
insecure = True
accept_expired_cert = False
tofu = Agunua.TOFU
patch_links = False
index = "index.gmi"
gempub = False
license = None
author = None
# Cannot be changed
metadata = "metadata.txt" # Gempub specification
def usage(msg=None):
print("Usage: %s url" % sys.argv[0], file=sys.stderr)
if msg is not None:
print(msg, file=sys.stderr)
def alarm(*_):
print("Maximum time (%i seconds) elapsed, stopping (use --maximum-time to increase it)" % maximum_time, file=sys.stderr)
sys.exit(1)
def remove_leading_slashes(s):
if s.startswith("/"):
return remove_leading_slashes(s[1:])
else:
return s
def sanitize(s):
""" Turn a path from the URL into a safe file name (removing dangerous things). """
components = urllib.parse.urlparse(s)
if components.query != "" or components.fragment != "":
if components.path.endswith(".gmi"):
s += ".gmi" # Ugly hack but this is to be sure that local
# Gemini clients get a proper media type
# (which, in remote access, is given by the
# media type).
s = remove_leading_slashes(s)
if s.endswith("/"): # Note that it fails if the link to a
# directory does not end with / and the gemtext author relied
# on redirections (/foo/bar -> /foo/bar/). No obvious solution
# besides doing two passes, one to register the redirections
# and one to patch.
return sanitize(s + index)
if s == "" or s is None:
return index
s2 = ""
for c in s:
if re.match("[\w/\-\.]", c):
s2 += c
else:
s2 += "_"
s2 = re.sub("\.\.+", "_", s2)
return s2
try:
optlist, args = getopt.getopt (sys.argv[1:], "d:e:ghi:n:prs:t:v",
["help", "verbose", "directory=",
"exclude=", "gempub",
"maximum-time=", "index-file=",
"secure", "no-tofu",
"accept-expired-certificate",
"maximum-files=", "sleep=",
"raw-directory", "patch-links",
"license=", "author="])
for option, value in optlist:
if option == "--help" or option == "-h":
usage()
sys.exit(0)
elif option == "--verbose" or option == "-v":
verbose = True
elif option == "--secure":
insecure = False
elif option == "--no-tofu":
tofu = ""
elif option == "--accept-expired-certificate":
accept_expired_cert = True
elif option == "--raw-directory" or option == "-r":
prepend_host_and_path = False
elif option == "--patch-links" or option == "-p":
patch_links = True
elif option == "--gempub" or option == "-g":
gempub = True
elif option == "--directory" or option == "-d":
base_directory = value
elif option == "--index-file" or option == "-i":
index = value
elif option == "--exclude" or option == "-e":
exclude = re.compile(value)
elif option == "--maximum-time" or option == "-t":
maximum_time = int(value)
elif option == "--maximum-files" or option == "-n":
maximum_files = int(value)
elif option == "--sleep" or option == "-s":
sleep = int(value)
elif option == "--license":
license = value
elif option == "--author":
author = value
else:
# Should never occur, it is trapped by getopt
usage("Unknown option %s" % option)
except getopt.error as reason:
usage(reason)
sys.exit(1)
if len(args) != 1:
usage()
sys.exit(1)
if gempub:
patch_links = True
else:
if license is not None or author is not None:
usage("--license and --author makes sense only for Gempubs")
sys.exit(1)
signal.signal(signal.SIGALRM, alarm)
signal.alarm(maximum_time)
url = args[0]
start_url = url
components = urllib.parse.urlparse(start_url)
if components.scheme != "gemini":
usage("%s is not a Gemini URL" % start_url)
sys.exit(1)
if prepend_host_and_path:
path = pathlib.Path(remove_leading_slashes(components.path))
directory = pathlib.Path(base_directory).joinpath(components.netloc, path)
else:
directory = pathlib.Path(base_directory)
if gempub:
pubname = components.netloc
if components.path != "/":
pubname += components.path
if pubname.endswith("/"):
pubname = pubname[:-1]
pubname = re.sub("\W", "-", pubname)
pubfile = zipfile.ZipFile("%s.gpub" % pubname, 'w')
tmpdir = tempfile.TemporaryDirectory()
os.chdir(tmpdir.name)
to_retrieve = {url: True}
retrieved = {}
filenames = {}
total_attempts = 0
total_retrieved = 0
generator = random.Random()
first = True
langtags = {}
charsets = {}
# May be we should canonicalize URLs using the canonicalize() routine
# in lupa.utils?
while total_attempts < maximum_files and len(to_retrieve) > 0:
retrievables = []
for u in to_retrieve.keys():
retrievables.append(u)
url = generator.choice(retrievables)
if verbose:
print("Retrieving %s" % url)
# "Insecure" by default. See ticket #36.
# We cannot use follow_redirect because it may go to other capsules.
g = Agunua.GeminiUri(url, insecure=insecure,
accept_expired=accept_expired_cert, tofu=tofu,
get_content=True, parse_content=True,
follow_redirect=False, maxlines=None,
maxsize=None)
retrieved[url] = True
del to_retrieve[url]
total_attempts += 1
if g.network_success:
if g.status_code == "20":
total_retrieved += 1
if g.links is not None: # It is None, for instance in non-gemtext files
for l in g.links:
if l.startswith(start_url) and (exclude is None or not exclude.search(l)):
if l not in retrieved and l not in to_retrieve:
to_retrieve[l] = True
(prefix, oldsuffix) = url.split(start_url)
suffix = sanitize(oldsuffix)
filename = directory.joinpath(suffix)
pathlib.Path.mkdir(filename.parent, parents=True, exist_ok=True)
if g.binary or not g.mediatype.startswith("text/"):
mode = "wb"
else:
mode = "w"
if g.lang is not None and g.lang != "":
if g.lang not in langtags:
langtags[g.lang] = 1
else:
langtags[g.lang] += 1
if g.charset is not None and g.charset != "":
if g.charset not in charsets:
charsets[g.charset] = 1
else:
charsets[g.charset] += 1
if filename in filenames:
continue # File writing may be done twice if there is
# the directory and index (by default
# index.gmi). Harmless for ordinary retrieval
# but triggers a warning for gempub, hence
# this test.
f = open(filename, mode)
filenames[filename] = True
if not patch_links or g.mediatype != "text/gemini":
f.write(g.payload)
else:
payload = io.StringIO(g.payload)
content = []
in_prefor = False
for line in payload.readlines():
if line[0:2] == "=>" and not in_prefor:
sline = re.sub("^\s*", "", line[2:]) # Strip leading spaces.
s = re.split("[ \t]+", sline, maxsplit=1)
if len(s) == 2:
(link, text) = s
else:
link = s[0] # Link without a text
text = ""
old_link = link
link = Agunua.urltinkering.pathmerge(str(directory), str(filename), old_link)
if link != old_link:
line = "=> %s %s" % (link, text)
elif l[0:3] == "```":
in_prefor = not in_prefor
else:
pass
content.append(line)
f.write("".join(content))
f.close()
if gempub:
pubfile.write(filename)
elif g.status_code == "30" or g.status_code == "31":
target = Agunua.uri_to_iri(Agunua.urltinkering.urlmerge(url, g.meta) )
if target.startswith(start_url) and (exclude is None or not exclude.search(target)):
if target not in retrieved and target not in to_retrieve:
to_retrieve[target] = True
else:
if verbose or first:
if g.status_code in Agunua.status.codes:
status = Agunua.status.codes[g.status_code]
elif g.status_code[0] in Agunua.status.categories:
status = "illegal status code, category \"%s\"" % \
Agunua.status.categories[g.status_code[0]]
else:
status = "completely illegal status code \"%s\"" % g.status_code
print("Wrong status code for %s: %s" % (url, status), file=sys.stderr)
else:
if verbose or first:
print("Network error retrieving %s: %s" % (url, g.error), file=sys.stderr)
time.sleep(sleep_duration)
first = False
if len(to_retrieve) > 0 and total_attempts >= maximum_files:
print("Warning, maximum number of %i files reached (use --maximum-files to increase it)" % maximum_files,
file=sys.stderr)
if gempub:
m = open(metadata, "w")
m.write("title: %s\n" % ("Download of %s" % start_url))
m.write("gpubVersion: 1.0.0\n")
m.write("index: %s/%s\n" % (directory, index))
if len(langtags) == 0:
pass
elif len(langtags) == 1:
for k in langtags.keys():
tag = k
m.write("language: %s\n" % k)
else:
print("Warning: several language tags in the capsule %s, not indicating a langtag in the gempub file" % start_url,
file=sys.stderr)
if len(charsets) == 0:
pass # Or write UTF-8, since it is the default charset?
elif len(charsets) == 1:
for k in charsets.keys():
tag = k
m.write("charset: %s\n" % k)
else:
print("Warning: several charsets in the capsule %s, not indicating a charset in the gempub file" % start_url,
file=sys.stderr)
m.write("version: geminitrack %s\n" % Agunua.VERSION)
m.write("revisionDate: %s\n" % time.strftime("%Y-%m-%d", time.gmtime(time.time())))
if license is not None:
m.write("license: %s\n" % license)
if author is not None:
m.write("author: %s\n" % author)
m.close()
pubfile.write(metadata)
pubfile.close()
del tmpdir
if total_retrieved == 0:
sys.exit(1)
else:
sys.exit(0)