mirror of https://framagit.org/bortzmeyer/agunua/
Agunua is a Python library for the development of Gemini clients, by Stephane Bortzmeyer - stephane+frama@bortzmeyer.org
https://framagit.org/bortzmeyer
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
321 lines
12 KiB
321 lines
12 KiB
#!/usr/bin/env python3
|
|
|
|
"""A command-line utility to retrieve a complete Gemini
|
|
<https://gemini.circumlunar.space/> capsule recursively. It can be
|
|
used, for instance, to backup an existing capsule."""
|
|
|
|
# https://framagit.org/bortzmeyer/agunua
|
|
import Agunua
|
|
import Agunua.urltinkering
|
|
import Agunua.status
|
|
|
|
import sys
|
|
import getopt
|
|
import signal
|
|
import random
|
|
import time
|
|
import re
|
|
import pathlib
|
|
import urllib.parse
|
|
import io
|
|
import zipfile
|
|
import tempfile
|
|
import os
|
|
|
|
# Defaults (but configurable)
|
|
# Programmer: if you change the defaults, change also the documentation in geminitrack.md
|
|
verbose = False
|
|
maximum_time = 30 # Seconds
|
|
maximum_files = 20
|
|
sleep_duration = 1 # Seconds
|
|
base_directory = "."
|
|
prepend_host_and_path = True
|
|
exclude = None # Regular expression
|
|
insecure = True
|
|
accept_expired_cert = False
|
|
tofu = Agunua.TOFU
|
|
patch_links = False
|
|
index = "index.gmi"
|
|
gempub = False
|
|
license = None
|
|
author = None
|
|
|
|
# Cannot be changed
|
|
metadata = "metadata.txt" # Gempub specification
|
|
|
|
def usage(msg=None):
|
|
print("Usage: %s url" % sys.argv[0], file=sys.stderr)
|
|
if msg is not None:
|
|
print(msg, file=sys.stderr)
|
|
|
|
def alarm(*_):
|
|
print("Maximum time (%i seconds) elapsed, stopping (use --maximum-time to increase it)" % maximum_time, file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def remove_leading_slashes(s):
|
|
if s.startswith("/"):
|
|
return remove_leading_slashes(s[1:])
|
|
else:
|
|
return s
|
|
|
|
def sanitize(s):
|
|
""" Turn a path from the URL into a safe file name (removing dangerous things). """
|
|
components = urllib.parse.urlparse(s)
|
|
if components.query != "" or components.fragment != "":
|
|
if components.path.endswith(".gmi"):
|
|
s += ".gmi" # Ugly hack but this is to be sure that local
|
|
# Gemini clients get a proper media type
|
|
# (which, in remote access, is given by the
|
|
# media type).
|
|
s = remove_leading_slashes(s)
|
|
if s.endswith("/"): # Note that it fails if the link to a
|
|
# directory does not end with / and the gemtext author relied
|
|
# on redirections (/foo/bar -> /foo/bar/). No obvious solution
|
|
# besides doing two passes, one to register the redirections
|
|
# and one to patch.
|
|
return sanitize(s + index)
|
|
if s == "" or s is None:
|
|
return index
|
|
s2 = ""
|
|
for c in s:
|
|
if re.match("[\w/\-\.]", c):
|
|
s2 += c
|
|
else:
|
|
s2 += "_"
|
|
s2 = re.sub("\.\.+", "_", s2)
|
|
return s2
|
|
|
|
try:
|
|
optlist, args = getopt.getopt (sys.argv[1:], "d:e:ghi:n:prs:t:v",
|
|
["help", "verbose", "directory=",
|
|
"exclude=", "gempub",
|
|
"maximum-time=", "index-file=",
|
|
"secure", "no-tofu",
|
|
"accept-expired-certificate",
|
|
"maximum-files=", "sleep=",
|
|
"raw-directory", "patch-links",
|
|
"license=", "author="])
|
|
for option, value in optlist:
|
|
if option == "--help" or option == "-h":
|
|
usage()
|
|
sys.exit(0)
|
|
elif option == "--verbose" or option == "-v":
|
|
verbose = True
|
|
elif option == "--secure":
|
|
insecure = False
|
|
elif option == "--no-tofu":
|
|
tofu = ""
|
|
elif option == "--accept-expired-certificate":
|
|
accept_expired_cert = True
|
|
elif option == "--raw-directory" or option == "-r":
|
|
prepend_host_and_path = False
|
|
elif option == "--patch-links" or option == "-p":
|
|
patch_links = True
|
|
elif option == "--gempub" or option == "-g":
|
|
gempub = True
|
|
elif option == "--directory" or option == "-d":
|
|
base_directory = value
|
|
elif option == "--index-file" or option == "-i":
|
|
index = value
|
|
elif option == "--exclude" or option == "-e":
|
|
exclude = re.compile(value)
|
|
elif option == "--maximum-time" or option == "-t":
|
|
maximum_time = int(value)
|
|
elif option == "--maximum-files" or option == "-n":
|
|
maximum_files = int(value)
|
|
elif option == "--sleep" or option == "-s":
|
|
sleep = int(value)
|
|
elif option == "--license":
|
|
license = value
|
|
elif option == "--author":
|
|
author = value
|
|
else:
|
|
# Should never occur, it is trapped by getopt
|
|
usage("Unknown option %s" % option)
|
|
except getopt.error as reason:
|
|
usage(reason)
|
|
sys.exit(1)
|
|
if len(args) != 1:
|
|
usage()
|
|
sys.exit(1)
|
|
if gempub:
|
|
patch_links = True
|
|
else:
|
|
if license is not None or author is not None:
|
|
usage("--license and --author makes sense only for Gempubs")
|
|
sys.exit(1)
|
|
signal.signal(signal.SIGALRM, alarm)
|
|
signal.alarm(maximum_time)
|
|
|
|
url = args[0]
|
|
start_url = url
|
|
components = urllib.parse.urlparse(start_url)
|
|
if components.scheme != "gemini":
|
|
usage("%s is not a Gemini URL" % start_url)
|
|
sys.exit(1)
|
|
if prepend_host_and_path:
|
|
path = pathlib.Path(remove_leading_slashes(components.path))
|
|
directory = pathlib.Path(base_directory).joinpath(components.netloc, path)
|
|
else:
|
|
directory = pathlib.Path(base_directory)
|
|
if gempub:
|
|
pubname = components.netloc
|
|
if components.path != "/":
|
|
pubname += components.path
|
|
if pubname.endswith("/"):
|
|
pubname = pubname[:-1]
|
|
pubname = re.sub("\W", "-", pubname)
|
|
pubfile = zipfile.ZipFile("%s.gpub" % pubname, 'w')
|
|
tmpdir = tempfile.TemporaryDirectory()
|
|
os.chdir(tmpdir.name)
|
|
to_retrieve = {url: True}
|
|
retrieved = {}
|
|
filenames = {}
|
|
total_attempts = 0
|
|
total_retrieved = 0
|
|
generator = random.Random()
|
|
first = True
|
|
langtags = {}
|
|
charsets = {}
|
|
|
|
# May be we should canonicalize URLs using the canonicalize() routine
|
|
# in lupa.utils?
|
|
while total_attempts < maximum_files and len(to_retrieve) > 0:
|
|
retrievables = []
|
|
for u in to_retrieve.keys():
|
|
retrievables.append(u)
|
|
url = generator.choice(retrievables)
|
|
if verbose:
|
|
print("Retrieving %s…" % url)
|
|
# "Insecure" by default. See ticket #36.
|
|
# We cannot use follow_redirect because it may go to other capsules.
|
|
g = Agunua.GeminiUri(url, insecure=insecure,
|
|
accept_expired=accept_expired_cert, tofu=tofu,
|
|
get_content=True, parse_content=True,
|
|
follow_redirect=False, maxlines=None,
|
|
maxsize=None)
|
|
retrieved[url] = True
|
|
del to_retrieve[url]
|
|
total_attempts += 1
|
|
if g.network_success:
|
|
if g.status_code == "20":
|
|
total_retrieved += 1
|
|
if g.links is not None: # It is None, for instance in non-gemtext files
|
|
for l in g.links:
|
|
if l.startswith(start_url) and (exclude is None or not exclude.search(l)):
|
|
if l not in retrieved and l not in to_retrieve:
|
|
to_retrieve[l] = True
|
|
(prefix, oldsuffix) = url.split(start_url)
|
|
suffix = sanitize(oldsuffix)
|
|
filename = directory.joinpath(suffix)
|
|
pathlib.Path.mkdir(filename.parent, parents=True, exist_ok=True)
|
|
if g.binary or not g.mediatype.startswith("text/"):
|
|
mode = "wb"
|
|
else:
|
|
mode = "w"
|
|
if g.lang is not None and g.lang != "":
|
|
if g.lang not in langtags:
|
|
langtags[g.lang] = 1
|
|
else:
|
|
langtags[g.lang] += 1
|
|
if g.charset is not None and g.charset != "":
|
|
if g.charset not in charsets:
|
|
charsets[g.charset] = 1
|
|
else:
|
|
charsets[g.charset] += 1
|
|
if filename in filenames:
|
|
continue # File writing may be done twice if there is
|
|
# the directory and index (by default
|
|
# index.gmi). Harmless for ordinary retrieval
|
|
# but triggers a warning for gempub, hence
|
|
# this test.
|
|
f = open(filename, mode)
|
|
filenames[filename] = True
|
|
if not patch_links or g.mediatype != "text/gemini":
|
|
f.write(g.payload)
|
|
else:
|
|
payload = io.StringIO(g.payload)
|
|
content = []
|
|
in_prefor = False
|
|
for line in payload.readlines():
|
|
if line[0:2] == "=>" and not in_prefor:
|
|
sline = re.sub("^\s*", "", line[2:]) # Strip leading spaces.
|
|
s = re.split("[ \t]+", sline, maxsplit=1)
|
|
if len(s) == 2:
|
|
(link, text) = s
|
|
else:
|
|
link = s[0] # Link without a text
|
|
text = ""
|
|
old_link = link
|
|
link = Agunua.urltinkering.pathmerge(str(directory), str(filename), old_link)
|
|
if link != old_link:
|
|
line = "=> %s %s" % (link, text)
|
|
elif l[0:3] == "```":
|
|
in_prefor = not in_prefor
|
|
else:
|
|
pass
|
|
content.append(line)
|
|
f.write("".join(content))
|
|
f.close()
|
|
if gempub:
|
|
pubfile.write(filename)
|
|
elif g.status_code == "30" or g.status_code == "31":
|
|
target = Agunua.uri_to_iri(Agunua.urltinkering.urlmerge(url, g.meta) )
|
|
if target.startswith(start_url) and (exclude is None or not exclude.search(target)):
|
|
if target not in retrieved and target not in to_retrieve:
|
|
to_retrieve[target] = True
|
|
else:
|
|
if verbose or first:
|
|
if g.status_code in Agunua.status.codes:
|
|
status = Agunua.status.codes[g.status_code]
|
|
elif g.status_code[0] in Agunua.status.categories:
|
|
status = "illegal status code, category \"%s\"" % \
|
|
Agunua.status.categories[g.status_code[0]]
|
|
else:
|
|
status = "completely illegal status code \"%s\"" % g.status_code
|
|
print("Wrong status code for %s: %s" % (url, status), file=sys.stderr)
|
|
else:
|
|
if verbose or first:
|
|
print("Network error retrieving %s: %s" % (url, g.error), file=sys.stderr)
|
|
time.sleep(sleep_duration)
|
|
first = False
|
|
if len(to_retrieve) > 0 and total_attempts >= maximum_files:
|
|
print("Warning, maximum number of %i files reached (use --maximum-files to increase it)" % maximum_files,
|
|
file=sys.stderr)
|
|
if gempub:
|
|
m = open(metadata, "w")
|
|
m.write("title: %s\n" % ("Download of %s" % start_url))
|
|
m.write("gpubVersion: 1.0.0\n")
|
|
m.write("index: %s/%s\n" % (directory, index))
|
|
if len(langtags) == 0:
|
|
pass
|
|
elif len(langtags) == 1:
|
|
for k in langtags.keys():
|
|
tag = k
|
|
m.write("language: %s\n" % k)
|
|
else:
|
|
print("Warning: several language tags in the capsule %s, not indicating a langtag in the gempub file" % start_url,
|
|
file=sys.stderr)
|
|
if len(charsets) == 0:
|
|
pass # Or write UTF-8, since it is the default charset?
|
|
elif len(charsets) == 1:
|
|
for k in charsets.keys():
|
|
tag = k
|
|
m.write("charset: %s\n" % k)
|
|
else:
|
|
print("Warning: several charsets in the capsule %s, not indicating a charset in the gempub file" % start_url,
|
|
file=sys.stderr)
|
|
m.write("version: geminitrack %s\n" % Agunua.VERSION)
|
|
m.write("revisionDate: %s\n" % time.strftime("%Y-%m-%d", time.gmtime(time.time())))
|
|
if license is not None:
|
|
m.write("license: %s\n" % license)
|
|
if author is not None:
|
|
m.write("author: %s\n" % author)
|
|
m.close()
|
|
pubfile.write(metadata)
|
|
pubfile.close()
|
|
del tmpdir
|
|
if total_retrieved == 0:
|
|
sys.exit(1)
|
|
else:
|
|
sys.exit(0)
|
|
|