PyCat A small implementation of netcat in Python3.x


netcat is an all-round tool used with many applicable features

I was playing around with sockets and openssl, and decided I should build my own.

It works as expected, but my code feels a bit smelly.


  • ssl
  • persistent shell
  • cd
  • exit



$   python -lvp 8080 --ssl [*] Incoming connection from username@hostame PyCat C:\dev\Pycat > echo hooooooi hooooooi username@hostame PyCat C:\dev\PyCat > cd ../  username@hostame PyCat C:\dev > exit 


python -i localhost -p 8080 --ssl 


import subprocess import tempfile import datetime import socket import ssl import sys import os import re  import argparse from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.backends import default_backend  class PyCat():     def __init__(self, host, port, listen, verbose, _ssl):         self.buffer = b""         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)         self.port = port = host if host else ''         self.listen = listen         self.verbose = verbose         self.ssl = _ssl         if self.ssl:             self.context = ssl.create_default_context()             if self.listen:                 self.key_file, self.cert_file = self.generate_temp_cert()         self.main_func = self.nc_listen if self.listen else self.nc_connect         self.main()      def generate_temp_cert(self):         key, key_path = tempfile.mkstemp()         cert, cert_path = tempfile.mkstemp()         name_attributes = [             x509.NameAttribute(NameOID.COUNTRY_NAME, "OK"),             x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "OK"),             x509.NameAttribute(NameOID.LOCALITY_NAME, "OK"),             x509.NameAttribute(NameOID.ORGANIZATION_NAME, "OK"),             x509.NameAttribute(NameOID.COMMON_NAME, "PyCat")         ]          key = rsa.generate_private_key(             public_exponent=65537,             key_size=2048,             backend=default_backend()         )          with open(key_path, "wb") as f:             f.write(                 key.private_bytes(                     encoding=serialization.Encoding.PEM,                     format=serialization.PrivateFormat.TraditionalOpenSSL,                     encryption_algorithm=serialization.NoEncryption()                 )             )          subject = issuer = x509.Name(name_attributes)          cert = x509.CertificateBuilder()\                     .subject_name(subject)\                     .issuer_name(issuer)\                     .public_key(key.public_key())\                     .serial_number(x509.random_serial_number())\                     .not_valid_before(datetime.datetime.utcnow())\                     .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365))          cert = cert.sign(key, hashes.SHA256(), default_backend())         with open(cert_path, "wb") as f:             f.write(                 cert.public_bytes(serialization.Encoding.PEM)             )          return key_path, cert_path      def main(self):         self.main_func()      def exit(self):         self.socket.close()         sys.exit(0)      def read(self, socket_conn, length=1024):         data, response = "starting", b""         while data:             data = socket_conn.recv(length)             response += data             if len(data) < length:                 break         return response.decode("utf-8").rstrip("\n")      def handle_command(self, cmd):         response = b" "         cd = re.match(r'cd(?:\s+|$  )(.*)', cmd)         if cmd == "exit":             self.exit()         elif cd and             try:                 os.chdir(             except FileNotFoundError:                 pass         else:             response = self.exec_command(cmd)         return response      def exec_command(self, command):         try:             output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)         except Exception as e:             output = str(e).encode("utf-8")         return output      def nc_connect(self):         self.socket.connect((, self.port))          if self.ssl:             self.context.check_hostname = False             self.context.verify_mode = ssl.CERT_NONE             self.socket = self.context.wrap_socket(self.socket)          while True:             cmd =             response = self.handle_command(cmd)             self.socket.send(response)      def create_prompt_string(self, client_socket):         client_socket.send(b"cd")         pwd =         client_socket.send(b"whoami")         whoami =         client_socket.send(b"hostname")         hostname =         return f"{whoami}@{hostname} PyCat {pwd}\n> "      def client_handler(self, client_socket):         while True:             prompt_string = self.create_prompt_string(client_socket)             buf = input(f"{prompt_string}")             client_socket.send(buf.encode("utf-8"))              if buf == "exit":                 self.exit()                 if self.ssl:                     os.remove(self.cert_file)                     os.remove(self.key_file)              print(      def nc_listen(self):         self.socket.bind((, self.port))         self.socket.listen(0)          if self.ssl:             self.socket = ssl.wrap_socket(                 self.socket,                 server_side=True,                  certfile=self.cert_file,                  keyfile=self.key_file             )          client_socket, addr = self.socket.accept()         if self.verbose:             ip, port = addr             print(f"[*] Incoming connection from {ip}:{port}")          self.client_handler(client_socket)  def parse_arguments():     parser = argparse.ArgumentParser(usage='%(prog)s [options]',                                      description='PyCat @Ludisposed',                                      formatter_class=argparse.RawDescriptionHelpFormatter,                                      epilog='Examples:\npython3 -lvp 443\npython3 -i localhost -p 443')     parser.add_argument('-l', '--listen', action="store_true", help='Listen')     parser.add_argument('-v', '--verbose', action="store_true", help='Verbose output')     parser.add_argument('-s', '--ssl', action="store_true", help='Encrypt connection')     parser.add_argument('-p', '--port', type=int, help='Port to listen on')     parser.add_argument('-i', '--ip', type=str, help='Ip to connect to')     args = parser.parse_args()      if (args.listen or args.ip) and not args.port:         parser.error('Specify which port to connect to')     elif not args.listen and not args.ip:         parser.error('Specify --listen or --ip')     return args.ip, args.port, args.listen, args.verbose, args.ssl  if __name__ == '__main__':     PyCat(*parse_arguments())