# RedSSH
# Copyright (C) 2018 - 2022 Red_M ( http://bitbucket.com/Red_M )
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import os
import re
import time
import socket
from . import exceptions
from . import enums
from . import clients
from . import utils
[docs]class RedSSH(object):
'''
Instances the start of an SSH connection.
Extra options are available after :func:`redssh.RedSSH.connect` is called.
:param encoding: Set the encoding to something other than the default of ``'utf8'`` when your target SSH server doesn't return UTF-8.
:type encoding: ``str``
'''
def __init__(self,encoding='utf8',terminal='vt100',known_hosts=None,ssh_host_key_verification=enums.SSHHostKeyVerify.warn,
ssh_keepalive_interval=0.0,set_flags={},method_preferences={},callbacks={},auto_terminate_tunnels=False,tcp_nodelay=False):
self.debug = False
self.client = self.pick_client()(encoding=encoding,terminal=terminal,known_hosts=known_hosts,ssh_host_key_verification=ssh_host_key_verification,
ssh_keepalive_interval=ssh_keepalive_interval,set_flags=set_flags,method_preferences=method_preferences,callbacks=callbacks,
auto_terminate_tunnels=auto_terminate_tunnels,tcp_nodelay=tcp_nodelay)
self.enums = self.client.enums
def pick_client(self,ssh_client=None,custom_ssh_clients={}):
default_client = clients.default_client
if ssh_client==None:
ssh_client = default_client
client_pool = {}
client_pool.update(clients.enabled_clients)
client_pool.update(custom_ssh_clients)
return(client_pool.get(ssh_client,clients.enabled_clients[default_client]))
[docs] def eof(self):
'''
Returns ``True`` or ``False`` when the main channel has recieved an ``EOF``.
'''
return(self.client.eof())
[docs] def methods(self, method):
'''
Returns what value was settled on during session negotiation.
'''
return(self.client.methods(method))
[docs] def setenv(self, varname, value):
'''
Set an environment variable on the channel.
:param varname: Name of environment variable to set on the remote channel.
:type varname: ``str``
:param value: Value to set ``varname`` to.
:type value: ``str``
:return: ``None``
'''
self.client.setenv(varname, value)
[docs] def connect(self,hostname,port=22,username='',password=None,
allow_agent=False,host_based=None,key_filepath=None,passphrase=None,
look_for_keys=False,sock=None,timeout=None):
'''
.. warning::
Some authentication methods are not yet supported!
:param hostname: Hostname to connect to.
:type hostname: ``str``
:param port: SSH port to connect to.
:type port: ``int``
:param username: Username to connect as to the remote server.
:type username: ``str``
:param password: Password to offer to the remote server for authentication.
:type password: ``str``
:param allow_agent: Allow the local SSH key agent to offer the keys held in it for authentication.
:type allow_agent: ``bool``
:param host_based: Allow the local SSH host keys to be used for authentication. NOT IMPLEMENTED!
:type host_based: ``bool``
:param key_filepath: Array of filenames to offer to the remote server. Can be a string for a single key.
:type key_filepath: ``array``/``str``
:param passphrase: Passphrase to decrypt any keys offered to the remote server.
:type passphrase: ``str``
:param look_for_keys: Enable offering keys in ``~/.ssh`` automatically. NOT IMPLEMENTED!
:type look_for_keys: ``bool``
:param sock: A pre-connected socket to the remote server. Useful if you have strange network requirements.
:type sock: :func:`socket.socket`
:param timeout: Timeout for the socket connection to the remote server.
:type timeout: ``float``
'''
self.client.before_connect_options()
self.client.connect(hostname,port,username,password,allow_agent,host_based,key_filepath,passphrase,look_for_keys,sock,timeout)
self.client.after_connect_options()
[docs] def read(self,block=False):
'''
Recieve data from the remote session.
Only works if the current session has made it past the login process.
:param block: Block until data is received from the remote server. ``True``
will block until data is recieved and ``False`` may return ``b''`` if no data is available from the remote server.
:type block: ``bool``
:return: ``generator`` - A generator of byte strings that has been recieved in the time given.
'''
return(self.client.read(block))
[docs] def send(self,string):
'''
Send data to the remote session.
Only works if the current session has made it past the login process.
:param string: String to send to the remote session.
:type string: ``str``
:return: ``int`` - Amount of bytes sent to remote machine.
'''
return(self.client.send(string))
[docs] def write(self,string):
'''
See :func:`redssh.RedSSH.send`
'''
return(self.send(string))
[docs] def flush(self):
'''
Flush all data on the primary channel's stdin to the remote connection.
Only works if connected, otherwise returns ``0``.
:return: ``int`` - Amount of bytes sent to remote machine.
'''
if self.client.past_login==True:
return(self.client.flush())
return(0)
[docs] def last_error(self):
'''
Get the last error from the client session.
:return: ``str``
'''
return(self.client.last_error())
[docs] def execute_command(self,command,env=None,channel=None,pty=False):
'''
Run a command. This will block as the command executes.
:param command: Command to execute.
:type command: ``str``
:param env: Environment variables to set during ``command``.
:type env: ``dict``
:param channel: Use an existing SSH channel instead of spawning a new one.
:type channel: ``redssh.RedSSH.channel``
:param pty: Request a pty for the command to be executed via.
:type pty: ``bool``
:return: ``tuple (int, str)`` - of ``(return_code, command_output)``
'''
return(self.client.execute_command(command,env=env,channel=channel,pty=pty))
[docs] def start_sftp(self):
'''
Start the SFTP client.
The client will be available at `self.sftp` and will be an instance of `redssh.sftp.RedSFTP`
:return: ``None``
'''
self.client.start_sftp()
if self.client.past_login and self.client.__check_for_attr__('sftp')==True:
self.sftp = utils.ObjectProxy(self.client,'sftp')
[docs] def start_scp(self):
'''
Start the SCP client.
If the client or server doesn't support SCP, SFTP will be started instead, this is due to SCP being deprecated.
:return: ``None``
'''
self.client.start_scp()
if self.client.past_login and self.client.__check_for_attr__('scp')==True:
self.scp = utils.ObjectProxy(self.client,'scp')
[docs] def forward_x11(self):
'''
Start forwarding an X11 display.
:return: ``None``
'''
self.client.forward_x11()
[docs] def local_tunnel(self,local_port,remote_host,remote_port,bind_addr='127.0.0.1',error_level=enums.TunnelErrorLevel.debug):
'''
Forwards a port on the remote machine the same way the ``-L`` option does for the OpenSSH client.
Providing a ``0`` for the local port will mean the OS will assign an unbound port for you.
This port number will be provided to you by this function.
:param local_port: The local port on the local machine to bind to.
:type local_port: ``int``
:param remote_host: The remote host to connect to via the remote machine.
:type remote_host: ``str``
:param remote_port: The remote host's port to connect to via the remote machine.
:type remote_port: ``int``
:param bind_addr: The bind address on this machine to bind to for the local port.
:type bind_addr: ``str``
:param error_level: The level of verbosity that errors in tunnel threads will use.
:type error_level: :class:`redssh.enums.TunnelErrorLevel`
:return: ``int`` The local port that has been bound.
'''
return(self.client.local_tunnel(local_port,remote_host,remote_port,bind_addr,error_level))
[docs] def remote_tunnel(self,local_port,remote_host,remote_port,bind_addr='127.0.0.1',error_level=enums.TunnelErrorLevel.warn):
'''
Forwards a port to the remote machine via the local machine the same way the ``-R`` option does for the OpenSSH client.
:param local_port: The local port on the remote side for clients to connect to.
:type local_port: ``int``
:param remote_host: The remote host to connect to via the local machine.
:type remote_host: ``str``
:param remote_port: The remote host's port to connect to via the local machine.
:type remote_port: ``int``
:param error_level: The level of verbosity that errors in tunnel threads will use.
:type error_level: :class:`redssh.enums.TunnelErrorLevel`
:return: ``None``
'''
return(self.client.remote_tunnel(local_port,remote_host,remote_port,bind_addr,error_level))
[docs] def dynamic_tunnel(self,local_port,bind_addr='127.0.0.1',error_level=enums.TunnelErrorLevel.warn):
'''
Opens a SOCKS proxy AKA gateway or dynamic port the same way the ``-D`` option does for the OpenSSH client.
Providing a ``0`` for the local port will mean the OS will assign an unbound port for you.
This port number will be provided to you by this function.
:param local_port: The local port on the local machine to bind to.
:type local_port: ``int``
:param bind_addr: The bind address on this machine to bind to for the local port.
:type bind_addr: ``str``
:param error_level: The level of verbosity that errors in tunnel threads will use.
:type error_level: :class:`redssh.enums.TunnelErrorLevel`
:return: ``int`` The local port that has been bound.
'''
return(self.client.dynamic_tunnel(local_port,bind_addr,error_level))
[docs] def tunnel_is_alive(self,tunnel_type,sport,rhost=None,rport=None,bind_addr='127.0.0.1'):
'''
Checks if a tunnel is alive.
Provide the same arguments to this that was given for openning the tunnel.
Examples:
`local_tunnel(9999,'localhost',8888)` would be `tunnel_is_alive(redssh.enums.TunnelType.local,9999,'localhost',8888)`
`remote_tunnel(7777,'localhost',8888)` would be `tunnel_is_alive(redssh.enums.TunnelType.remote,7777,'localhost',8888)`
`dynamic_tunnel(9999)` would be `tunnel_is_alive(redssh.enums.TunnelType.dynamic,9999)`
`dynamic_tunnel(9999,'10.0.0.1')` would be `tunnel_is_alive(redssh.enums.TunnelType.dynamic,9999,bind_addr='10.0.0.1')`
:param tunnel_type: The tunnel type to shutdown.
:type tunnel_type: :class:`redssh.enums.TunnelType`
:param sport: The bound port for local and dynamic tunnels or the local port on the remote side for remote tunnels.
:type sport: ``str``
:param rhost: The remote host for local and remote tunnels.
:type rhost: ``str``
:param rport: The remote port for local and remote tunnels.
:type rport: ``int``
:param bind_addr: The bind address used for local and dynamic tunnels.
:type bind_addr: ``str``
:return: ``bool``, if bad tunnel type provided returns ``None``
'''
return(self.client.tunnel_is_alive(tunnel_type,sport,rhost,rport,bind_addr))
[docs] def shutdown_tunnel(self,tunnel_type,sport,rhost=None,rport=None,bind_addr='127.0.0.1'):
'''
Closes an open tunnel.
Provide the same arguments to this that was given for openning the tunnel.
Examples:
`local_tunnel(9999,'localhost',8888)` would be `shutdown_tunnel(redssh.enums.TunnelType.local,9999,'localhost',8888)`
`remote_tunnel(7777,'localhost',8888)` would be `shutdown_tunnel(redssh.enums.TunnelType.remote,7777,'localhost',8888)`
`dynamic_tunnel(9999)` would be `shutdown_tunnel(redssh.enums.TunnelType.dynamic,9999)`
`dynamic_tunnel(9999,'10.0.0.1')` would be `shutdown_tunnel(redssh.enums.TunnelType.dynamic,9999,bind_addr='10.0.0.1')`
:param tunnel_type: The tunnel type to shutdown.
:type tunnel_type: :class:`redssh.enums.TunnelType`
:param sport: The bound port for local and dynamic tunnels or the local port on the remote side for remote tunnels.
:type sport: ``str``
:param rhost: The remote host for local and remote tunnels.
:type rhost: ``str``
:param rport: The remote port for local and remote tunnels.
:type rport: ``int``
:param bind_addr: The bind address used for local and dynamic tunnels.
:type bind_addr: ``str``
:return: ``None``
'''
self.client.shutdown_tunnel(tunnel_type,sport,rhost,rport,bind_addr)
[docs] def close_tunnels(self):
'''
Closes all SSH tunnels if any are open.
'''
self.client.close_tunnels()
[docs] def exit(self):
'''
Kill the current session if connected.
'''
self.client.exit()