Source code for cryptopyutils.sshkeypair

# -*- coding: utf-8 -*-
"""sshkeypair.py - generate, save and load keypairs for OpenSSH

Class:
- SSH
"""
import base64
import binascii
import os

from cryptography.hazmat.primitives import hashes

from . import files
from . import utils
from .config import Base
from .config import SSHKeyPairConfig
from .privatekey import PrivateKey
from .publickey import PublicKey


[docs]class SSHKeyPair(Base): """SSH Object - extends Base""" def __init__(self, **kwargs): """SSHKeyPair class constructor Args: config (SSHKeyPairConfig, optional): The configuration. private_key (PrivateKey, optional): The private key. An instance of PrivateKey. public_key (PublicKey, optional): The public key. An instance of PublicKey. alg (str, optional): The key algorithm. RSA, ED25519, ECDSA and DSA (legacy) are supported. Defaults to `RSA`. """ super().__init__(**kwargs) # configuration if not hasattr(self, "config"): self._config = kwargs.pop("config", SSHKeyPairConfig()) # private_key object if not hasattr(self, "private_key"): self._private_key = kwargs.pop("private_key", PrivateKey()) # public_key object if not hasattr(self, "public_key"): self._public_key = kwargs.pop("public_key", PublicKey()) # alg object if not hasattr(self, "alg"): self._alg = kwargs.pop("alg", "RSA").upper() @property def public_key(self): """Get the public_key attribute Returns: PublicKey: An instance of PublicKey. """ return self._public_key @public_key.setter def public_key(self, key): """Set the key with a pre-existing Public Key Args: key (PublicKey): An instance of PublicKey. """ self._public_key = key @property def private_key(self): """Get the private_key attribute Returns: PrivateKey : An instance of PrivateKey. """ return self._private_key @private_key.setter def private_key(self, key): """Set the key with a pre-existing private key Args: key (PrivateKey): An instance of PrivateKey. """ self._private_key = key # Private Key
[docs] def gen_private_key( self, alg="RSA", key_size=None, public_exponent=None, curve=None, ): """Generate a private key for OpenSSH Args: alg (str, optional): The key algorithm. RSA, ED25519, ECDSA and DSA (legacy) are supported. Defaults to `RSA`. key_size (int, optional): Key size. Used in RSA. Defaults to None. public_exponent (int, optional): Public Exponent. Used in RSA. Defaults to None. curve (str, optional): The name of the elliptic curve for ECDSA. Defaults to None. passphrase (str, optional): The passphrase. Defaults to None. """ # handle the algorithm if self._alg is None: alg = alg.upper() # Correct some input mistakes if alg in ["RSA", "ED25519", "DSA", "EC", "ECDSA"]: self._alg = alg else: raise Exception("SSH algorithm not supported by cryptopyutils") # ECDSA if self._alg == "ECDSA": self._alg = "EC" # DSA if self._alg == "DSA": key_size = self._config.dsa_key_size # Generate the private key self._private_key.gen(self._alg, key_size, public_exponent, curve)
[docs] def save_private_key( self, path, passphrase=None, file_mode=None, force=False, ): """Save the SSH private key Args: path (str): The file path where the private key will be saved. passphrase (str, optional): The passphrase. Defaults to None. file_mode (byte, optional): The file mode (chmod). Defaults to None. force (bool, optional): Force to replace file if already exists. Defaults to False. Returns: bool: True if successful. False if already exists and not forced to overwrite. """ # Note: SSH format requires PEM encoding. return self._private_key.save( path, "PEM", "OpenSSH", passphrase, file_mode, force, )
[docs] def load_private_key(self, path, passphrase=None): """Load a SSH Private Key Args: filepath(str): The file path of the private key to be loaded. passphrase (str, optional): The passphrase. Default to None. """ self._private_key.load(path, "OpenSSH", passphrase)
# Public Key
[docs] def gen_public_key(self): """Generate the SSH public key Assumes you have generated the private key first. """ if self._alg in ["RSA", "ED25519", "DSA", "EC"]: self._public_key.gen(self._alg, self.private_key) else: raise Exception("SSH algorithm not supported by cryptopyutils")
[docs] def load_public_key(self, path): """Load a SSH Public Key Args: path(str): The file path of the public key to be loaded. """ data = files.read(path) self.public_key.load(path, "OpenSSH")
[docs] def save_public_key( self, path, file_mode=None, force=False, comment=None, ): """Save the SSH public key Will open the file after saving it to apprend the comment if provided. Args: path (str): The file path where the public key will be saved. file_mode (byte, optional): The file mode (chmod). Defaults to None. force (bool, optional): Force to replace file if already exists. Defaults to False. comment (str, optional): comment. Typically user@host format to be appended at the end of the public key. Defaults to None. Returns: bool: True if successful. False if already exists and not forced to overwrite. """ status = self.public_key.save(path, "OpenSSH", "OpenSSH", file_mode, force) # return False if public key not saved if status is False: return False # if comment is set then open the file and append it if comment != None: # read the file data = files.read(path, istext=True) # modify the content data = "%s %s" % (data, comment) # write the file back files.write(path, data, istext=True) # return the filepath return True
[docs] def hash_fingerprint(self, path, hash_alg=None): """Get the fingerprint based on a hash function equivalent to ssh-keygen -l -f /id_rsa.pub | awk '(print $2)' Args: path (str): path to the public key file hash_alg (str, optional): The hash algorithm. Defaults to None. """ # Defaults if hash_alg is None: hash_alg = self.config.hash_alg # Read the public key data_public_key1 = files.read(path, istext=True) # Pick the hash algorithm digest = hashes.Hash(utils.hash_algorithm(alg=hash_alg)) # obtain the bytes of the public key pubk_bytes = binascii.a2b_base64(data_public_key1.split(" ")[1]) # compute the fingerprint, encode it to base 64, remove equal signs and add the hash alg digest.update(pubk_bytes) fingerp = base64.b64encode(digest.finalize()).decode() fingerp = fingerp.replace("=", "") # remove the dash in the hash algorithm hash = str(hash_alg).replace("-", "") # return the fingerpring return hash + ":" + fingerp
# Key pair
[docs] def key_pair( self, alg="RSA", out_dir=None, passphrase=None, file_mode=None, force=False, key_size=None, public_exponent=None, curve_length=521, comment=None, is_user=True, ): """Generate the SSH key pair using RSA Args: alg (str, optional): The key algorithm. RSA, ED25519, ECDSA and DSA (legacy) are supported. Defaults to `RSA`. out_dir (str, optional): The output directory path. Defaults to None. passphrase (str, optional): The passphrase. Defaults to None. file_mode (byte, optional): The file mode (chmod). Defaults to None. force (bool, optional): Force to replace file if already exists. Defaults to False. key_size (int, optional): Key size. Used in RSA. Defaults to None. public_exponent (int, optional): Public Exponent. Used in RSA. Defaults to None. curve (int): The elliptic curve length for ECDSA. Can be 256, 384 or 521. Defaults to 521. comment (str, optional): comment. Typically user@host format to be appended at the end of the public key. Defaults to None. is_user (bool, optional): Is the key a user key (True) or a system key (False). Defaults to True. Returns: [bool, bool], [str, str]: The first is the private key, the second is the public key. bool: True if successful. False if already exists and not forced to overwrite. str: Private key and public key file pathes """ # algorithm supported if alg not in ["RSA", "ED25519", "ECDSA", "DSA"]: raise Exception("SSH algorithm not supported by cryptopyutils") else: self._alg = alg # output directory if out_dir is None: if is_user: out_dir = self._config.user_dir else: out_dir = self._config.system_dir # private key files # case of elliptic curves if self._alg == "RSA": pkfp = os.path.join(out_dir, "id_rsa") # generate and save the private key self.gen_private_key("RSA", key_size, public_exponent) elif self._alg == "ED25519": pkfp = os.path.join(out_dir, "id_ed25519") self.gen_private_key("ED25519") elif self._alg == "ECDSA": # Choose the proper elliptic curve curves = { "256": "SECP256R1", "384": "SECP384R1", "521": "SECP521R1", } if str(curve_length) in curves.keys(): curve = curves[str(curve_length)] else: raise Exception("ECDSA curve not supported by cryptopyutils") pkfp = os.path.join(out_dir, "id_ecdsa") self.gen_private_key("EC", curve=curve) elif self._alg == "DSA": pkfp = os.path.join(out_dir, "id_dsa") self.gen_private_key("DSA") else: # Not supported return [None, None], [None, None] # save the private key status = self.save_private_key(pkfp, passphrase, file_mode, force) # return False if private key not saved if status is False: return [False, None], [pkfp, None] # generate the public key self.gen_public_key() # generate public key filepath pubkfp = pkfp + ".pub" # save the public key status_pub = self.save_public_key(pubkfp, file_mode, force, comment) # return False if public key not saved if status_pub is False: return [True, False], [pkfp, pubkfp] # else return return [True, True], [pkfp, pubkfp]