diff --git a/plot.py b/plot.py new file mode 100644 index 0000000..b872307 --- /dev/null +++ b/plot.py @@ -0,0 +1,130 @@ +from rocket_twin.systems import Station, ControllerFMU +from cosapp.drivers import RungeKutta, NonLinearSolver +from cosapp.recorders import DataFrameRecorder +from cosapp.utils import swap_system +import numpy as np +import matplotlib.pyplot as plt + +model_path = r"systems\control\controller.mo" +model_name = "controller" + +model_path_r = r"systems\control\rocket_controller.mo" +model_name_r = "rocket_controller" + +sys = Station("sys") +swap_system( + sys.controller, + ControllerFMU("controller", model_path=model_path, model_name=model_name), +) +swap_system( + sys.rocket.controller, + ControllerFMU("controller", model_path=model_path_r, model_name=model_name_r), +) + +sys.connect(sys.controller.inwards, sys.rocket.inwards, ["weight_max", "weight_p"]) +sys.rocket.connect( + sys.rocket.controller.inwards, sys.rocket.tank.inwards, ["weight_max", "weight_p"] +) + +driver = sys.add_driver(RungeKutta(order=4, time_interval=[0, 25], dt=0.01)) +solver = driver.add_child(NonLinearSolver('solver')) +init = {"g_tank.weight_p": 10.0, "rocket.tank.weight_p": 0.0} +values = { + "g_tank.w_out_max": 1.0, + "rocket.tank.w_out_max": 0.5, + "controller.time_int": 10., + "rocket.controller.time_int": 10., +} + +includes = ["rocket.tank.weight_p", "g_tank.weight_p", "rocket.a"] + +driver.set_scenario(init=init, values=values) +driver.add_recorder(DataFrameRecorder(includes=includes), period=1.0) + +sys.run_drivers() +data = driver.recorder.export_data() + +time = np.asarray(data['time']) +g_mass = np.asarray(data["g_tank.weight_p"]) +r_mass = np.asarray(data["rocket.tank.weight_p"]) +acel = np.asarray(data['rocket.a']) + +plt.plot(time, g_mass, label="Ground tank fuel mass") +plt.plot(time, r_mass, label="Rocket tank fuel mass") +plt.title("Ground and rocket fuel masses over time") +plt.xlabel("Time (s)") +plt.ylabel("Mass (kg)") +plt.legend() +plt.show() + +plt.plot(time, acel, label="Rocket acceleration") +plt.title("Rocket acceleration over time") +plt.xlabel("Time (s)") +plt.ylabel("Acceleration (m/s²)") +plt.legend() +plt.show() + +from cosapp.base import System + +from rocket_twin.systems import Dynamics, Stage + + +class Rocket(System): + """A simple model of a rocket. + + Inputs + ------ + n_stages: int, + how many stages the rocket has + flying: boolean, + whether the rocket is already flying or still on ground + + Outputs + ------ + """ + + def setup(self): + + + self.add_inward("n_stages", 1, desc="Number of rocket stages", unit='') + + forces, weights, centers = ([] for i in range(self.n_stages)) + + for i in range(self.n_stages): + self.add_child(Stage(f"stage_{i}")) + forces[i] = f"thrust_{i}" + weights[i] = f"weight_{i}" + centers[i] = f"center_{i}" + + self.add_child( + Dynamics( + "dyn", + forces=forces, + weights=weights, + centers=centers, + ), + pulling=["a"], + ) + + for stage in self.children: + if stage != "dyn": + self.connect( + self.stage.outwards, + self.dyn.inwards, + {"force": f"thrust_{i}", "weight": f"weight_{i}", "center": f"center_{i}"}, + ) + + + self.add_inward_modevar( + "flying", False, desc="Whether the rocket is flying or not", unit="" + ) + + self.add_event("Takeoff", trigger="dyn.a > 0") + + def compute(self): + self.a *= self.flying + + def transition(self): + + if self.Takeoff.present: + self.flying = True \ No newline at end of file diff --git a/rocket_twin/systems/engine/engine.py b/rocket_twin/systems/engine/engine.py index 1ed69d1..01c1e83 100644 --- a/rocket_twin/systems/engine/engine.py +++ b/rocket_twin/systems/engine/engine.py @@ -21,7 +21,7 @@ class Engine(System): def setup(self): - self.add_inward("isp", 20.0, desc="Specific impulsion in vacuum", unit="s") + self.add_inward("isp", 200.0, desc="Specific impulsion in vacuum", unit="s") self.add_inward("w_out", 0.0, desc="Fuel consumption rate", unit="kg/s") self.add_inward("g_0", 10.0, desc="Gravity at Earth's surface", unit="m/s**2") diff --git a/rocket_twin/systems/rocket/__init__.py b/rocket_twin/systems/rocket/__init__.py index a8e3e08..3f580b1 100644 --- a/rocket_twin/systems/rocket/__init__.py +++ b/rocket_twin/systems/rocket/__init__.py @@ -1,4 +1,4 @@ +from rocket_twin.systems.rocket.stage import Stage # isort: skip from rocket_twin.systems.rocket.rocket import Rocket -from rocket_twin.systems.rocket.stage import Stage __all__ = ["Stage", "Rocket"] diff --git a/rocket_twin/systems/rocket/rocket.py b/rocket_twin/systems/rocket/rocket.py index a80ea47..a36fd8f 100644 --- a/rocket_twin/systems/rocket/rocket.py +++ b/rocket_twin/systems/rocket/rocket.py @@ -1,6 +1,8 @@ from cosapp.base import System +from cosapp.utils import get_state -from rocket_twin.systems import ControllerCoSApp, Dynamics, Engine, Tank +from rocket_twin.systems import Dynamics +from rocket_twin.systems.rocket import Stage class Rocket(System): @@ -8,6 +10,8 @@ class Rocket(System): Inputs ------ + n_stages: int, + how many stages the rocket has flying: boolean, whether the rocket is already flying or still on ground @@ -15,40 +19,69 @@ class Rocket(System): ------ """ - def setup(self): - self.add_child(ControllerCoSApp("controller")) - self.add_child(Tank("tank"), pulling=["w_in", "weight_max", "weight_p"]) - self.add_child(Engine("engine")) + def setup(self, n_stages=1): + + forces, weights, centers = ([None] * n_stages for i in range(3)) + + for i in range(1, n_stages + 1): + self.add_child(Stage(f"stage_{i}")) + forces[i - 1] = f"thrust_{i}" + weights[i - 1] = f"weight_{i}" + centers[i - 1] = f"center_{i}" + self.add_child( Dynamics( "dyn", - forces=["thrust"], - weights=["weight_eng", "weight_tank"], - centers=["engine", "tank"], + forces=forces, + weights=weights, + centers=centers, ), pulling=["a"], ) - self.connect(self.controller.outwards, self.tank.inwards, {"w": "w_command"}) - self.connect(self.tank.outwards, self.engine.inwards, {"w_out": "w_out"}) + self.stage_1.controller.w_temp = 1. - self.connect( - self.engine.outwards, - self.dyn.inwards, - {"force": "thrust", "weight": "weight_eng", "cg": "engine"}, - ) - self.connect(self.tank.outwards, self.dyn.inwards, {"weight": "weight_tank", "cg": "tank"}) + stages = list(self.children.values()) + + for i in range(0, len(stages) - 1): + self.connect( + stages[i].outwards, + self.dyn.inwards, + {"force": f"thrust_{i + 1}", "weight": f"weight_{i + 1}", "cg": f"center_{i + 1}"}, + ) + self.add_inward_modevar( "flying", False, desc="Whether the rocket is flying or not", unit="" ) + self.add_outward_modevar("stage", 1, desc="Rocket's current stage", unit='') + + self.add_event("Takeoff", trigger="dyn.a > 0") + self.add_event("Removal", trigger="stage_1.weight_p == 0.") - self.add_event("Takeoff", trigger="engine.force > 0") def compute(self): self.a *= self.flying + print('massa 1: ', self.stage_1.weight_p) + print('tempo: ', self.time) def transition(self): if self.Takeoff.present: self.flying = True + if self.Removal.present: + print(f"REMOVAL {self.stage}") + + self.dyn.inwards[f'thrust_{self.stage}'] = 0. + self.dyn.inwards[f'weight_{self.stage}'] = 0. + self.dyn.inwards[f'center_{self.stage}'] = 0. + + self.stage += 1 + cur_stage = list(self.children.values())[1] + cur_stage.controller.w_temp = 1. + + self.Removal.trigger = f"stage_{self.stage}.weight_p == 0." + self.pop_child(list(self.children.keys())[0]) + + + diff --git a/rocket_twin/systems/rocket/stage.py b/rocket_twin/systems/rocket/stage.py index 3f53f13..78de7c8 100644 --- a/rocket_twin/systems/rocket/stage.py +++ b/rocket_twin/systems/rocket/stage.py @@ -23,7 +23,7 @@ def setup(self): self.add_child(ControllerCoSApp("controller")) self.add_child(Tank("tank"), pulling=["w_in", "weight_max", "weight_p"]) - self.add_child(Engine("engine")) + self.add_child(Engine("engine"), pulling=["force"]) self.connect(self.controller.outwards, self.tank.inwards, {"w": "w_command"}) self.connect(self.tank.outwards, self.engine.inwards, {"w_out": "w_out"}) @@ -31,8 +31,12 @@ def setup(self): self.add_outward("weight", 1.0, desc="Weight", unit="kg") self.add_outward("cg", 1.0, desc="Center of gravity", unit="m") + self.tank.w_out_max = 1. + self.weight_p = self.weight_max + def compute(self): + print("COMPUTANDO") self.weight = self.tank.weight + self.engine.weight self.cg = (self.tank.cg * self.tank.weight + self.engine.cg * self.engine.weight) / ( self.weight diff --git a/rocket_twin/tests/test_stages.py b/rocket_twin/tests/test_stages.py index e8d01cd..4f1f659 100644 --- a/rocket_twin/tests/test_stages.py +++ b/rocket_twin/tests/test_stages.py @@ -1,7 +1,8 @@ import numpy as np -from cosapp.drivers import RungeKutta +from cosapp.drivers import RungeKutta, EulerExplicit +from cosapp.recorders import DataFrameRecorder -from rocket_twin.systems import Stage +from rocket_twin.systems import Stage, Rocket class TestStage: @@ -19,3 +20,47 @@ def test_single_stage(self): np.testing.assert_allclose(sys.weight, 2.0, atol=10 ** (-2)) np.testing.assert_allclose(sys.cg, 1.0, atol=10 ** (-2)) + + def test_two_stages(self): + + sys = Rocket('sys', n_stages=2) + init = {"stage_1.weight_p": "stage_1.weight_max", "stage_2.weight_p": "stage_2.weight_max"} + stop = "stage_2.weight_p == 0." + print(init) + + driver = sys.add_driver(RungeKutta('rk', order=4, dt=0.1)) + driver.time_interval = (0, 10) + driver.set_scenario(init=init, stop=stop) + driver.add_recorder(DataFrameRecorder(includes=["dyn.weight"]), period=1.0) + sys.run_drivers() + + data = driver.recorder.export_data() + data = data.drop(["Section", "Status", "Error code"], axis=1) + print(data) + + np.testing.assert_allclose(sys.dyn.weight, 2.0, atol=10 ** (-2)) + np.testing.assert_allclose(sys.dyn.center, 1.0, atol=10 ** (-2)) + + def test_n_stages(self, n=3): + + sys = Rocket('sys', n_stages=n) + init = {} + #for i in range(n): + #init[f"stage_{i + 1}.weight_p"] = f"stage_{i + 1}.weight_max" + stop = f"stage_{n}.weight_p == 0." + + driver = sys.add_driver(EulerExplicit('rk', dt=0.1)) + driver.time_interval = (0, 15) + driver.set_scenario(init=init, stop=stop) + #driver.add_recorder(DataFrameRecorder(includes=["stage_2.weight_p"]), period=1.0) + sys.run_drivers() + + #data = driver.recorder.export_data() + #data = data.drop(["Section", "Status", "Error code"], axis=1) + #print(data) + + np.testing.assert_allclose(sys.dyn.weight, 2.0, atol=10 ** (-2)) + np.testing.assert_allclose(sys.dyn.center, 1.0, atol=10 ** (-2)) + +test = TestStage() +test.test_n_stages()