Source code for cloudmesh.storage.spec.tests.test_openapi_storage

###############################################################
# pytest -v --capture=no tests/test_openapi_storage.py
# pytest -v  tests/test_openapi_storage.py
###############################################################

from __future__ import print_function

import os
import time
import requests
import json

import pytest
from cloudmesh.common.run.background import run
from cloudmesh.common.util import banner
from cloudmesh.shell.variables import Variables
from cloudmesh.common.parameter import Parameter
from cloudmesh.common.util import path_expand
from  pathlib import Path
from cloudmesh.common.util import writefile

pytest.storage = None
pytest.openapi = None


# noinspection PyPep8
[docs]@pytest.mark.incremental class Test_cloud_storage: """ see: https://github.com/cloudmesh/cloudmesh-common/blob/master/cloudmesh/common/run/background.py the code in thel link has not bean tested make this s function execute the server in the back ground not in a terminal, get the pid and kill it after the test is done UNAME := $(shell uname) ifeq ($(UNAME), Darwin) define terminal osascript -e 'tell application "Terminal" to do script "cd $(PWD); $1"' endef endif ifeq ($(UNAME), Linux) define terminal gnome-terminal --command 'bash -c "cd $(PWD); $1"' endef endif """ def create_file(self, location, content): d = Path(os.path.dirname(path_expand(location))) print() print("TESTDIR:", d) d.mkdir(parents=True, exist_ok=True) writefile(path_expand(location), content) def test_setup(self): self.variables = Variables() self.storages = Parameter.expand(self.variables['storage']) pytest.storage = self.storages[0] command = ['python', 'server.py'] pytest.openapi = run(command) pytest.openapi.execute() print(pytest.openapi.pid) time.sleep(5) def test_install(self): time.sleep(3) def test_01_create_source(self): # create source dir self.create_file("~/openapi/a.txt", "content of a") assert True def test_openapi_storage_create(self): banner('create the directory') storage = pytest.storage response = requests.get( f"http://localhost:8080/cloudmesh/storage/v1/create_dir?service={storage}&directory=%2fapitest") print(response) print() def test_openapi_storage_put(self): banner('Put/Upload the files/blobs') storage = pytest.storage headers = { 'Content-Type': 'application/json', } #data = '{"service": "azureblob", "source": "~/openapi/a.txt", "destination": "/apitest", "recursive": "False"}' data = {} data['service'] = f"{storage}" data['source'] = "~/openapi/a.txt" data['destination'] = "/apitest" data['recursive'] = "False" data_str = json.dumps(data) response = requests.post('http://localhost:8080/cloudmesh/storage/v1/put', headers=headers, data=data_str) print(response) print() def test_openapi_storage_get(self): banner('get the files/blobs') storage = pytest.storage response = requests.get( f"http://localhost:8080/cloudmesh/storage/v1/get?service={storage}&source=%2fapitest%2fa%2etxt&destination=%7e%2fopenapi") print(response) print() def test_openapi_storage_list(self): banner('List the files/blobs') storage = pytest.storage response = requests.get( f"http://localhost:8080/cloudmesh/storage/v1/list?service={storage}&directory=%2fapitest&recursive=True") print(response) print() def test_openapi_storage_search(self): banner('search the files/blobs') storage = pytest.storage response = requests.get( f"http://localhost:8080/cloudmesh/storage/v1/search?service={storage}&directory=%2fapitest&filename=a%2etxt") print(response) print() def test_openapi_storage_del(self): banner('delete the files/blobs') storage = pytest.storage response = requests.get( f"http://localhost:8080/cloudmesh/storage/v1/delete?service={storage}&source=%2fapitest%2fa%2etxt") print(response) print() def test_kill_pid(self): pytest.openapi.kill()