Source code for cloudmesh.shell.shell

from __future__ import print_function

import importlib
import inspect
import os
import pkgutil
import pydoc
import shelve
import sys
import textwrap
import subprocess
import requests
import yaml

from cmd import Cmd

from cloudmesh.common.Printer import Printer
from cloudmesh.common.Shell import Shell
from cloudmesh.common.StopWatch import StopWatch
from cloudmesh.common.dotdict import dotdict
from cloudmesh.common.util import path_expand
from cloudmesh.common.default import Default
from cloudmesh.common.error import Error
from cloudmesh.common.console import Console
from cloudmesh.common.util import readfile

import cloudmesh
import cloudmesh.common
from cloudmesh.shell.command import PluginCommand
from cloudmesh.shell.command import command, basecommand
from cloudmesh.shell.plugin import PluginManager
from cloudmesh.variables import Variables





[docs]class Plugin(object): """ Some simple methods to manage dynamic namespace plugins for cloudmesh. """
[docs] @classmethod def modules(cls): """ list of cloudmesh modules in the cloudmesh namespace :return: list of modules """ module_list = [] package = cloudmesh for importer, modname, ispkg in pkgutil.walk_packages( path=package.__path__, prefix=package.__name__ + '.', onerror=lambda x: None): module_list.append(modname) return module_list
[docs] @classmethod def classes(cls): """ list of the commands in the cloudmesh namespace under cloudmesh.ext.command :return: list of the commands """ module_list = cls.modules() commands = [] for module_name in module_list: if module_name.startswith( 'cloudmesh.') and '.command.' in module_name: commands.append(module_name) return commands
[docs] @classmethod def name(cls, command): """ creates a name for a modules starting with do_ :param command: returns a tuple with the module location and tge do_function :return: """ command_name = "do_" + command class_name = "cloudmesh." + command + ".command." + command + "." \ + command.capitalize() + "Command" return class_name, command_name
[docs] @classmethod def class_name(cls, command): """ creates the default filename in which the module is defined :param command: the name of the command :return: cloudmesh.ext.command.<command>+command.<Command> """ return "cloudmesh." + command + ".command." + command + "." \ + command.capitalize() + "Command"
[docs] @classmethod def load(cls, commands=None): """ :param commands: If None the commands will be found from import cloudmesh Otherwise the commands can be explicitly specified with commands = [ 'cloudmesh.ext.command.bar.BarCommand', 'cloudmesh.ext.command.foo.FooCommand', ] A namespace package must exists. Foo and Bar ar just examples :return: the classes of the command """ if commands is None: commands = [c.split('.')[-1] for c in cls.classes()] # print_list(commands) class_commands = [cls.class_name(c) for c in commands] commands = [getattr(importlib.import_module(mod), cls) for (mod, cls) in (commands.rsplit(".", 1) for commands in class_commands)] return commands
Plugin.load() PluginCommandClasses = type( 'CommandProxyClass', tuple(PluginCommand.__subclasses__()), {})
[docs]class CMShell(Cmd, PluginCommandClasses): """ The command shell that inherits all commands from PluginCommand """ prompt = 'cms> ' banner = textwrap.dedent(""" +-------------------------------------------------------+ | ____ _ _ _ | | / ___| | ___ _ _ __| |_ __ ___ ___ ___| |__ | | | | | |/ _ \| | | |/ _` | '_ ` _ \ / _ \/ __| '_ \ | | | |___| | (_) | |_| | (_| | | | | | | __/\__ \ | | | | | \____|_|\___/ \__,_|\__,_|_| |_| |_|\___||___/_| |_| | +-------------------------------------------------------+ | Cloudmesh CMD5 Shell | +-------------------------------------------------------+ """)
[docs] def precmd(self, line): StopWatch.start("command") return line
[docs] def postcmd(self, stop, line): StopWatch.stop("command") try: variable = Variables() if "timer" not in variable: variable["timer"] = "off" if variable["timer"].lower() in ['on', 'true']: print("Timer: {:.4f}s ({})".format(StopWatch.get("command"), line.strip())) variable.close() except Exception as e: Error.traceback(error=e) return stop
# noinspection PyMethodMayBeStatic def replace_vars(self, line): # self.update_time() variable = Variables() newline = line if len(variable) is not None: for name in variable.data: value = str(variable[name]) newline = newline.replace("$" + name, value) newline = newline.replace("var." + name, value) for v in os.environ: name = v.replace('os.', '') if name in newline: value = os.environ[name] newline = newline.replace("os." + v, value) default = Default() if default is not None: for v in default.data: name = "default." + v.replace(",", ".") value = default.data[v] if name in newline: newline = newline.replace(name, value) # replace if global is missing global_default = default["global"] if global_default is not None: for v in global_default: name = "default." + v value = global_default[v] if name in newline: newline = newline.replace(name, value) default.close() variable.close() newline = path_expand(newline) return line, newline
[docs] def onecmd(self, line): """Interpret the argument as though it had been typed in response to the prompt. This may be overridden, but should not normally need to be; see the precmd() and postcmd() methods for useful execution hooks. The return value is a flag indicating whether interpretation of commands by the interpreter should stop. """ oldline, line = self.replace_vars(line) # ----------------------------- # print comment lines, but do not execute them # ----------------------------- if line.startswith('#') \ or line.startswith('//') \ or line.startswith('/*'): print(line) return "" if line.startswith('!'): os.system(line[1:]) return "" # if line is None: # return "" # if line.startswith("!"): # line.replace("!", "! ") # line = self.var_replacer(line) # if line != "hist" and line: # self._hist += [line.strip()] # if line.startswith("!") or line.startswith("shell"): # self.do_shell_exec(line[1:]) # return "" cmd, arg, line = self.parseline(line) if line.startswith("$") or line.startswith('var.'): line = line.replace("$", "", 1) line = line.replace("var.", "", 1) print("FIND>", line, "<", sep='') variable = Variables() print(variable[line]) variable.close() return "" # ----------------------------- # handle empty line # ----------------------------- if not line: return self.emptyline() # ----------------------------- # handle file execution # ----------------------------- # # this does not yet work # # if os.path.isfile(line): # print ("... execute", line) # self.do_exec(line) # return "" if cmd != '': try: func = getattr(self, 'do_' + cmd) return func(arg) except AttributeError as e: variables = Variables() trace = "T" in variables['trace'] debug = "T" in variables['debug'] command_missing = "'CMShell' object has no attribute 'do_{cmd}'".format( cmd=cmd) if e.args[0] == command_missing: Console.error( "this command does not exist: '{cmd}'".format(cmd=cmd), traceflag=False) else: Error.traceback(error=e, debug=debug, trace=trace) # noinspection PyUnusedLocal cmd = None line = oldline return ""
# noinspection PyUnusedLocal @command def do_shell(self, args, arguments): """ :: Usage: shell COMMAND Arguments: COMMAND the command to be executed Description: shell COMMAND executes the command """ # print ("Executing>", args, "<", sep='') # os.system(args) os.system(str(args)) return "" ''' # # List all commands that start with do # # noinspection PyMethodOverriding @command def do_help(self, args, arguments): """ :: Usage: help Description: help - List of all registered commands """ print("Help") print("====") method_list = [n for n, v in inspect.getmembers(self, inspect.ismethod)] function_list = [n for n, v in inspect.getmembers(self, inspect.isfunction)] commands = method_list + function_list for c in sorted(commands): if c.startswith("do_"): print(c.replace("do_", ""), end=' ') print() return "" '''
[docs] def do_help(self, arg): """ :: Usage: help help COMMAND Description: List available commands with "help" or detailed help with "help COMMAND". """ if arg: try: func = getattr(self, 'help_' + arg) except AttributeError: try: doc = getattr(self, 'do_' + arg).__doc__ if doc: self.stdout.write("Command {arg}\n".format(arg=arg)) self.stdout.write(len("Command " + arg) * "=") self.stdout.write("\n") # doc = doc.replace("::\n\n", "") self.stdout.write("%s\n" % str(doc)) return except AttributeError: pass self.stdout.write("%s\n" % str(self.nohelp % (arg,))) return func() else: names = self.get_names() cmds_doc = [] cmds_undoc = [] help_page = {} for name in names: if name[:5] == 'help_': help_page[name[5:]] = 1 names.sort() # There can be duplicates if routines overridden prevname = '' for name in names: if name[:3] == 'do_': if name == prevname: continue prevname = name cmd = name[3:] if cmd in help_page: cmds_doc.append(cmd) del help_page[cmd] elif getattr(self, name).__doc__: cmds_doc.append(cmd) else: cmds_undoc.append(cmd) self.stdout.write("%s\n" % str(self.doc_leader)) self.print_topics(self.doc_header, cmds_doc, 15, 80) self.print_topics(self.misc_header, list(help_page.keys()), 15, 80) self.print_topics(self.undoc_header, cmds_undoc, 15, 80)
[docs] def help_help(self): """ :: Usage: help help COMMAND Description: List available commands with "help" or detailed help with "help COMMAND". """ print(textwrap.dedent(self.help_help.__doc__))
# noinspection PyUnusedLocal @command def do_info(self, args, arguments): """ :: Usage: info [path|commands|files|cloudmesh] Description: info provides internal info about the shell and its packages """ arguments = dotdict(arguments) module_list = Plugin.modules() if arguments.commands: commands = Plugin.classes() print_list(commands) elif arguments.path: path_list = cloudmesh.__path__ print_list(path_list) elif arguments.files: commands = Plugin.modules() for command in commands: try: r = inspect.getfile(command) print("*", type(command)) except Exception as e: print(e) elif arguments.help: for name in module_list: p = "cloudmesh." + name help_string = p + " not found." # noinspection PyBroadException try: help_string = pydoc.render_doc(p, "Help on %s" + "\n" + 79 * "=") except Exception as e: pass print(help_string) else: print_list(module_list)
[docs] def preloop(self): """adds the banner to the preloop""" lines = textwrap.dedent(self.banner).split("\n") for line in lines: # Console.cprint("BLUE", "", line) print(line)
# noinspection PyUnusedLocal,PyPep8Naming,PyMethodMayBeStatic
[docs] def do_EOF(self, args): """ :: Usage: EOF Description: Command to the shell to terminate reading a script. """ return True
# noinspection PyUnusedLocal,PyMethodMayBeStatic
[docs] def do_quit(self, args): """ :: Usage: quit Description: Action to be performed when quit is typed """ return True
do_q = do_quit
[docs] def emptyline(self): return
# noinspection PyUnusedLocal @command def do_plugin(self, args, arguments): """ :: Usage: plugin install PLUGIN [-s] plugin uninstall PLUGIN plugin list plugin ? [--format=FORMAT] Arguments: PLUGIN the name of the plugin Description: plugin available lists the available plugins plugin list lists the plugin plugin install installs the given plugin plugin uninstall uninstalls the given plugin """ if arguments['--format'] is None: arguments['--format'] = 'table' # print (arguments) if arguments.install: plugins = PluginManager() plugins.load() if arguments["-s"]: plugins.source_install(arguments.PLUGIN) else: plugins.pip_install(arguments.PLUGIN) elif arguments.uninstall: plugins = PluginManager() plugins.load() plugins.uninstall(arguments.PLUGIN) elif '?' in arguments: plugins = PluginManager() plugins.load() print(Printer.write(plugins.data['plugins'], output=arguments["--format"], order=["name", "status", "description"], sort_keys="name")) # noinspection PyUnusedLocal,PyPep8 @basecommand def do_version(self, args, arguments): """ :: Usage: version pip [PACKAGE] version [--format=FORMAT] [--check=CHECK] Options: --format=FORMAT the format to print the versions in [default: table] --check=CHECK boolean tp conduct an additional check [default: True] Description: version Prints out the version number version pip Prints the contents of pip list Limitations: Package names must not have a . in them instead you need to use - Thus to query for cloudmesh-cmd5 use cms version pip cloudmesh-cmd5 """ # print (arguments) # print (">", args, "<") if arguments["pip"]: # noinspection PyBroadException try: package = arguments["PACKAGE"] if package is None: result = Shell.execute('pip', ['list', '--format=columns'], traceflag=False, witherror=False) print(result) else: if "." in package: package = package.replace(".", "-") result = Shell.execute('pip', ['show', package], traceflag=False, witherror=False) print(result) except Exception as e: result = 'N/A' return "" python_version, pip_version = Shell.get_python() # noinspection PyBroadException try: git_hash_version = Shell.execute('git', 'log -1 --format=%h', traceflag=False, witherror=False) except: git_hash_version = 'N/A' versions = { "python": { "name": "python", "version": str(python_version) }, "pip": { "name": "pip", "version": str(pip_version) }, "git": { "name": "git hash", "version": str(git_hash_version) } } # dynamically check all installed cloudmesh packages and versions pipcheck = subprocess.Popen(('pip', 'freeze'), stdout=subprocess.PIPE) try: # python 3 returns byte sequence so the decode is necessary output = subprocess.check_output(('grep', "cloudmesh"), stdin=pipcheck.stdout).decode( "utf-8") pkglines = output.strip().split("\n") for pkgline in pkglines: if "==" in pkgline: values = pkgline.split("==") pkg = values[0] version = values[1].strip() if pkg != 'cloudmesh-installer': pname = pkg.replace("cloudmesh-", "cloudmesh.") i = importlib.import_module(pname) location = i.__file__ try: vlocation = path_expand(os.path.join(os.path.dirname(location), "__version__.py")) v = readfile(vlocation).split('"')[1].strip() except: v = "not found" versions[pkg] = { "name": pkg, "package": pname, "version": version, "source": location, "VERSION": v } elif "git" in pkgline: pkgline = pkgline.replace( "-e git+git@github.com:cloudmesh-community/", "") pkgline = pkgline.replace( "-e git+https://github.com/cloudmesh/", "") pkgline = pkgline.replace("egg=", "") version, pkg = pkgline.split("#") pname = pkg.replace("cloudmesh-", "cloudmesh.") # # The next line needed to be added as for some reason the is an _ here # pname = pkg.replace("cloudmesh_", "cloudmesh.") i = importlib.import_module(pname) location = i.__file__ try: vlocation = path_expand(os.path.join(os.path.dirname(location), "__version__.py")) v = readfile(vlocation).split('"')[1].strip() except: v = "not found" versions[pkg] = { "name": pkg, "package": pname, "version": version, "source": location, "VERSION": v } except subprocess.CalledProcessError as e: pass pipcheck.wait() # installedpkgs = [] # # for a preset set of named packages ''' #pkgs = ['cloudmesh-common', 'cloudmesh-cmd5', 'cloudmesh.comet'] for package in pkgs: # check version from pip pipcheck = subprocess.Popen(('pip', 'freeze'), stdout=subprocess.PIPE) try: output = subprocess.check_output(('grep', package), stdin=pipcheck.stdout) version = output.split("==")[1].strip() versions[package] = {"name": package, "version": version } except subprocess.CalledProcessError as e: pass pipcheck.wait() ''' # __version__ not maintained in package file so this won't work ''' try: print ("trying package |%s|" % package) try_module = __import__(package) print ("added one package into the list...") installedpkgs.append(package) except ImportError as e: print ("error importing |%s|" % package) pass #print (installedpkgs) for package in installedpkgs: versions[package] = {package: {"name": package, "version": str(package.__version__) } } print (versions) ''' print(Printer.write(versions, output=arguments["--format"], order=["name", "package", "VERSION", "version", "source"], sort_keys="name")) if arguments["--check"] in ["True"]: Shell.check_python()
# def main(): # CMShell().cmdloop() def inheritors(klass): subclasses = set() work = [klass] while work: parent = work.pop() for child in parent.__subclasses__(): if child not in subclasses: subclasses.add(child) work.append(child) return subclasses # noinspection PyBroadException,PyUnusedLocal
[docs]def main(): """cms. Usage: cms --help cms [--echo] [--debug] [--nosplash] [-i] [COMMAND ...] Arguments: COMMAND A command to be executed Options: --file=SCRIPT -f SCRIPT Executes the script -i After start keep the shell interactive, otherwise quit [default: False] --nosplash do not show the banner [default: False] """ def manual(): print(main.__doc__) args = sys.argv[1:] arguments = { '--echo': '--echo' in args, '--help': '--help' in args, '--debug': '--debug' in args, '--nosplash': '--nosplash' in args, '-i': '-i' in args} echo = arguments["--echo"] if arguments['--help']: manual() sys.exit() for a in args: if a in arguments: args.remove(a) arguments['COMMAND'] = [' '.join(args)] commands = arguments["COMMAND"] # commands = list(arguments["COMMAND"]) if len(commands) > 0: if ".cm" in commands[0]: arguments["SCRIPT"] = commands[0] commands = commands[1:] else: arguments["SCRIPT"] = None arguments["COMMAND"] = ' '.join(commands) if arguments["COMMAND"] == '': arguments["COMMAND"] = None # noinspection PySimplifyBooleanCheck if arguments['COMMAND'] == []: arguments['COMMAND'] = None splash = not arguments['--nosplash'] debug = arguments['--debug'] interactive = arguments['-i'] script = arguments["SCRIPT"] command = arguments["COMMAND"] # context = CloudmeshContext( # interactive=interactive, # debug=debug, # echo=echo, # splash=splash) cmd = CMShell() # if script is not None: # cmd.do_exec(script) try: if echo: print(cmd.prompt, command) if command is not None: cmd.precmd(command) stop = cmd.onecmd(command) cmd.postcmd(stop, command) except Exception as e: print("ERROR: executing command '{0}'".format(command)) print(70 * "=") print(e) d = Default() trace = d["global", "trace"] == "True" trace = True Error.traceback(error=e, debug=True, trace=trace) d.close() print(70 * "=") if interactive or (command is None and script is None): cmd.cmdloop()
if __name__ == '__main__': main()