Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sudo-project
GitHub Repository: sudo-project/sudo
Path: blob/main/plugins/python/example_audit_plugin.py
1532 views
1
import sudo
2
3
import os
4
5
6
VERSION = 1.0
7
8
9
class SudoAuditPlugin(sudo.Plugin):
10
def __init__(self, plugin_options, user_info, **kwargs):
11
# For loading multiple times, an optional "Id" can be specified
12
# as argument to identify the log lines
13
plugin_id = sudo.options_as_dict(plugin_options).get("Id", "")
14
self._log_line_prefix = "(AUDIT{}) ".format(plugin_id)
15
16
user_info_dict = sudo.options_as_dict(user_info)
17
user = user_info_dict.get("user", "???")
18
uid = user_info_dict.get("uid", "???")
19
self._log("-- Started by user {} ({}) --".format(user, uid))
20
21
def __del__(self):
22
self._log("-- Finished --")
23
24
def open(self, submit_optind: int, submit_argv: tuple) -> int:
25
# To cut out the sudo options, use "submit_optind":
26
program_args = submit_argv[submit_optind:]
27
if program_args:
28
self._log("Requested command: " + " ".join(program_args))
29
30
def accept(self, plugin_name, plugin_type,
31
command_info, run_argv, run_envp) -> int:
32
info = sudo.options_as_dict(command_info)
33
cmd = list(run_argv)
34
cmd[0] = info.get("command")
35
self._log("Accepted command: {}".format(" ".join(cmd)))
36
self._log(" By the plugin: {} (type={})".format(
37
plugin_name, self.__plugin_type_str(plugin_type)))
38
39
self._log(" Environment: " + " ".join(run_envp))
40
41
def reject(self, plugin_name, plugin_type, audit_msg, command_info) -> int:
42
self._log("Rejected by plugin {} (type={}): {}".format(
43
plugin_name, self.__plugin_type_str(plugin_type), audit_msg))
44
45
def error(self, plugin_name, plugin_type, audit_msg, command_info) -> int:
46
self._log("Plugin {} (type={}) got an error: {}".format(
47
plugin_name, self.__plugin_type_str(plugin_type), audit_msg))
48
49
def close(self, status_kind: int, status: int) -> None:
50
if status_kind == sudo.EXIT_REASON.NO_STATUS:
51
self._log("The command was not executed")
52
53
elif status_kind == sudo.EXIT_REASON.WAIT_STATUS:
54
if os.WIFEXITED(status):
55
self._log("Command returned with exit code "
56
"{}".format(os.WEXITSTATUS(status)))
57
elif os.WIFSIGNALED(status):
58
self._log("Command exited due to signal "
59
"{}".format(os.WTERMSIG(status)))
60
else:
61
raise sudo.PluginError("Failed to understand wait exit status")
62
63
elif status_kind == sudo.EXIT_REASON.EXEC_ERROR:
64
self._log("Sudo has failed to execute the command, "
65
"execve returned {}".format(status))
66
67
elif status_kind == sudo.EXIT_REASON.SUDO_ERROR:
68
self._log("Sudo has run into an error: {}".format(status))
69
70
else:
71
raise Exception("Command returned unknown status kind {}".format(
72
status_kind))
73
74
def show_version(self, is_verbose: bool) -> int:
75
version_str = " (version=1.0)" if is_verbose else ""
76
sudo.log_info("Python Example Audit Plugin" + version_str)
77
78
def _log(self, string):
79
# For the example, we just log to output (this could be a file)
80
sudo.log_info(self._log_line_prefix, string)
81
82
@staticmethod
83
def __plugin_type_str(plugin_type):
84
return sudo.PLUGIN_TYPE(plugin_type).name
85
86