Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
C
common_helper_process
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
fact-gitdep
common_helper_process
Commits
2db6987c
Commit
2db6987c
authored
May 27, 2019
by
Jörg Stucke
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added check option
parent
42f97ed0
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
25 additions
and
14 deletions
+25
-14
fail_safe_subprocess.py
common_helper_process/fail_safe_subprocess.py
+19
-11
setup.py
setup.py
+1
-1
test_fail_safe_subprocess.py
test/test_fail_safe_subprocess.py
+5
-2
No files found.
common_helper_process/fail_safe_subprocess.py
View file @
2db6987c
from
subprocess
import
Popen
,
PIPE
,
STDOUT
,
TimeoutExpired
from
signal
import
SIGKILL
import
logging
import
pexpect
from
signal
import
SIGKILL
from
subprocess
import
Popen
,
PIPE
,
STDOUT
,
TimeoutExpired
,
CalledProcessError
from
time
import
sleep
import
pexpect
def
execute_shell_command
(
shell_command
,
timeout
=
None
):
def
execute_shell_command
(
shell_command
,
timeout
=
None
,
check
=
False
):
"""
Execute a shell command and return STDOUT and STDERR in one combined result string.
This function shall not raise any errors
:param shell_command: command to execute
:type shell_command: str
:param timeout: kill process after timeout seconds
:type: timeout: int, optional
:param check: raise CalledProcessError if the return code is != 0
:type: timeout: bool
:return: str
"""
return
execute_shell_command_get_return_code
(
shell_command
,
timeout
=
timeout
)[
0
]
output
,
return_code
=
execute_shell_command_get_return_code
(
shell_command
,
timeout
=
timeout
)
if
check
and
return_code
!=
0
:
raise
CalledProcessError
(
return_code
,
shell_command
)
return
output
def
execute_shell_command_get_return_code
(
shell_command
,
timeout
=
None
):
"""
Execute a shell command and return a tuple (program output, return code)
Program ouput includes STDOUT and STDERR.
Program ou
t
put includes STDOUT and STDERR.
This function shall not raise any errors
:param shell_command: command to execute
:type shell_command: str
:param timeout: kill process after timeout seconds
:type: timeout: int
:type: timeout: int
, optional
:return: str, int
"""
output
=
""
return_code
=
1
pl
=
Popen
(
shell_command
,
shell
=
True
,
stdout
=
PIPE
,
stderr
=
STDOUT
)
try
:
output
=
pl
.
communicate
(
timeout
=
timeout
)[
0
]
.
decode
(
'utf-8'
,
errors
=
'replace'
)
...
...
@@ -44,7 +50,7 @@ def execute_shell_command_get_return_code(shell_command, timeout=None):
return
output
,
return_code
def
execute_interactive_shell_command
(
shell_command
,
timeout
=
60
,
inputs
=
{}
):
def
execute_interactive_shell_command
(
shell_command
,
timeout
=
60
,
inputs
=
None
):
"""
Execute an interactive shell command and return a tuple (program output, return code)
This function shall not raise any errors
...
...
@@ -54,9 +60,11 @@ def execute_interactive_shell_command(shell_command, timeout=60, inputs={}):
:param timeout: kill process after timeout seconds
:type timeout: int
:param inputs: dictionary {'EXPECTED_CONSOLE_OUTPUT': 'DESIRED_INPUT'}
:type inputs: dict
:type inputs: dict
, optional
:return: str, int
"""
if
inputs
is
None
:
inputs
=
{}
trigger
,
inputs
=
_parse_inputs
(
inputs
)
output
=
b
''
child
=
pexpect
.
spawn
(
shell_command
)
...
...
setup.py
View file @
2db6987c
from
setuptools
import
setup
,
find_packages
VERSION
=
0.
3
VERSION
=
0.
4
setup
(
name
=
"common_helper_process"
,
...
...
test/test_fail_safe_subprocess.py
View file @
2db6987c
import
os
import
unittest
from
time
import
time
import
os
from
common_helper_files
import
get_dir_of_file
from
common_helper_process
import
execute_shell_command
,
execute_shell_command_get_return_code
,
execute_interactive_shell_command
from
common_helper_process
import
(
execute_interactive_shell_command
,
execute_shell_command
,
execute_shell_command_get_return_code
)
from
common_helper_process.fail_safe_subprocess
import
_parse_inputs
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment