PyCat A small implementation of netcat in Python3.x


Intro

netcat is an all-round tool used with many applicable features

I was playing around with sockets and openssl, and decided I should build my own.

It works as expected, but my code feels a bit smelly.

Features

  • ssl
  • persistent shell
  • cd
  • exit

Example

server

$   python pycat.py -lvp 8080 --ssl [*] Incoming connection from 127.0.0.1:53391 username@hostame PyCat C:\dev\Pycat > echo hooooooi hooooooi username@hostame PyCat C:\dev\PyCat > cd ../  username@hostame PyCat C:\dev > exit 

client

python pycat.py -i localhost -p 8080 --ssl 

Code

import subprocess import tempfile import datetime import socket import ssl import sys import os import re  import argparse from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.backends import default_backend  class PyCat():     def __init__(self, host, port, listen, verbose, _ssl):         self.buffer = b""         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)         self.port = port         self.host = host if host else '0.0.0.0'         self.listen = listen         self.verbose = verbose         self.ssl = _ssl         if self.ssl:             self.context = ssl.create_default_context()             if self.listen:                 self.key_file, self.cert_file = self.generate_temp_cert()         self.main_func = self.nc_listen if self.listen else self.nc_connect         self.main()      def generate_temp_cert(self):         key, key_path = tempfile.mkstemp()         cert, cert_path = tempfile.mkstemp()         name_attributes = [             x509.NameAttribute(NameOID.COUNTRY_NAME, "OK"),             x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "OK"),             x509.NameAttribute(NameOID.LOCALITY_NAME, "OK"),             x509.NameAttribute(NameOID.ORGANIZATION_NAME, "OK"),             x509.NameAttribute(NameOID.COMMON_NAME, "PyCat")         ]          key = rsa.generate_private_key(             public_exponent=65537,             key_size=2048,             backend=default_backend()         )          with open(key_path, "wb") as f:             f.write(                 key.private_bytes(                     encoding=serialization.Encoding.PEM,                     format=serialization.PrivateFormat.TraditionalOpenSSL,                     encryption_algorithm=serialization.NoEncryption()                 )             )          subject = issuer = x509.Name(name_attributes)          cert = x509.CertificateBuilder()\                     .subject_name(subject)\                     .issuer_name(issuer)\                     .public_key(key.public_key())\                     .serial_number(x509.random_serial_number())\                     .not_valid_before(datetime.datetime.utcnow())\                     .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365))          cert = cert.sign(key, hashes.SHA256(), default_backend())         with open(cert_path, "wb") as f:             f.write(                 cert.public_bytes(serialization.Encoding.PEM)             )          return key_path, cert_path      def main(self):         self.main_func()      def exit(self):         self.socket.close()         sys.exit(0)      def read(self, socket_conn, length=1024):         data, response = "starting", b""         while data:             data = socket_conn.recv(length)             response += data             if len(data) < length:                 break         return response.decode("utf-8").rstrip("\n")      def handle_command(self, cmd):         response = b" "         cd = re.match(r'cd(?:\s+|$  )(.*)', cmd)         if cmd == "exit":             self.exit()         elif cd and cd.group(1):             try:                 os.chdir(cd.group(1))             except FileNotFoundError:                 pass         else:             response = self.exec_command(cmd)         return response      def exec_command(self, command):         try:             output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)         except Exception as e:             output = str(e).encode("utf-8")         return output      def nc_connect(self):         self.socket.connect((self.host, self.port))          if self.ssl:             self.context.check_hostname = False             self.context.verify_mode = ssl.CERT_NONE             self.socket = self.context.wrap_socket(self.socket)          while True:             cmd = self.read(self.socket)             response = self.handle_command(cmd)             self.socket.send(response)      def create_prompt_string(self, client_socket):         client_socket.send(b"cd")         pwd = self.read(client_socket)         client_socket.send(b"whoami")         whoami = self.read(client_socket)         client_socket.send(b"hostname")         hostname = self.read(client_socket)         return f"{whoami}@{hostname} PyCat {pwd}\n> "      def client_handler(self, client_socket):         while True:             prompt_string = self.create_prompt_string(client_socket)             buf = input(f"{prompt_string}")             client_socket.send(buf.encode("utf-8"))              if buf == "exit":                 self.exit()                 if self.ssl:                     os.remove(self.cert_file)                     os.remove(self.key_file)              print(self.read(client_socket))      def nc_listen(self):         self.socket.bind((self.host, self.port))         self.socket.listen(0)          if self.ssl:             self.socket = ssl.wrap_socket(                 self.socket,                 server_side=True,                  certfile=self.cert_file,                  keyfile=self.key_file             )          client_socket, addr = self.socket.accept()         if self.verbose:             ip, port = addr             print(f"[*] Incoming connection from {ip}:{port}")          self.client_handler(client_socket)  def parse_arguments():     parser = argparse.ArgumentParser(usage='%(prog)s [options]',                                      description='PyCat @Ludisposed',                                      formatter_class=argparse.RawDescriptionHelpFormatter,                                      epilog='Examples:\npython3 pycat.py -lvp 443\npython3 pycat.py -i localhost -p 443')     parser.add_argument('-l', '--listen', action="store_true", help='Listen')     parser.add_argument('-v', '--verbose', action="store_true", help='Verbose output')     parser.add_argument('-s', '--ssl', action="store_true", help='Encrypt connection')     parser.add_argument('-p', '--port', type=int, help='Port to listen on')     parser.add_argument('-i', '--ip', type=str, help='Ip to connect to')     args = parser.parse_args()      if (args.listen or args.ip) and not args.port:         parser.error('Specify which port to connect to')     elif not args.listen and not args.ip:         parser.error('Specify --listen or --ip')     return args.ip, args.port, args.listen, args.verbose, args.ssl  if __name__ == '__main__':     PyCat(*parse_arguments())