-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfjsp_naderi.py
273 lines (220 loc) · 8.05 KB
/
fjsp_naderi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
"""
One-off script to solve FJSP instances using Naderi's OR-Tools model
and benchmark the results.
Usage:
```
uv run fjsp_naderi.py \
--time_limit 900 \
--num_parallel_instances 8 \
--num_workers 8 \
>> results.txt
```
"""
from dataclasses import dataclass
from pathlib import Path
from functools import partial
from collections import namedtuple, defaultdict
from itertools import product
from argparse import ArgumentParser
from ortools.sat.python.cp_model import CpModel, CpSolver
from tqdm.contrib.concurrent import process_map
import numpy as np
@dataclass
class Instance:
num_jobs: int
num_machines: int
num_tasks_by_job: list[int]
durations: list[list[list[int]]]
def fjsp_naderi(instance: Instance):
"""
Refactored FJSP model from Naderi et al. (2023).
Parameters
----------
instance
The instance to solve.
Returns
-------
objective
The objective value of the solution.
"""
model = CpModel()
horizon = 100000
interval_type = namedtuple("task_type", "start end is_present interval")
mode_vars = {}
machine_to_intervals = defaultdict(list)
job_task_to_interval = [defaultdict(list) for _ in range(instance.num_jobs)]
for job in range(instance.num_jobs):
num_tasks = instance.num_tasks_by_job[job]
for task in range(num_tasks):
for machine in range(instance.num_machines):
duration = instance.durations[job][task][machine]
if duration == 0:
continue
suffix = f"_{job}_%{task}_%{machine}"
start = model.new_int_var(0, horizon, "start" + suffix)
end = model.new_int_var(0, horizon, "end" + suffix)
is_present = model.new_bool_var("is_present" + suffix)
interval = model.new_optional_interval_var(
start, duration, end, is_present, "interval" + suffix
)
mode_vars[job, task, machine] = interval_type(
start=start,
end=end,
is_present=is_present,
interval=interval,
)
machine_to_intervals[machine].append(interval)
job_task_to_interval[job][task].append(interval)
# No overlap on machines
for machine in range(instance.num_machines):
model.add_no_overlap(machine_to_intervals[machine])
# Select one mode per task
for job in range(instance.num_jobs):
for task in range(instance.num_tasks_by_job[job]):
mode_presence = [
mode_vars[job, task, machine].is_present
for machine in range(instance.num_machines)
if instance.durations[job][task][machine] > 0
]
model.add(sum(mode_presence) == 1)
# Precedence constraint (linear routing)
for job in range(instance.num_jobs):
num_tasks = instance.num_tasks_by_job[job]
for task in range(num_tasks - 1):
for m1, m2 in product(range(instance.num_machines), repeat=2):
duration_pred = instance.durations[job][task][m1]
duration_succ = instance.durations[job][task + 1][m2]
if duration_pred > 0 and duration_succ > 0:
# Only add precedence between (pred, succ) if they actual modes.
pred = mode_vars[job, task, m1]
succ = mode_vars[job, task + 1, m2]
model.add(pred.end <= succ.start)
# Minimizing makespan objective
makespan = model.new_int_var(0, horizon, "makespan")
last_tasks = []
for job in range(instance.num_jobs):
for machine in range(instance.num_machines):
last = instance.num_tasks_by_job[job] - 1
duration = instance.durations[job][last][machine]
if duration > 0:
last_tasks.append(mode_vars[job, last, machine].end)
model.add_max_equality(makespan, last_tasks)
model.minimize(makespan)
return model
def _read(instance_loc):
"""
Reads the instance file and returns an instance object.
"""
with open(instance_loc, "r") as fh:
lines = iter([list(map(int, line.strip().split())) for line in fh])
num_jobs = next(lines)[0]
num_machines = next(lines)[0]
num_tasks_by_job = next(lines)
durations = [
[next(lines) for _ in range(num_tasks)] for num_tasks in num_tasks_by_job
]
return Instance(
num_jobs=num_jobs,
num_machines=num_machines,
num_tasks_by_job=num_tasks_by_job,
durations=durations,
)
def solve(instance_loc: Path, time_limit: int, display: bool, num_workers: int):
"""
Solves an FJSP instance using Naderi's OR-Tools model.
"""
instance = _read(instance_loc)
model = fjsp_naderi(instance)
cp_solver = CpSolver()
params = {
"max_time_in_seconds": time_limit,
"log_search_progress": display,
# 0 means using all available CPU cores.
"num_workers": num_workers if num_workers is not None else 0,
}
for key, value in params.items():
setattr(cp_solver.parameters, key, value)
status_code = cp_solver.solve(model)
status = cp_solver.status_name(status_code)
status = status.capitalize()
objective_value = cp_solver.objective_value
return instance_loc.name, status, objective_value, round(cp_solver.wall_time, 3)
def benchmark(instances: list[Path], num_parallel_instances: int, **kwargs):
"""
Solves the list of instances and prints a table of the results.
"""
args = sorted(instances)
func = partial(solve, **kwargs)
if len(instances) == 1:
results = [func(args[0])]
else:
results = process_map(
func,
args,
max_workers=num_parallel_instances,
unit="instance",
)
results = [res for res in results if res is not None]
dtypes = [
("inst", "U37"),
("feas", "U37"),
("obj", float),
("time", float),
]
data = np.asarray(results, dtype=dtypes)
headers = ["Instance", "Status", "Obj.", "Time (s)"]
avg_objective = data["obj"].mean()
avg_runtime = data["time"].mean()
num_optimal = np.count_nonzero(data["feas"] == "Optimal")
num_feas = np.count_nonzero(data["feas"] == "Feasible") + num_optimal
num_infeas = np.count_nonzero(data["feas"].size - num_feas)
print("\n", tabulate(headers, data), "\n", sep="")
print(f" Avg. objective: {avg_objective:.2f}")
print(f" Avg. run-time: {avg_runtime:.2f}s")
print(f" Total optimal: {num_optimal}")
print(f" Total infeas: {num_infeas}")
def tabulate(headers: list[str], rows: np.ndarray) -> str:
"""
Creates a simple table from the given header and row data.
"""
# These lengths are used to space each column properly.
lens = [len(header) for header in headers]
for row in rows:
for idx, cell in enumerate(row):
lens[idx] = max(lens[idx], len(str(cell)))
header = [
" ".join(f"{hdr:<{ln}s}" for ln, hdr in zip(lens, headers)),
" ".join("-" * ln for ln in lens),
]
content = [" ".join(f"{c!s:>{ln}s}" for ln, c in zip(lens, r)) for r in rows]
return "\n".join(header + content)
def parse_args():
parser = ArgumentParser()
parser.add_argument(
"--time_limit",
type=int,
default=900,
help="Time limit for the solver.",
)
parser.add_argument(
"--num_parallel_instances",
type=int,
default=25,
help="Number of instances to solve in parallel.",
)
parser.add_argument(
"--num_workers",
type=int,
default=8,
help="Number of workers to use in parallel processing.",
)
parser.add_argument(
"--display",
action="store_true",
help="Whether to display the solver output.",
)
return parser.parse_args()
if __name__ == "__main__":
INSTANCE_DIR = Path("data/instances/FJSP")
instances = list(INSTANCE_DIR.glob("*.txt"))
benchmark(instances, **vars(parse_args()))