From d8b0d2bbc78118bc3271a1df9418cb2b3c8e9889 Mon Sep 17 00:00:00 2001 From: Felipe Correa Date: Tue, 5 Mar 2024 19:46:51 -0300 Subject: [PATCH] =?UTF-8?q?Adiciona=20timeout=20para=20envio=20e=20status?= =?UTF-8?q?=20servico=20da=20comunica=C3=A7=C3=A3o=20sefaz=20(#328)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pynfe/processamento/comunicacao.py | 55 ++++++++++++++++-------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/pynfe/processamento/comunicacao.py b/pynfe/processamento/comunicacao.py index 3c74d8ae..8ab1a4e4 100644 --- a/pynfe/processamento/comunicacao.py +++ b/pynfe/processamento/comunicacao.py @@ -1,27 +1,30 @@ # -*- coding: utf-8 -*- -import re import datetime +import re + import requests + +from pynfe.entidades.certificado import CertificadoA1 from pynfe.utils import etree, so_numeros from pynfe.utils.flags import ( + CODIGOS_ESTADOS, + MODELO_MDFE, + NAMESPACE_BETHA, + NAMESPACE_CTE, + NAMESPACE_CTE_METODO, + NAMESPACE_MDFE, + NAMESPACE_MDFE_METODO, + NAMESPACE_METODO, NAMESPACE_NFE, + NAMESPACE_SOAP, NAMESPACE_XSD, NAMESPACE_XSI, - NAMESPACE_MDFE, - NAMESPACE_MDFE_METODO, - MODELO_MDFE, + VERSAO_CTE, VERSAO_MDFE, VERSAO_PADRAO, - NAMESPACE_SOAP, - CODIGOS_ESTADOS, - NAMESPACE_BETHA, - NAMESPACE_METODO, - NAMESPACE_CTE, - VERSAO_CTE, - NAMESPACE_CTE_METODO, ) -from pynfe.utils.webservices import NFE, NFCE, NFSE, MDFE, CTE -from pynfe.entidades.certificado import CertificadoA1 +from pynfe.utils.webservices import CTE, MDFE, NFCE, NFE, NFSE + from .assinatura import AssinaturaA1 @@ -51,7 +54,7 @@ class ComunicacaoSefaz(Comunicacao): _assinatura = AssinaturaA1 def autorizacao( - self, modelo, nota_fiscal, id_lote=1, ind_sinc=1, contingencia=False + self, modelo, nota_fiscal, id_lote=1, ind_sinc=1, contingencia=False, timeout=None ): """ Método para realizar autorização da nota de acordo com o modelo @@ -64,24 +67,20 @@ def autorizacao( Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário. """ # url do serviço - url = self._get_url( - modelo=modelo, consulta="AUTORIZACAO", contingencia=contingencia - ) + url = self._get_url(modelo=modelo, consulta="AUTORIZACAO", contingencia=contingencia) # Monta XML do corpo da requisição raiz = etree.Element("enviNFe", xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO) etree.SubElement(raiz, "idLote").text = str( id_lote ) # numero autoincremental gerado pelo sistema - etree.SubElement(raiz, "indSinc").text = str( - ind_sinc - ) # 0 para assincrono, 1 para sincrono + etree.SubElement(raiz, "indSinc").text = str(ind_sinc) # 0 para assincrono, 1 para sincrono raiz.append(nota_fiscal) # Monta XML para envio da requisição xml = self._construir_xml_soap("NFeAutorizacao4", raiz) # Faz request no Servidor da Sefaz - retorno = self._post(url, xml) + retorno = self._post(url, xml, timeout) # Em caso de sucesso, retorna xml com nfe e protocolo de autorização. # Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário. @@ -302,7 +301,7 @@ def evento(self, modelo, evento, id_lote=1): xml = self._construir_xml_soap("NFeRecepcaoEvento4", raiz) return self._post(url, xml) - def status_servico(self, modelo): + def status_servico(self, modelo, timeout=None): """ Verifica status do servidor da receita. :param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce @@ -315,7 +314,7 @@ def status_servico(self, modelo): etree.SubElement(raiz, "cUF").text = CODIGOS_ESTADOS[self.uf.upper()] etree.SubElement(raiz, "xServ").text = "STATUS" xml = self._construir_xml_soap("NFeStatusServico4", raiz) - return self._post(url, xml) + return self._post(url, xml, timeout) def inutilizacao( self, @@ -585,7 +584,7 @@ def _post_header(self): response["SOAPAction"] = "" return response - def _post(self, url, xml): + def _post(self, url, xml, timeout=None): certificado_a1 = CertificadoA1(self.certificado) chave, cert = certificado_a1.separar_arquivo( self.certificado_senha, caminho=True @@ -607,7 +606,12 @@ def _post(self, url, xml): xml = xml_declaration + xml # Faz o request com o servidor result = requests.post( - url, xml, headers=self._post_header(), cert=chave_cert, verify=False + url, + xml, + headers=self._post_header(), + cert=chave_cert, + verify=False, + timeout=timeout, ) result.encoding = "utf-8" return result @@ -815,6 +819,7 @@ def _post_https(self, url, xml, metodo): # comunicacao wsdl try: from suds.client import Client + from pynfe.utils.https_nfse import HttpAuthenticated certificadoA1 = CertificadoA1(self.certificado)