Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
TASTE
asn1-value-editor
Commits
8d62bb9b
Commit
8d62bb9b
authored
Aug 04, 2016
by
Maxime Perrotin
Browse files
Implement breadth first search
parent
35674081
Changes
2
Hide whitespace changes
Inline
Side-by-side
asn1_value_editor/ValueGenerator.py
View file @
8d62bb9b
...
...
@@ -152,17 +152,34 @@ def compute_choice_combinations(asn1_ty, pool):
yield
'{}: {}'
.
format
(
discr
,
each
)
def
myproduct
(
*
iterables
):
''' Custom implementation of itertools.product - standard product
has to consume the iterables completely before making the product,
meaning it is useless with large iterable due to memory usage '''
if
len
(
iterables
)
==
0
:
raise
StopIteration
else
:
for
item
in
iterables
[
0
]():
for
items
in
myproduct
(
*
iterables
[
1
:]):
yield
(
item
,
)
+
items
def
compute_sequence_combinations
(
asn1_ty
,
pool
):
''' Generator returning all combinations of SEQUENCE types '''
# Prepare generators to compute combinations of each field
elems
=
(
compute_combinations
(
sort
,
pool
)
for
sort
in
asn1_ty
.
Children
.
viewvalues
()
)
elems
=
[
lambda
:
compute_combinations
(
sort
,
pool
)
for
sort
in
asn1_ty
.
Children
.
viewvalues
()
]
# Combine all field generators to get the complete set of values
for
each
in
itertools
.
product
(
*
elems
):
elems_lambda
=
[
lambda
:
elem
()
for
elem
in
elems
]
# This is not quite right. XXX
#for each in itertools.product(*elems):
for
each
in
myproduct
(
*
elems_lambda
):
# "each" contains a generator per field in the SEQUENCE
# each is a tuple with values for the sequence, join with fieldnames
pairs
=
itertools
.
izip
(
asn1_ty
.
Children
.
viewkeys
(),
each
)
res
=
(
'{} {}'
.
format
(
name
,
value
)
for
name
,
value
in
pairs
)
yield
'{{ {} }}'
.
format
(
', '
.
join
(
res
))
res_yield
=
'{{ {} }}'
.
format
(
', '
.
join
(
res
))
yield
res_yield
def
compute_sequenceof_combinations
(
asn1_ty
,
pool
):
...
...
@@ -172,4 +189,5 @@ def compute_sequenceof_combinations(asn1_ty, pool):
for
_
in
xrange
(
size
):
elems
.
append
(
compute_combinations
(
asn1_ty
,
pool
))
for
each
in
itertools
.
product
(
*
elems
):
yield
'{{ {} }}'
.
format
(
', '
.
join
(
each
))
res
=
'{{ {} }}'
.
format
(
', '
.
join
(
each
))
yield
res
asn1_value_editor/sdlHandler.py
View file @
8d62bb9b
...
...
@@ -25,6 +25,7 @@ import ctypes
import
random
from
functools
import
partial
from
itertools
import
chain
from
collections
import
deque
from
PySide.QtGui
import
(
QDockWidget
,
QPushButton
,
QGridLayout
,
QListWidget
,
QUndoStack
,
QUndoCommand
,
QToolButton
,
QTableWidget
,
...
...
@@ -535,9 +536,8 @@ class sdlHandler(QObject):
self
.
log_area
.
addItem
(
'Sent {}({})'
.
format
(
tc_name
,
param
or
''
))
if
check_ppty
:
# When doing undo, dont check propertis again
# When doing undo, dont check properti
e
s again
self
.
check_properties
(
new_hash
)
#self.check_properties(new_hash)
return
new_hash
def
update_button_state
(
self
,
tc_name
=
None
):
...
...
@@ -673,7 +673,9 @@ class sdlHandler(QObject):
''' For all combinations of a given interface parameter, execute
the PI, save the resulting state, undo, and yield the state '''
if
asn1_ty
:
print
'Exhausting'
,
name
for
arg
in
compute_combinations
(
asn1_ty
,
self
.
proc
.
dataview
):
#print name, arg
self
.
click_tc
(
name
,
arg
)
new_hash
=
self
.
current_hash
self
.
undo
()
...
...
@@ -691,56 +693,87 @@ class sdlHandler(QObject):
sort
=
inp
.
get
(
'type'
,
None
)
if
sort
:
asn1_ty
=
self
.
proc
.
dataview
[
sort
.
ReferencedTypeName
]
exhaust_interface
(
name
,
ty
)
for
each
in
exhaust_interface
(
name
,
asn1_ty
):
yield
each
def
breadth_first
(
self
,
max_states
=
100
):
''' Create and explore the state graph with breadth first '''
# Fifo contains the hash, and a tuple (PI, Param) to reach it
fifo
=
deque
([(
self
.
current_hash
,
(
None
,
None
))])
visited
=
deque
(
fifo
)
scenario
=
deque
()
nb_states
=
0
while
fifo
:
state
,
(
trans_name
,
trans_param
)
=
fifo
.
popleft
()
if
trans_name
:
scenario
.
append
((
trans_name
,
trans_param
))
self
.
restore_global_state
(
state
)
for
sibling
,
(
trans_name
,
trans_param
)
in
self
.
children_states
():
if
sibling
not
in
visited
:
visited
.
append
(
sibling
)
if
not
self
.
stop_conditions
:
# Cut branch if there are stop conditions
fifo
.
append
((
sibling
,
(
trans_name
,
trans_param
)))
else
:
scenario
.
append
((
trans_name
,
trans_param
))
print
'Stop conditions met:'
,
self
.
stop_conditions
print
'Scenario:'
for
(
pi
,
arg
)
in
scenario
:
print
(
' {} ({})'
.
format
(
pi
,
arg
))
nb_states
+=
1
if
nb_states
>=
max_states
:
return
def
exhaustive_simulation
(
self
):
''' Model checker - try all combinations of all inputs in all
possible states, and verify properties on the fly '''
# DEPRECATED FUNCTION
print
'Exhaustive simulation (Breadth first)'
next_level
=
[]
self
.
sim_param
[
'state'
]
=
'exhaustive'
total_err
=
0
def
exhaust_interface
(
name
,
asn1_ty
):
''' Send all combinations of an input signal and return
a list of new states (should be doing this in a worker thread) '''
error
=
0
new_hashes
=
[]
if
asn1_ty
:
for
arg
in
compute_combinations
(
asn1_ty
,
self
.
proc
.
dataview
):
QApplication
.
processEvents
()
if
self
.
sim_param
[
'state'
]
!=
'exhaustive'
:
return
new_hashes
,
error
self
.
click_tc
(
name
,
arg
)
if
self
.
new_state_created
:
# Create new state only if no stop conditions
if
not
self
.
stop_conditions
:
new_hashes
.
append
((
name
,
arg
,
self
.
current_hash
))
else
:
# TODO: generate scenario
error
+=
1
self
.
undo
()
else
:
self
.
click_tc
(
name
)
if
self
.
new_state_created
:
new_hashes
.
append
((
name
,
None
,
self
.
current_hash
))
self
.
undo
()
return
new_hashes
,
error
for
name
in
self
.
active_tc
:
ty
=
None
for
inp
in
self
.
proc
.
input_signals
:
if
inp
[
'name'
].
lower
()
==
name
.
lower
():
sort
=
inp
.
get
(
'type'
,
None
)
if
sort
:
typename
=
sort
.
ReferencedTypeName
.
replace
(
'-'
,
'_'
)
ty
=
self
.
proc
.
dataview
[
sort
.
ReferencedTypeName
]
next_states
,
error_count
=
exhaust_interface
(
name
,
ty
)
total_err
+=
error_count
next_level
.
extend
(
next_states
)
print
'length of next level: '
,
len
(
next_level
)
print
'Number of stop conditions reached:'
,
total_err
self
.
breadth_first
()
# next_level = []
# self.sim_param['state'] = 'exhaustive'
# total_err = 0
#
# def exhaust_interface(name, asn1_ty):
# ''' Send all combinations of an input signal and return
# a list of new states (should be doing this in a worker thread) '''
# error = 0
# new_hashes = []
# if asn1_ty:
# for arg in compute_combinations(asn1_ty, self.proc.dataview):
# QApplication.processEvents()
# if self.sim_param['state'] != 'exhaustive':
# return new_hashes, error
# self.click_tc(name, arg)
# if self.new_state_created:
# # Create new state only if no stop conditions
# if not self.stop_conditions:
# new_hashes.append((name, arg, self.current_hash))
# else:
# # TODO: generate scenario
# error += 1
# self.undo()
# else:
# self.click_tc(name)
# if self.new_state_created:
# new_hashes.append((name, None, self.current_hash))
# self.undo()
# return new_hashes, error
# for name in self.active_tc:
# ty = None
# for inp in self.proc.input_signals:
# if inp['name'].lower() == name.lower():
# sort = inp.get('type', None)
# if sort:
# typename = sort.ReferencedTypeName.replace('-', '_')
# ty = self.proc.dataview[sort.ReferencedTypeName]
# next_states, error_count = exhaust_interface(name, ty)
# total_err += error_count
# next_level.extend(next_states)
# print 'length of next level: ', len(next_level)
#
# print 'Number of stop conditions reached:', total_err
def
random_simulation
(
self
):
''' Random simulator - read the config from the checker_table and
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment