Path: blob/main/tests/integration_tests/performance/test_block_performance.py
1958 views
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.1# SPDX-License-Identifier: Apache-2.02"""Performance benchmark for block device emulation."""3import concurrent4import json5import logging6import os7from enum import Enum8import shutil9import pytest1011from conftest import _test_images_s3_bucket12from framework.artifacts import ArtifactCollection, ArtifactSet13from framework.builder import MicrovmBuilder14from framework.matrix import TestContext, TestMatrix15from framework.stats import core16from framework.stats.baseline import Provider as BaselineProvider17from framework.stats.metadata import DictProvider as DictMetadataProvider18from framework.utils import get_cpu_percent, CmdBuilder, DictQuery, run_cmd19from framework.utils_cpuid import get_cpu_model_name20import host_tools.drive as drive_tools21import host_tools.network as net_tools # pylint: disable=import-error22import framework.stats as st23from integration_tests.performance.configs import defs24from integration_tests.performance.utils import handle_failure, \25dump_test_result2627DEBUG = False28TEST_ID = "block_device_performance"29FIO = "fio"3031# Measurements tags.32CPU_UTILIZATION_VMM = "cpu_utilization_vmm"33CPU_UTILIZATION_VMM_SAMPLES_TAG = "cpu_utilization_vmm_samples"34CPU_UTILIZATION_VCPUS_TOTAL = "cpu_utilization_vcpus_total"35CONFIG = json.load(open(defs.CFG_LOCATION /36"block_performance_test_config.json"))373839# pylint: disable=R090340class BlockBaselinesProvider(BaselineProvider):41"""Implementation of a baseline provider for the block performance test."""4243def __init__(self, env_id, fio_id):44"""Block baseline provider initialization."""45cpu_model_name = get_cpu_model_name()46baselines = list(filter(47lambda cpu_baseline: cpu_baseline["model"] == cpu_model_name,48CONFIG["hosts"]["instances"]["m5d.metal"]["cpus"]))4950super().__init__(DictQuery(dict()))51if len(baselines) > 0:52super().__init__(DictQuery(baselines[0]))5354self._tag = "baselines/{}/" + env_id + "/{}/" + fio_id5556def get(self, ms_name: str, st_name: str) -> dict:57"""Return the baseline value corresponding to the key."""58key = self._tag.format(ms_name, st_name)59baseline = self._baselines.get(key)60if baseline:61target = baseline.get("target")62delta_percentage = baseline.get("delta_percentage")63return {64"target": target,65"delta": delta_percentage * target / 100,66}67return None686970def run_fio(env_id, basevm, ssh_conn, mode, bs):71"""Run a fio test in the specified mode with block size bs."""72logs_path = f"{basevm.jailer.chroot_base_with_id()}/{env_id}/{mode}{bs}"7374# Compute the fio command. Pin it to the first guest CPU.75cmd = CmdBuilder(FIO) \76.with_arg(f"--name={mode}-{bs}") \77.with_arg(f"--rw={mode}") \78.with_arg(f"--bs={bs}") \79.with_arg("--filename=/dev/vdb") \80.with_arg("--time_base=1") \81.with_arg(f"--size={CONFIG['block_device_size']}M") \82.with_arg("--direct=1") \83.with_arg("--ioengine=libaio") \84.with_arg("--iodepth=32") \85.with_arg(f"--ramp_time={CONFIG['omit']}") \86.with_arg(f"--numjobs={CONFIG['load_factor'] * basevm.vcpus_count}") \87.with_arg("--randrepeat=0") \88.with_arg(f"--runtime={CONFIG['time']}") \89.with_arg(f"--write_iops_log={mode}{bs}") \90.with_arg(f"--write_bw_log={mode}{bs}") \91.with_arg("--log_avg_msec=1000") \92.with_arg("--output-format=json+") \93.build()9495rc, _, stderr = ssh_conn.execute_command(96"echo 'none' > /sys/block/vdb/queue/scheduler")97assert rc == 0, stderr.read()98assert stderr.read() == ""99100# First, flush all guest cached data to host, then drop guest FS caches.101rc, _, stderr = ssh_conn.execute_command("sync")102assert rc == 0, stderr.read()103assert stderr.read() == ""104rc, _, stderr = ssh_conn.execute_command(105"echo 3 > /proc/sys/vm/drop_caches")106assert rc == 0, stderr.read()107assert stderr.read() == ""108109# Then, flush all host cached data to hardware, also drop host FS caches.110run_cmd("sync")111run_cmd("echo 3 > /proc/sys/vm/drop_caches")112113# Start the CPU load monitor.114with concurrent.futures.ThreadPoolExecutor() as executor:115cpu_load_future = executor.submit(get_cpu_percent,116basevm.jailer_clone_pid,117CONFIG["time"],118omit=CONFIG["omit"])119120# Print the fio command in the log and run it121rc, _, stderr = ssh_conn.execute_command(cmd)122assert rc == 0, stderr.read()123assert stderr.read() == ""124125if os.path.isdir(logs_path):126shutil.rmtree(logs_path)127128os.makedirs(logs_path)129130ssh_conn.scp_get_file("*.log", logs_path)131rc, _, stderr = ssh_conn.execute_command("rm *.log")132assert rc == 0, stderr.read()133134result = dict()135cpu_load = cpu_load_future.result()136tag = "firecracker"137assert tag in cpu_load and len(cpu_load[tag]) == 1138139data = list(cpu_load[tag].values())[0]140data_len = len(data)141assert data_len == CONFIG["time"]142143result[CPU_UTILIZATION_VMM] = sum(data) / data_len144if DEBUG:145result[CPU_UTILIZATION_VMM_SAMPLES_TAG] = data146147vcpus_util = 0148for vcpu in range(basevm.vcpus_count):149# We expect a single fc_vcpu thread tagged with150# f`fc_vcpu {vcpu}`.151tag = f"fc_vcpu {vcpu}"152assert tag in cpu_load and len(cpu_load[tag]) == 1153data = list(cpu_load[tag].values())[0]154data_len = len(data)155156assert data_len == CONFIG["time"]157if DEBUG:158samples_tag = f"cpu_utilization_fc_vcpu_{vcpu}_samples"159result[samples_tag] = data160vcpus_util += sum(data) / data_len161162result[CPU_UTILIZATION_VCPUS_TOTAL] = vcpus_util163return result164165166class DataDirection(Enum):167"""Operation type."""168169READ = 0170WRITE = 1171TRIM = 2172173def __str__(self):174"""Representation as string."""175# pylint: disable=W0143176if self.value == 0:177return "read"178# pylint: disable=W0143179if self.value == 1:180return "write"181# pylint: disable=W0143182if self.value == 2:183return "trim"184return ""185186187def read_values(cons, numjobs, env_id, mode, bs, measurement, logs_path):188"""Read the values for each measurement.189190The values are logged once every second. The time resolution is in msec.191The log file format documentation can be found here:192https://fio.readthedocs.io/en/latest/fio_doc.html#log-file-formats193"""194values = dict()195196for job_id in range(numjobs):197file_path = f"{logs_path}/{env_id}/{mode}{bs}/{mode}" \198f"{bs}_{measurement}.{job_id + 1}.log"199file = open(file_path)200lines = file.readlines()201202direction_count = 1203if mode.endswith("readwrite") or mode.endswith("rw"):204direction_count = 2205206for idx in range(0, len(lines), direction_count):207value_idx = idx//direction_count208for direction in range(direction_count):209data = lines[idx + direction].split(sep=",")210data_dir = DataDirection(int(data[2].strip()))211212measurement_id = f"{measurement}_{str(data_dir)}"213if measurement_id not in values:214values[measurement_id] = dict()215216if value_idx not in values[measurement_id]:217values[measurement_id][value_idx] = list()218values[measurement_id][value_idx].append(int(data[1].strip()))219220for measurement_id in values:221for idx in values[measurement_id]:222# Discard data points which were not measured by all jobs.223if len(values[measurement_id][idx]) != numjobs:224continue225226value = sum(values[measurement_id][idx])227if DEBUG:228cons.consume_custom(measurement_id, value)229cons.consume_data(measurement_id, value)230231232def consume_fio_output(cons, result, numjobs, mode, bs, env_id, logs_path):233"""Consumer function."""234cpu_utilization_vmm = result[CPU_UTILIZATION_VMM]235cpu_utilization_vcpus = result[CPU_UTILIZATION_VCPUS_TOTAL]236237cons.consume_stat("Avg", CPU_UTILIZATION_VMM, cpu_utilization_vmm)238cons.consume_stat("Avg",239CPU_UTILIZATION_VCPUS_TOTAL,240cpu_utilization_vcpus)241242read_values(cons, numjobs, env_id, mode, bs, "iops", logs_path)243read_values(cons, numjobs, env_id, mode, bs, "bw", logs_path)244245246@pytest.mark.nonci247@pytest.mark.timeout(CONFIG["time"] * 1000) # 1.40 hours248def test_block_performance(bin_cloner_path, results_file_dumper):249"""Test network throughput driver for multiple artifacts."""250logger = logging.getLogger(TEST_ID)251artifacts = ArtifactCollection(_test_images_s3_bucket())252microvm_artifacts = ArtifactSet(artifacts.microvms(keyword="2vcpu_1024mb"))253microvm_artifacts.insert(artifacts.microvms(keyword="1vcpu_1024mb"))254kernel_artifacts = ArtifactSet(artifacts.kernels())255disk_artifacts = ArtifactSet(artifacts.disks(keyword="ubuntu"))256257# Create a test context and add builder, logger, network.258test_context = TestContext()259test_context.custom = {260'builder': MicrovmBuilder(bin_cloner_path),261'logger': logger,262'name': TEST_ID,263'results_file_dumper': results_file_dumper264}265266test_matrix = TestMatrix(context=test_context,267artifact_sets=[268microvm_artifacts,269kernel_artifacts,270disk_artifacts271])272test_matrix.run_test(fio_workload)273274275def fio_workload(context):276"""Execute block device emulation benchmarking scenarios."""277vm_builder = context.custom['builder']278logger = context.custom["logger"]279file_dumper = context.custom["results_file_dumper"]280281# Create a rw copy artifact.282rw_disk = context.disk.copy()283# Get ssh key from read-only artifact.284ssh_key = context.disk.ssh_key()285# Create a fresh microvm from artifacts.286vm_instance = vm_builder.build(kernel=context.kernel,287disks=[rw_disk],288ssh_key=ssh_key,289config=context.microvm)290basevm = vm_instance.vm291292# Add a secondary block device for benchmark tests.293fs = drive_tools.FilesystemFile(294os.path.join(basevm.fsfiles, 'scratch'),295CONFIG["block_device_size"]296)297basevm.add_drive('scratch', fs.path)298basevm.start()299300# Get names of threads in Firecracker.301current_cpu_id = 0302basevm.pin_vmm(current_cpu_id)303current_cpu_id += 1304basevm.pin_api(current_cpu_id)305for vcpu_id in range(basevm.vcpus_count):306current_cpu_id += 1307basevm.pin_vcpu(vcpu_id, current_cpu_id)308309st_core = core.Core(name=TEST_ID,310iterations=1,311custom={"microvm": context.microvm.name(),312"kernel": context.kernel.name(),313"disk": context.disk.name(),314"cpu_model_name": get_cpu_model_name()})315316logger.info("Testing with microvm: \"{}\", kernel {}, disk {}"317.format(context.microvm.name(),318context.kernel.name(),319context.disk.name()))320321ssh_connection = net_tools.SSHConnection(basevm.ssh_config)322env_id = f"{context.kernel.name()}/{context.disk.name()}/" \323f"{context.microvm.name()}"324325for mode in CONFIG["fio_modes"]:326for bs in CONFIG["fio_blk_sizes"]:327fio_id = f"{mode}-bs{bs}"328st_prod = st.producer.LambdaProducer(329func=run_fio,330func_kwargs={"env_id": env_id, "basevm": basevm,331"ssh_conn": ssh_connection, "mode": mode,332"bs": bs})333st_cons = st.consumer.LambdaConsumer(334metadata_provider=DictMetadataProvider(335CONFIG["measurements"],336BlockBaselinesProvider(env_id,337fio_id)),338func=consume_fio_output,339func_kwargs={"numjobs": basevm.vcpus_count, "mode": mode,340"bs": bs, "env_id": env_id,341"logs_path": basevm.jailer.chroot_base_with_id()})342st_core.add_pipe(st_prod, st_cons, tag=f"{env_id}/{fio_id}")343344# Gather results and verify pass criteria.345try:346result = st_core.run_exercise()347except core.CoreException as err:348handle_failure(file_dumper, err)349350dump_test_result(file_dumper, result)351352353