"""
A convenient method to execute shell commands and return their output. Note: that this method requires that the
command be completely execute before the output is returned. FOr many activities in cloudmesh this is sufficient.
"""
from __future__ import print_function
import errno
import glob
import os
import shlex
import subprocess
import sys
import zipfile
from distutils.spawn import find_executable
from pipes import quote
from sys import platform
from cloudmesh.common.console import Console
from cloudmesh.common.dotdict import dotdict
from cloudmesh.common.util import path_expand
import subprocess
from functools import partial
class Brew(object):
@classmethod
def install(cls, name):
r = Shell.brew("install", name)
print(r)
if "already satisfied: " + name in r:
print(name, "... already installed")
Console.ok(r)
elif "Successfully installed esptool":
print(name, "... installed")
Console.ok(r)
else:
print(name, "... error")
Console.error(r)
@classmethod
def version(cls, name):
r = Shell.brew("ls", "--version", "name")
print(r)
class Pip(object):
@classmethod
def install(cls, name):
r = Shell.pip("install", "esptool")
if "already satisfied: esptool" in r:
print(name, "... already installed")
Console.ok(r)
elif "Successfully installed esptool":
print(name, "... installed")
Console.ok(r)
else:
print(name, "... error")
Console.error(r)
[docs]class SubprocessError(Exception):
"""
Manages the formatting of the error and stdout.
THis command should not be directly called. Instead use SHell
"""
def __init__(self, cmd, returncode, stderr, stdout):
"""
sets the error
:param cmd: the command executed
:param returncode: the return code
:param stderr: the stderr
:param stdout: the stdout
"""
self.cmd = cmd
self.returncode = returncode
self.stderr = stderr
self.stdout = stdout
def __str__(self):
def indent(lines, amount, ch=' '):
"""
indent the lines by multiples of ch
:param lines:
:param amount:
:param ch:
:return:
"""
padding = amount * ch
return padding + ('\n' + padding).join(lines.split('\n'))
cmd = ' '.join(map(quote, self.cmd))
s = ''
s += 'Command: %s\n' % cmd
s += 'Exit code: %s\n' % self.returncode
if self.stderr:
s += 'Stderr:\n' + indent(self.stderr, 4)
if self.stdout:
s += 'Stdout:\n' + indent(self.stdout, 4)
return s
[docs]class Subprocess(object):
"""
Executes a command. This class should not be directly used, but instead you should use Shell.
"""
def __init__(self, cmd, cwd=None, stderr=subprocess.PIPE,
stdout=subprocess.PIPE, env=None):
"""
execute the given command
:param cmd: the command
:param cwd: the directory in which to execute the command
:param stderr: the pipe for stderror
:param stdout: the pipe for the stdoutput
:param env:
"""
Console.debug_msg('Running cmd: {}'.format(' '.join(map(quote, cmd))))
proc = subprocess.Popen(cmd, stderr=stderr, stdout=stdout, cwd=cwd,
env=env)
stdout, stderr = proc.communicate()
self.returncode = proc.returncode
self.stderr = stderr
self.stdout = stdout
if self.returncode != 0:
raise SubprocessError(cmd, self.returncode, self.stderr,
self.stdout)
[docs]class Shell(object):
"""
The shell class allowing us to conveniently access many operating system commands.
TODO: This works well on Linux and OSX, but has not been tested much on Windows
"""
# TODO: we have not supported cygwin for a while
cygwin_path = 'bin' # i copied fom C:\cygwin\bin
command = {
'windows': {},
'linux': {},
'darwin': {}
}
'''
TODO
how do we now define dynamically functions based on a list that we want to support
what we want is where args are multiple unlimited parameters to the function
def f(args...):
name = get the name from f
a = list of args...
cls.execute(cmd, arguments=a, capture=True, verbose=False)
commands = ['ps', 'ls', ..... ]
for c in commands:
generate this command and add to this class dynamically
or do something more simple
ls = cls.execute('cmd', args...)
'''
@classmethod
def terminal(cls, command='pwd'):
if platform == 'darwin':
os.system(
"osascript -e 'tell application \"Terminal\" to do script \"{command}\"'".format(
**locals())
)
elif platform == "linus": # for ubuntu running gnome
os.system('gnome-terminal --command="{command}"'.format(**locals()))
else:
raise NotImplementedError
@classmethod
def live(cls, command, cwd=None):
if cwd is None:
cwd = os.getcwd()
process = subprocess.Popen(shlex.split(command), cwd=cwd,
stdout=subprocess.PIPE)
result = b''
while True:
output = process.stdout.read(1)
if output == b'' and process.poll() is not None:
break
if output:
result = result + output
sys.stdout.write(output.decode("utf-8"))
sys.stdout.flush()
rc = process.poll()
data = dotdict({
"status": rc,
"text": output.decode("utf-8")
})
return data
[docs] @classmethod
def get_python(cls):
"""
returns the python and pip version
:return: python version, pip version
"""
python_version = sys.version_info[:3]
v_string = [str(i) for i in python_version]
python_version_s = '.'.join(v_string)
# pip_version = pip.__version__
pip_version = Shell.pip("--version").split()[1]
return python_version_s, pip_version
[docs] @classmethod
def check_python(cls):
"""
checks if the python version is supported
:return: True if it is supported
"""
python_version = sys.version_info[:3]
v_string = [str(i) for i in python_version]
if python_version[0] == 2:
python_version_s = '.'.join(v_string)
if (python_version[0] == 2) and (python_version[1] >= 7) and (
python_version[2] >= 9):
print(
"You are running a supported version of python: {:}".format(
python_version_s))
else:
print(
"WARNING: You are running an unsupported version of python: {:}".format(
python_version_s))
print(" We recommend you update your python")
elif python_version[0] == 3:
python_version_s = '.'.join(v_string)
if (python_version[0] == 3) and (python_version[1] >= 7) and (
python_version[2] >= 0):
print(
"You are running a supported version of python: {:}".format(
python_version_s))
else:
print(
"WARNING: You are running an unsupported version of python: {:}".format(
python_version_s))
print(" We recommend you update your python")
# pip_version = pip.__version__
python_version, pip_version = cls.get_python()
if int(pip_version.split(".")[0]) >= 18:
print("You are running a supported version of pip: " + str(
pip_version))
else:
print("WARNING: You are running an old version of pip: " + str(
pip_version))
print(" We recommend you update your pip with \n")
print(" pip install -U pip\n")
[docs] @classmethod
def check_output(cls, *args, **kwargs):
"""Thin wrapper around :func:`subprocess.check_output`
"""
return subprocess.check_output(*args, **kwargs)
[docs] @classmethod
def ls(cls, *args):
"""
executes ls with the given arguments
:param args:
:return:
"""
return cls.execute('ls', args)
[docs] @classmethod
def ps(cls, *args):
"""
executes ps with the given arguments
:param args:
:return:
"""
return cls.execute('ps', args)
[docs] @classmethod
def bash(cls, *args):
"""
executes bash with the given arguments
:param args:
:return:
"""
return cls.execute('bash', args)
[docs] @classmethod
def brew(cls, *args):
"""
executes bash with the given arguments
:param args:
:return:
"""
return cls.execute('brew', args)
[docs] @classmethod
def cat(cls, *args):
"""
executes cat with the given arguments
:param args:
:return:
"""
return cls.execute('cat', args)
[docs] @classmethod
def git(cls, *args):
"""
executes git with the given arguments
:param args:
:return:
"""
return cls.execute('git', args)
# noinspection PyPep8Naming
[docs] @classmethod
def VBoxManage(cls, *args):
"""
executes VboxManage with the given arguments
:param args:
:return:
"""
if platform.system().lower() == "darwin":
command = "/Applications/VirtualBox.app/Contents/MacOS/VBoxManage"
else:
command = 'VBoxManage'
return cls.execute(command, args)
[docs] @classmethod
def blockdiag(cls, *args):
"""
executes blockdiag with the given arguments
:param args:
:return:
"""
return cls.execute('blockdiag', args)
[docs] @classmethod
def cm(cls, *args):
"""
executes cm with the given arguments
:param args:
:return:
"""
return cls.execute('cm', args)
[docs] @classmethod
def fgrep(cls, *args):
"""
executes fgrep with the given arguments
:param args:
:return:
"""
return cls.execute('fgrep', args)
[docs] @classmethod
def grep(cls, *args):
"""
executes grep with the given arguments
:param args:
:return:
"""
return cls.execute('grep', args)
[docs] @classmethod
def head(cls, *args):
"""
executes head with the given arguments
:param args:
:return:
"""
return cls.execute('head', args)
[docs] @classmethod
def keystone(cls, *args):
"""
executes keystone with the given arguments
:param args:
:return:
"""
return cls.execute('keystone', args)
[docs] @classmethod
def kill(cls, *args):
"""
executes kill with the given arguments
:param args:
:return:
"""
return cls.execute('kill', args)
[docs] @classmethod
def nosetests(cls, *args):
"""
executes nosetests with the given arguments
:param args:
:return:
"""
return cls.execute('nosetests', args)
[docs] @classmethod
def nova(cls, *args):
"""
executes nova with the given arguments
:param args:
:return:
"""
return cls.execute('nova', args)
[docs] @classmethod
def ping(cls, host=None, count=1):
"""
execute ping
:param host: the host to ping
:param count: the number of pings
:return:
"""
option = '-n' if platform.system().lower() == 'windows' else '-c'
return cls.execute('ping',
"{option} {count} {host}".format(option=option,
count=count,
host=host))
[docs] @classmethod
def pwd(cls, *args):
"""
executes pwd with the given arguments
:param args:
:return:
"""
return cls.execute('pwd', args)
[docs] @classmethod
def rackdiag(cls, *args):
"""
executes rackdiag with the given arguments
:param args:
:return:
"""
return cls.execute('rackdiag', args)
[docs] @classmethod
def rm(cls, *args):
"""
executes rm with the given arguments
:param args:
:return:
"""
return cls.execute('rm', args)
[docs] @classmethod
def rsync(cls, *args):
"""
executes rsync with the given arguments
:param args:
:return:
"""
return cls.execute('rsync', args)
[docs] @classmethod
def scp(cls, *args):
"""
executes scp with the given arguments
:param args:
:return:
"""
return cls.execute('scp', args)
[docs] @classmethod
def sort(cls, *args):
"""
executes sort with the given arguments
:param args:
:return:
"""
return cls.execute('sort', args)
[docs] @classmethod
def sh(cls, *args):
"""
executes sh with the given arguments
:param args:
:return:
"""
return cls.execute('sh', args)
[docs] @classmethod
def ssh(cls, *args):
"""
executes ssh with the given arguments
:param args:
:return:
"""
return cls.execute('ssh', args)
[docs] @classmethod
def sudo(cls, *args):
"""
executes sudo with the given arguments
:param args:
:return:
"""
return cls.execute('sudo', args)
[docs] @classmethod
def tail(cls, *args):
"""
executes tail with the given arguments
:param args:
:return:
"""
return cls.execute('tail', args)
[docs] @classmethod
def vagrant(cls, *args):
"""
executes vagrant with the given arguments
:param args:
:return:
"""
return cls.execute('vagrant', args, shell=True)
[docs] @classmethod
def pandoc(cls, *args):
"""
executes vagrant with the given arguments
:param args:
:return:
"""
return cls.execute('pandoc', args)
[docs] @classmethod
def mongod(cls, *args):
"""
executes mongod with the given arguments
:param args:
:return:
"""
return cls.execute('mongod', args)
[docs] @classmethod
def dialog(cls, *args):
"""
executes dialof with the given arguments
:param args:
:return:
"""
return cls.execute('dialog', args)
[docs] @classmethod
def pip(cls, *args):
"""
executes pip with the given arguments
:param args:
:return:
"""
return cls.execute('pip', args)
[docs] @classmethod
def remove_line_with(cls, lines, what):
"""
returns all lines that do not contain what
:param lines:
:param what:
:return:
"""
result = []
for line in lines:
if what not in line:
result = result + [line]
return result
[docs] @classmethod
def find_lines_with(cls, lines, what):
"""
returns all lines that contain what
:param lines:
:param what:
:return:
"""
result = []
for line in lines:
if what in line:
result = result + [line]
return result
def __init__(cls):
"""
identifies parameters for the os
"""
if cls.operating_system() == "windows":
cls.find_cygwin_executables()
else:
pass
# implement for cmd, for linux we can just pass as it includes everything
[docs] @classmethod
def find_cygwin_executables(cls):
"""
find the executables in cygwin
"""
exe_paths = glob.glob(cls.cygwin_path + r'\*.exe')
# print cls.cygwin_path
# list all *.exe in cygwin path, use glob
for c in exe_paths:
exe = c.split('\\')
name = exe[1].split('.')[0]
# command['windows'][name] = "{:}\{:}.exe".format(cygwin_path, c)
cls.command['windows'][name] = c
[docs] @classmethod
def terminal_type(cls):
"""
returns darwin, cygwin, cmd, or linux
"""
what = sys.platform
kind = 'UNDEFINED_TERMINAL_TYPE'
if 'linux' in what:
kind = 'linux'
elif 'darwin' in what:
kind = 'darwin'
elif 'cygwin' in what:
kind = 'cygwin'
elif 'windows' in what:
kind = 'windows'
return kind
[docs] @classmethod
def which(cls, command):
"""
returns the path of the command with which
:param command: teh command
:return: the path
"""
return find_executable(command)
[docs] @classmethod
def command_exists(cls, name):
"""
returns True if the command exists
:param name:
:return:
"""
return cls.which(name) is not None
[docs] @classmethod
def operating_system(cls):
"""
the name of the os
:return: the name of the os
"""
return platform.system().lower()
[docs] @classmethod
def execute(cls,
cmd,
arguments="",
shell=False,
cwd=None,
traceflag=True,
witherror=True):
"""Run Shell command
:param witherror: if set to False the error will not be printed
:param traceflag: if set to true the trace is printed in case of an error
:param cwd: the current working directory in whcih the command is supposed to be executed.
:param shell: if set to true the subprocess is called as part of a shell
:param cmd: command to run
:param arguments: we do not know yet
:return:
"""
# print "--------------"
result = None
terminal = cls.terminal_type()
# print cls.command
os_command = [cmd]
if terminal in ['linux', 'windows']:
os_command = [cmd]
elif 'cygwin' in terminal:
if not cls.command_exists(cmd):
print("ERROR: the command could not be found", cmd)
return
else:
os_command = [cls.command[cls.operating_system()][cmd]]
if isinstance(arguments, list):
os_command = os_command + arguments
elif isinstance(arguments, tuple):
os_command = os_command + list(arguments)
elif isinstance(arguments, str):
os_command = os_command + arguments.split()
else:
print("ERROR: Wrong parameter type", type(arguments))
if cwd is None:
cwd = os.getcwd()
try:
if shell:
result = subprocess.check_output(
os_command,
stderr=subprocess.STDOUT,
shell=True,
cwd=cwd)
else:
result = subprocess.check_output(
os_command,
# shell=True,
stderr=subprocess.STDOUT,
cwd=cwd)
except:
if witherror:
Console.error("problem executing subprocess",
traceflag=traceflag)
if result is not None:
result = result.strip().decode()
return result
[docs] @classmethod
def mkdir(cls, directory):
"""
creates a directory with all its parents in ots name
:param directory: the path of the directory
:return:
"""
directory = path_expand(directory)
try:
os.makedirs(directory)
except OSError as e:
# EEXIST (errno 17) occurs under two conditions when the path exists:
# - it is a file
# - it is a directory
#
# if it is a file, this is a valid error, otherwise, all
# is fine.
if e.errno == errno.EEXIST and os.path.isdir(directory):
pass
else:
raise
[docs] def unzip(cls, source_filename, dest_dir):
"""
unzips a file into the destination directory
:param source_filename: the source
:param dest_dir: the destination directory
:return:
"""
with zipfile.ZipFile(source_filename) as zf:
for member in zf.infolist():
# Path traversal defense copied from
# http://hg.python.org/cpython/file/tip/Lib/http/server.py#l789
words = member.filename.split('/')
path = path_expand(dest_dir)
for word in words[:-1]:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir, ''):
continue
path = os.path.join(path, word)
zf.extract(member, path)
[docs]def main():
"""
a test that should actually be added into a nosetest
:return:
"""
shell = Shell()
print(shell.terminal_type())
r = shell.execute('pwd') # copy line replace
print(r)
# shell.list()
# print json.dumps(shell.command, indent=4)
# test some commands without args
"""
for cmd in ['whoami', 'pwd']:
r = shell._execute(cmd)
print ("---------------------")
print ("Command: {:}".format(cmd))
print ("{:}".format(r))
print ("---------------------")
"""
r = shell.execute('ls', ["-l", "-a"])
print(r)
r = shell.execute('ls', "-l -a")
print(r)
r = shell.ls("-aux")
print(r)
r = shell.ls("-a", "-u", "-x")
print(r)
r = shell.pwd()
print(r)
if __name__ == "__main__":
main()