diff --git a/jupyter_client/kernelspec.py b/jupyter_client/kernelspec.py
index 41ed2bad..86324223 100644
--- a/jupyter_client/kernelspec.py
+++ b/jupyter_client/kernelspec.py
@@ -168,6 +168,8 @@ def _user_kernel_dir_default(self) -> str:
         "whitelist": ("allowed_kernelspecs", "7.0"),
     }
 
+    _allow_insecure_kernelspec_params = False
+
     # Method copied from
     # https://github.com/jupyterhub/jupyterhub/blob/d1a85e53dccfc7b1dd81b0c1985d158cc6b61820/jupyterhub/auth.py#L143-L161
     @observe(*list(_deprecated_aliases))
@@ -228,6 +230,207 @@ def find_kernel_specs(self) -> dict[str, str]:
         return d
         # TODO: Caching?
 
+    def allow_insecure_kernelspec_params(self, allow_insecure_kernelspec_params):
+        self._allow_insecure_kernelspec_params = allow_insecure_kernelspec_params
+
+    def _check_parameterized_kernel(self, kspec: KernelSpec) -> KernelSpec:
+        is_secure = self.check_kernel_is_secure(kspec=kspec)
+        if is_secure == True:
+            if kspec.metadata and isinstance(kspec.metadata, dict):
+                kspec.metadata.update({"is_secure": True})
+            else:
+                kspec.metadata = {}
+                kspec.metadata.update({"is_secure": True})
+            return kspec  # a kernel spec is allowed
+        else:
+            if kspec.metadata and isinstance(kspec.metadata, dict):
+                kspec.metadata.update({"is_secure": False})
+            else:
+                kspec.metadata = {}
+                kspec.metadata.update({"is_secure": False})
+            if self._allow_insecure_kernelspec_params == True:
+                return kspec  # a kernel spec is allowed
+            else:
+                kspec_data = self.check_kernel_custom_all_default_values(kspec=kspec)
+
+                if kspec_data["all_have_default"] == True:
+                    return kspec_data["kspec"]  # a kernel spec is modyfied and is allowed
+                else:
+                    return None
+
+    def check_kernel_is_secure(self, kspec):
+        is_secure = False
+        total_sum_kernel_variables = self.get_argv_env_kernel_variables(kspec=kspec)
+        if (
+            kspec.metadata
+            and isinstance(kspec.metadata, dict)
+            and "parameters" in kspec.metadata
+            and isinstance(kspec.metadata["parameters"], dict)
+            and "properties" in kspec.metadata["parameters"]
+            and isinstance(kspec.metadata["parameters"]["properties"], dict)
+        ):
+            counter_secure_kernel_variables = self.get_count_secure_kernel_variables(
+                obj=kspec.metadata["parameters"], counter_secure_kernel_variables=0
+            )
+            if total_sum_kernel_variables > 0:
+                if counter_secure_kernel_variables == total_sum_kernel_variables:
+                    is_secure = True
+                else:
+                    is_secure = False
+            else:
+                is_secure = False
+        else:
+            # check if there are kernel variables even metadata.parameters are empty
+            if total_sum_kernel_variables > 0:
+                is_secure = False
+            else:
+                is_secure = True
+        return is_secure
+
+    def get_argv_env_kernel_variables(self, kspec):
+        total_sum_kernel_variables = 0
+        env = None
+        argv = None
+        sum_argv_kernel_variables = 0
+        sum_env_kernel_variables = 0
+        if hasattr(kspec, "env"):
+            env = kspec.env
+            sum_env_kernel_variables = self.get_count_all_kernel_variables(parameters=env)
+        if hasattr(kspec, "argv"):
+            argv = kspec.argv
+            sum_argv_kernel_variables = self.get_count_all_kernel_variables(parameters=argv)
+        total_sum_kernel_variables = sum_env_kernel_variables + sum_argv_kernel_variables
+        return total_sum_kernel_variables
+
+    def get_count_secure_kernel_variables(self, obj, counter_secure_kernel_variables):
+        is_secure = True
+        if "properties" in obj:
+            propetries = obj["properties"].items()
+            if len(propetries) > 0:
+                for property_key, property_value in propetries:
+                    if (
+                        property_value.get("type") == "string"
+                        or property_value.get("type") == "null"
+                    ):
+                        if property_value.get("enum"):
+                            counter_secure_kernel_variables = counter_secure_kernel_variables + 1
+                        else:
+                            is_secure = False
+                    elif property_value.get("type") == "array":
+                        print("Type of JSON Schema data is array and it is not supported now")
+                        is_secure = False
+                    elif property_value.get("enum"):
+                        counter_secure_kernel_variables = counter_secure_kernel_variables + 1
+                    elif property_value.get("type") == "object":
+                        counter_secure_kernel_variables = self.get_count_secure_kernel_variables(
+                            obj=obj, counter_secure_kernel_variables=counter_secure_kernel_variables
+                        )
+
+        if is_secure == False:
+            counter_secure_kernel_variables = 0
+
+        return counter_secure_kernel_variables
+
+    def get_count_all_kernel_variables(self, parameters):
+        sum = 0
+        if isinstance(parameters, list):
+            for argv_item in parameters:
+                is_variable = self.has_variable(argv_item)
+                if is_variable:
+                    sum = sum + 1
+        elif isinstance(parameters, dict):
+            for env_key, env_item in parameters.items():
+                is_variable = self.has_variable(env_item)
+                if is_variable:
+                    sum = sum + 1
+        return sum
+
+    def has_variable(self, string: str):
+        pattern = re.compile(r"\{connection_file\}")
+        match = pattern.match(string)
+        if match is None:
+            pattern = re.compile(r"\{([A-Za-z0-9_]+)\}")
+            matches = pattern.findall(string)
+            if len(matches) > 0:
+                return True
+            else:
+                return False
+        else:
+            return False
+
+    def check_kernel_custom_all_default_values(self, kspec):
+        if (
+            kspec.metadata
+            and isinstance(kspec.metadata, dict)
+            and "parameters" in kspec.metadata
+            and isinstance(kspec.metadata["parameters"], dict)
+            and "properties" in kspec.metadata["parameters"]
+            and isinstance(kspec.metadata["parameters"]["properties"], dict)
+        ):
+            has_default = True
+            propetries = kspec.metadata["parameters"]["properties"].items()
+
+            new_kspec = {}
+            for property_key, property_value in propetries:
+                if "default" in property_value:
+                    new_kspec = self.replaceByDefault(
+                        kspec, property_key, property_value["default"]
+                    )
+                else:
+                    has_default = False
+
+            if has_default == False:
+                result = {"kspec": kspec, "all_have_default": False}
+            else:
+                # check if there is anything after replacing
+                total_sum_kernel_variables = self.get_argv_env_kernel_variables(kspec=new_kspec)
+
+                if total_sum_kernel_variables > 0:
+                    result = {"kspec": kspec, "all_have_default": False}
+                else:
+                    result = {"kspec": new_kspec, "all_have_default": True}
+        else:
+            result = {"kspec": kspec, "all_have_default": False}
+        return result
+
+    def replace_spec_parameter(self, variable, value, spec) -> str:
+        regexp = r"\{" + variable + "\\}"
+        pattern = re.compile(regexp)
+        return pattern.sub(value, spec)
+
+    def replaceByDefault(self, kspec, kernel_variable, default_value):
+        new_env = {}
+        new_argv = []
+        if hasattr(kspec, "env"):
+            tmp_env = kspec.env.copy()
+            if "env" in tmp_env:
+                env = tmp_env.env
+                # check and replace env variables
+
+                for env_key, env_item in env.items():
+                    new_env_item = self.replace_spec_parameter(
+                        kernel_variable, default_value, env_item
+                    )
+                    new_env[env_key] = new_env_item
+
+                if len(new_env) > 0:
+                    tmp_env.update(new_env)
+                    kspec.env = tmp_env
+
+        # check and replace argv parameters
+        if hasattr(kspec, "argv") and kspec.argv is not None:
+            argv = kspec.argv.copy()
+            for argv_item in argv:
+                new_argv_item = self.replace_spec_parameter(
+                    kernel_variable, default_value, argv_item
+                )
+                new_argv.append(new_argv_item)
+
+            if len(new_argv) > 0:
+                argv = new_argv
+                kspec.argv = new_argv
+        return kspec
+
     def _get_kernel_spec_by_name(self, kernel_name: str, resource_dir: str) -> KernelSpec:
         """Returns a :class:`KernelSpec` instance for a given kernel_name
         and resource_dir.
@@ -249,7 +452,12 @@ def _get_kernel_spec_by_name(self, kernel_name: str, resource_dir: str) -> Kerne
         if not KPF.instance(parent=self.parent).is_provisioner_available(kspec):
             raise NoSuchKernel(kernel_name)
 
-        return kspec
+        kspec = self._check_parameterized_kernel(kspec)
+
+        if kspec is not None:
+            return kspec
+        else:
+            return None
 
     def _find_spec_directory(self, kernel_name: str) -> str | None:
         """Find the resource directory of a named kernel spec"""
@@ -310,8 +518,8 @@ def get_all_specs(self) -> dict[str, t.Any]:
                     # which may have overridden find_kernel_specs
                     # and get_kernel_spec, but not the newer get_all_specs
                     spec = self.get_kernel_spec(kname)
-
-                res[kname] = {"resource_dir": resource_dir, "spec": spec.to_dict()}
+                if spec != None:
+                    res[kname] = {"resource_dir": resource_dir, "spec": spec.to_dict()}
             except NoSuchKernel:
                 pass  # The appropriate warning has already been logged
             except Exception:
diff --git a/jupyter_client/launcher.py b/jupyter_client/launcher.py
index f0d07ad1..f361641e 100644
--- a/jupyter_client/launcher.py
+++ b/jupyter_client/launcher.py
@@ -59,6 +59,10 @@ def launch_kernel(
     # If this process has been backgrounded, our stdin is invalid. Since there
     # is no compelling reason for the kernel to inherit our stdin anyway, we'll
     # place this one safe and always redirect.
+
+    if "custom_kernel_specs" in kw:
+        del kw["custom_kernel_specs"]
+
     redirect_in = True
     _stdin = PIPE if stdin is None else stdin
 
diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py
index 088acd6c..b9beac83 100644
--- a/jupyter_client/manager.py
+++ b/jupyter_client/manager.py
@@ -228,6 +228,8 @@ def ipykernel(self) -> bool:
 
     shutting_down: bool = False
 
+    custom_kernel_default_value: dict = {}
+
     def __del__(self) -> None:
         self._close_control_socket()
         self.cleanup_connection_file()
@@ -292,20 +294,118 @@ def update_env(self, *, env: t.Dict[str, str]) -> None:
 
         .. version-added: 8.5
         """
+
         # Mypy think this is unreachable as it see _launch_args as Dict, not t.Dict
         if (
             isinstance(self._launch_args, dict)
             and "env" in self._launch_args
             and isinstance(self._launch_args["env"], dict)  # type: ignore [unreachable]
         ):
+            # if self._launch_args["env"] has custom kernel variable for env but env does not have then we have to fill env with it
+            if "custom_kernel_specs" in self._launch_args:
+                saved_env = self._launch_args.get("env", {})
+                custom_kernel_dict = self._launch_args["custom_kernel_specs"]
+                if isinstance(custom_kernel_dict, dict):
+                    for key, value in custom_kernel_dict.items():
+                        if key in saved_env and key not in env:
+                            env[key] = saved_env[key]
+
+            # check whether env has custom kernel spec variables
+            env = self.update_custom_env_parameters(env=env)
+
             self._launch_args["env"].update(env)  # type: ignore [unreachable]
 
+    def update_custom_env_parameters(self, env: t.Dict[str, str]) -> t.Dict[str, str]:
+        newEnv = {}
+        custom_kernel_dict = {}
+        if "custom_kernel_specs" in self._launch_args:
+            custom_kernel_dict = self._launch_args["custom_kernel_specs"]
+            # check is custom kernel variables are full if not then we should take default ones
+            if self.custom_kernel_default_value:
+                for key, value in self.custom_kernel_default_value.items():
+                    if isinstance(custom_kernel_dict, dict) and key not in custom_kernel_dict:
+                        custom_kernel_dict[key] = value
+        elif self.custom_kernel_default_value:
+            # if not but default values are present into a kernel.json file then we have to take them
+            custom_kernel_dict = self.custom_kernel_default_value
+
+        if isinstance(custom_kernel_dict, dict) and len(custom_kernel_dict) > 0:
+            for custom_kernel_spec, custom_kernel_spec_value in custom_kernel_dict.items():
+                for env_key, env_item in env.items():
+                    kernel_spec_item = self.replace_spec_parameter(
+                        custom_kernel_spec, custom_kernel_spec_value, env_item
+                    )
+                    newEnv[env_key] = kernel_spec_item
+        else:
+            # check whether there are custom kernel spec variables into kernel.json,
+            # if yes but a user has not configured them and default ones are not present ,
+            # we should clean them
+            newEnv = self.clear_custom_kernel_parameters(env)
+
+        if len(newEnv) > 0:
+            env = self.clear_custom_kernel_parameters(newEnv)
+        else:
+            env = self.clear_custom_kernel_parameters(env)
+
+        return env
+
+    def replace_spec_parameter(self, variable, value, spec) -> str:
+        regexp = r"\{" + variable + "\\}"
+        pattern = re.compile(regexp)
+        return pattern.sub(value, spec)
+
+    def check_existence_custom_kernel_spec(self, item: str):
+        pattern = re.compile(r"\{([A-Za-z0-9_]+)\}")
+        matches = pattern.findall(item)
+        isMatch = False
+        if len(matches) > 0:
+            isMatch = True
+        return isMatch
+
+    # Clear kernel specs files if user has not configured them themselves
+    # we should return only that has not kernel custom variables
+    # if there are no metadata specification for custom kernel
+
+    def clear_custom_kernel_parameters(self, kernel_parameters: t.Any) -> t.Any:
+        clean_parameters = None
+        if isinstance(kernel_parameters, list):
+            clean_parameters = []
+            for argv_item in kernel_parameters:
+                isMatch = self.check_existence_custom_kernel_spec(argv_item)
+                if not isMatch:
+                    clean_parameters.append(argv_item)
+        elif isinstance(kernel_parameters, dict):
+            clean_parameters = {}
+            for env_key, env_item in kernel_parameters.items():
+                isMatch = self.check_existence_custom_kernel_spec(env_item)
+                if not isMatch:
+                    clean_parameters[env_key] = env_item
+        if len(clean_parameters) == 0:
+            clean_parameters = kernel_parameters
+        return clean_parameters
+
+    def get_default_custom_kernel_specs_value(self):
+        assert self.kernel_spec is not None
+        custom_kernel_default_value = {}
+        if (
+            self.kernel_spec.metadata
+            and isinstance(self.kernel_spec.metadata, dict)
+            and "parameters" in self.kernel_spec.metadata
+            and isinstance(self.kernel_spec.metadata["parameters"], dict)
+            and "properties" in self.kernel_spec.metadata["parameters"]
+            and isinstance(self.kernel_spec.metadata["parameters"]["properties"], dict)
+        ):
+            propetries = self.kernel_spec.metadata["parameters"]["properties"].items()
+            for property_key, property_value in propetries:
+                if "default" in property_value:
+                    custom_kernel_default_value[property_key] = property_value["default"]
+        self.custom_kernel_default_value = custom_kernel_default_value
+
     def format_kernel_cmd(self, extra_arguments: t.Optional[t.List[str]] = None) -> t.List[str]:
         """Replace templated args (e.g. {connection_file})"""
         extra_arguments = extra_arguments or []
         assert self.kernel_spec is not None
         cmd = self.kernel_spec.argv + extra_arguments
-
         if cmd and cmd[0] in {
             "python",
             "python%i" % sys.version_info[0],
@@ -329,10 +429,25 @@ def format_kernel_cmd(self, extra_arguments: t.Optional[t.List[str]] = None) ->
             "prefix": sys.prefix,
         }
 
+        # Updating ns if there are custom kernel specs variables
+        custom_kernel_dict = {}
+        if "custom_kernel_specs" in self._launch_args:
+            custom_kernel_dict = self._launch_args["custom_kernel_specs"]
+            if self.custom_kernel_default_value:
+                for key, value in self.custom_kernel_default_value.items():
+                    if isinstance(custom_kernel_dict, dict) and key not in custom_kernel_dict:
+                        custom_kernel_dict[key] = value
+        elif self.custom_kernel_default_value:
+            # if not but default values are present into a kernel.json file then we have to take them
+            custom_kernel_dict = self.custom_kernel_default_value
+
+        if isinstance(custom_kernel_dict, dict) and len(custom_kernel_dict) > 0:
+            for custom_kernel_spec_key, custom_kernel_spec_value in custom_kernel_dict.items():
+                ns[custom_kernel_spec_key] = custom_kernel_spec_value
+
         if self.kernel_spec:  # type:ignore[truthy-bool]
             ns["resource_dir"] = self.kernel_spec.resource_dir
         assert isinstance(self._launch_args, dict)
-
         ns.update(self._launch_args)
 
         pat = re.compile(r"\{([A-Za-z0-9_]+)\}")
@@ -350,6 +465,7 @@ async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> No
         Note that provisioners can now be used to customize kernel environments
         and
         """
+        #
         assert self.provisioner is not None
         connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw)
         assert self.provisioner.has_process
@@ -391,6 +507,8 @@ async def _async_pre_start_kernel(
         # save kwargs for use in restart
         # assigning Traitlets Dicts to Dict make mypy unhappy but is ok
         self._launch_args = kw.copy()  # type:ignore [assignment]
+        #
+
         if self.provisioner is None:  # will not be None on restarts
             self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance(
                 self.kernel_id,
@@ -398,7 +516,14 @@ async def _async_pre_start_kernel(
                 parent=self,
             )
         kw = await self.provisioner.pre_launch(**kw)
+        # update env
+        if "env" in kw:
+            kw["env"] = self.update_custom_env_parameters(env=kw["env"])
+            self._launch_args["env"].update(kw["env"])
         kernel_cmd = kw.pop("cmd")
+        if "custom_kernel_specs" in kw:
+            del kw["custom_kernel_specs"]
+
         return kernel_cmd, kw
 
     pre_start_kernel = run_sync(_async_pre_start_kernel)
@@ -432,6 +557,7 @@ async def _async_start_kernel(self, **kw: t.Any) -> None:
              and launching the kernel (e.g. Popen kwargs).
         """
         self._attempted_start = True
+
         kernel_cmd, kw = await self._async_pre_start_kernel(**kw)
 
         # launch the kernel subprocess
@@ -549,7 +675,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
             # the kernel a chance to properly execute shutdown actions. Wait for at
             # most 1s, checking every 0.1s.
             await self._async_finish_shutdown(restart=restart)
-
         await self._async_cleanup_resources(restart=restart)
 
     shutdown_kernel = run_sync(_async_shutdown_kernel)
@@ -593,6 +718,7 @@ async def _async_restart_kernel(
 
         # Start new kernel.
         self._launch_args.update(kw)
+
         await self._async_start_kernel(**self._launch_args)
 
     restart_kernel = run_sync(_async_restart_kernel)
diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py
index d14a3f84..268edc23 100644
--- a/jupyter_client/multikernelmanager.py
+++ b/jupyter_client/multikernelmanager.py
@@ -6,6 +6,7 @@
 import asyncio
 import json
 import os
+import re
 import socket
 import typing as t
 import uuid
@@ -222,12 +223,14 @@ def update_env(self, *, kernel_id: str, env: t.Dict[str, str]) -> None:
 
         .. version-added: 8.5
         """
+
         if kernel_id in self:
             self._kernels[kernel_id].update_env(env=env)
 
     async def _add_kernel_when_ready(
         self, kernel_id: str, km: KernelManager, kernel_awaitable: t.Awaitable
     ) -> None:
+        #
         try:
             await kernel_awaitable
             self._kernels[kernel_id] = km
@@ -251,6 +254,25 @@ def _using_pending_kernels(self) -> bool:
         """
         return getattr(self, "use_pending_kernels", False)
 
+    def validate(self, string) -> str:
+        sanitazed_string = re.sub(r"[;&|$#]", "", string)
+        match = re.match(r"'", sanitazed_string)
+        if match:
+            sanitazed_string = "'" + re.sub(r"'", "'''", sanitazed_string) + "'"
+        return sanitazed_string
+
+    def validate_kernel_parameters(self, kwargs: t.Any) -> None:
+        if "custom_kernel_specs" in kwargs:
+            custom_kernel_specs = kwargs.get("custom_kernel_specs")
+            if custom_kernel_specs is not None:
+                for custom_kernel_spec, custom_kernel_spec_value in kwargs[
+                    "custom_kernel_specs"
+                ].items():
+                    sanitazed_string = self.validate(custom_kernel_spec_value)
+                    if sanitazed_string != "":
+                        kwargs["custom_kernel_specs"][custom_kernel_spec] = sanitazed_string
+        return kwargs
+
     async def _async_start_kernel(self, *, kernel_name: str | None = None, **kwargs: t.Any) -> str:
         """Start a new kernel.
 
@@ -259,6 +281,19 @@ async def _async_start_kernel(self, *, kernel_name: str | None = None, **kwargs:
 
         The kernel ID for the newly started kernel is returned.
         """
+
+        if "custom_kernel_specs" in kwargs:
+            custom_kernel_specs = kwargs.get("custom_kernel_specs")
+            if custom_kernel_specs is None or (
+                isinstance(custom_kernel_specs, dict) and len(custom_kernel_specs) == 0
+            ):
+                del kwargs["custom_kernel_specs"]
+        if hasattr(self, "_launch_args") and self._launch_args:
+            if "custom_kernel_specs" in self._launch_args:
+                if "custom_kernel_specs" not in kwargs:
+                    del self._launch_args["custom_kernel_specs"]
+
+        kwargs = self.validate_kernel_parameters(kwargs)
         km, kernel_name, kernel_id = self.pre_start_kernel(kernel_name, kwargs)
         if not isinstance(km, KernelManager):
             self.log.warning(  # type:ignore[unreachable]
diff --git a/jupyter_client/provisioning/local_provisioner.py b/jupyter_client/provisioning/local_provisioner.py
index 42d8d32d..a5587795 100644
--- a/jupyter_client/provisioning/local_provisioner.py
+++ b/jupyter_client/provisioning/local_provisioner.py
@@ -155,6 +155,7 @@ async def cleanup(self, restart: bool = False) -> None:
                 lpc.return_port(port)
 
     async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
+        #
         """Perform any steps in preparation for kernel process launch.
 
         This includes applying additional substitutions to the kernel launch command and env.
@@ -166,6 +167,9 @@ async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
         # This should be considered temporary until a better division of labor can be defined.
         km = self.parent
         if km:
+            # Get default values from kernel.json file if there is a custom kernel
+            km.get_default_custom_kernel_specs_value()
+
             if km.transport == "tcp" and not is_local_ip(km.ip):
                 msg = (
                     "Can only launch a kernel on a local interface. "
@@ -189,6 +193,9 @@ async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
                 km.control_port = lpc.find_available_port(km.ip)
                 self.ports_cached = True
             if "env" in kwargs:
+                # update env if there is custom kernel specs variables for env
+                km.update_env(env=kwargs["env"])
+
                 jupyter_session = kwargs["env"].get("JPY_SESSION_NAME", "")
                 km.write_connection_file(jupyter_session=jupyter_session)
             else:
@@ -202,6 +209,11 @@ async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
             extra_arguments = kwargs.pop("extra_arguments", [])
             kernel_cmd = self.kernel_spec.argv + extra_arguments
 
+        kernel_cmd = km.clear_custom_kernel_parameters(kernel_cmd)
+        print("cmd--------", kernel_cmd)
+
+        if "custom_kernel_specs" in kwargs:
+            del kwargs["custom_kernel_specs"]
         return await super().pre_launch(cmd=kernel_cmd, **kwargs)
 
     async def launch_kernel(self, cmd: List[str], **kwargs: Any) -> KernelConnectionInfo:
@@ -222,7 +234,7 @@ async def launch_kernel(self, cmd: List[str], **kwargs: Any) -> KernelConnection
     @staticmethod
     def _scrub_kwargs(kwargs: Dict[str, Any]) -> Dict[str, Any]:
         """Remove any keyword arguments that Popen does not tolerate."""
-        keywords_to_scrub: List[str] = ["extra_arguments", "kernel_id"]
+        keywords_to_scrub: List[str] = ["extra_arguments", "kernel_id", "custom_kernel_specs"]
         scrubbed_kwargs = kwargs.copy()
         for kw in keywords_to_scrub:
             scrubbed_kwargs.pop(kw, None)
diff --git a/jupyter_client/provisioning/provisioner_base.py b/jupyter_client/provisioning/provisioner_base.py
index eff89432..51864896 100644
--- a/jupyter_client/provisioning/provisioner_base.py
+++ b/jupyter_client/provisioning/provisioner_base.py
@@ -155,8 +155,11 @@ async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
         :meth:`launch_kernel()`.
         """
         env = kwargs.pop("env", os.environ).copy()
+        # here!!!
         env.update(self.__apply_env_substitutions(env))
+
         self._finalize_env(env)
+
         kwargs["env"] = env
 
         return kwargs