refactor image import and add Alpine docker image

- dynamic import of QR reader
- build docker also for arm64
This commit is contained in:
scito
2022-12-24 01:59:35 +01:00
committed by Roland Kurmann
parent 915efcf192
commit 9d052dc78a
21 changed files with 910 additions and 521 deletions
+183 -127
View File
@@ -43,31 +43,33 @@
import argparse
import base64
import fileinput
import sys
import csv
import fileinput
import importlib
import json
import cv2
from qreader import QReader
from urllib.parse import parse_qs, urlencode, urlparse, quote
from os import path, makedirs
from re import compile as rcompile
import os
import re
import sys
import urllib.parse as urlparse
import protobuf_generated_python.google_auth_pb2
verbose = False
quiet = True
# These dynamic import are below:
# import cv2
# import numpy
# from qreader import QReader
def sys_main():
main(sys.argv[1:])
def main(sys_args):
global verbose, quiet
global verbose, quiet, qreader_available
# allow to use sys.stdout with with (avoid closing)
sys.stdout.close = lambda: None
# sys.stdout.reconfigure(encoding='utf-8')
args = parse_args(sys_args)
verbose = args.verbose if args.verbose else 0
@@ -80,21 +82,21 @@ def main(sys_args):
def parse_args(sys_args):
formatter = lambda prog: argparse.HelpFormatter(prog, max_help_position=52)
arg_parser = argparse.ArgumentParser(formatter_class=formatter)
arg_parser.add_argument('infile',
help="image file containing a QR code from a Google Authenticator export or a text file "
"or - for stdin with \"otpauth-migration://...\" URLs separated by newlines. Lines "
"starting with # are ignored.")
formatter = lambda prog: argparse.RawDescriptionHelpFormatter(prog, max_help_position=52)
example_text = '''examples:
python extract_otp_secret_keys.py example_*.txt
python extract_otp_secret_keys.py - < example_export.txt
python extract_otp_secret_keys.py --csv - example_*.png | tail -n+2
python extract_otp_secret_keys.py = < example_export.png'''
arg_parser = argparse.ArgumentParser(formatter_class=formatter,
epilog=example_text)
arg_parser.add_argument('infile', help='1) file or - for stdin with "otpauth-migration://..." URLs separated by newlines, lines starting with # are ignored; or 2) image file containing a QR code or = for stdin for an image containing a QR code', nargs='+')
arg_parser.add_argument('--json', '-j', help='export json file or - for stdout', metavar=('FILE'))
arg_parser.add_argument('--csv', '-c', help='export csv file or - for stdout', metavar=('FILE'))
arg_parser.add_argument('--keepass', '-k', help='export totp/hotp csv file(s) for KeePass, - for stdout',
metavar=('FILE'))
arg_parser.add_argument('--printqr', '-p', help='print QR code(s) as text to the terminal (requires qrcode module)',
action='store_true')
arg_parser.add_argument('--saveqr', '-s',
help='save QR code(s) as images to the given folder (requires qrcode module)',
metavar=('DIR'))
arg_parser.add_argument('--keepass', '-k', help='export totp/hotp csv file(s) for KeePass, - for stdout', metavar=('FILE'))
arg_parser.add_argument('--printqr', '-p', help='print QR code(s) as text to the terminal (requires qrcode module)', action='store_true')
arg_parser.add_argument('--saveqr', '-s', help='save QR code(s) as images to the given folder (requires qrcode module)', metavar=('DIR'))
output_group = arg_parser.add_mutually_exclusive_group()
output_group.add_argument('--verbose', '-v', help='verbose output', action='count')
output_group.add_argument('--quiet', '-q', help='no stdout output, except output set by -', action='store_true')
@@ -110,112 +112,148 @@ def extract_otps(args):
otps = []
lines = get_lines_from_file(args.infile)
i = j = k = 0
if verbose: print('Input files: {}'.format(args.infile))
for infile in args.infile:
if verbose: print('Processing infile {}'.format(infile))
k += 1
for line in get_lines_from_file(infile):
if verbose: print(line)
if line.startswith('#') or line == '': continue
i += 1
payload = get_payload_from_line(line, i, infile)
i = j = 0
for line in lines:
if verbose:
print(line)
if line.startswith('#') or line == '':
continue
i += 1
payload = get_payload_from_line(line, i, args)
# pylint: disable=no-member
for raw_otp in payload.otp_parameters:
j += 1
if verbose:
print('\n{}. Secret Key'.format(j))
secret = convert_secret_from_bytes_to_base32_str(raw_otp.secret)
otp_type_enum = get_enum_name_by_number(raw_otp, 'type')
otp_type = get_otp_type_str_from_code(raw_otp.type)
otp_url = build_otp_url(secret, raw_otp)
otp = {
"name": raw_otp.name,
"secret": secret,
"issuer": raw_otp.issuer,
"type": otp_type,
"counter": raw_otp.counter if raw_otp.type == 1 else None,
"url": otp_url
}
if not quiet:
print_otp(otp)
if args.printqr:
print_qr(args, otp_url)
if args.saveqr:
save_qr(otp, args, j)
if not quiet:
print()
otps.append(otp)
# pylint: disable=no-member
for raw_otp in payload.otp_parameters:
j += 1
if verbose: print('\n{}. Secret Key'.format(j))
secret = convert_secret_from_bytes_to_base32_str(raw_otp.secret)
otp_type_enum = get_enum_name_by_number(raw_otp, 'type')
otp_type = get_otp_type_str_from_code(raw_otp.type)
otp_url = build_otp_url(secret, raw_otp)
otp = {
"name": raw_otp.name,
"secret": secret,
"issuer": raw_otp.issuer,
"type": otp_type,
"counter": raw_otp.counter if raw_otp.type == 1 else None,
"url": otp_url
}
if not quiet:
print_otp(otp)
if args.printqr:
print_qr(args, otp_url)
if args.saveqr:
save_qr(otp, args, j)
if not quiet:
print()
otps.append(otp)
if verbose: print('{} infile(s) processed'.format(k))
return otps
def get_lines_from_file(filepath):
global verbose
def get_lines_from_file(filename):
global qreader_available
# stdin stream cannot be rewinded, thus distinguish, use - for utf-8 stdin and = for binary image stdin
if filename != '=':
check_file_exists(filename)
lines = read_lines_from_text_file(filename)
if lines or filename == '-':
return lines
# Check if this is an image file
if(path.splitext(filepath)[1][1:].lower() in ('bmp', 'jpg', 'jpeg', 'png', 'tif', 'tiff')):
# It's an image file, so try to read it as a QR Code
try:
decoder = QReader()
# could not process text file, try reading as image
if filename != '-':
return convert_img_to_line(filename)
if not path.isfile(filepath):
eprint('\nERROR: Input file provided is non-existent or not a file.'
'\ninput file: {}'.format(filepath))
return []
image = cv2.imread(filepath)
if image is None:
eprint('\nERROR: Unable to open file for reading. Please ensure that you have read access to the '
'file and that the file is a valid image file.\ninput file: {}'.format(filepath))
return []
decoded_text = decoder.detect_and_decode(image=image)
if decoded_text is None:
eprint('\nERROR: Unable to read QR Code from file.\ninput file: {}'.format(filepath))
return []
return [decoded_text]
except Exception as e:
eprint('\nERROR: Encountered exception "{}".\ninput file: {}'.format(str(e), filepath))
return []
else:
# Not an image file, so assume it's a text file and proceed as usual
def read_lines_from_text_file(filename):
if verbose: print('Reading lines of {}'.format(filename))
finput = fileinput.input(filename)
try:
lines = []
finput = fileinput.input(filepath)
try:
for line in (line.strip() for line in finput):
if verbose:
print(line)
if line.startswith('#') or line == '':
continue
lines.append(line)
finally:
finput.close()
for line in (line.strip() for line in finput):
if verbose: print(line)
if is_binary(line):
abort('\nBinary input was given in stdin, please use = instead of - as infile argument for images.')
# unfortunately yield line leads to random test fails
lines.append(line)
if not lines:
eprint("WARN: {} is empty".format(filename.replace('-', 'stdin')))
return lines
except UnicodeDecodeError:
if filename == '-':
abort('\nERROR: Unable to open text file form stdin. '
'In case you want read an image file from stdin, you must use "=" instead of "-".')
else: # The file is probably an image, process below
return None
finally:
finput.close()
def get_payload_from_line(line, i, args):
def convert_img_to_line(filename):
try:
import cv2
import numpy
except Exception as e:
eprint("WARNING: No cv2 or numpy module installed. Exception: {}".format(str(e)))
return []
if verbose: print('Reading image {}'.format(filename))
try:
if filename != '=':
image = cv2.imread(filename)
else:
try:
stdin = sys.stdin.buffer.read()
except AttributeError:
# Workaround for pytest, since pytest cannot monkeypatch sys.stdin.buffer
stdin = sys.stdin.read()
if not stdin:
eprint("WARN: stdin is empty")
try:
img_array = numpy.frombuffer(stdin, dtype='uint8')
except TypeError as e:
abort('\nERROR: Cannot read binary stdin buffer. Exception: {}'.format(str(e)))
if not img_array.size:
return []
image = cv2.imdecode(img_array, cv2.IMREAD_UNCHANGED)
if image is None:
abort('\nERROR: Unable to open file for reading.\ninput file: {}'.format(filename))
# dynamic import of QReader since this module has a dependency to zbar lib and import it only when necessary
try:
from qreader import QReader
except ImportError as e:
abort('''
ERROR: Cannot import QReader module. This problem is probably due to the missing zbar shared library.
On Linux and macOS libzbar0 must be installed.
See in README.md for the installation of the libzbar0.
Exception: {}'''.format(str(e)))
decoder = QReader()
decoded_text = decoder.detect_and_decode(image=image)
if decoded_text is None:
abort('\nERROR: Unable to read QR Code from file.\ninput file: {}'.format(filename))
return [decoded_text]
except Exception as e:
abort('\nERROR: Encountered exception "{}".\ninput file: {}'.format(str(e), filename))
def get_payload_from_line(line, i, infile):
global verbose
if not line.startswith('otpauth-migration://'):
eprint(
'\nWARN: line is not a otpauth-migration:// URL\ninput file: {}\nline "{}"\nProbably a wrong file was given'.format(
args.infile, line))
parsed_url = urlparse(line)
eprint( '\nWARN: line is not a otpauth-migration:// URL\ninput file: {}\nline "{}"\nProbably a wrong file was given'.format(infile, line))
parsed_url = urlparse.urlparse(line)
if verbose > 1: print('\nDEBUG: parsed_url={}'.format(parsed_url))
try:
params = parse_qs(parsed_url.query, strict_parsing=True)
params = urlparse.parse_qs(parsed_url.query, strict_parsing=True)
except: # Not necessary for Python >= 3.11
params = []
if verbose > 1: print('\nDEBUG: querystring params={}'.format(params))
if 'data' not in params:
eprint(
'\nERROR: no data query parameter in input URL\ninput file: {}\nline "{}"\nProbably a wrong file was given'.format(
args.infile, line))
sys.exit(1)
abort('\nERROR: no data query parameter in input URL\ninput file: {}\nline "{}"\nProbably a wrong file was given'.format(infile, line))
data_base64 = params['data'][0]
if verbose > 1: print('\nDEBUG: data_base64={}'.format(data_base64))
data_base64_fixed = data_base64.replace(' ', '+')
@@ -225,9 +263,8 @@ def get_payload_from_line(line, i, args):
try:
payload.ParseFromString(data)
except:
eprint('\nERROR: Cannot decode otpauth-migration migration payload.')
eprint('data={}'.format(data_base64))
exit(1)
abort('\nERROR: Cannot decode otpauth-migration migration payload.\n'
'data={}'.format(data_base64))
if verbose:
print('\n{}. Payload Line'.format(i), payload, sep='\n')
@@ -252,8 +289,7 @@ def build_otp_url(secret, raw_otp):
url_params = {'secret': secret}
if raw_otp.type == 1: url_params['counter'] = raw_otp.counter
if raw_otp.issuer: url_params['issuer'] = raw_otp.issuer
otp_url = 'otpauth://{}/{}?'.format(get_otp_type_str_from_code(raw_otp.type), quote(raw_otp.name)) + urlencode(
url_params)
otp_url = 'otpauth://{}/{}?'.format(get_otp_type_str_from_code(raw_otp.type), urlparse.quote(raw_otp.name)) + urlparse.urlencode( url_params)
return otp_url
@@ -270,12 +306,11 @@ def print_otp(otp):
def save_qr(otp, args, j):
dir = args.saveqr
if not (path.exists(dir)): makedirs(dir, exist_ok=True)
pattern = rcompile(r'[\W_]+')
if not (os.path.exists(dir)): os.makedirs(dir, exist_ok=True)
pattern = re.compile(r'[\W_]+')
file_otp_name = pattern.sub('', otp['name'])
file_otp_issuer = pattern.sub('', otp['issuer'])
save_qr_file(args, otp['url'],
'{}/{}-{}{}.png'.format(dir, j, file_otp_name, '-' + file_otp_issuer if file_otp_issuer else ''))
save_qr_file(args, otp['url'], '{}/{}-{}{}.png'.format(dir, j, file_otp_name, '-' + file_otp_issuer if file_otp_issuer else ''))
return file_otp_issuer
@@ -330,8 +365,7 @@ def write_keepass_csv(args, otps):
count_totp_entries += 1
if has_hotp:
with open_file_or_stdout_for_csv(otp_filename_hotp) as outfile:
writer = csv.DictWriter(outfile,
["Title", "User Name", "HmacOtp-Secret-Base32", "HmacOtp-Counter", "Group"])
writer = csv.DictWriter(outfile, ["Title", "User Name", "HmacOtp-Secret-Base32", "HmacOtp-Counter", "Group"])
writer.writeheader()
for otp in otps:
if otp['type'] == 'hotp':
@@ -344,10 +378,8 @@ def write_keepass_csv(args, otps):
})
count_hotp_entries += 1
if not quiet:
if count_totp_entries > 0: print(
"Exported {} totp entries to keepass csv file {}".format(count_totp_entries, otp_filename_totp))
if count_hotp_entries > 0: print(
"Exported {} hotp entries to keepass csv file {}".format(count_hotp_entries, otp_filename_hotp))
if count_totp_entries > 0: print( "Exported {} totp entries to keepass csv file {}".format(count_totp_entries, otp_filename_totp))
if count_hotp_entries > 0: print( "Exported {} hotp entries to keepass csv file {}".format(count_hotp_entries, otp_filename_hotp))
def write_json(args, otps):
@@ -367,7 +399,7 @@ def has_otp_type(otps, otp_type):
def add_pre_suffix(file, pre_suffix):
'''filename.ext, pre -> filename.pre.ext'''
name, ext = path.splitext(file)
name, ext = os.path.splitext(file)
return name + "." + pre_suffix + (ext if ext else "")
@@ -386,10 +418,34 @@ def open_file_or_stdout_for_csv(filename):
return open(filename, "w", encoding='utf-8', newline='') if filename != '-' else sys.stdout
def check_file_exists(filename):
if filename != '-' and not os.path.isfile(filename):
abort('\nERROR: Input file provided is non-existent or not a file.'
'\ninput file: {}'.format(filename))
def is_binary(line):
try:
line.startswith('#')
return False
except (UnicodeDecodeError, AttributeError, TypeError):
return True
def check_module_available(module_name):
module_spec = importlib.util.find_spec(module_name)
return module_spec is not None
def eprint(*args, **kwargs):
'''Print to stderr.'''
print(*args, file=sys.stderr, **kwargs)
def abort(*args, **kwargs):
eprint(*args, **kwargs)
sys.exit(1)
if __name__ == '__main__':
sys_main()