Commit 73573cf3 by Peter Weidenbach

interactive shell command functionality added

parent be553af7
from .fail_safe_subprocess import execute_shell_command
from .fail_safe_subprocess import execute_shell_command, execute_shell_command_get_return_code, execute_interactive_shell_command
__all__ = [
'execute_shell_command',
'execute_shell_command_get_return_code',
'execute_interactive_shell_command'
]
from subprocess import Popen, PIPE, STDOUT, TimeoutExpired
from signal import SIGKILL
import logging
import pexpect
from time import sleep
def execute_shell_command(shell_command, timeout=None):
......@@ -39,3 +42,47 @@ def execute_shell_command_get_return_code(shell_command, timeout=None):
output += "\n\nERROR: execution timed out!"
rc = 1
return output, rc
def execute_interactive_shell_command(shell_command, timeout=60, inputs={}):
"""
Execute an interactive shell command and return a tuple (program output, return code)
This function shall not raise any errors
:param shell_command: command to execute
:type shell_command: str
:param timeout: kill process after timeout seconds
:type timeout: int
:param inputs: dictionary {'EXPECTED_CONSOLE_OUTPUT': 'DESIRED_INPUT'}
:type inputs: dict
:return: str, int
"""
trigger, inputs = _parse_inputs(inputs)
output = b''
child = pexpect.spawn(shell_command)
while True:
try:
i = child.expect(trigger, timeout=timeout)
sleep(0.1)
child.sendline(inputs[i])
output += child.before + child.after
except pexpect.TIMEOUT:
child.kill(SIGKILL)
output += child.before
break
except pexpect.EOF:
output += child.before
break
child.close()
output = output.decode('utf-8', errors='ignore')
return_code = child.exitstatus if child.exitstatus is not None else 1
return output, return_code
def _parse_inputs(input_dict):
trigger = []
inputs = []
for item in sorted(input_dict.keys()):
trigger.append(item)
inputs.append(input_dict[item])
return trigger, inputs
[pytest]
addopts = --pep8 -v
pep8ignore =
*.py E501
from setuptools import setup, find_packages
VERSION = 0.2
VERSION = 0.3
setup(
name="common_helper_process",
version=VERSION,
packages=find_packages(),
install_requires=[
'pexpect',
'common_helper_files'
],
dependency_links=[
'git+https://github.com/fkie-cad/common_helper_files.git#common_helper_files'
],
description="Helper functions for handling processes.",
author="Fraunhofer FKIE",
author_email="peter.weidenbach@fkie.fraunhofer.de",
......
#!/usr/bin/env bash
echo "give me some input:"
{ read first_input; echo first=$first_input; }
echo "go on"
echo "give me more:"
{ read second_input; echo second=$second_input; }
echo "done"
import unittest
from time import time
import os
from common_helper_files import get_dir_of_file
from common_helper_process import execute_shell_command
from common_helper_process.fail_safe_subprocess import execute_shell_command_get_return_code
from common_helper_process import execute_shell_command, execute_shell_command_get_return_code, execute_interactive_shell_command
from common_helper_process.fail_safe_subprocess import _parse_inputs
class TestProcessHelper(unittest.TestCase):
......@@ -27,8 +29,29 @@ class TestProcessHelper(unittest.TestCase):
def test_execute_shell_command_time_out(self):
start_time = time()
output, rc = execute_shell_command_get_return_code("echo 'test 123' && for i in {1..10}; do sleep 1; done", timeout=1)
output, rc = execute_shell_command_get_return_code('echo \'test 123\' && for i in {1..10}; do sleep 1; done', timeout=1)
run_time = time() - start_time
self.assertEqual(output, 'test 123\n\n\nERROR: execution timed out!', 'timeout message not added')
self.assertEqual(rc, 1, 'return code not correct')
self.assertGreater(5, run_time, "command not aborted")
def test_parse_inputs(self):
test_dict = {'a': 'a_out', 'b': 'b_out'}
trigger, inputs = _parse_inputs(test_dict)
self.assertEqual(trigger, ['a', 'b'])
self.assertEqual(inputs, ['a_out', 'b_out'])
def test_interactive_shell_command(self):
script_path = os.path.join(get_dir_of_file(__file__), 'data/interactive.sh')
expected_inputs = {'give me some input:\r\n': 'test_input_1', 'give me more:\r\n': 'test_input_2'}
output, ret_code = execute_interactive_shell_command(script_path, inputs=expected_inputs, timeout=5)
assert 'first=test_input_1' in output
assert 'second=test_input_2' in output
assert ret_code == 0
def test_interactive_shell_command_none_correct_input(self):
script_path = os.path.join(get_dir_of_file(__file__), 'data/interactive.sh')
output, ret_code = execute_interactive_shell_command(script_path, timeout=2)
assert 'give me some input' in output
assert 'Error: Execution timed out!'
assert ret_code > 0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment