1081 lines
38 KiB
Python
1081 lines
38 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Device Tree Blob Parser
|
|
|
|
Copyright 2014 Neil 'superna' Armstrong <superna9999@gmail.com>
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
|
|
@author: Neil 'superna' Armstrong <superna9999@gmail.com>
|
|
"""
|
|
|
|
import string
|
|
import os
|
|
import json
|
|
from copy import deepcopy, copy
|
|
from struct import Struct, unpack, pack
|
|
|
|
FDT_MAGIC = 0xd00dfeed
|
|
FDT_BEGIN_NODE = 0x1
|
|
FDT_END_NODE = 0x2
|
|
FDT_PROP = 0x3
|
|
FDT_NOP = 0x4
|
|
FDT_END = 0x9
|
|
|
|
INDENT = ' ' * 4
|
|
|
|
FDT_MAX_VERSION = 17
|
|
|
|
|
|
class FdtProperty(object):
|
|
""" Represents an empty property"""
|
|
|
|
@staticmethod
|
|
def __validate_dt_name(name):
|
|
"""Checks the name validity"""
|
|
return not any([True for char in name
|
|
if char not in string.printable])
|
|
|
|
def __init__(self, name):
|
|
"""Init with name"""
|
|
self.name = name
|
|
if not FdtProperty.__validate_dt_name(self.name):
|
|
raise Exception("Invalid name '%s'" % self.name)
|
|
|
|
def get_name(self):
|
|
"""Get property name"""
|
|
return self.name
|
|
|
|
def __str__(self):
|
|
"""String representation"""
|
|
return "Property(%s)" % self.name
|
|
|
|
def dts_represent(self, depth=0):
|
|
"""Get dts string representation"""
|
|
return INDENT*depth + self.name + ';'
|
|
|
|
def dtb_represent(self, string_store, pos=0, version=17):
|
|
"""Get blob representation"""
|
|
# print "%x:%s" % (pos, self)
|
|
strpos = string_store.find(self.name+'\0')
|
|
if strpos < 0:
|
|
strpos = len(string_store)
|
|
string_store += self.name+'\0'
|
|
pos += 12
|
|
return (pack('>III', FDT_PROP, 0, strpos),
|
|
string_store, pos)
|
|
|
|
def json_represent(self, depth=0):
|
|
"""Ouput JSON"""
|
|
return '%s: null' % json.dumps(self.name)
|
|
|
|
def to_raw(self):
|
|
"""Return RAW value representation"""
|
|
return ''
|
|
|
|
def __getitem__(self, value):
|
|
"""Returns No Items"""
|
|
return None
|
|
|
|
def __ne__(self, node):
|
|
"""Check property inequality
|
|
"""
|
|
return not self.__eq__(node)
|
|
|
|
def __eq__(self, node):
|
|
"""Check node equality
|
|
check properties are the same (same values)
|
|
"""
|
|
if not isinstance(node, FdtProperty):
|
|
raise Exception("Invalid object type")
|
|
if self.name != node.get_name():
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def __check_prop_strings(value):
|
|
"""Check property string validity
|
|
Python version of util_is_printable_string from dtc
|
|
"""
|
|
pos = 0
|
|
posi = 0
|
|
end = len(value)
|
|
|
|
if not len(value):
|
|
return None
|
|
|
|
#Needed for python 3 support: If a bytes object is passed,
|
|
#decode it with the ascii codec. If the decoding fails, assume
|
|
#it was not a string object.
|
|
try:
|
|
value = value.decode('ascii')
|
|
except ValueError:
|
|
return None
|
|
|
|
#Test both against string 0 and int 0 because of python2/3 compatibility
|
|
if value[-1] != '\0':
|
|
return None
|
|
|
|
while pos < end:
|
|
posi = pos
|
|
while pos < end and value[pos] != '\0' \
|
|
and value[pos] in string.printable \
|
|
and value[pos] not in ('\r', '\n'):
|
|
pos += 1
|
|
|
|
if value[pos] != '\0' or pos == posi:
|
|
return None
|
|
pos += 1
|
|
|
|
return True
|
|
|
|
@staticmethod
|
|
def new_raw_property(name, raw_value):
|
|
"""Instantiate property with raw value type"""
|
|
if FdtProperty.__check_prop_strings(raw_value):
|
|
return FdtPropertyStrings.init_raw(name, raw_value)
|
|
elif len(raw_value) and len(raw_value) % 4 == 0:
|
|
return FdtPropertyWords.init_raw(name, raw_value)
|
|
elif len(raw_value) and len(raw_value):
|
|
return FdtPropertyBytes.init_raw(name, raw_value)
|
|
else:
|
|
return FdtProperty(name)
|
|
|
|
|
|
class FdtPropertyStrings(FdtProperty):
|
|
"""Property with strings as value"""
|
|
|
|
@classmethod
|
|
def __extract_prop_strings(cls, value):
|
|
"""Extract strings from raw_value"""
|
|
return [st for st in \
|
|
value.decode('ascii').split('\0') if len(st)]
|
|
|
|
def __init__(self, name, strings):
|
|
"""Init with strings"""
|
|
FdtProperty.__init__(self, name)
|
|
if not strings:
|
|
raise Exception("Invalid strings")
|
|
for stri in strings:
|
|
if len(stri) == 0:
|
|
raise Exception("Invalid strings")
|
|
if any([True for char in stri
|
|
if char not in string.printable
|
|
or char in ('\r', '\n')]):
|
|
raise Exception("Invalid chars in strings")
|
|
self.strings = strings
|
|
|
|
@classmethod
|
|
def init_raw(cls, name, raw_value):
|
|
"""Init from raw"""
|
|
return cls(name, cls.__extract_prop_strings(raw_value))
|
|
|
|
def dts_represent(self, depth=0):
|
|
"""Get dts string representation"""
|
|
return INDENT*depth + self.name + ' = "' + \
|
|
'", "'.join(self.strings) + '";'
|
|
|
|
def dtb_represent(self, string_store, pos=0, version=17):
|
|
"""Get blob representation"""
|
|
# print "%x:%s" % (pos, self)
|
|
blob = pack('')
|
|
for chars in self.strings:
|
|
blob += chars.encode('ascii') + pack('b', 0)
|
|
blob_len = len(blob)
|
|
if version < 16 and (pos+12) % 8 != 0:
|
|
blob = pack('b', 0) * (8-((pos+12) % 8)) + blob
|
|
if blob_len % 4:
|
|
blob += pack('b', 0) * (4-(blob_len % 4))
|
|
strpos = string_store.find(self.name+'\0')
|
|
if strpos < 0:
|
|
strpos = len(string_store)
|
|
string_store += self.name+'\0'
|
|
blob = pack('>III', FDT_PROP, blob_len, strpos) + blob
|
|
pos += len(blob)
|
|
return (blob, string_store, pos)
|
|
|
|
def json_represent(self, depth=0):
|
|
"""Ouput JSON"""
|
|
result = '%s: ["strings", ' % json.dumps(self.name)
|
|
result += ', '.join([json.dumps(stri) for stri in self.strings])
|
|
result += ']'
|
|
return result
|
|
|
|
def to_raw(self):
|
|
"""Return RAW value representation"""
|
|
return ''.join([chars+'\0' for chars in self.strings])
|
|
|
|
def __str__(self):
|
|
"""String representation"""
|
|
return "Property(%s,Strings:%s)" % (self.name, self.strings)
|
|
|
|
def __getitem__(self, index):
|
|
"""Get strings, returns a string"""
|
|
return self.strings[index]
|
|
|
|
def __len__(self):
|
|
"""Get strings count"""
|
|
return len(self.strings)
|
|
|
|
def __eq__(self, node):
|
|
"""Check node equality
|
|
check properties are the same (same values)
|
|
"""
|
|
if not FdtProperty.__eq__(self, node):
|
|
return False
|
|
if self.__len__() != len(node):
|
|
return False
|
|
for index in range(self.__len__()):
|
|
if self.strings[index] != node[index]:
|
|
return False
|
|
return True
|
|
|
|
class FdtPropertyWords(FdtProperty):
|
|
"""Property with words as value"""
|
|
|
|
def __init__(self, name, words):
|
|
"""Init with words"""
|
|
FdtProperty.__init__(self, name)
|
|
for word in words:
|
|
if not 0 <= word <= 4294967295:
|
|
raise Exception("Invalid word value %d, requires 0 <= number <= 4294967295" % word)
|
|
if not len(words):
|
|
raise Exception("Invalid Words")
|
|
self.words = words
|
|
|
|
@classmethod
|
|
def init_raw(cls, name, raw_value):
|
|
"""Init from raw"""
|
|
if len(raw_value) % 4 == 0:
|
|
words = [unpack(">I", raw_value[i:i+4])[0]
|
|
for i in range(0, len(raw_value), 4)]
|
|
return cls(name, words)
|
|
else:
|
|
raise Exception("Invalid raw Words")
|
|
|
|
def dts_represent(self, depth=0):
|
|
"""Get dts string representation"""
|
|
return INDENT*depth + self.name + ' = <' + \
|
|
' '.join(["0x%08x" % word for word in self.words]) + ">;"
|
|
|
|
def dtb_represent(self, string_store, pos=0, version=17):
|
|
"""Get blob representation"""
|
|
# # print "%x:%s" % (pos, self)
|
|
strpos = string_store.find(self.name+'\0')
|
|
if strpos < 0:
|
|
strpos = len(string_store)
|
|
string_store += self.name+'\0'
|
|
blob = pack('>III', FDT_PROP, len(self.words)*4, strpos) + \
|
|
pack('').join([pack('>I', word) for word in self.words])
|
|
pos += len(blob)
|
|
return (blob, string_store, pos)
|
|
|
|
def json_represent(self, depth=0):
|
|
"""Ouput JSON"""
|
|
result = '%s: ["words", "' % json.dumps(self.name)
|
|
result += '", "'.join(["0x%08x" % word for word in self.words])
|
|
result += '"]'
|
|
return result
|
|
|
|
def to_raw(self):
|
|
"""Return RAW value representation"""
|
|
return ''.join([pack('>I', word) for word in self.words])
|
|
|
|
def __str__(self):
|
|
"""String representation"""
|
|
return "Property(%s,Words:%s)" % (self.name, self.words)
|
|
|
|
def __getitem__(self, index):
|
|
"""Get words, returns a word integer"""
|
|
return self.words[index]
|
|
|
|
def __len__(self):
|
|
"""Get words count"""
|
|
return len(self.words)
|
|
|
|
def __eq__(self, node):
|
|
"""Check node equality
|
|
check properties are the same (same values)
|
|
"""
|
|
if not FdtProperty.__eq__(self, node):
|
|
return False
|
|
if self.__len__() != len(node):
|
|
return False
|
|
for index in range(self.__len__()):
|
|
if self.words[index] != node[index]:
|
|
return False
|
|
return True
|
|
|
|
|
|
class FdtPropertyBytes(FdtProperty):
|
|
"""Property with signed bytes as value"""
|
|
|
|
def __init__(self, name, bytez):
|
|
"""Init with bytes"""
|
|
FdtProperty.__init__(self, name)
|
|
for byte in bytez:
|
|
if not -128 <= byte <= 127:
|
|
raise Exception("Invalid value for byte %d, requires -128 <= number <= 127" % byte)
|
|
if not bytez:
|
|
raise Exception("Invalid Bytes")
|
|
self.bytes = bytez
|
|
|
|
@classmethod
|
|
def init_raw(cls, name, raw_value):
|
|
"""Init from raw"""
|
|
return cls(name, unpack('b' * len(raw_value), raw_value))
|
|
|
|
def dts_represent(self, depth=0):
|
|
"""Get dts string representation"""
|
|
return INDENT*depth + self.name + ' = [' + \
|
|
' '.join(["%02x" % (byte & int('ffffffff',16))
|
|
for byte in self.bytes]) + "];"
|
|
|
|
def dtb_represent(self, string_store, pos=0, version=17):
|
|
"""Get blob representation"""
|
|
# print "%x:%s" % (pos, self)
|
|
strpos = string_store.find(self.name+'\0')
|
|
if strpos < 0:
|
|
strpos = len(string_store)
|
|
string_store += self.name+'\0'
|
|
blob = pack('>III', FDT_PROP, len(self.bytes), strpos)
|
|
blob += pack('').join([pack('>b', byte) for byte in self.bytes])
|
|
if len(blob) % 4:
|
|
blob += pack('b', 0) * (4-(len(blob) % 4))
|
|
pos += len(blob)
|
|
return (blob, string_store, pos)
|
|
|
|
def json_represent(self, depth=0):
|
|
"""Ouput JSON"""
|
|
result = '%s: ["bytes", "' % json.dumps(self.name)
|
|
result += '", "'.join(["%02x" % byte
|
|
for byte in self.bytes])
|
|
result += '"]'
|
|
return result
|
|
|
|
def to_raw(self):
|
|
"""Return RAW value representation"""
|
|
return ''.join([pack('>b', byte) for byte in self.bytes])
|
|
|
|
def __str__(self):
|
|
"""String representation"""
|
|
return "Property(%s,Bytes:%s)" % (self.name, self.bytes)
|
|
|
|
def __getitem__(self, index):
|
|
"""Get bytes, returns a byte"""
|
|
return self.bytes[index]
|
|
|
|
def __len__(self):
|
|
"""Get strings count"""
|
|
return len(self.bytes)
|
|
|
|
def __eq__(self, node):
|
|
"""Check node equality
|
|
check properties are the same (same values)
|
|
"""
|
|
if not FdtProperty.__eq__(self, node):
|
|
return False
|
|
if self.__len__() != len(node):
|
|
return False
|
|
for index in range(self.__len__()):
|
|
if self.bytes[index] != node[index]:
|
|
return False
|
|
return True
|
|
|
|
|
|
class FdtNop(object): # pylint: disable-msg=R0903
|
|
"""Nop child representation"""
|
|
|
|
def __init__(self):
|
|
"""Init with nothing"""
|
|
|
|
def get_name(self): # pylint: disable-msg=R0201
|
|
"""Return name"""
|
|
return None
|
|
|
|
def __str__(self):
|
|
"""String representation"""
|
|
return ''
|
|
|
|
def dts_represent(self, depth=0): # pylint: disable-msg=R0201
|
|
"""Get dts string representation"""
|
|
return INDENT*depth+'// [NOP]'
|
|
|
|
def dtb_represent(self, string_store, pos=0, version=17):
|
|
"""Get blob representation"""
|
|
# print "%x:%s" % (pos, self)
|
|
pos += 4
|
|
return (pack('>I', FDT_NOP), string_store, pos)
|
|
|
|
|
|
class FdtNode(object):
|
|
"""Node representation"""
|
|
|
|
@staticmethod
|
|
def __validate_dt_name(name):
|
|
"""Checks the name validity"""
|
|
return not any([True for char in name
|
|
if char not in string.printable])
|
|
|
|
def __init__(self, name):
|
|
"""Init node with name"""
|
|
self.name = name
|
|
self.subdata = []
|
|
self.parent = None
|
|
if not FdtNode.__validate_dt_name(self.name):
|
|
raise Exception("Invalid name '%s'" % self.name)
|
|
|
|
def get_name(self):
|
|
"""Get property name"""
|
|
return self.name
|
|
|
|
def __check_name_duplicate(self, name):
|
|
"""Checks if name is not in a subnode"""
|
|
for data in self.subdata:
|
|
if not isinstance(data, FdtNop) \
|
|
and data.get_name() == name:
|
|
return True
|
|
return False
|
|
|
|
def add_subnode(self, node):
|
|
"""Add child, deprecated use append()"""
|
|
self.append(node)
|
|
|
|
def add_raw_attribute(self, name, raw_value):
|
|
"""Construct a raw attribute and add to child"""
|
|
self.append(FdtProperty.new_raw_property(name, raw_value))
|
|
|
|
def set_parent_node(self, node):
|
|
"""Set parent node, None and FdtNode accepted"""
|
|
if node is not None and \
|
|
not isinstance(node, FdtNode):
|
|
raise Exception("Invalid object type")
|
|
self.parent = node
|
|
|
|
def get_parent_node(self):
|
|
"""Get parent node"""
|
|
return self.parent
|
|
|
|
def __str__(self):
|
|
"""String representation"""
|
|
return "Node(%s)" % self.name
|
|
|
|
def dts_represent(self, depth=0):
|
|
"""Get dts string representation"""
|
|
result = ('\n').join([sub.dts_represent(depth+1)
|
|
for sub in self.subdata])
|
|
if len(result) > 0:
|
|
result += '\n'
|
|
return INDENT*depth + self.name + ' {\n' + \
|
|
result + INDENT*depth + "};"
|
|
|
|
def dtb_represent(self, strings_store, pos=0, version=17):
|
|
"""Get blob representation
|
|
Pass string storage as strings_store, pos for current node start
|
|
and version as current dtb version
|
|
"""
|
|
# print "%x:%s" % (pos, self)
|
|
strings = strings_store
|
|
if self.get_name() == '/':
|
|
blob = pack('>II', FDT_BEGIN_NODE, 0)
|
|
else:
|
|
blob = pack('>I', FDT_BEGIN_NODE)
|
|
blob += self.get_name().encode('ascii') + pack('b', 0)
|
|
if len(blob) % 4:
|
|
blob += pack('b', 0) * (4-(len(blob) % 4))
|
|
pos += len(blob)
|
|
for sub in self.subdata:
|
|
(data, strings, pos) = sub.dtb_represent(strings, pos, version)
|
|
blob += data
|
|
pos += 4
|
|
blob += pack('>I', FDT_END_NODE)
|
|
return (blob, strings, pos)
|
|
|
|
def json_represent(self, depth=0):
|
|
"""Get dts string representation"""
|
|
result = (',\n'+ \
|
|
INDENT*(depth+1)).join([sub.json_represent(depth+1)
|
|
for sub in self.subdata
|
|
if not isinstance(sub, FdtNop)])
|
|
if len(result) > 0:
|
|
result = INDENT + result + '\n'+INDENT*depth
|
|
if self.get_name() == '/':
|
|
return "{\n" + INDENT*(depth) + result + "}"
|
|
else:
|
|
return json.dumps(self.name) + ': {\n' + \
|
|
INDENT*(depth) + result + "}"
|
|
|
|
def __getitem__(self, index):
|
|
"""Get subnodes, returns either a Node, a Property or a Nop"""
|
|
return self.subdata[index]
|
|
|
|
def __setitem__(self, index, subnode):
|
|
"""Set node at index, replacing previous subnode,
|
|
must not be a duplicate name
|
|
"""
|
|
if self.subdata[index].get_name() != subnode.get_name() and \
|
|
self.__check_name_duplicate(subnode.get_name()):
|
|
raise Exception("%s : %s subnode already exists" % \
|
|
(self, subnode))
|
|
if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)):
|
|
raise Exception("Invalid object type")
|
|
self.subdata[index] = subnode
|
|
|
|
def __len__(self):
|
|
"""Get strings count"""
|
|
return len(self.subdata)
|
|
|
|
def __ne__(self, node):
|
|
"""Check node inequality
|
|
i.e. is subnodes are the same, in either order
|
|
and properties are the same (same values)
|
|
The FdtNop is excluded from the check
|
|
"""
|
|
return not self.__eq__(node)
|
|
|
|
def __eq__(self, node):
|
|
"""Check node equality
|
|
i.e. is subnodes are the same, in either order
|
|
and properties are the same (same values)
|
|
The FdtNop is excluded from the check
|
|
"""
|
|
if not isinstance(node, FdtNode):
|
|
raise Exception("Invalid object type")
|
|
if self.name != node.get_name():
|
|
return False
|
|
curnames = set([subnode.get_name() for subnode in self.subdata
|
|
if not isinstance(subnode, FdtNop)])
|
|
cmpnames = set([subnode.get_name() for subnode in node
|
|
if not isinstance(subnode, FdtNop)])
|
|
if curnames != cmpnames:
|
|
return False
|
|
for subnode in [subnode for subnode in self.subdata
|
|
if not isinstance(subnode, FdtNop)]:
|
|
index = node.index(subnode.get_name())
|
|
if subnode != node[index]:
|
|
return False
|
|
return True
|
|
|
|
def append(self, subnode):
|
|
"""Append subnode, same as add_subnode"""
|
|
if self.__check_name_duplicate(subnode.get_name()):
|
|
raise Exception("%s : %s subnode already exists" % \
|
|
(self, subnode))
|
|
if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)):
|
|
raise Exception("Invalid object type")
|
|
self.subdata.append(subnode)
|
|
|
|
def pop(self, index=-1):
|
|
"""Remove and returns subnode at index, default the last"""
|
|
return self.subdata.pop(index)
|
|
|
|
def insert(self, index, subnode):
|
|
"""Insert subnode before index, must not be a duplicate name"""
|
|
if self.__check_name_duplicate(subnode.get_name()):
|
|
raise Exception("%s : %s subnode already exists" % \
|
|
(self, subnode))
|
|
if not isinstance(subnode, (FdtNode, FdtProperty, FdtNop)):
|
|
raise Exception("Invalid object type")
|
|
self.subdata.insert(index, subnode)
|
|
|
|
def _find(self, name):
|
|
"""Find name in subnodes"""
|
|
for i in range(0, len(self.subdata)):
|
|
if not isinstance(self.subdata[i], FdtNop) and \
|
|
name == self.subdata[i].get_name():
|
|
return i
|
|
return None
|
|
|
|
def remove(self, name):
|
|
"""Remove subnode with the name
|
|
Raises ValueError is not present
|
|
"""
|
|
index = self._find(name)
|
|
if index is None:
|
|
raise ValueError("Not present")
|
|
return self.subdata.pop(index)
|
|
|
|
def index(self, name):
|
|
"""Returns position of subnode with the name
|
|
Raises ValueError is not present
|
|
"""
|
|
index = self._find(name)
|
|
if index is None:
|
|
raise ValueError("Not present")
|
|
return index
|
|
|
|
def merge(self, node):
|
|
"""Merge two nodes and subnodes
|
|
Replace current properties with the given properties
|
|
"""
|
|
if not isinstance(node, FdtNode):
|
|
raise Exception("Can only merge with a FdtNode")
|
|
for subnode in [obj for obj in node
|
|
if isinstance(obj, (FdtNode, FdtProperty))]:
|
|
index = self._find(subnode.get_name())
|
|
if index is None:
|
|
dup = deepcopy(subnode)
|
|
if isinstance(subnode, FdtNode):
|
|
dup.set_parent_node(self)
|
|
self.append(dup)
|
|
elif isinstance(subnode, FdtNode):
|
|
self.subdata[index].merge(subnode)
|
|
else:
|
|
self.subdata[index] = copy(subnode)
|
|
|
|
def walk(self):
|
|
"""Walk into subnodes and yield paths and objects
|
|
Returns set with (path string, node object)
|
|
"""
|
|
node = self
|
|
start = 0
|
|
hist = []
|
|
curpath = []
|
|
|
|
while True:
|
|
for index in range(start, len(node)):
|
|
if isinstance(node[index], (FdtNode, FdtProperty)):
|
|
yield ('/' + '/'.join(curpath+[node[index].get_name()]),
|
|
node[index])
|
|
if isinstance(node[index], FdtNode):
|
|
if len(node[index]):
|
|
hist.append((node, index+1))
|
|
curpath.append(node[index].get_name())
|
|
node = node[index]
|
|
start = 0
|
|
index = -1
|
|
break
|
|
if index >= 0:
|
|
if len(hist):
|
|
(node, start) = hist.pop()
|
|
curpath.pop()
|
|
else:
|
|
break
|
|
|
|
|
|
class Fdt(object):
|
|
"""Flattened Device Tree representation"""
|
|
|
|
def __init__(self, version=17, last_comp_version=16, boot_cpuid_phys=0):
|
|
"""Init FDT object with version and boot values"""
|
|
self.header = {'magic': FDT_MAGIC,
|
|
'totalsize': 0,
|
|
'off_dt_struct': 0,
|
|
'off_dt_strings': 0,
|
|
'off_mem_rsvmap': 0,
|
|
'version': version,
|
|
'last_comp_version': last_comp_version,
|
|
'boot_cpuid_phys': boot_cpuid_phys,
|
|
'size_dt_strings': 0,
|
|
'size_dt_struct': 0}
|
|
self.rootnode = None
|
|
self.prenops = None
|
|
self.postnops = None
|
|
self.reserve_entries = None
|
|
|
|
def add_rootnode(self, rootnode, prenops=None, postnops=None):
|
|
"""Add root node"""
|
|
self.rootnode = rootnode
|
|
self.prenops = prenops
|
|
self.postnops = postnops
|
|
|
|
def get_rootnode(self):
|
|
"""Get root node"""
|
|
return self.rootnode
|
|
|
|
def add_reserve_entries(self, reserve_entries):
|
|
"""Add reserved entries as list of dict with
|
|
'address' and 'size' keys"""
|
|
self.reserve_entries = reserve_entries
|
|
|
|
def to_dts(self):
|
|
"""Export to DTS representation in string format"""
|
|
result = "/dts-v1/;\n"
|
|
result += "// version:\t\t%d\n" % self.header['version']
|
|
result += "// last_comp_version:\t%d\n" % \
|
|
self.header['last_comp_version']
|
|
if self.header['version'] >= 2:
|
|
result += "// boot_cpuid_phys:\t0x%x\n" % \
|
|
self.header['boot_cpuid_phys']
|
|
result += '\n'
|
|
if self.reserve_entries is not None:
|
|
for entry in self.reserve_entries:
|
|
result += "/memreserve/ "
|
|
if entry['address']:
|
|
result += "%#x " % entry['address']
|
|
else:
|
|
result += "0 "
|
|
if entry['size']:
|
|
result += "%#x" % entry['size']
|
|
else:
|
|
result += "0"
|
|
result += ";\n"
|
|
if self.prenops:
|
|
result += '\n'.join([nop.dts_represent() for nop in self.prenops])
|
|
result += '\n'
|
|
if self.rootnode is not None:
|
|
result += self.rootnode.dts_represent()
|
|
if self.postnops:
|
|
result += '\n'
|
|
result += '\n'.join([nop.dts_represent() for nop in self.postnops])
|
|
return result
|
|
|
|
def to_dtb(self):
|
|
"""Export to Blob format"""
|
|
if self.rootnode is None:
|
|
return None
|
|
blob_reserve_entries = pack('')
|
|
if self.reserve_entries is not None:
|
|
for entry in self.reserve_entries:
|
|
blob_reserve_entries += pack('>QQ',
|
|
entry['address'],
|
|
entry['size'])
|
|
blob_reserve_entries += pack('>QQ', 0, 0)
|
|
header_size = 7 * 4
|
|
if self.header['version'] >= 2:
|
|
header_size += 4
|
|
if self.header['version'] >= 3:
|
|
header_size += 4
|
|
if self.header['version'] >= 17:
|
|
header_size += 4
|
|
header_adjust = pack('')
|
|
if header_size % 8 != 0:
|
|
header_adjust = pack('b', 0) * (8 - (header_size % 8))
|
|
header_size += len(header_adjust)
|
|
dt_start = header_size + len(blob_reserve_entries)
|
|
# print "dt_start %d" % dt_start
|
|
(blob_dt, blob_strings, dt_pos) = \
|
|
self.rootnode.dtb_represent('', dt_start, self.header['version'])
|
|
if self.prenops is not None:
|
|
blob_dt = pack('').join([nop.dtb_represent('')[0]
|
|
for nop in self.prenops])\
|
|
+ blob_dt
|
|
if self.postnops is not None:
|
|
blob_dt += pack('').join([nop.dtb_represent('')[0]
|
|
for nop in self.postnops])
|
|
blob_dt += pack('>I', FDT_END)
|
|
self.header['size_dt_strings'] = len(blob_strings)
|
|
self.header['size_dt_struct'] = len(blob_dt)
|
|
self.header['off_mem_rsvmap'] = header_size
|
|
self.header['off_dt_struct'] = dt_start
|
|
self.header['off_dt_strings'] = dt_start + len(blob_dt)
|
|
self.header['totalsize'] = dt_start + len(blob_dt) + len(blob_strings)
|
|
blob_header = pack('>IIIIIII', self.header['magic'],
|
|
self.header['totalsize'],
|
|
self.header['off_dt_struct'],
|
|
self.header['off_dt_strings'],
|
|
self.header['off_mem_rsvmap'],
|
|
self.header['version'],
|
|
self.header['last_comp_version'])
|
|
if self.header['version'] >= 2:
|
|
blob_header += pack('>I', self.header['boot_cpuid_phys'])
|
|
if self.header['version'] >= 3:
|
|
blob_header += pack('>I', self.header['size_dt_strings'])
|
|
if self.header['version'] >= 17:
|
|
blob_header += pack('>I', self.header['size_dt_struct'])
|
|
return blob_header + header_adjust + blob_reserve_entries + \
|
|
blob_dt + blob_strings.encode('ascii')
|
|
|
|
def to_json(self):
|
|
"""Ouput JSON"""
|
|
if self.rootnode is None:
|
|
return None
|
|
return self.rootnode.json_represent()
|
|
|
|
def resolve_path(self, path):
|
|
"""Resolve path like /memory/reg and return either a FdtNode,
|
|
a FdtProperty or None"""
|
|
if self.rootnode is None:
|
|
return None
|
|
if not path.startswith('/'):
|
|
return None
|
|
if len(path) > 1 and path.endswith('/'):
|
|
path = path[:-1]
|
|
if path == '/':
|
|
return self.rootnode
|
|
curnode = self.rootnode
|
|
for subpath in path[1:].split('/'):
|
|
found = None
|
|
if not isinstance(curnode, FdtNode):
|
|
return None
|
|
for node in curnode:
|
|
if subpath == node.get_name():
|
|
found = node
|
|
break
|
|
if found is None:
|
|
return None
|
|
curnode = found
|
|
return curnode
|
|
|
|
def _add_json_to_fdtnode(node, subjson):
|
|
"""Populate FdtNode with JSON dict items"""
|
|
for (key, value) in subjson.items():
|
|
if isinstance(value, dict):
|
|
subnode = FdtNode(key)
|
|
subnode.set_parent_node(node)
|
|
node.append(subnode)
|
|
_add_json_to_fdtnode(subnode, value)
|
|
elif isinstance(value, list):
|
|
if len(value) < 2:
|
|
raise Exception("Invalid list for %s" % key)
|
|
if value[0] == "words":
|
|
words = [int(word, 16) for word in value[1:]]
|
|
node.append(FdtPropertyWords(key, words))
|
|
elif value[0] == "bytes":
|
|
bytez = [int(byte, 16) for byte in value[1:]]
|
|
node.append(FdtPropertyBytes(key, bytez))
|
|
elif value[0] == "strings":
|
|
node.append(FdtPropertyStrings(key, \
|
|
[s for s in value[1:]]))
|
|
else:
|
|
raise Exception("Invalid list for %s" % key)
|
|
elif value is None:
|
|
node.append(FdtProperty(key))
|
|
else:
|
|
raise Exception("Invalid value for %s" % key)
|
|
|
|
def FdtJsonParse(buf):
|
|
"""Import FDT from JSON representation, see JSONDeviceTree.md for
|
|
structure and encoding
|
|
Returns an Fdt object
|
|
"""
|
|
tree = json.loads(buf)
|
|
|
|
root = FdtNode('/')
|
|
|
|
_add_json_to_fdtnode(root, tree)
|
|
|
|
fdt = Fdt()
|
|
fdt.add_rootnode(root)
|
|
return fdt
|
|
|
|
def FdtFsParse(path):
|
|
"""Parse device tree filesystem and return a Fdt instance
|
|
Should be /proc/device-tree on a device, or the fusemount.py
|
|
mount point.
|
|
"""
|
|
root = FdtNode("/")
|
|
|
|
if path.endswith('/'):
|
|
path = path[:-1]
|
|
|
|
nodes = {path: root}
|
|
|
|
for subpath, subdirs, files in os.walk(path):
|
|
if subpath not in nodes.keys():
|
|
raise Exception("os.walk error")
|
|
cur = nodes[subpath]
|
|
for f in files:
|
|
with open(subpath+'/'+f, 'rb') as content_file:
|
|
content = content_file.read()
|
|
prop = FdtProperty.new_raw_property(f, content)
|
|
cur.add_subnode(prop)
|
|
for subdir in subdirs:
|
|
subnode = FdtNode(subdir)
|
|
cur.add_subnode(subnode)
|
|
subnode.set_parent_node(cur)
|
|
nodes[subpath+'/'+subdir] = subnode
|
|
|
|
fdt = Fdt()
|
|
fdt.add_rootnode(root)
|
|
return fdt
|
|
|
|
class FdtBlobParse(object): # pylint: disable-msg=R0903
|
|
"""Parse from file input"""
|
|
|
|
__fdt_header_format = ">IIIIIII"
|
|
__fdt_header_names = ('magic', 'totalsize', 'off_dt_struct',
|
|
'off_dt_strings', 'off_mem_rsvmap', 'version',
|
|
'last_comp_version')
|
|
|
|
__fdt_reserve_entry_format = ">QQ"
|
|
__fdt_reserve_entry_names = ('address', 'size')
|
|
|
|
__fdt_dt_cell_format = ">I"
|
|
__fdt_dt_prop_format = ">II"
|
|
__fdt_dt_tag_name = {FDT_BEGIN_NODE: 'node_begin',
|
|
FDT_END_NODE: 'node_end',
|
|
FDT_PROP: 'prop',
|
|
FDT_NOP: 'nop',
|
|
FDT_END: 'end'}
|
|
|
|
def __extract_fdt_header(self):
|
|
"""Extract DTB header"""
|
|
header = Struct(self.__fdt_header_format)
|
|
header_entry = Struct(">I")
|
|
data = self.infile.read(header.size)
|
|
result = dict(zip(self.__fdt_header_names, header.unpack_from(data)))
|
|
if result['version'] >= 2:
|
|
data = self.infile.read(header_entry.size)
|
|
result['boot_cpuid_phys'] = header_entry.unpack_from(data)[0]
|
|
if result['version'] >= 3:
|
|
data = self.infile.read(header_entry.size)
|
|
result['size_dt_strings'] = header_entry.unpack_from(data)[0]
|
|
if result['version'] >= 17:
|
|
data = self.infile.read(header_entry.size)
|
|
result['size_dt_struct'] = header_entry.unpack_from(data)[0]
|
|
return result
|
|
|
|
def __extract_fdt_reserve_entries(self):
|
|
"""Extract reserved memory entries"""
|
|
header = Struct(self.__fdt_reserve_entry_format)
|
|
entries = []
|
|
self.infile.seek(self.fdt_header['off_mem_rsvmap'])
|
|
while True:
|
|
data = self.infile.read(header.size)
|
|
result = dict(zip(self.__fdt_reserve_entry_names,
|
|
header.unpack_from(data)))
|
|
if result['address'] == 0 and result['size'] == 0:
|
|
return entries
|
|
entries.append(result)
|
|
|
|
def __extract_fdt_nodename(self):
|
|
"""Extract node name"""
|
|
data = ''
|
|
pos = self.infile.tell()
|
|
while True:
|
|
byte = self.infile.read(1)
|
|
if ord(byte) == 0:
|
|
break
|
|
data += byte.decode('ascii')
|
|
align_pos = pos + len(data) + 1
|
|
align_pos = (((align_pos) + ((4) - 1)) & ~((4) - 1))
|
|
self.infile.seek(align_pos)
|
|
return data
|
|
|
|
def __extract_fdt_string(self, prop_string_pos):
|
|
"""Extract string from string pool"""
|
|
data = ''
|
|
pos = self.infile.tell()
|
|
self.infile.seek(self.fdt_header['off_dt_strings']+prop_string_pos)
|
|
while True:
|
|
byte = self.infile.read(1)
|
|
if ord(byte) == 0:
|
|
break
|
|
data += byte.decode('ascii')
|
|
self.infile.seek(pos)
|
|
return data
|
|
|
|
def __extract_fdt_prop(self):
|
|
"""Extract property"""
|
|
prop = Struct(self.__fdt_dt_prop_format)
|
|
pos = self.infile.tell()
|
|
data = self.infile.read(prop.size)
|
|
(prop_size, prop_string_pos,) = prop.unpack_from(data)
|
|
|
|
prop_start = pos + prop.size
|
|
if self.fdt_header['version'] < 16 and prop_size >= 8:
|
|
prop_start = (((prop_start) + ((8) - 1)) & ~((8) - 1))
|
|
|
|
self.infile.seek(prop_start)
|
|
value = self.infile.read(prop_size)
|
|
|
|
align_pos = self.infile.tell()
|
|
align_pos = (((align_pos) + ((4) - 1)) & ~((4) - 1))
|
|
self.infile.seek(align_pos)
|
|
|
|
return (self.__extract_fdt_string(prop_string_pos), value)
|
|
|
|
def __extract_fdt_dt(self):
|
|
"""Extract tags"""
|
|
cell = Struct(self.__fdt_dt_cell_format)
|
|
tags = []
|
|
self.infile.seek(self.fdt_header['off_dt_struct'])
|
|
while True:
|
|
data = self.infile.read(cell.size)
|
|
if len(data) < cell.size:
|
|
break
|
|
tag, = cell.unpack_from(data)
|
|
# print "*** %s" % self.__fdt_dt_tag_name.get(tag, '')
|
|
if self.__fdt_dt_tag_name.get(tag, '') in 'node_begin':
|
|
name = self.__extract_fdt_nodename()
|
|
if len(name) == 0:
|
|
name = '/'
|
|
tags.append((tag, name))
|
|
elif self.__fdt_dt_tag_name.get(tag, '') in ('node_end', 'nop'):
|
|
tags.append((tag, ''))
|
|
elif self.__fdt_dt_tag_name.get(tag, '') in 'end':
|
|
tags.append((tag, ''))
|
|
break
|
|
elif self.__fdt_dt_tag_name.get(tag, '') in 'prop':
|
|
propdata = self.__extract_fdt_prop()
|
|
tags.append((tag, propdata))
|
|
else:
|
|
print("Unknown Tag %d" % tag)
|
|
return tags
|
|
|
|
def __init__(self, infile):
|
|
"""Init with file input"""
|
|
self.infile = infile
|
|
self.fdt_header = self.__extract_fdt_header()
|
|
if self.fdt_header['magic'] != FDT_MAGIC:
|
|
raise Exception('Invalid Magic')
|
|
if self.fdt_header['version'] > FDT_MAX_VERSION:
|
|
raise Exception('Invalid Version %d' % self.fdt_header['version'])
|
|
if self.fdt_header['last_comp_version'] > FDT_MAX_VERSION-1:
|
|
raise Exception('Invalid last compatible Version %d' %
|
|
self.fdt_header['last_comp_version'])
|
|
self.fdt_reserve_entries = self.__extract_fdt_reserve_entries()
|
|
self.fdt_dt_tags = self.__extract_fdt_dt()
|
|
|
|
def __to_nodes(self):
|
|
"""Represent fdt as Node and properties structure
|
|
Returns a set with the pre-node Nops, the Root Node,
|
|
and the post-node Nops.
|
|
"""
|
|
prenops = []
|
|
postnops = []
|
|
rootnode = None
|
|
curnode = None
|
|
for tag in self.fdt_dt_tags:
|
|
if self.__fdt_dt_tag_name.get(tag[0], '') in 'node_begin':
|
|
newnode = FdtNode(tag[1])
|
|
if rootnode is None:
|
|
rootnode = newnode
|
|
if curnode is not None:
|
|
curnode.add_subnode(newnode)
|
|
newnode.set_parent_node(curnode)
|
|
curnode = newnode
|
|
elif self.__fdt_dt_tag_name.get(tag[0], '') in 'node_end':
|
|
if curnode is not None:
|
|
curnode = curnode.get_parent_node()
|
|
elif self.__fdt_dt_tag_name.get(tag[0], '') in 'nop':
|
|
if curnode is not None:
|
|
curnode.add_subnode(FdtNop())
|
|
elif rootnode is not None:
|
|
postnops.append(FdtNop())
|
|
else:
|
|
prenops.append(FdtNop())
|
|
elif self.__fdt_dt_tag_name.get(tag[0], '') in 'prop':
|
|
if curnode is not None:
|
|
curnode.add_raw_attribute(tag[1][0], tag[1][1])
|
|
elif self.__fdt_dt_tag_name.get(tag[0], '') in 'end':
|
|
continue
|
|
return (prenops, rootnode, postnops)
|
|
|
|
def to_fdt(self):
|
|
"""Create a fdt object
|
|
Returns a Fdt object
|
|
"""
|
|
if self.fdt_header['version'] >= 2:
|
|
boot_cpuid_phys = self.fdt_header['boot_cpuid_phys']
|
|
else:
|
|
boot_cpuid_phys = 0
|
|
fdt = Fdt(version=self.fdt_header['version'],
|
|
last_comp_version=self.fdt_header['last_comp_version'],
|
|
boot_cpuid_phys=boot_cpuid_phys)
|
|
(prenops, rootnode, postnops) = self.__to_nodes()
|
|
fdt.add_rootnode(rootnode, prenops=prenops, postnops=postnops)
|
|
fdt.add_reserve_entries(self.fdt_reserve_entries)
|
|
return fdt
|