Modern ActivityPub compliant server, designed for simplicity and accessibility. Includes calendar and sharing economy features to empower your federated community. https://code.freedombone.net/bashrc/epicyon Docs: https://epicyon.net/#install
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

httpsig.py 17KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. __filename__ = "httpsig.py"
  2. __author__ = "Bob Mottram"
  3. __credits__ = ['lamia']
  4. __license__ = "AGPL3+"
  5. __version__ = "1.2.0"
  6. __maintainer__ = "Bob Mottram"
  7. __email__ = "bob@freedombone.net"
  8. __status__ = "Production"
  9. # see https://tools.ietf.org/html/draft-cavage-http-signatures-06
  10. #
  11. # This might change in future
  12. # see https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures-01
  13. from cryptography.hazmat.backends import default_backend
  14. from cryptography.hazmat.primitives.serialization import load_pem_private_key
  15. from cryptography.hazmat.primitives.serialization import load_pem_public_key
  16. from cryptography.hazmat.primitives.asymmetric import padding
  17. from cryptography.hazmat.primitives import hashes
  18. from cryptography.hazmat.primitives.asymmetric import utils as hazutils
  19. import base64
  20. from time import gmtime, strftime
  21. import datetime
  22. from utils import getFullDomain
  23. from utils import getSHA256
  24. def messageContentDigest(messageBodyJsonStr: str) -> str:
  25. msg = messageBodyJsonStr.encode('utf-8')
  26. hashResult = getSHA256(msg)
  27. return base64.b64encode(hashResult).decode('utf-8')
  28. def signPostHeaders(dateStr: str, privateKeyPem: str,
  29. nickname: str,
  30. domain: str, port: int,
  31. toDomain: str, toPort: int,
  32. path: str,
  33. httpPrefix: str,
  34. messageBodyJsonStr: str) -> str:
  35. """Returns a raw signature string that can be plugged into a header and
  36. used to verify the authenticity of an HTTP transmission.
  37. """
  38. domain = getFullDomain(domain, port)
  39. toDomain = getFullDomain(toDomain, toPort)
  40. if not dateStr:
  41. dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
  42. keyID = httpPrefix + '://' + domain + '/users/' + nickname + '#main-key'
  43. if not messageBodyJsonStr:
  44. headers = {
  45. '(request-target)': f'post {path}',
  46. 'host': toDomain,
  47. 'date': dateStr,
  48. 'content-type': 'application/json'
  49. }
  50. else:
  51. bodyDigest = messageContentDigest(messageBodyJsonStr)
  52. contentLength = len(messageBodyJsonStr)
  53. headers = {
  54. '(request-target)': f'post {path}',
  55. 'host': toDomain,
  56. 'date': dateStr,
  57. 'digest': f'SHA-256={bodyDigest}',
  58. 'content-type': 'application/activity+json',
  59. 'content-length': str(contentLength)
  60. }
  61. key = load_pem_private_key(privateKeyPem.encode('utf-8'),
  62. None, backend=default_backend())
  63. # headers.update({
  64. # '(request-target)': f'post {path}',
  65. # })
  66. # build a digest for signing
  67. signedHeaderKeys = headers.keys()
  68. signedHeaderText = ''
  69. for headerKey in signedHeaderKeys:
  70. signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
  71. signedHeaderText = signedHeaderText.strip()
  72. # signedHeaderText.encode('ascii') matches
  73. headerDigest = getSHA256(signedHeaderText.encode('ascii'))
  74. # print('headerDigest2: ' + str(headerDigest))
  75. # Sign the digest
  76. rawSignature = key.sign(headerDigest,
  77. padding.PKCS1v15(),
  78. hazutils.Prehashed(hashes.SHA256()))
  79. signature = base64.b64encode(rawSignature).decode('ascii')
  80. # Put it into a valid HTTP signature format
  81. signatureDict = {
  82. 'keyId': keyID,
  83. 'algorithm': 'rsa-sha256',
  84. 'headers': ' '.join(signedHeaderKeys),
  85. 'signature': signature
  86. }
  87. signatureHeader = ','.join(
  88. [f'{k}="{v}"' for k, v in signatureDict.items()])
  89. return signatureHeader
  90. def signPostHeadersNew(dateStr: str, privateKeyPem: str,
  91. nickname: str,
  92. domain: str, port: int,
  93. toDomain: str, toPort: int,
  94. path: str,
  95. httpPrefix: str,
  96. messageBodyJsonStr: str,
  97. algorithm: str) -> (str, str):
  98. """Returns a raw signature strings that can be plugged into a header
  99. as "Signature-Input" and "Signature"
  100. used to verify the authenticity of an HTTP transmission.
  101. See https://tools.ietf.org/html/draft-ietf-httpbis-message-signatures-01
  102. """
  103. domain = getFullDomain(domain, port)
  104. toDomain = getFullDomain(toDomain, toPort)
  105. timeFormat = "%a, %d %b %Y %H:%M:%S %Z"
  106. if not dateStr:
  107. currTime = gmtime()
  108. dateStr = strftime(timeFormat, currTime)
  109. else:
  110. currTime = datetime.datetime.strptime(dateStr, timeFormat)
  111. secondsSinceEpoch = \
  112. int((currTime - datetime.datetime(1970, 1, 1)).total_seconds())
  113. keyID = httpPrefix + '://' + domain + '/users/' + nickname + '#main-key'
  114. if not messageBodyJsonStr:
  115. headers = {
  116. '*request-target': f'post {path}',
  117. '*created': str(secondsSinceEpoch),
  118. 'host': toDomain,
  119. 'date': dateStr,
  120. 'content-type': 'application/json'
  121. }
  122. else:
  123. bodyDigest = messageContentDigest(messageBodyJsonStr)
  124. contentLength = len(messageBodyJsonStr)
  125. headers = {
  126. '*request-target': f'post {path}',
  127. '*created': str(secondsSinceEpoch),
  128. 'host': toDomain,
  129. 'date': dateStr,
  130. 'digest': f'SHA-256={bodyDigest}',
  131. 'content-type': 'application/activity+json',
  132. 'content-length': str(contentLength)
  133. }
  134. key = load_pem_private_key(privateKeyPem.encode('utf-8'),
  135. None, backend=default_backend())
  136. # build a digest for signing
  137. signedHeaderKeys = headers.keys()
  138. signedHeaderText = ''
  139. for headerKey in signedHeaderKeys:
  140. signedHeaderText += f'{headerKey}: {headers[headerKey]}\n'
  141. signedHeaderText = signedHeaderText.strip()
  142. headerDigest = getSHA256(signedHeaderText.encode('ascii'))
  143. # Sign the digest. Potentially other signing algorithms can be added here.
  144. signature = ''
  145. if algorithm == 'rsa-sha256':
  146. rawSignature = key.sign(headerDigest,
  147. padding.PKCS1v15(),
  148. hazutils.Prehashed(hashes.SHA256()))
  149. signature = base64.b64encode(rawSignature).decode('ascii')
  150. sigKey = 'sig1'
  151. # Put it into a valid HTTP signature format
  152. signatureInputDict = {
  153. 'keyId': keyID,
  154. }
  155. signatureIndexHeader = '; '.join(
  156. [f'{k}="{v}"' for k, v in signatureInputDict.items()])
  157. signatureIndexHeader += '; alg=hs2019'
  158. signatureIndexHeader += '; created=' + str(secondsSinceEpoch)
  159. signatureIndexHeader += \
  160. '; ' + sigKey + '=(' + ', '.join(signedHeaderKeys) + ')'
  161. signatureDict = {
  162. sigKey: signature
  163. }
  164. signatureHeader = '; '.join(
  165. [f'{k}=:{v}:' for k, v in signatureDict.items()])
  166. return signatureIndexHeader, signatureHeader
  167. def createSignedHeader(privateKeyPem: str, nickname: str,
  168. domain: str, port: int,
  169. toDomain: str, toPort: int,
  170. path: str, httpPrefix: str, withDigest: bool,
  171. messageBodyJsonStr: str) -> {}:
  172. """Note that the domain is the destination, not the sender
  173. """
  174. contentType = 'application/activity+json'
  175. headerDomain = getFullDomain(toDomain, toPort)
  176. dateStr = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime())
  177. if not withDigest:
  178. headers = {
  179. '(request-target)': f'post {path}',
  180. 'host': headerDomain,
  181. 'date': dateStr
  182. }
  183. signatureHeader = \
  184. signPostHeaders(dateStr, privateKeyPem, nickname,
  185. domain, port, toDomain, toPort,
  186. path, httpPrefix, None)
  187. else:
  188. bodyDigest = messageContentDigest(messageBodyJsonStr)
  189. contentLength = len(messageBodyJsonStr)
  190. headers = {
  191. '(request-target)': f'post {path}',
  192. 'host': headerDomain,
  193. 'date': dateStr,
  194. 'digest': f'SHA-256={bodyDigest}',
  195. 'content-length': str(contentLength),
  196. 'content-type': contentType
  197. }
  198. signatureHeader = \
  199. signPostHeaders(dateStr, privateKeyPem, nickname,
  200. domain, port,
  201. toDomain, toPort,
  202. path, httpPrefix, messageBodyJsonStr)
  203. headers['signature'] = signatureHeader
  204. return headers
  205. def _verifyRecentSignature(signedDateStr: str) -> bool:
  206. """Checks whether the given time taken from the header is within
  207. 12 hours of the current time
  208. """
  209. currDate = datetime.datetime.utcnow()
  210. dateFormat = "%a, %d %b %Y %H:%M:%S %Z"
  211. signedDate = datetime.datetime.strptime(signedDateStr, dateFormat)
  212. timeDiffSec = (currDate - signedDate).seconds
  213. # 12 hours tollerance
  214. if timeDiffSec > 43200:
  215. print('WARN: Header signed too long ago: ' + signedDateStr)
  216. print(str(timeDiffSec / (60 * 60)) + ' hours')
  217. return False
  218. if timeDiffSec < 0:
  219. print('WARN: Header signed in the future! ' + signedDateStr)
  220. print(str(timeDiffSec / (60 * 60)) + ' hours')
  221. return False
  222. return True
  223. def verifyPostHeaders(httpPrefix: str, publicKeyPem: str, headers: dict,
  224. path: str, GETmethod: bool,
  225. messageBodyDigest: str,
  226. messageBodyJsonStr: str, debug: bool,
  227. noRecencyCheck=False) -> bool:
  228. """Returns true or false depending on if the key that we plugged in here
  229. validates against the headers, method, and path.
  230. publicKeyPem - the public key from an rsa key pair
  231. headers - should be a dictionary of request headers
  232. path - the relative url that was requested from this site
  233. GETmethod - GET or POST
  234. messageBodyJsonStr - the received request body (used for digest)
  235. """
  236. if GETmethod:
  237. method = 'GET'
  238. else:
  239. method = 'POST'
  240. if debug:
  241. print('DEBUG: verifyPostHeaders ' + method)
  242. print('verifyPostHeaders publicKeyPem: ' + str(publicKeyPem))
  243. print('verifyPostHeaders headers: ' + str(headers))
  244. print('verifyPostHeaders messageBodyJsonStr: ' +
  245. str(messageBodyJsonStr))
  246. pubkey = load_pem_public_key(publicKeyPem.encode('utf-8'),
  247. backend=default_backend())
  248. # Build a dictionary of the signature values
  249. if headers.get('Signature-Input'):
  250. signatureHeader = headers['Signature-Input']
  251. fieldSep2 = ','
  252. # split the signature input into separate fields
  253. signatureDict = {
  254. k.strip(): v.strip()
  255. for k, v in [i.split('=', 1) for i in signatureHeader.split(';')]
  256. }
  257. requestTargetKey = None
  258. requestTargetStr = None
  259. for k, v in signatureDict.items():
  260. if v.startswith('('):
  261. requestTargetKey = k
  262. requestTargetStr = v[1:-1]
  263. break
  264. if not requestTargetKey:
  265. return False
  266. signatureDict[requestTargetKey] = requestTargetStr
  267. else:
  268. requestTargetKey = 'headers'
  269. signatureHeader = headers['signature']
  270. fieldSep2 = ' '
  271. # split the signature input into separate fields
  272. signatureDict = {
  273. k: v[1:-1]
  274. for k, v in [i.split('=', 1) for i in signatureHeader.split(',')]
  275. }
  276. # Unpack the signed headers and set values based on current headers and
  277. # body (if a digest was included)
  278. signedHeaderList = []
  279. for signedHeader in signatureDict[requestTargetKey].split(fieldSep2):
  280. signedHeader = signedHeader.strip()
  281. if debug:
  282. print('DEBUG: verifyPostHeaders signedHeader=' + signedHeader)
  283. if signedHeader == '(request-target)':
  284. # original Mastodon http signature
  285. appendStr = f'(request-target): {method.lower()} {path}'
  286. signedHeaderList.append(appendStr)
  287. elif '*request-target' in signedHeader:
  288. # https://tools.ietf.org/html/
  289. # draft-ietf-httpbis-message-signatures-01
  290. appendStr = f'*request-target: {method.lower()} {path}'
  291. # remove ()
  292. # if appendStr.startswith('('):
  293. # appendStr = appendStr.split('(')[1]
  294. # if ')' in appendStr:
  295. # appendStr = appendStr.split(')')[0]
  296. signedHeaderList.append(appendStr)
  297. elif signedHeader == 'digest':
  298. if messageBodyDigest:
  299. bodyDigest = messageBodyDigest
  300. else:
  301. bodyDigest = messageContentDigest(messageBodyJsonStr)
  302. signedHeaderList.append(f'digest: SHA-256={bodyDigest}')
  303. elif signedHeader == 'content-length':
  304. if headers.get(signedHeader):
  305. appendStr = f'content-length: {headers[signedHeader]}'
  306. signedHeaderList.append(appendStr)
  307. else:
  308. if headers.get('Content-Length'):
  309. contentLength = headers['Content-Length']
  310. signedHeaderList.append(f'content-length: {contentLength}')
  311. else:
  312. if headers.get('Content-length'):
  313. contentLength = headers['Content-length']
  314. appendStr = f'content-length: {contentLength}'
  315. signedHeaderList.append(appendStr)
  316. else:
  317. if debug:
  318. print('DEBUG: verifyPostHeaders ' + signedHeader +
  319. ' not found in ' + str(headers))
  320. else:
  321. if headers.get(signedHeader):
  322. if signedHeader == 'date' and not noRecencyCheck:
  323. if not _verifyRecentSignature(headers[signedHeader]):
  324. if debug:
  325. print('DEBUG: ' +
  326. 'verifyPostHeaders date is not recent ' +
  327. headers[signedHeader])
  328. return False
  329. signedHeaderList.append(
  330. f'{signedHeader}: {headers[signedHeader]}')
  331. else:
  332. if '-' in signedHeader:
  333. # capitalise with dashes
  334. # my-header becomes My-Header
  335. headerParts = signedHeader.split('-')
  336. signedHeaderCap = None
  337. for part in headerParts:
  338. if signedHeaderCap:
  339. signedHeaderCap += '-' + part.capitalize()
  340. else:
  341. signedHeaderCap = part.capitalize()
  342. else:
  343. # header becomes Header
  344. signedHeaderCap = signedHeader.capitalize()
  345. if debug:
  346. print('signedHeaderCap: ' + signedHeaderCap)
  347. # if this is the date header then check it is recent
  348. if signedHeaderCap == 'Date':
  349. if not _verifyRecentSignature(headers[signedHeaderCap]):
  350. if debug:
  351. print('DEBUG: ' +
  352. 'verifyPostHeaders date is not recent ' +
  353. headers[signedHeader])
  354. return False
  355. # add the capitalised header
  356. if headers.get(signedHeaderCap):
  357. signedHeaderList.append(
  358. f'{signedHeader}: {headers[signedHeaderCap]}')
  359. elif '-' in signedHeader:
  360. # my-header becomes My-header
  361. signedHeaderCap = signedHeader.capitalize()
  362. if headers.get(signedHeaderCap):
  363. signedHeaderList.append(
  364. f'{signedHeader}: {headers[signedHeaderCap]}')
  365. if debug:
  366. print('DEBUG: signedHeaderList: ' + str(signedHeaderList))
  367. # Now we have our header data digest
  368. signedHeaderText = '\n'.join(signedHeaderList)
  369. headerDigest = getSHA256(signedHeaderText.encode('ascii'))
  370. # Get the signature, verify with public key, return result
  371. signature = None
  372. if headers.get('Signature-Input') and headers.get('Signature'):
  373. # https://tools.ietf.org/html/
  374. # draft-ietf-httpbis-message-signatures-01
  375. headersSig = headers['Signature']
  376. # remove sig1=:
  377. if requestTargetKey + '=:' in headersSig:
  378. headersSig = headersSig.split(requestTargetKey + '=:')[1]
  379. headersSig = headersSig[:len(headersSig)-1]
  380. signature = base64.b64decode(headersSig)
  381. else:
  382. # Original Mastodon signature
  383. signature = base64.b64decode(signatureDict['signature'])
  384. try:
  385. pubkey.verify(
  386. signature,
  387. headerDigest,
  388. padding.PKCS1v15(),
  389. hazutils.Prehashed(hashes.SHA256()))
  390. return True
  391. except BaseException:
  392. if debug:
  393. print('DEBUG: verifyPostHeaders pkcs1_15 verify failure')
  394. return False