-
Notifications
You must be signed in to change notification settings - Fork 350
Description
Bug Description
I am implementing a variant of DEM using warp, where I follow physically accurate coefficients. Here I am simulating a particle hitting a wall and bouncing back. The file I am attaching is self sufficient and doesn't depend on other files.
The problem I am encountering is, the collision logic: function compute_force_sw
behaves strangely.
Without the print statement (line no 241), the force is not being computed when the particle is in contact with the wall. However, with that print statement enabled, the code works perfect and I am able to reproduce the force curve as expected.
I am not sure how to report this bug or where the fault could be. I am using Mac Arm system. You can run the file and by enabling and disabling the print statement, I hope you will be able to reproduce this bug.
One need h5py
python package to write the data.
Thank you.
from tqdm import tqdm
import warp as wp
import numpy as np
import math
import matplotlib.pyplot as plt
import h5py
import os
class ParticlesDEM:
def __init__(self, points, max_no_tangential_contacts_limit=6, dim=2):
# Properties
self.x = wp.array(points, dtype=wp.vec3) # Positions
self.u = wp.zeros(len(points), dtype=wp.vec3) # Velocities
self.au = wp.zeros(len(points), dtype=wp.vec3) # Accelerations
self.force = wp.zeros(len(points), dtype=wp.vec3) # Forces
self.torque = wp.zeros(len(points), dtype=wp.vec3) # Torques
self.omega = wp.zeros(len(points), dtype=wp.vec3) # Angular velocities
self.m = wp.ones(len(points), dtype=wp.float32) # Masses
self.rho = wp.ones(len(points), dtype=wp.float32) # Densities
self.rad = wp.ones(len(points), dtype=wp.float32) # Radii
self.E = wp.ones(len(points), dtype=wp.float32) # Young's Modulus
self.nu = wp.ones(len(points), dtype=wp.float32) # Poisson's Ratio
self.G = wp.ones(len(points), dtype=wp.float32) # Shear Modulus
self.I = wp.ones(len(points), dtype=wp.float32) # Moment of inertia
self.tangential_disp_ss_x = wp.zeros(len(points) * max_no_tangential_contacts_limit, dtype=wp.float32) # tangential disp in x direction with sphere contact
self.tangential_disp_ss_y = wp.zeros(len(points) * max_no_tangential_contacts_limit, dtype=wp.float32) # tangential disp in y direction with sphere contact
self.tangential_disp_ss_z = wp.zeros(len(points) * max_no_tangential_contacts_limit, dtype=wp.float32) # tangential disp in z direction with sphere contact
self.tangential_disp_ss_idx = wp.array(-1 * np.ones(len(points) * max_no_tangential_contacts_limit), dtype=wp.int32) # tangential disp id with sphere contact
self.total_no_tangential_contacts_ss = wp.zeros(len(points), dtype=wp.int32) # total no of tangential contacts
self.max_no_tangential_contacts_limit_ss = wp.array(np.array([max_no_tangential_contacts_limit]), dtype=wp.int32) # maximum no of contacts
# Properties related to contact with infinite wall
self.tangential_disp_sw_x = wp.zeros(len(points) * 6, dtype=wp.float32)
self.tangential_disp_sw_y = wp.zeros(len(points) * 6, dtype=wp.float32)
self.tangential_disp_sw_z = wp.zeros(len(points) * 6, dtype=wp.float32)
# Properties related to contact with infinite wall ends
# Grid for neighbours
nx = int(len(points)**0.5) + 1
ny = int(len(points)**0.5) + 1
nz = 1
if dim == 3:
nx = int(len(points)**1./3.) + 1
ny = int(len(points)**1./3.) + 1
nz = int(len(points)**1./3.) + 1
self.grid = wp.HashGrid(nx, ny, nz)
self.max_radius = 0.0
def output(self, folder, step, time):
positions = self.x.numpy()
velocities = self.u.numpy()
forces = self.force.numpy()
m = self.m.numpy()
rho = self.rho.numpy()
rad = self.rad.numpy()
write_time_step(
folder+"/particles", # prefix
step, # timestep index
time, # simulation time
positions,
("velocity", velocities),
("force", forces),
("mass", m),
("density", rho),
("radius", rad),
)
class WallsDEM:
def __init__(self, points, normals, dim=2):
# Properties
self.x = wp.array(points, dtype=wp.vec3) # Positions
self.u = wp.zeros(len(points), dtype=wp.vec3) # Velocities
self.omega = wp.zeros(len(points), dtype=wp.vec3) # Angular velocities
self.E = wp.ones(len(points), dtype=wp.float32) # Young's Modulus
self.nu = wp.ones(len(points), dtype=wp.float32) # Poisson's Ratio
self.G = wp.ones(len(points), dtype=wp.float32) # Shear Modulus
# normals of each wall
self.normal = wp.array(normals, dtype=wp.vec3) # Normals
self.total_no_walls = wp.array(np.array([len(points)]), dtype=wp.int32)
# Function that takes points (x) and returns a SimulationState instance
def create_particles_DEM(points, rho, rad, E, nu):
particles = ParticlesDEM(points)
# Setup the particles
# particles.m = * rho * rad**3.
particles.m = wp.array(4. / 3. * np.pi * np.ones(len(points)), dtype=wp.float32) * rho * rad * rad * rad
particles.rho = rho
particles.rad = rad
particles.E = E
particles.nu = nu
particles.G = E / ((nu + wp.ones(len(points), dtype=wp.float32)) * wp.array(2. * np.ones(len(points)), dtype=wp.float32))
particles.I = wp.array(2. / 5. * np.ones(len(points)), dtype=wp.float32) * particles.m * rad
# Build the grid
max_radius_wp = wp.array([-math.inf], dtype=float)
wp.launch(compute_max, dim=particles.rad.shape, inputs=(particles.rad,), outputs=(max_radius_wp,))
particles.max_radius = max_radius_wp.numpy()[0]
particles.grid.build(particles.x, 3. * particles.max_radius)
return particles
# Function that takes points (x) and returns a SimulationState instance
def create_walls_DEM(points, E, nu, normals):
walls = WallsDEM(points, normals)
walls.E = E
walls.nu = nu
walls.G = E / ((nu + wp.ones(len(points), dtype=wp.float32)) * wp.array(2. * np.ones(len(points)), dtype=wp.float32))
return walls
# Function that takes points (x) and returns a SimulationState instance
def setup_wall_colliding_properties_of_particles(particles, walls):
particles.tangential_disp_sw_x = wp.zeros(len(particles.x) * len(walls.x), dtype=wp.float32)
particles.tangential_disp_sw_y = wp.zeros(len(particles.x) * len(walls.x), dtype=wp.float32)
particles.tangential_disp_sw_z = wp.zeros(len(particles.x) * len(walls.x), dtype=wp.float32)
@wp.kernel
def stage1(
x: wp.array(dtype=wp.vec3),
u: wp.array(dtype=wp.vec3),
force: wp.array(dtype=wp.vec3),
torque: wp.array(dtype=wp.vec3),
omega: wp.array(dtype=wp.vec3),
m: wp.array(dtype=wp.float32),
I: wp.array(dtype=wp.float32),
dt: wp.float32):
tid = wp.tid()
m_inverse = (1.0) / m[tid]
u[tid][0] = u[tid][0] + (0.5) * dt * force[tid][0] * m_inverse
u[tid][1] = u[tid][1] + (0.5) * dt * force[tid][1] * m_inverse
u[tid][2] = u[tid][2] + (0.5) * dt * force[tid][2] * m_inverse
I_inverse = (1.0) / I[tid]
omega[tid][0] = omega[tid][0] + (0.5) * dt * torque[tid][0] * I_inverse
omega[tid][1] = omega[tid][1] + (0.5) * dt * torque[tid][1] * I_inverse
omega[tid][2] = omega[tid][2] + (0.5) * dt * torque[tid][2] * I_inverse
@wp.kernel
def stage2(
x: wp.array(dtype=wp.vec3),
u: wp.array(dtype=wp.vec3),
dt: wp.float32):
tid = wp.tid()
x[tid][0] = x[tid][0] + dt * u[tid][0]
x[tid][1] = x[tid][1] + dt * u[tid][1]
x[tid][2] = x[tid][2] + dt * u[tid][2]
@wp.kernel
def stage3(
x: wp.array(dtype=wp.vec3),
u: wp.array(dtype=wp.vec3),
force: wp.array(dtype=wp.vec3),
torque: wp.array(dtype=wp.vec3),
omega: wp.array(dtype=wp.vec3),
m: wp.array(dtype=wp.float32),
I: wp.array(dtype=wp.float32),
dt: wp.float32):
tid = wp.tid()
m_inverse = (1.0) / m[tid]
u[tid][0] = u[tid][0] + (0.5) * dt * force[tid][0] * m_inverse
u[tid][1] = u[tid][1] + (0.5) * dt * force[tid][1] * m_inverse
u[tid][2] = u[tid][2] + (0.5) * dt * force[tid][2] * m_inverse
I_inverse = (1.0) / I[tid]
omega[tid][0] = omega[tid][0] + (0.5) * dt * torque[tid][0] * I_inverse
omega[tid][1] = omega[tid][1] + (0.5) * dt * torque[tid][1] * I_inverse
omega[tid][2] = omega[tid][2] + (0.5) * dt * torque[tid][2] * I_inverse
@wp.kernel
def make_force_torque_zero_on_particle(
force: wp.array(dtype=wp.vec3),
torque: wp.array(dtype=wp.vec3),
dt: wp.float32):
tid = wp.tid()
force[tid][0] = 0.
force[tid][1] = 0.
force[tid][2] = 0.
torque[tid][0] = 0.
torque[tid][1] = 0.
torque[tid][2] = 0.
@wp.kernel
def compute_force_sw(
grid: wp.uint64,
x: wp.array(dtype=wp.vec3),
u: wp.array(dtype=wp.vec3),
au: wp.array(dtype=wp.vec3),
force: wp.array(dtype=wp.vec3),
torque: wp.array(dtype=wp.vec3),
omega: wp.array(dtype=wp.vec3),
m: wp.array(dtype=wp.float32),
rho: wp.array(dtype=wp.float32),
rad: wp.array(dtype=wp.float32),
E: wp.array(dtype=wp.float32),
nu: wp.array(dtype=wp.float32),
G: wp.array(dtype=wp.float32),
I: wp.array(dtype=wp.float32),
tangential_disp_sw_x: wp.array(dtype=wp.float32),
tangential_disp_sw_y: wp.array(dtype=wp.float32),
tangential_disp_sw_z: wp.array(dtype=wp.float32),
x_w: wp.array(dtype=wp.vec3),
u_w: wp.array(dtype=wp.vec3),
omega_w: wp.array(dtype=wp.vec3),
normal_w: wp.array(dtype=wp.vec3),
E_w: wp.array(dtype=wp.float32),
nu_w: wp.array(dtype=wp.float32),
G_w: wp.array(dtype=wp.float32),
total_no_walls: wp.array(dtype=wp.int32),
dt: wp.float32,
en: wp.float32,
fric_coeff: wp.float32
):
i = wp.tid()
pos_i = x[i]
for j in range(len(x_w)):
print(j)
pos_j = x_w[j]
pos_ij = pos_i - pos_j
nij = normal_w[j]
tmp = wp.dot(pos_ij, nij)
overlap = rad[i] - tmp
# Index where we save current walls contact info
found_at = i * total_no_walls[0] + j
# ---------- force computation starts ------------
# if particles are overlapping
if overlap > 1e-6:
a_i = rad[i] - overlap / (2.)
vel_i = u[i] + wp.cross(omega[i], nij) * a_i
vel_j = wp.vec3(0., 0., 0.)
# Now the relative velocity of particle i w.r.t j at the contact point is
vel_ij = vel_i - vel_j
# normal velocity magnitude
vij_dot_nij = wp.dot(vel_ij, nij)
vn = vij_dot_nij * nij
# tangential velocity
vt = vel_ij - vn
############################
# normal force computation #
############################
# Compute stiffness
# effective Young's modulus
tmp_1 = (1. - nu[i]**2.) / E[i]
tmp_2 = (1. - nu_w[j]**2.) / E_w[j]
E_eff = 1. / (tmp_1 + tmp_2)
tmp_3 = 1. / rad[i]
tmp_4 = 0.
R_eff = 1. / (tmp_3 + tmp_4)
# Eq 4 [1]
kn = (4. / 3.) * E_eff * R_eff**0.5
# compute damping coefficient
tmp_1 = wp.log(en)
tmp_2 = tmp_1**2. + wp.pi**2.
beta = tmp_1 / (tmp_2)**0.5
S_n = 2. * E_eff * (R_eff * overlap)**0.5
tmp_1 = 1. / m[i]
tmp_2 = 0.
m_eff = 1. / (tmp_1 + tmp_2)
eta_n = -2. * (5./6.)**0.5 * beta * (S_n * m_eff)**0.5
fn = kn * overlap**1.5
fn_x = fn * nij[0] - eta_n * vn[0]
fn_y = fn * nij[1] - eta_n * vn[1]
fn_z = fn * nij[2] - eta_n * vn[2]
#################################
# tangential force computation #
#################################
ft_x = 0.
ft_y = 0.
ft_z = 0.
vij_magn = wp.length(vel_ij)
if vij_magn < 1e-6:
tangential_disp_sw_x[found_at] = 0.
tangential_disp_sw_y[found_at] = 0.
tangential_disp_sw_z[found_at] = 0.
else:
tangential_disp_sw_x[found_at] = tangential_disp_sw_x[found_at] + vt[0] * dt
tangential_disp_sw_y[found_at] = tangential_disp_sw_y[found_at] + vt[1] * dt
tangential_disp_sw_z[found_at] = tangential_disp_sw_z[found_at] + vt[2] * dt
# Compute the tangential stiffness
tmp_1 = (2. - nu[i]) / G[i]
tmp_2 = (2. - nu_w[j]) / G_w[j]
G_eff = 1. / (tmp_1 + tmp_2)
# Eq 12 [1]
kt = 8. * G_eff * (R_eff * overlap)**0.5
S_t = kt
eta_t = -2. * (5./6.)**0.5 * beta * (S_t * m_eff)**0.5
ft_x_star = -kt * tangential_disp_sw_x[found_at] - eta_t * vt[0]
ft_y_star = -kt * tangential_disp_sw_y[found_at] - eta_t * vt[1]
ft_z_star = -kt * tangential_disp_sw_z[found_at] - eta_t * vt[2]
ft_magn = (ft_x_star**2. + ft_y_star**2. + ft_z_star**2.)**0.5
ti = wp.vec3(0., 0., 0.)
if ft_magn > 1e-6:
ti[0] = ft_x_star / ft_magn
ti[1] = ft_y_star / ft_magn
ti[2] = ft_z_star / ft_magn
fn_magn = (fn_x**2. + fn_y**2. + fn_z**2.)**0.5
ft_magn_star = min(fric_coeff * fn_magn, ft_magn)
# compute the tangential force, by equation 17 (Lethe)
ft_x = ft_magn_star * ti[0]
ft_y = ft_magn_star * ti[1]
ft_z = ft_magn_star * ti[2]
# Add damping to the limited force
ft_x = ft_x + eta_t * vt[0]
ft_y = ft_y + eta_t * vt[1]
ft_z = ft_z + eta_t * vt[2]
# reset the spring length
tangential_disp_sw_x[found_at] = -ft_x / kt
tangential_disp_sw_y[found_at] = -ft_y / kt
tangential_disp_sw_z[found_at] = -ft_z / kt
force[i][0] = force[i][0] + fn_x + ft_x
force[i][1] = force[i][1] + fn_y + ft_y
force[i][2] = force[i][2] + fn_z + ft_z
# torque = n cross F
torque[i][0] = torque[i][0] + (nij[1] * ft_z - nij[2] * ft_y) * a_i
torque[i][1] = torque[i][1] + (nij[2] * ft_x - nij[0] * ft_z) * a_i
torque[i][2] = torque[i][2] + (nij[0] * ft_y - nij[1] * ft_x) * a_i
else:
tangential_disp_sw_x[found_at] = -ft_x / kt
tangential_disp_sw_y[found_at] = -ft_y / kt
tangential_disp_sw_z[found_at] = -ft_z / kt
def compute_force_and_torque(particles, en_ss, fric_coeff_ss, dt,
walls=None, en_sw=0.1, fric_coeff_sw=0.1):
# re-build the grid
particles.grid.build(particles.x, 3. * particles.max_radius)
wp.launch(
kernel=make_force_torque_zero_on_particle,
dim=len(particles.x),
inputs=[particles.force,
particles.torque,
dt],
)
if walls is not None:
wp.launch(
kernel=compute_force_sw,
dim=len(particles.x),
inputs=[
particles.grid.id,
particles.x,
particles.u,
particles.au,
particles.force,
particles.torque,
particles.omega,
particles.m,
particles.rho,
particles.rad,
particles.E,
particles.nu,
particles.G,
particles.I,
particles.tangential_disp_sw_x,
particles.tangential_disp_sw_y,
particles.tangential_disp_sw_z,
walls.x,
walls.u,
walls.omega,
walls.normal,
walls.E,
walls.nu,
walls.G,
walls.total_no_walls,
dt,
en_sw,
fric_coeff_sw],
)
class SolverDEM:
def __init__(self, particles, tf, dt, output_frequency, output_dir,
en_ss, fric_coeff_ss, walls=None, en_sw=0.1, fric_coeff_sw=0.1):
self.particles = particles
self.walls = walls
self.tf = tf
self.dt = dt
self.num_steps = int(tf / dt)
self.output_frequency = output_frequency
self.output_dir = output_dir
# Create the folder if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
self.en_ss = en_ss
self.fric_coeff_ss = fric_coeff_ss
self.en_sw = en_sw
self.fric_coeff_sw = fric_coeff_sw
def run(self):
for step in tqdm(range(self.num_steps + 1), desc="Running Simulation",
unit="step"):
wp.launch(
kernel=stage1,
dim=len(self.particles.x),
inputs=[self.particles.x,
self.particles.u,
self.particles.force,
self.particles.torque,
self.particles.omega,
self.particles.m,
self.particles.I,
self.dt],
)
wp.launch(
kernel=stage2,
dim=len(self.particles.x),
inputs=[self.particles.x,
self.particles.u,
self.dt],
)
compute_force_and_torque(
self.particles, self.en_ss, self.fric_coeff_ss,
self.dt, self.walls, self.en_sw, self.fric_coeff_sw
)
wp.launch(
kernel=stage3,
dim=len(self.particles.x),
inputs=[self.particles.x,
self.particles.u,
self.particles.force,
self.particles.torque,
self.particles.omega,
self.particles.m,
self.particles.I,
self.dt],
)
self.output(step)
def output(self, step):
is_output_step = (step % self.output_frequency == 0)
is_final_step = (step == self.num_steps)
if is_output_step or is_final_step:
# Compute output index
if is_output_step:
output_index = step // self.output_frequency
else:
# Offset by 1 if the final step isn't aligned with frequency
output_index = (self.num_steps // self.output_frequency) + 1
self.particles.output(
folder=self.output_dir,
step=output_index,
time=step * self.dt
)
@wp.kernel
def compute_max(
values: wp.array(dtype=float),
out: wp.array(dtype=float),
):
tid = wp.tid()
wp.atomic_max(out, 0, values[tid])
def get_files(directory):
# =====================================
# start: get the files and sort
# =====================================
files = [filename for filename in os.listdir(directory) if filename.startswith("particles") and filename.endswith("h5") ]
files.sort()
files_num = []
for f in files:
f_last = f[10:]
files_num.append(int(f_last[:-3]))
files_num.sort()
sorted_files = []
for num in files_num:
sorted_files.append("particles_" + str(num) + ".h5")
files = sorted_files
return files
# ---------------------------------------
# XDMF Writer Functions
# ---------------------------------------
def write_xdmf_header(xdmf_file, num_particles, dims, dtype, precision, h5_filename, coords_name):
with open(xdmf_file, "w") as f:
f.write('<?xml version="1.0" ?>\n')
f.write('<!DOCTYPE Xdmf SYSTEM "Xdmf.dtd" []>\n')
f.write('<Xdmf Version="2.0">\n')
f.write(' <Domain>\n')
f.write(' <Grid Name="points" GridType="Uniform">\n')
f.write(f' <Topology TopologyType="Polyvertex" Dimensions="{num_particles}"/>\n')
f.write(' <Geometry Type="XYZ">\n')
f.write(f' <DataItem Dimensions="{num_particles} {dims}" NumberType="{dtype}" Precision="{precision}" Format="HDF">\n')
f.write(f' {h5_filename}:/{coords_name}\n')
f.write(' </DataItem>\n')
f.write(' </Geometry>\n')
def write_xdmf_footer(xdmf_file):
with open(xdmf_file, "a") as f:
f.write(' </Grid>\n')
f.write(' </Domain>\n')
f.write('</Xdmf>\n')
def write_xdmf_attribute(xdmf_file, name, dims, dtype, precision, h5_file, path):
attr_type = "Scalar" if len(dims) == 1 else "Vector" if len(dims) == 2 else "Tensor"
with open(xdmf_file, "a") as f:
f.write(f' <Attribute Name="{name}" AttributeType="{attr_type}" Center="Node">\n')
f.write(f' <DataItem Dimensions="{" ".join(map(str, dims))}" NumberType="{dtype}" Precision="{precision}" Format="HDF">\n')
f.write(f' {h5_file}:/{path}\n')
f.write(' </DataItem>\n')
f.write(' </Attribute>\n')
# ---------------------------------------
# HDF5 Writer Functions
# ---------------------------------------
def write_field(file, dataset_name, data):
file.create_dataset(dataset_name, data=data)
def write_time_step(prefix, timestep_index, time, coords, *fields):
def to_numpy64(arr):
if isinstance(arr, wp.array):
arr = arr.numpy()
# Always convert to float64 to avoid HDF5 + Paraview issues
if arr.dtype == np.float32:
arr = arr.astype(np.float64)
return arr
coords = to_numpy64(coords)
converted_fields = []
for name, field in fields:
field = to_numpy64(field)
converted_fields.append((name, field))
fields = converted_fields
n_particles, dim = coords.shape
dtype = "Float"
precision = 8 if coords.dtype == np.float64 else 4
h5_filename = f"{prefix}_{timestep_index}.h5"
xdmf_filename = f"{prefix}_{timestep_index}.xmf"
# Write HDF5
with h5py.File(h5_filename, "w") as f:
f.attrs["Time"] = time
write_field(f, "Position", coords)
for name, field in fields:
write_field(f, name, field)
# Write XDMF metadata
write_xdmf_header(
xdmf_file=xdmf_filename,
num_particles=n_particles,
dims=dim,
dtype=dtype,
precision=precision,
h5_filename=os.path.basename(h5_filename),
coords_name="Position"
)
for name, field in fields:
field_shape = field.shape
dims = field_shape if len(field_shape) > 1 else (field_shape[0],)
write_xdmf_attribute(
xdmf_file=xdmf_filename,
name=name,
dims=dims,
dtype=dtype,
precision=precision,
h5_file=os.path.basename(h5_filename),
path=name
)
write_xdmf_footer(xdmf_filename)
class DEMWallSimulation:
def __init__(self, output_dir, tf=3*1e-3, dt=1e-6, output_frequency=30,
en_ss=1.0, fric_coeff_ss=0.35, en_sw=1.0, fric_coeff_sw=0.0):
"""
Initializes the DEM simulation with particles, walls, and solver setup.
Parameters:
- output_dir: Directory to save output files.
- tf: Final simulation time.
- dt: Time step.
- output_frequency: How often to output results.
- en_ss, fric_coeff_ss: Normal restitution and static friction coefficients for particle-particle collisions.
- en_sw, fric_coeff_sw: Normal restitution and static friction coefficients for particle-wall collisions.
"""
wp.init()
# General simulation-related variables
self.output_dir = output_dir
self.tf = tf
self.dt = dt
self.output_frequency = output_frequency
# Scheme-related constants
self.en_ss = 1.
self.fric_coeff_ss = 0.
self.en_sw = 1.
self.fric_coeff_sw = 0.
# Material properties
self.particle_radius = 0.1
self.particle_velocity = wp.array(np.array([[0., -0.2, 0.]]), dtype=wp.vec3)
self.particle_density = 2699.0
self.particle_elastic_modulus = 7. * 1e10
self.particle_poisson_ratio = 0.3
self.wall_elastic_modulus = 380e9
self.wall_poisson_ratio = 0.23
# self.wall_elastic_modulus = 70e9
# self.wall_poisson_ratio = 0.3
# Initialize particles, walls, and solver
self.particles_DEM = None
self.walls_DEM = None
self.solver = None
def create_particles(self):
"""Create and define DEM particles."""
pos = np.array([[0., self.particle_radius + self.particle_radius / 1000, 0.]]) # Single particle at initial position
vel = np.array([[0., -0.2, 0.]]) # Initial velocity
# Convert positions, velocities, and properties into Warp arrays
x = wp.array(pos, dtype=wp.vec3)
u = wp.array(vel, dtype=wp.vec3)
rho = wp.array(self.particle_density * np.ones(len(pos)), dtype=wp.float32)
rad = wp.array(self.particle_radius * np.ones(len(pos)), dtype=wp.float32)
E = wp.array(self.particle_elastic_modulus * np.ones(len(pos)), dtype=wp.float32)
nu = wp.array(self.particle_poisson_ratio * np.ones(len(pos)), dtype=wp.float32)
# Create particles with the defined properties
self.particles_DEM = create_particles_DEM(x, rho, rad, E, nu)
self.particles_DEM.u = u # Set the initial velocity
def create_walls(self):
"""Create the walls for particle-wall collisions."""
# Define wall position and normal direction
wall_pos = np.array([[0., 0., 0.]])
wall_normals = np.array([[0., 1., 0.]]) # Normal vector pointing upwards
# Convert to Warp arrays
x = wp.array(wall_pos, dtype=wp.vec3)
wp_normals = wp.array(wall_normals, dtype=wp.vec3)
E = wp.array(self.wall_elastic_modulus * np.ones(len(x)), dtype=wp.float32)
nu = wp.array(self.wall_poisson_ratio * np.ones(len(x)), dtype=wp.float32)
# Create the walls with defined properties
self.walls_DEM = create_walls_DEM(x, E, nu, wp_normals)
def setup_collision_properties(self):
"""Set up collision properties between particles and walls."""
setup_wall_colliding_properties_of_particles(self.particles_DEM, self.walls_DEM)
def create_solver(self):
"""Create the solver for the DEM simulation."""
if self.particles_DEM is None or self.walls_DEM is None:
raise RuntimeError("Both particles and walls must be created before creating the solver.")
self.solver = SolverDEM(
self.particles_DEM, self.tf, self.dt, self.output_frequency,
self.output_dir,
self.en_ss, self.fric_coeff_ss, self.walls_DEM, self.en_sw,
self.fric_coeff_sw)
def run(self):
"""Run the DEM simulation."""
if self.solver is None:
raise RuntimeError("Solver must be created before running the simulation.")
self.solver.run()
def postprocess(self, analytical_csv=None):
"""Post-process simulation results and compare to analytical data."""
files = get_files(self.output_dir)
# if not files:
# print("No output files found.")
# return
fn_simu = []
time_simu = []
for file in files:
if file.endswith(".h5"):
with h5py.File(os.path.join(self.output_dir, file), "r") as f:
# print(f["Position"][:])
fn_simu.append(f["force"][0][1]/1e3) # Convert to kN
# fn_simu.append(f["Position"][0][1]) # Convert to kN
time_simu.append(f.attrs["Time"] / 1e-6) # Convert to microseconds
# Plot simulation data
plt.plot(time_simu, fn_simu, "^-", label="Cabana DEM Solver")
# If analytical data is provided, plot that as well
if analytical_csv:
data = np.loadtxt(analytical_csv, delimiter=",")
time_analy, fn_analy = data[:, 0], data[:, 1]
plt.scatter(time_analy, fn_analy, label="Analytical")
# Labels and grid setup
plt.xlabel("Time (μs)")
plt.ylabel("Normal Force (kN)")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(os.path.join(self.output_dir, "fn_vs_time.pdf"))
plt.show()
if __name__ == "__main__":
# Running the DEMWallSimulation
app = DEMWallSimulation(output_dir="02_output")
# Create particles, walls, set up collision properties, and create solver
app.create_particles()
app.create_walls()
app.setup_collision_properties()
app.create_solver()
# Run the simulation
app.run()
# Post-process and compare with analytical data
app.postprocess(
# analytical_csv=os.path.join(
# "chung_8_validation_data",
# "02_elastic_normal_impact_of_sphere_with_plane_time_vs_fn_analytical_data.csv"
# )
)
System Information
Mac OS Arm. Warp 1.8.1, Python 3.12.11