forked from logictensornetworks/logictensornetworks
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlogictensornetworks.py
347 lines (300 loc) · 11.6 KB
/
logictensornetworks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
#!/usr/bin/env python
import tensorflow as tf
BIAS_factor = 0.0
BIAS = 0.0
LAYERS=4
F_And = None
F_Or = None
F_Implies = None
F_Equiv = None
F_Not = None
F_Forall = None
F_Exists = None
def set_tnorm(tnorm):
assert tnorm in ['min','luk','prod','mean','']
global F_And,F_Or,F_Implies,F_Not,F_Equiv,F_Forall
if tnorm == "min":
def F_And(wffs):
return tf.reduce_min(wffs,axis=-1,keepdims=True)
def F_Or(wffs):
return tf.reduce_max(wffs,axis=-1,keepdims=True)
def F_Implies(wff1, wff2):
return tf.maximum(tf.to_float(tf.less_equal(wff1,wff2)),wff2)
def F_Not(wff):
return 1 - wff
def F_Equiv(wff1,wff2):
return tf.maximum(tf.to_float(tf.equal(wff1,wff2)),tf.minimum(wff1,wff2))
if tnorm == "prod":
def F_And(wffs):
return tf.reduce_prod(wffs,axis=-1,keepdims=True)
def F_Or(wffs):
return 1-tf.reduce_prod(1-wffs,axis=-1,keepdims=True)
def F_Implies(wff1, wff2):
le_wff1_wff2 = tf.to_float(tf.less_equal(wff1,wff2))
gt_wff1_wff2 = tf.to_float(tf.greater(wff1,wff2))
return tf.cond(tf.equal(wff1[0],0),lambda:le_wff1_wff2 + gt_wff1_wff2*wff2/wff1,lambda:tf.constant([1.0]))
def F_Not(wff):
# according to standard goedel logic is
# return tf.to_float(tf.equal(wff,1))
return 1-wff
def F_Equiv(wff1,wff2):
return tf.minimum(wff1/wff2,wff2/wff1)
if tnorm == "mean":
def F_And(wffs):
return tf.reduce_mean(wffs,axis=-1,keepdims=True)
def F_Or(wffs):
return tf.reduce_max(wffs,axis=-1,keepdims=True)
def F_Implies(wff1, wff2):
return tf.clip_by_value(2*wff2-wff1,0,1)
def F_Not(wff):
return 1 - wff
def F_Equiv(wff1,wff2):
return 1 - tf.abs(wff1-wff2)
if tnorm == "luk":
def F_And(wffs):
return tf.maximum(0.0,tf.reduce_sum(wffs,axis=-1,keepdims=True)+1-tf.to_float(tf.shape(wffs)[-1]))
def F_Or(wffs):
return tf.minimum(tf.reduce_sum(wffs,axis=-1,keepdims=True),1.0,)
def F_Implies(wff1, wff2):
return tf.minimum(1.,1 - wff1 + wff2)
def F_Not(wff):
return 1 - wff
def F_Equiv(wff1,wff2):
return 1 - tf.abs(wff1-wff2)
def set_universal_aggreg(aggreg):
assert aggreg in ['hmean','min','mean']
global F_Forall
if aggreg == "hmean":
def F_Forall(axis,wff):
return 1/tf.reduce_mean(1/(wff+1e-10),axis=axis)
if aggreg == "min":
def F_Forall(axis,wff):
return tf.reduce_min(wff,axis=axis)
if aggreg == "mean":
def F_Forall(axis,wff):
return tf.reduce_mean(wff, axis=axis)
def set_existential_aggregator(aggreg):
assert aggreg in ['max']
global F_Exists
if aggreg == "max":
def F_Exists(axis, wff):
return tf.reduce_max(wff, axis=axis)
set_tnorm("luk")
set_universal_aggreg("hmean")
set_existential_aggregator("max")
def And(*wffs):
if len(wffs) == 0:
result = tf.constant(1.0)
result.doms = []
else:
cross_wffs,_ = cross_args(wffs)
label = "_AND_".join([wff.name.split(":")[0] for wff in wffs])
result = tf.identity(F_And(cross_wffs),name=label)
result.doms = cross_wffs.doms
return result
def Or(*wffs):
if len(wffs) == 0:
result = tf.constant(0.0)
result.doms = []
else:
cross_wffs,_ = cross_args(wffs)
label = "_OR_".join([wff.name.split(":")[0] for wff in wffs])
result = tf.identity(F_Or(cross_wffs),name=label)
result.doms = cross_wffs.doms
return result
def Implies(wff1, wff2):
_, cross_wffs = cross_2args(wff1,wff2)
label = wff1.name.split(":")[0] + "_IMP_" + wff2.name.split(":")[0]
result = F_Implies(cross_wffs[0],cross_wffs[1])
result = tf.identity(result,name=label)
result.doms = cross_wffs[0].doms
return result
def Not(wff):
result = F_Not(wff)
label = "NOT_" + wff.name.split(":")[0]
result = tf.identity(result,name=label)
result.doms = wff.doms
return result
def Equiv(wff1,wff2):
_, cross_wffs = cross_2args(wff1,wff2)
label = wff1.name.split(":")[0] + "_IFF_" + wff2.name.split(":")[0]
result = F_Equiv(cross_wffs[0],cross_wffs[1])
result.doms = cross_wffs[0].doms
return result
def Forall(vars,wff):
if type(vars) is not tuple:
vars = (vars,)
result_doms = [x for x in wff.doms if x not in [var.doms[0] for var in vars]]
quantif_axis = [wff.doms.index(var.doms[0]) for var in vars]
not_empty_vars = tf.cast(tf.reduce_prod(tf.stack([tf.size(var) for var in vars])),tf.bool)
ones = tf.ones((1,)*(len(result_doms)+1))
result = tf.cond(not_empty_vars,lambda:F_Forall(quantif_axis,wff),lambda:ones)
result.doms = result_doms
return result
def Exists(vars,wff):
if type(vars) is not tuple:
vars = (vars,)
result_doms = [x for x in wff.doms if x not in [var.doms[0] for var in vars]]
quantif_axis = [wff.doms.index(var.doms[0]) for var in vars]
not_empty_vars = tf.cast(tf.reduce_prod(tf.stack([tf.size(var) for var in vars])),tf.bool)
zeros = tf.zeros((1,)*(len(result_doms)+1))
result = tf.cond(not_empty_vars,lambda:F_Exists(quantif_axis,wff),lambda:zeros)
result.doms = result_doms
return result
def variable(label,number_of_features_or_feed):
if type(number_of_features_or_feed) is int:
result = tf.placeholder(dtype=tf.float32,shape=(None,number_of_features_or_feed),name=label)
elif isinstance(number_of_features_or_feed,tf.Tensor):
result = tf.identity(number_of_features_or_feed,name=label)
else:
result = tf.constant(number_of_features_or_feed,name=label)
result.doms = [label]
return result
def constant(label,value=None,
min_value=None,
max_value=None):
label = "ltn_constant_"+label
if value is not None:
result = tf.constant(value,name=label)
else:
result = tf.Variable(tf.random_uniform(
shape=(1,len(min_value)),
minval=min_value,
maxval=max_value,name=label))
result.doms = []
return result
def function(label, input_shape_spec, output_shape_spec=1,fun_definition=None):
if type(input_shape_spec) is list:
number_of_features = sum([int(v.shape[1]) for v in input_shape_spec])
elif type(input_shape_spec) is tf.Tensor:
number_of_features = int(input_shape_spec.shape[1])
else:
number_of_features = input_shape_spec
if fun_definition is None:
W = tf.Variable(
tf.random_normal(
[number_of_features + 1,output_shape_spec],mean=0,stddev=1), name="W" + label)
def apply_fun(*args):
tensor_args = tf.concat(args,axis=1)
X = tf.concat([tf.ones((tf.shape(tensor_args)[0], 1)),
tensor_args], 1)
result = tf.matmul(X,W)
return result
pars = [W]
else:
def apply_fun(*args):
return fun_definition(*args)
pars = []
def fun(*args):
crossed_args, list_of_args_in_crossed_args = cross_args(args)
result = apply_fun(*list_of_args_in_crossed_args)
if crossed_args.doms != []:
result = tf.reshape(result, tf.concat([tf.shape(crossed_args)[:-1],
tf.shape(result)[-1:]],axis=0))
else:
result = tf.reshape(result, (output_shape_spec,))
result.doms = crossed_args.doms
return result
fun.pars = pars
fun.label=label
return fun
def proposition(label,initial_value=None,value=None):
if value is not None:
assert 0 <= value and value <= 1
result = tf.constant([value])
elif initial_value is not None:
assert 0 <= initial_value <= 1
result = tf.Variable(initial_value=[value])
else:
result = tf.expand_dims(tf.clip_by_value(tf.Variable(tf.random_normal(shape=(),mean=.5,stddev=.5)),0.,1.),dim=0)
result.doms = ()
return result
def predicate(label,number_of_features_or_vars,pred_definition=None,layers=None):
layers=layers or LAYERS
global BIAS
if type(number_of_features_or_vars) is list:
number_of_features = sum([int(v.shape[1]) for v in number_of_features_or_vars])
elif type(number_of_features_or_vars) is tf.Tensor:
number_of_features = int(number_of_features_or_vars.shape[1])
else:
number_of_features = number_of_features_or_vars
if pred_definition is None:
W = tf.matrix_band_part(
tf.Variable(
tf.random_normal(
[layers,
number_of_features + 1,
number_of_features + 1],mean=0,stddev=1), name="W" + label), 0, -1)
u = tf.Variable(tf.ones([layers, 1]),
name="u" + label)
def apply_pred(*args):
app_label = label + "/" + "_".join([arg.name.split(":")[0] for arg in args]) + "/"
tensor_args = tf.concat(args,axis=1)
X = tf.concat([tf.ones((tf.shape(tensor_args)[0], 1)),
tensor_args], 1)
XW = tf.matmul(tf.tile(tf.expand_dims(X, 0), [layers, 1, 1]), W)
XWX = tf.squeeze(tf.matmul(tf.expand_dims(X, 1), tf.transpose(XW, [1, 2, 0])), axis=[1])
gX = tf.matmul(tf.tanh(XWX), u)
result = tf.sigmoid(gX, name=app_label)
return result
pars = [W,u]
else:
def apply_pred(*args):
return pred_definition(*args)
pars = []
def pred(*args):
global BIAS
crossed_args, list_of_args_in_crossed_args = cross_args(args)
result = apply_pred(*list_of_args_in_crossed_args)
if crossed_args.doms != []:
result = tf.reshape(result, tf.concat([tf.shape(crossed_args)[:-1],[1]],axis=0))
else:
result = tf.reshape(result, (1,))
result.doms = crossed_args.doms
BIAS = tf.divide(BIAS + .5 - tf.reduce_mean(result),2)*BIAS_factor
return result
pred.pars = pars
pred.label=label
return pred
def cross_args(args):
result = args[0]
for arg in args[1:]:
result,_ = cross_2args(result,arg)
result_flat = tf.reshape(result,
(tf.reduce_prod(tf.shape(result)[:-1]),
tf.shape(result)[-1]))
result_args = tf.split(result_flat,[tf.shape(arg)[-1] for arg in args],1)
return result, result_args
def cross_2args(X,Y):
if X.doms == [] and Y.doms == []:
result = tf.concat([X,Y],axis=-1)
result.doms = []
return result,[X,Y]
X_Y = set(X.doms) - set(Y.doms)
Y_X = set(Y.doms) - set(X.doms)
eX = X
eX_doms = [x for x in X.doms]
for y in Y_X:
eX = tf.expand_dims(eX,0)
eX_doms = [y] + eX_doms
eY = Y
eY_doms = [y for y in Y.doms]
for x in X_Y:
eY = tf.expand_dims(eY,-2)
eY_doms.append(x)
perm_eY = []
for y in eY_doms:
perm_eY.append(eX_doms.index(y))
eY = tf.transpose(eY,perm=perm_eY + [len(perm_eY)])
mult_eX = [1]*(len(eX_doms)+1)
mult_eY = [1]*(len(eY_doms)+1)
for i in range(len(mult_eX)-1):
mult_eX[i] = tf.maximum(1,tf.floor_div(tf.shape(eY)[i],tf.shape(eX)[i]))
mult_eY[i] = tf.maximum(1,tf.floor_div(tf.shape(eX)[i],tf.shape(eY)[i]))
result1 = tf.tile(eX,mult_eX)
result2 = tf.tile(eY,mult_eY)
result = tf.concat([result1,result2],axis=-1)
result1.doms = eX_doms
result2.doms = eX_doms
result.doms = eX_doms
return result,[result1,result2]