1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8 et ts=4 sts=4 sw=4
#
# Copyright © 2022 Maxime “pep” Buquet <pep@bouah.net>
#
# See the LICENSE file for copying permissions.
"""
XEP-0454: OMEMO Media Sharing
"""
from typing import IO, Optional, Tuple
from os import urandom
from pathlib import Path
from io import BytesIO, SEEK_END
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.base import register_plugin
class InvalidURL(Exception):
"""Raised for URLs that either aren't HTTPS or already contain a fragment."""
class XEP_0454(BasePlugin):
"""
XEP-0454: OMEMO Media Sharing
"""
name = 'xep_0454'
description = 'XEP-0454: OMEMO Media Sharing'
dependencies = {'xep_0363'}
@classmethod
def encrypt(cls, input_file: Optional[IO[bytes]] = None, filename: Optional[Path] = None) -> Tuple[bytes, str]:
"""
Encrypts file as specified in XEP-0454 for use in file sharing
:param input_file: Binary file stream on the file.
:param filename: Path to the file to upload.
One of input_file or filename must be specified. If both are
passed, input_file will be used and filename ignored.
"""
if input_file is None and filename is None:
raise ValueError('Specify either filename or input_file parameter')
aes_gcm_iv = urandom(12)
aes_gcm_key = urandom(32)
aes_gcm = Cipher(
algorithms.AES(aes_gcm_key),
modes.GCM(aes_gcm_iv),
).encryptor()
if input_file is None:
input_file = open(filename, 'rb')
payload = b''
while True:
buf = input_file.read(4096)
if not buf:
break
payload += aes_gcm.update(buf)
payload += aes_gcm.finalize() + aes_gcm.tag
fragment = aes_gcm_iv.hex() + aes_gcm_key.hex()
return (payload, fragment)
@classmethod
def decrypt(cls, input_file: IO[bytes], fragment: str) -> bytes:
"""
Decrypts file-like.
:param input_file: Binary file stream on the file, containing the
tag (16 bytes) at the end.
:param fragment: 88 hex chars string composed of iv (24 chars)
+ key (64 chars).
"""
assert len(fragment) == 88
aes_gcm_iv = bytes.fromhex(fragment[:24])
aes_gcm_key = bytes.fromhex(fragment[24:])
# Find 16 bytes tag
input_file.seek(-16, SEEK_END)
tag = input_file.read()
aes_gcm = Cipher(
algorithms.AES(aes_gcm_key),
modes.GCM(aes_gcm_iv, tag),
).decryptor()
size = input_file.seek(0, SEEK_END)
input_file.seek(0)
count = size - 16
plain = b''
while count >= 0:
buf = input_file.read(4096)
count -= len(buf)
if count <= 0:
buf += input_file.read()
buf = buf[:-16]
plain += aes_gcm.update(buf)
plain += aes_gcm.finalize()
return plain
@classmethod
def format_url(cls, url: str, fragment: str) -> str:
"""Helper to format a HTTPS URL to an AESGCM URI"""
if not url.startswith('https://') or url.find('#') != -1:
raise InvalidURL
return 'aesgcm://' + url.removeprefix('https://') + '#' + fragment
async def upload_file(
self,
filename: Path,
_size: Optional[int] = None,
content_type: Optional[str] = None,
**kwargs,
) -> str:
"""
Wrapper to xep_0363 (HTTP Upload)'s upload_file method.
:param input_file: Binary file stream on the file.
:param filename: Path to the file to upload.
Same as `XEP_0454.encrypt`, one of input_file or filename must be
specified. If both are passed, input_file will be used and
filename ignored.
Other arguments passed in are passed to the actual
`XEP_0363.upload_file` call.
"""
input_file = kwargs.get('input_file')
payload, fragment = self.encrypt(input_file, filename)
# Prepare kwargs for upload_file call
filename = urandom(12).hex() # Random filename to hide user-provided path
kwargs['filename'] = filename
input_enc = BytesIO(payload)
kwargs['input_file'] = input_enc
# Size must also be overriden if provided
size = input_enc.seek(0, SEEK_END)
input_enc.seek(0)
kwargs['size'] = size
kwargs['content_type'] = content_type
url = await self.xmpp['xep_0363'].upload_file(**kwargs)
return self.format_url(url, fragment)
register_plugin(XEP_0454)
|