"""Utility tools that extracts DWARF information encoded in a wasm output
produced by the LLVM tools, and encodes it as a wasm source map. Additionally,
it can collect original sources, change files prefixes, and strip debug
sections from a wasm file.
"""
import argparse
import json
import logging
import os
import re
import sys
from math import floor, log
__scriptdir__ = os.path.dirname(os.path.abspath(__file__))
__rootdir__ = os.path.dirname(__scriptdir__)
sys.path.insert(0, __rootdir__)
from tools import shared, utils
from tools.system_libs import DETERMINISTIC_PREFIX
LLVM_CXXFILT = shared.llvm_tool_path('llvm-cxxfilt')
EMSCRIPTEN_PREFIX = utils.normalize_path(utils.path_from_root())
logger = logging.getLogger('wasm-sourcemap')
generate_scopes = False
def parse_args(args):
parser = argparse.ArgumentParser(prog='wasm-sourcemap.py', description=__doc__)
parser.add_argument('wasm', help='wasm file')
parser.add_argument('-o', '--output', help='output source map')
parser.add_argument('-p', '--prefix', nargs='*', help='replace source debug filename prefix for source map', default=[])
parser.add_argument('-s', '--sources', action='store_true', help='read and embed source files from file system into source map')
parser.add_argument('-l', '--load-prefix', nargs='*', help='replace source debug filename prefix for reading sources from file system (see also --sources)', default=[])
parser.add_argument('-w', nargs='?', help='set output wasm file')
parser.add_argument('-x', '--strip', action='store_true', help='removes debug and linking sections')
parser.add_argument('-u', '--source-map-url', nargs='?', help='specifies sourceMappingURL section content')
parser.add_argument('--dwarfdump', help="path to llvm-dwarfdump executable")
parser.add_argument('--dwarfdump-output', nargs='?', help=argparse.SUPPRESS)
parser.add_argument('--basepath', help='base path for source files, which will be relative to this')
return parser.parse_args(args)
class Prefixes:
def __init__(self, args, base_path=None, preserve_deterministic_prefix=True):
prefixes = []
for p in args:
if '=' in p:
prefix, replacement = p.split('=')
prefixes.append({'prefix': utils.normalize_path(prefix), 'replacement': replacement})
else:
prefixes.append({'prefix': utils.normalize_path(p), 'replacement': ''})
self.base_path = utils.normalize_path(base_path) if base_path is not None else None
self.preserve_deterministic_prefix = preserve_deterministic_prefix
self.prefixes = prefixes
self.cache = {}
def resolve(self, name):
if name in self.cache:
return self.cache[name]
source = name
if not self.preserve_deterministic_prefix and name.startswith(DETERMINISTIC_PREFIX):
source = EMSCRIPTEN_PREFIX + name.removeprefix(DETERMINISTIC_PREFIX)
provided = False
for p in self.prefixes:
if source.startswith(p['prefix']):
source = p['replacement'] + source.removeprefix(p['prefix'])
provided = True
break
if not (source.startswith(DETERMINISTIC_PREFIX) or provided or self.base_path is None):
try:
source = os.path.relpath(source, self.base_path)
except ValueError:
source = os.path.abspath(source)
source = utils.normalize_path(source)
self.cache[name] = source
return source
class SourceMapPrefixes:
def __init__(self, sources, load, base_path):
self.sources = Prefixes(sources, base_path=base_path)
self.load = Prefixes(load, preserve_deterministic_prefix=False)
def encode_vlq(n):
VLQ_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
x = (n << 1) if n >= 0 else ((-n << 1) + 1)
result = ""
while x > 31:
result = result + VLQ_CHARS[32 + (x & 31)]
x = x >> 5
return result + VLQ_CHARS[x]
def read_var_uint(wasm, pos):
n = 0
shift = 0
b = ord(wasm[pos:pos + 1])
pos = pos + 1
while b >= 128:
n = n | ((b - 128) << shift)
b = ord(wasm[pos:pos + 1])
pos = pos + 1
shift += 7
return n + (b << shift), pos
def strip_debug_sections(wasm):
logger.debug('Strip debug sections')
pos = 8
stripped = wasm[:pos]
while pos < len(wasm):
section_start = pos
section_id, pos_ = read_var_uint(wasm, pos)
section_size, section_body = read_var_uint(wasm, pos_)
pos = section_body + section_size
if section_id == 0:
name_len, name_pos = read_var_uint(wasm, section_body)
name_end = name_pos + name_len
name = str(wasm[name_pos:name_end])
if name in {'linking', 'sourceMappingURL'} or name.startswith(('reloc..debug_', '.debug_')):
continue
stripped = stripped + wasm[section_start:pos]
return stripped
def encode_uint_var(n):
result = bytearray()
while n > 127:
result.append(128 | (n & 127))
n = n >> 7
result.append(n)
return bytes(result)
def append_source_mapping(wasm, url):
logger.debug('Append sourceMappingURL section')
section_name = "sourceMappingURL"
section_content = encode_uint_var(len(section_name)) + section_name.encode() + encode_uint_var(len(url)) + url.encode()
return wasm + encode_uint_var(0) + encode_uint_var(len(section_content)) + section_content
def get_code_section_offset(wasm):
logger.debug('Read sections index')
pos = 8
while pos < len(wasm):
section_id, pos_ = read_var_uint(wasm, pos)
section_size, pos = read_var_uint(wasm, pos_)
if section_id == 10:
return pos
pos = pos + section_size
def remove_dead_entries(entries):
block_start = 0
cur_entry = 0
while cur_entry < len(entries):
if not entries[cur_entry]['eos']:
cur_entry += 1
continue
fn_start = entries[block_start]['address']
fn_size_length = floor(log(entries[cur_entry]['address'] - fn_start + 1, 128)) + 1
min_live_offset = 1 + fn_size_length
if fn_start < min_live_offset:
del entries[block_start:cur_entry + 1]
cur_entry = block_start
continue
cur_entry += 1
block_start = cur_entry
def decode_octal_encoded_utf8(str):
out = bytearray(len(str))
i = 0
o = 0
final_length = len(str)
in_escape = False
while i < len(str):
if not in_escape and str[i] == '\\' and (str[i + 1] == '2' or str[i + 1] == '3'):
out[o] = int(str[i + 1:i + 4], 8)
i += 4
final_length -= 3
in_escape = False
else:
out[o] = ord(str[i])
in_escape = False if in_escape else (str[i] == '\\')
i += 1
o += 1
return out[:final_length].decode('utf-8')
def extract_comp_dir_map(text):
compile_unit_pattern = re.compile(r"0x[0-9a-f]*: DW_TAG_compile_unit")
stmt_list_pattern = re.compile(r"DW_AT_stmt_list\s+\((0x[0-9a-f]*)\)")
comp_dir_pattern = re.compile(r"DW_AT_comp_dir\s+\(\"([^\"]+)\"\)")
map_stmt_list_to_comp_dir = {}
iterator = compile_unit_pattern.finditer(text)
current_match = next(iterator, None)
while current_match:
next_match = next(iterator, None)
start = current_match.end()
end = next_match.start() if next_match else len(text)
stmt_list_match = stmt_list_pattern.search(text, start, end)
if stmt_list_match is not None:
stmt_list = stmt_list_match.group(1)
comp_dir_match = comp_dir_pattern.search(text, start, end)
comp_dir = decode_octal_encoded_utf8(comp_dir_match.group(1)) if comp_dir_match is not None else ''
map_stmt_list_to_comp_dir[stmt_list] = comp_dir
current_match = next_match
return map_stmt_list_to_comp_dir
def demangle_names(names):
mangled_names = sorted({n for n in names if n.startswith('_Z')})
if not mangled_names:
return {}
if not os.path.exists(LLVM_CXXFILT):
logger.warning('llvm-cxxfilt does not exist')
return {}
input_str = '\n'.join(mangled_names)
proc = shared.check_call([LLVM_CXXFILT], input=input_str, stdout=shared.PIPE, stderr=shared.PIPE, text=True)
if proc.returncode != 0:
logger.warning('llvm-cxxfilt failed: %s' % proc.stderr)
return {}
demangled_list = proc.stdout.splitlines()
if len(demangled_list) != len(mangled_names):
logger.warning('llvm-cxxfilt output length mismatch')
return {}
return dict(zip(mangled_names, demangled_list, strict=True))
class FuncRange:
def __init__(self, name, low_pc, high_pc):
self.name = name
self.low_pc = low_pc
self.high_pc = high_pc
def extract_func_ranges(text):
next_tag_pattern = re.compile(r'\n0x[0-9a-f]+:')
func_pattern = re.compile(r'DW_TAG_(?:subprogram|inlined_subroutine)')
low_pc_pattern = re.compile(r'DW_AT_low_pc\s+\(0x([0-9a-f]+)\)')
high_pc_pattern = re.compile(r'DW_AT_high_pc\s+\(0x([0-9a-f]+)\)')
abstract_origin_pattern = re.compile(r'DW_AT_abstract_origin\s+\(0x[0-9a-f]+\s+"([^"]+)"\)')
linkage_name_pattern = re.compile(r'DW_AT_linkage_name\s+\("([^"]+)"\)')
name_pattern = re.compile(r'DW_AT_name\s+\("([^"]+)"\)')
specification_pattern = re.compile(r'DW_AT_specification\s+\(0x[0-9a-f]+\s+"([^"]+)"\)')
def get_name_from_tag(start, end):
m = linkage_name_pattern.search(text, start, end)
if m:
return m.group(1)
m = name_pattern.search(text, start, end)
if m:
return m.group(1)
m = specification_pattern.search(text, start, end)
if m:
return m.group(1)
return None
func_ranges = []
for match in func_pattern.finditer(text):
search_start = match.end()
m_next = next_tag_pattern.search(text, search_start)
search_end = m_next.start() if m_next else len(text)
name = None
low_pc = None
high_pc = None
m = low_pc_pattern.search(text, search_start, search_end)
if m:
low_pc = int(m.group(1), 16)
m = high_pc_pattern.search(text, search_start, search_end)
if m:
high_pc = int(m.group(1), 16)
if 'DW_TAG_subprogram' in match.group(0):
name = get_name_from_tag(search_start, search_end)
else:
m = abstract_origin_pattern.search(text, search_start, search_end)
if m:
name = m.group(1)
if name and low_pc is not None and high_pc is not None:
func_ranges.append(FuncRange(name, low_pc, high_pc))
all_names = [item.name for item in func_ranges]
demangled_map = demangle_names(all_names)
for func_range in func_ranges:
if func_range.name in demangled_map:
func_range.name = demangled_map[func_range.name]
func_ranges.sort(key=lambda item: (item.low_pc, -item.high_pc))
return func_ranges
def read_dwarf_info(wasm, options):
if options.dwarfdump_output:
output = utils.read_file(options.dwarfdump_output)
elif options.dwarfdump:
logger.debug('Reading DWARF information from %s' % wasm)
if not os.path.exists(options.dwarfdump):
utils.exit_with_error('llvm-dwarfdump not found: ' + options.dwarfdump)
dwarfdump_cmd = [options.dwarfdump, '-debug-info', '-debug-line', wasm]
if generate_scopes:
dwarfdump_cmd += ['-t', 'DW_TAG_compile_unit', '-t', 'DW_TAG_subprogram',
'-t', 'DW_TAG_inlined_subroutine']
else:
dwarfdump_cmd += ['--recurse-depth=0']
proc = shared.check_call(dwarfdump_cmd, stdout=shared.PIPE)
output = proc.stdout
else:
utils.exit_with_error('Please specify either --dwarfdump or --dwarfdump-output')
debug_line_pattern = re.compile(r"debug_line\[(0x[0-9a-f]*)\]")
include_dir_pattern = re.compile(r"include_directories\[\s*(\d+)\] = \"([^\"]*)")
file_pattern = re.compile(r"file_names\[\s*(\d+)\]:\s+name: \"([^\"]*)\"\s+dir_index: (\d+)")
line_pattern = re.compile(r"\n0x([0-9a-f]+)\s+(\d+)\s+(\d+)\s+(\d+)(.*?end_sequence)?")
entries = []
iterator = debug_line_pattern.finditer(output)
current_match = None
try:
current_match = next(iterator)
debug_info_end = current_match.start()
except StopIteration:
debug_info_end = len(output)
debug_info = output[:debug_info_end]
map_stmt_list_to_comp_dir = extract_comp_dir_map(debug_info)
while current_match:
next_match = next(iterator, None)
stmt_list = current_match.group(1)
start = current_match.end()
end = next_match.start() if next_match else len(output)
comp_dir = map_stmt_list_to_comp_dir.get(stmt_list, '')
include_directories = {'0': comp_dir}
for dir in include_dir_pattern.finditer(output, start, end):
include_directories[dir.group(1)] = os.path.join(comp_dir, decode_octal_encoded_utf8(dir.group(2)))
files = {}
for file in file_pattern.finditer(output, start, end):
dir = include_directories[file.group(3)]
file_path = os.path.join(dir, decode_octal_encoded_utf8(file.group(2)))
files[file.group(1)] = file_path
for line in line_pattern.finditer(output, start, end):
entry = {'address': int(line.group(1), 16), 'line': int(line.group(2)), 'column': int(line.group(3)), 'file': files[line.group(4)], 'eos': line.group(5) is not None}
if not entry['eos']:
entries.append(entry)
else:
entry['address'] -= 1
if entries[-1]['address'] == entry['address']:
entries[-1]['eos'] = True
else:
entries.append(entry)
current_match = next_match
remove_dead_entries(entries)
entries = sorted(entries, key=lambda entry: entry['address'])
if generate_scopes:
func_ranges = extract_func_ranges(debug_info)
else:
func_ranges = []
return entries, func_ranges
def build_sourcemap(entries, func_ranges, code_section_offset, options):
base_path = options.basepath
collect_sources = options.sources
prefixes = SourceMapPrefixes(options.prefix, options.load_prefix, base_path)
for func_range in func_ranges:
func_range.low_pc += code_section_offset
func_range.high_pc += code_section_offset
sources = []
sources_content = []
names = sorted({item.name for item in func_ranges})
name_to_id = {name: i for i, name in enumerate(names)}
mappings = []
sources_map = {}
last_address = 0
last_source_id = 0
last_line = 1
last_column = 1
last_func_id = 0
active_funcs = []
next_func_range_id = 0
def get_function_id(address):
nonlocal active_funcs
nonlocal next_func_range_id
while next_func_range_id < len(func_ranges) and func_ranges[next_func_range_id].low_pc <= address:
active_funcs.append((func_ranges[next_func_range_id].high_pc, next_func_range_id))
next_func_range_id += 1
active_funcs = [f for f in active_funcs if f[0] > address]
if active_funcs:
func_range_id = active_funcs[-1][1]
name = func_ranges[func_range_id].name
return name_to_id[name]
return None
for entry in entries:
line = entry['line']
column = entry['column']
if line == 0:
continue
if column == 0:
column = 1
address = entry['address'] + code_section_offset
file_name = utils.normalize_path(entry['file'])
source_name = prefixes.sources.resolve(file_name)
if source_name not in sources_map:
source_id = len(sources)
sources_map[source_name] = source_id
sources.append(source_name)
if collect_sources:
load_name = prefixes.load.resolve(file_name)
try:
with open(load_name) as infile:
source_content = infile.read()
sources_content.append(source_content)
except OSError:
print('Failed to read source: %s' % load_name)
sources_content.append(None)
else:
source_id = sources_map[source_name]
func_id = get_function_id(address)
address_delta = address - last_address
source_id_delta = source_id - last_source_id
line_delta = line - last_line
column_delta = column - last_column
last_address = address
last_source_id = source_id
last_line = line
last_column = column
mapping = encode_vlq(address_delta) + encode_vlq(source_id_delta) + encode_vlq(line_delta) + encode_vlq(column_delta)
if func_id is not None:
func_id_delta = func_id - last_func_id
last_func_id = func_id
mapping += encode_vlq(func_id_delta)
mappings.append(mapping)
return {'version': 3,
'sources': sources,
'sourcesContent': sources_content,
'names': names,
'mappings': ','.join(mappings)}
def main(args):
options = parse_args(args)
wasm_input = options.wasm
with open(wasm_input, 'rb') as infile:
wasm = infile.read()
entries, func_ranges = read_dwarf_info(wasm_input, options)
code_section_offset = get_code_section_offset(wasm)
logger.debug('Saving to %s' % options.output)
map = build_sourcemap(entries, func_ranges, code_section_offset, options)
with open(options.output, 'w', encoding='utf-8') as outfile:
json.dump(map, outfile, separators=(',', ':'), ensure_ascii=False)
if options.strip:
wasm = strip_debug_sections(wasm)
if options.source_map_url:
wasm = append_source_mapping(wasm, options.source_map_url)
if options.w:
logger.debug('Saving wasm to %s' % options.w)
with open(options.w, 'wb') as outfile:
outfile.write(wasm)
logger.debug('Done')
return 0
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG if os.environ.get('EMCC_DEBUG') else logging.INFO)
sys.exit(main(sys.argv[1:]))