lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <68395ea66ebd4ee02b3a192ee913312af65f5d64.1567047549.git.brandonbonaby94@gmail.com>
Date:   Thu, 29 Aug 2019 00:24:28 -0400
From:   Branden Bonaby <brandonbonaby94@...il.com>
To:     kys@...rosoft.com, haiyangz@...rosoft.com, sthemmin@...rosoft.com,
        sashal@...nel.org
Cc:     Branden Bonaby <brandonbonaby94@...il.com>,
        linux-hyperv@...r.kernel.org, linux-kernel@...r.kernel.org
Subject: [PATCH v4 2/2] tools: hv: add vmbus testing tool

This is a userspace tool to drive the testing. Currently it supports
introducing user specified delay in the host to guest communication
path on a per-channel basis.

Signed-off-by: Branden Bonaby <brandonbonaby94@...il.com>
---
Changes in v4:
- Based on Harrys comments, made the tool more
  user friendly and added more error checking.

Changes in v3:
- Align python tool to match Linux coding style.

Changes in v2:
 - Move testing location to new location in debugfs.
 
 tools/hv/vmbus_testing | 376 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 376 insertions(+)
 create mode 100644 tools/hv/vmbus_testing

diff --git a/tools/hv/vmbus_testing b/tools/hv/vmbus_testing
new file mode 100644
index 000000000000..e7212903dd1d
--- /dev/null
+++ b/tools/hv/vmbus_testing
@@ -0,0 +1,376 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+#
+# Program to allow users to fuzz test Hyper-V drivers
+# by interfacing with Hyper-V debugfs attributes.
+# Current test methods available:
+#       1. delay testing
+#
+# Current file/directory structure of hyper-V debugfs:
+#       /sys/kernel/debug/hyperv/UUID
+#       /sys/kernel/debug/hyperv/UUID/<test-state filename>
+#       /sys/kernel/debug/hyperv/UUID/<test-method sub-directory>
+#
+# author: Branden Bonaby <brandonbonaby94@...il.com>
+
+import os
+import cmd
+import argparse
+import glob
+from argparse import RawDescriptionHelpFormatter
+from argparse import RawTextHelpFormatter
+from enum import Enum
+
+# Do not change unless, you change the debugfs attributes
+# in /drivers/hv/debugfs.c. All fuzz testing
+# attributes will start with "fuzz_test".
+
+# debugfs path for hyperv must exist before proceeding
+debugfs_hyperv_path = "/sys/kernel/debug/hyperv"
+if not os.path.isdir(debugfs_hyperv_path):
+        print("{} doesn't exist/check permissions".format(debugfs_hyperv_path))
+        exit(-1)
+
+class dev_state(Enum):
+        off = 0
+        on = 1
+
+# File names, that correspond to the files created in
+# /drivers/hv/debugfs.c
+class f_names(Enum):
+        state_f = "fuzz_test_state"
+        buff_f =  "fuzz_test_buffer_interrupt_delay"
+        mess_f =  "fuzz_test_message_delay"
+
+# Both single_actions and all_actions are used
+# for error checking and to allow for some subparser
+# names to be abbreviated. Do not abbreviate the
+# test method names, as it will become less intuitive
+# as to what the user can do. If you do decide to
+# abbreviate the test method name, make sure the main
+# function reflects this change.
+
+all_actions = [
+        "disable_all",
+        "D",
+        "enable_all",
+        "view_all",
+        "V"
+]
+
+single_actions = [
+        "disable_single",
+        "d",
+        "enable_single",
+        "view_single",
+        "v"
+]
+
+def main():
+
+        file_map = recursive_file_lookup(debugfs_hyperv_path, dict())
+        args = parse_args()
+        if (not args.action):
+                print ("Error, no options selected...exiting")
+                exit(-1)
+        arg_set = { k for (k,v) in vars(args).items() if v and k != "action" }
+        arg_set.add(args.action)
+        path = args.path if "path" in arg_set else None
+        if (path and path[-1] == "/"):
+                path = path[:-1]
+        validate_args_path(path, arg_set, file_map)
+        if (path and "enable_single" in arg_set):
+            state_path = locate_state(path, file_map)
+            set_test_state(state_path, dev_state.on.value, args.quiet)
+
+        # Use subparsers as the key for different actions
+        if ("delay" in arg_set):
+                validate_delay_values(args.delay_time)
+                if (args.enable_all):
+                        set_delay_all_devices(file_map, args.delay_time,
+                                              args.quiet)
+                else:
+                        set_delay_values(path, file_map, args.delay_time,
+                                         args.quiet)
+        elif ("disable_all" in arg_set or "D" in arg_set):
+                disable_all_testing(file_map)
+        elif ("disable_single" in arg_set or "d" in arg_set):
+                disable_testing_single_device(path, file_map)
+        elif ("view_all" in arg_set or "V" in arg_set):
+                get_all_devices_test_status(file_map)
+        elif ("view_single" in arg_set or  "v" in arg_set):
+                get_device_test_values(path, file_map)
+
+# Get the state location
+def locate_state(device, file_map):
+        return file_map[device][f_names.state_f.value]
+
+# Validate delay values to make sure they are acceptable to
+# enable delays on a device
+def validate_delay_values(delay):
+
+        if (delay[0]  == -1 and delay[1] == -1):
+                print("\nError, At least 1 value must be greater than 0")
+                exit(-1)
+        for i in delay:
+                if (i < -1 or i == 0 or i > 1000):
+                        print("\nError, Values must be  equal to -1 "
+                              "or be > 0 and <= 1000")
+                        exit(-1)
+
+# Validate argument path
+def validate_args_path(path, arg_set, file_map):
+
+        if (not path and any(element in arg_set for element in single_actions)):
+                print("Error, path (-p) REQUIRED for the specified option. "
+                      "Use (-h) to check usage.")
+                exit(-1)
+        elif (path and any(item in arg_set for item in all_actions)):
+                print("Error, path (-p) NOT REQUIRED for the specified option. "
+                      "Use (-h) to check usage." )
+                exit(-1)
+        elif (path not in file_map and any(item in arg_set
+                                           for item in single_actions)):
+                print("Error, path '{}' not a valid vmbus device".format(path))
+                exit(-1)
+
+# display Testing status of single device
+def get_device_test_values(path, file_map):
+
+        for name in file_map[path]:
+                file_location = file_map[path][name]
+                print( name + " = " + str(read_test_files(file_location)))
+
+# Create a map of the vmbus devices and their associated files
+# [key=device, value = [key = filename, value = file path]]
+def recursive_file_lookup(path, file_map):
+
+        for f_path in glob.iglob(path + '**/*'):
+                if (os.path.isfile(f_path)):
+                        if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path):
+                                directory = f_path.rsplit("/",1)[0]
+                        else:
+                                directory = f_path.rsplit("/",2)[0]
+                        f_name = f_path.split("/")[-1]
+                        if (file_map.get(directory)):
+                                file_map[directory].update({f_name:f_path})
+                        else:
+                                file_map[directory] = {f_name:f_path}
+                elif (os.path.isdir(f_path)):
+                        recursive_file_lookup(f_path,file_map)
+        return file_map
+
+# display Testing state of devices
+def get_all_devices_test_status(file_map):
+
+        for device in file_map:
+                if (get_test_state(locate_state(device, file_map)) is 1):
+                        print("Testing = ON for: {}"
+                              .format(device.split("/")[5]))
+                else:
+                        print("Testing = OFF for: {}"
+                              .format(device.split("/")[5]))
+
+# read the vmbus device files, path must be absolute path before calling
+def read_test_files(path):
+        try:
+                with open(path,"r") as f:
+                        file_value = f.readline().strip()
+                return int(file_value)
+
+        except IOError as e:
+                errno, strerror = e.args
+                print("I/O error({0}): {1} on file {2}"
+                      .format(errno, strerror, path))
+                exit(-1)
+        except ValueError:
+                print ("Element to int conversion error in: \n{}".format(path))
+                exit(-1)
+
+# writing to vmbus device files, path must be absolute path before calling
+def write_test_files(path, value):
+
+        try:
+                with open(path,"w") as f:
+                        f.write("{}".format(value))
+        except IOError as e:
+                errno, strerror = e.args
+                print("I/O error({0}): {1} on file {2}"
+                      .format(errno, strerror, path))
+                exit(-1)
+
+# set testing state of device
+def set_test_state(state_path, state_value, quiet):
+
+        write_test_files(state_path, state_value)
+        if (get_test_state(state_path) is 1):
+                if (not quiet):
+                        print("Testing = ON for device: {}"
+                              .format(state_path.split("/")[5]))
+        else:
+                if (not quiet):
+                        print("Testing = OFF for device: {}"
+                              .format(state_path.split("/")[5]))
+
+# get testing state of device
+def get_test_state(state_path):
+        #state == 1 - test = ON
+        #state == 0 - test = OFF
+        return  read_test_files(state_path)
+
+# write 1 - 1000 microseconds, into a single device using the
+# fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay
+# debugfs attributes
+def set_delay_values(device, file_map, delay_length, quiet):
+
+        try:
+                interrupt = file_map[device][f_names.buff_f.value]
+                message = file_map[device][f_names.mess_f.value]
+
+                # delay[0]- buffer interrupt delay, delay[1]- message delay
+                if (delay_length[0] >= 0 and delay_length[0] <= 1000):
+                        write_test_files(interrupt, delay_length[0])
+                if (delay_length[1] >= 0 and delay_length[1] <= 1000):
+                        write_test_files(message, delay_length[1])
+                if (not quiet):
+                        print("Buffer delay testing = {} for: {}"
+                              .format(read_test_files(interrupt),
+                                      interrupt.split("/")[5]))
+                        print("Message delay testing = {} for: {}"
+                              .format(read_test_files(message),
+                                      message.split("/")[5]))
+        except IOError as e:
+                errno, strerror = e.args
+                print("I/O error({0}): {1} on files {2}{3}"
+                      .format(errno, strerror, interrupt, message))
+                exit(-1)
+
+# enabling delay testing on all devices
+def set_delay_all_devices(file_map, delay, quiet):
+
+        for device in (file_map):
+                set_test_state(locate_state(device, file_map),
+                               dev_state.on.value,
+                               quiet)
+                set_delay_values(device, file_map, delay, quiet)
+
+# disable all testing on a SINGLE device.
+def disable_testing_single_device(device, file_map):
+
+        for name in file_map[device]:
+                file_location = file_map[device][name]
+                write_test_files(file_location, dev_state.off.value)
+        print("ALL testing now OFF for {}".format(device.split("/")[-1]))
+
+# disable all testing on ALL devices
+def disable_all_testing(file_map):
+
+        for device in file_map:
+                disable_testing_single_device(device, file_map)
+
+def parse_args():
+        parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n"
+                "%(prog)s [delay]   [-h] [-e|-E] -t [-p]\n"
+                "%(prog)s [view_all       | V]      [-h]\n"
+                "%(prog)s [disable_all    | D]      [-h]\n"
+                "%(prog)s [disable_single | d]      [-h|-p]\n"
+                "%(prog)s [view_single    | v]      [-h|-p]\n"
+                "%(prog)s --version\n",
+                description = "\nUse lsvmbus to get vmbus device type "
+                "information.\n" "\nThe debugfs root path is "
+                "/sys/kernel/debug/hyperv",
+                formatter_class = RawDescriptionHelpFormatter)
+        subparsers = parser.add_subparsers(dest = "action")
+        parser.add_argument("--version", action = "version",
+                version = '%(prog)s 0.1.0')
+        parser.add_argument("-q","--quiet", action = "store_true",
+                help = "silence none important test messages."
+                       " This will only work when enabling testing"
+                       " on a device.")
+        # Use the path parser to hold the --path attribute so it can
+        # be shared between subparsers. Also do the same for the state
+        # parser, as all testing methods will use --enable_all and
+        # enable_single.
+        path_parser = argparse.ArgumentParser(add_help=False)
+        path_parser.add_argument("-p","--path", metavar = "",
+                help = "Debugfs path to a vmbus device. The path "
+                "must be the absolute path to the device.")
+        state_parser = argparse.ArgumentParser(add_help=False)
+        state_group = state_parser.add_mutually_exclusive_group(required = True)
+        state_group.add_argument("-E", "--enable_all", action = "store_const",
+                                 const = "enable_all",
+                                 help = "Enable the specified test type "
+                                 "on ALL vmbus devices.")
+        state_group.add_argument("-e", "--enable_single",
+                                 action = "store_const",
+                                 const = "enable_single",
+                                 help = "Enable the specified test type on a "
+                                 "SINGLE vmbus device.")
+        parser_delay = subparsers.add_parser("delay",
+                        parents = [state_parser, path_parser],
+                        help = "Delay the ring buffer interrupt or the "
+                        "ring buffer message reads in microseconds.",
+                        prog = "vmbus_testing",
+                        usage = "%(prog)s [-h]\n"
+                        "%(prog)s -E -t [value] [value]\n"
+                        "%(prog)s -e -t [value] [value] -p",
+                        description = "Delay the ring buffer interrupt for "
+                        "vmbus devices, or delay the ring buffer message "
+                        "reads for vmbus devices (both in microseconds). This "
+                        "is only on the host to guest channel.")
+        parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2,
+                        type = check_range, default =[0,0], required = (True),
+                        help = "Set [buffer] & [message] delay time. "
+                        "Value constraints: -1 == value "
+                        "or 0 < value <= 1000.\n"
+                        "Use -1 to keep the previous value for that delay "
+                        "type, or a value > 0 <= 1000 to change the delay "
+                        "time.")
+        parser_dis_all = subparsers.add_parser("disable_all",
+                        aliases = ['D'], prog = "vmbus_testing",
+                        usage = "%(prog)s [disable_all | D] -h\n"
+                        "%(prog)s [disable_all | D]\n",
+                        help = "Disable ALL testing on ALL vmbus devices.",
+                        description = "Disable ALL testing on ALL vmbus "
+                        "devices.")
+        parser_dis_single = subparsers.add_parser("disable_single",
+                        aliases = ['d'],
+                        parents = [path_parser], prog = "vmbus_testing",
+                        usage = "%(prog)s [disable_single | d] -h\n"
+                        "%(prog)s [disable_single | d] -p\n",
+                        help = "Disable ALL testing on a SINGLE vmbus device.",
+                        description = "Disable ALL testing on a SINGLE vmbus "
+                        "device.")
+        parser_view_all = subparsers.add_parser("view_all", aliases = ['V'],
+                        help = "View the test state for ALL vmbus devices.",
+                        prog = "vmbus_testing",
+                        usage = "%(prog)s [view_all | V] -h\n"
+                        "%(prog)s [view_all | V]\n",
+                        description = "This shows the test state for ALL the "
+                        "vmbus devices.")
+        parser_view_single = subparsers.add_parser("view_single",
+                        aliases = ['v'],parents = [path_parser],
+                        help = "View the test values for a SINGLE vmbus "
+                        "device.",
+                        description = "This shows the test values for a SINGLE "
+                        "vmbus device.", prog = "vmbus_testing",
+                        usage = "%(prog)s [view_single | v] -h\n"
+                        "%(prog)s [view_single | v] -p")
+
+        return  parser.parse_args()
+
+# value checking for range checking input in parser
+def check_range(arg1):
+
+        try:
+                val = int(arg1)
+        except ValueError as err:
+                raise argparse.ArgumentTypeError(str(err))
+        if val < -1 or val > 1000:
+                message = ("\n\nvalue must be -1 or  0 < value <= 1000. "
+                           "Value program received: {}\n").format(val)
+                raise argparse.ArgumentTypeError(message)
+        return val
+
+if __name__ == "__main__":
+        main()
-- 
2.17.1

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ