diff --git a/include/opcua.hrl b/include/opcua.hrl index 7198a57..e4385bc 100644 --- a/include/opcua.hrl +++ b/include/opcua.hrl @@ -40,7 +40,37 @@ -define(OBJ_SERVER_TYPE, 2004). -define(OBJ_SERVER_STATUS_TYPE, 2138). - +% Attribute Id +% ------------ +% Every node in an OPC UA information model contains attributes depending on +% the node type. Possible attributes are as follows: +-define(UA_ATTRIBUTEID_NODEID, 1). +-define(UA_ATTRIBUTEID_NODECLASS, 2). +-define(UA_ATTRIBUTEID_BROWSENAME, 3). +-define(UA_ATTRIBUTEID_DISPLAYNAME, 4). +-define(UA_ATTRIBUTEID_DESCRIPTION, 5). +-define(UA_ATTRIBUTEID_WRITEMASK, 6). +-define(UA_ATTRIBUTEID_USERWRITEMASK, 7). +-define(UA_ATTRIBUTEID_ISABSTRACT, 8). +-define(UA_ATTRIBUTEID_SYMMETRIC, 9). +-define(UA_ATTRIBUTEID_INVERSENAME, 10). +-define(UA_ATTRIBUTEID_CONTAINSNOLOOPS, 11). +-define(UA_ATTRIBUTEID_EVENTNOTIFIER, 12). +-define(UA_ATTRIBUTEID_VALUE, 13). +-define(UA_ATTRIBUTEID_DATATYPE, 14). +-define(UA_ATTRIBUTEID_VALUERANK, 15). +-define(UA_ATTRIBUTEID_ARRAYDIMENSIONS, 16). +-define(UA_ATTRIBUTEID_ACCESSLEVEL, 17). +-define(UA_ATTRIBUTEID_USERACCESSLEVEL, 18). +-define(UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL,19). +-define(UA_ATTRIBUTEID_HISTORIZING, 20). +-define(UA_ATTRIBUTEID_EXECUTABLE, 21). +-define(UA_ATTRIBUTEID_USEREXECUTABLE, 22). +-define(UA_ATTRIBUTEID_DATATYPEDEFINITION, 23). +-define(UA_ATTRIBUTEID_ROLEPERMISSIONS, 24). +-define(UA_ATTRIBUTEID_USERROLEPERMISSIONS, 25). +-define(UA_ATTRIBUTEID_ACCESSRESTRICTIONS, 26). +-define(UA_ATTRIBUTEID_ACCESSLEVELEX, 27). %%% TYPES %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %-- OPCUA Types Records -------------------------------------------------------- diff --git a/src/opcua_codec.erl b/src/opcua_codec.erl index 320237d..08c9f1f 100644 --- a/src/opcua_codec.erl +++ b/src/opcua_codec.erl @@ -24,6 +24,7 @@ -export([unpack_type/3]). -export([unpack_enum/2]). -export([unpack_option_set/2]). +-export([unpack_variant/1]). -export([builtin_type_name/1]). -export([builtin_type_id/1]). @@ -119,6 +120,18 @@ unpack_option_set(#opcua_option_set{fields = Fields}, Value) -> end, [], Fields), lists:reverse(FieldNames). +-spec unpack_variant(opcua:variant()) -> term(). +unpack_variant(#opcua_variant{type = extension_object, + value = #opcua_extension_object{ + type_id = DataTypeID, + body = Data + }}) -> + % Not sure what to do here ... + % For now I just extract the type and value + {DataTypeID, Data}; +unpack_variant(#opcua_variant{type = _, value = _}) -> + error(bad_not_implemented). + builtin_type_name( 1) -> boolean; builtin_type_name( 2) -> sbyte; builtin_type_name( 3) -> byte; diff --git a/src/opcua_pubsub.erl b/src/opcua_pubsub.erl new file mode 100644 index 0000000..39822a7 --- /dev/null +++ b/src/opcua_pubsub.erl @@ -0,0 +1,140 @@ +-module(opcua_pubsub). + +-export([start_link/0]). + + +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). + + +-export([add_published_dataset/1]). +-export([add_published_dataset_field/3]). + +-export([new_connection/3]). + +-export([add_reader_group/2]). +-export([add_writer_group/2]). + +-export([add_dataset_reader/3]). +-export([create_target_variables/4]). + +-export([add_dataset_writer/4]). + +-export([start_connection/1]). +-export([stop_connection/1]). +-export([register_connection/1]). +-export([get_published_dataset/1]). + +-include("opcua_pubsub.hrl"). + +-record(state, { + state, + connections = #{},% Maps Ids to Pids + published_datasets = #{} +}). + +%%% API FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +start_connection(Connection) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Connection}). + +stop_connection(ConnectionID) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, ConnectionID}). + +% Publised Data Set configuration: PDS are independent +add_published_dataset(Config) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Config}). + +% Adds definitions per-field for a PublishedDataSet +add_published_dataset_field(PDS_ID, FieldsMetaData, FieldsSource) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, PDS_ID, FieldsMetaData, FieldsSource}). + +get_published_dataset(PDS_ID) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, PDS_ID}). + +new_connection(Url, Config, Opts) -> + opcua_pubsub_connection:create(Url, Config, Opts). + +% Just a place to group DataSetReaders +add_reader_group(Connection, Config) -> + opcua_pubsub_connection:add_reader_group(Config, Connection). + +add_writer_group(Connection, Config) -> + opcua_pubsub_connection:add_writer_group(Config, Connection). + +% define a DataSetReader, this includes its DataSetFieldMetaData +add_dataset_reader(Connection, RG_id, DSR_cfg) -> + opcua_pubsub_connection:add_dataset_reader(RG_id, DSR_cfg, Connection). + +% Add target variables to tell a DataSetReader where to write the decoded Fields +create_target_variables(Connection, RG_id, DSR_id, Cfg) -> + opcua_pubsub_connection:create_target_variables(RG_id, DSR_id, Cfg, Connection). + +add_dataset_writer(Connection, WG_id, PDS_id, DWR_cfg) -> + opcua_pubsub_connection:add_dataset_writer(WG_id, PDS_id, DWR_cfg, Connection). + +% INTERNAL API %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +register_connection(ID) -> + gen_server:cast(?MODULE, {?FUNCTION_NAME, ID, self()}). + +% GEN_SERVER callbacks %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +init([]) -> + {ok, #state{state = operational}}. + +handle_call({start_connection, ConnectionConfig}, _, S) -> + ID = uuid:get_v4(), + {ok, _} = supervisor:start_child(opcua_pubsub_connection_sup, [ID, ConnectionConfig]), + {reply, {ok, ID}, S}; +handle_call({stop_connection, ConnectionID}, _, #state{connections = Conns} = S) -> + Pid = maps:get(ConnectionID, Conns), + ok = supervisor:terminate_child(opcua_pubsub_connection_sup, Pid), + NewMap = maps:remove(ConnectionID, Conns), + {reply, ok, S#state{connections = NewMap}}; +handle_call({add_published_dataset, Config}, _, #state{published_datasets = PDSs} = S) -> + PDS = h_add_published_dataset(Config), + ID = uuid:get_v4(), + NewMap = maps:put(ID, PDS, PDSs), + {reply, {ok, ID}, S#state{published_datasets = NewMap}}; +handle_call({add_published_dataset_field, PDS_id, FieldsMetadata, FieldsSources}, + _, #state{published_datasets = PDSs} = S) -> + PDS = maps:get(PDS_id, PDSs), + NewPDS = h_add_published_dataset_field(PDS, FieldsMetadata, FieldsSources), + NewMap = maps:put(PDS_id, NewPDS, PDSs), + {reply, ok, S#state{published_datasets = NewMap}}; +handle_call({get_published_dataset, PDS_ID}, + _, #state{published_datasets = PublishedDatasets} = S) -> + {reply, maps:get(PDS_ID, PublishedDatasets), S}. + +handle_cast({register_connection, ID, Pid}, #state{connections = Conns} = State) -> + {noreply, State#state{connections = maps:put(ID, Pid, Conns)}}. + +handle_info(_, S) -> + {noreply, S}. + + +%%% INTERNAL FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +h_add_published_dataset(Config) -> + % TODO add the PDS as object to the address space + Config. + +h_add_published_dataset_field( + #published_dataset{ + dataset_metadata = #dataset_metadata{fields = MDFields} = DM, + dataset_source = DSSource + } = PDS, + FieldsMetaData, FieldsSource) -> + + % TODO do more then just copy the provided configuration + % check for correctness in the config + % and show this stuff in the address space + PDS#published_dataset{ + dataset_metadata = DM#dataset_metadata{ + fields = MDFields ++ FieldsMetaData + }, + dataset_source = DSSource ++ FieldsSource + }. \ No newline at end of file diff --git a/src/opcua_pubsub.hrl b/src/opcua_pubsub.hrl new file mode 100644 index 0000000..07a8316 --- /dev/null +++ b/src/opcua_pubsub.hrl @@ -0,0 +1,141 @@ +-define(UA_PUBLISHERIDTYPE_BYTE,0). +-define(UA_PUBLISHERIDTYPE_UINT16,1). +-define(UA_PUBLISHERIDTYPE_UINT32,2). +-define(UA_PUBLISHERIDTYPE_UINT64,3). +-define(UA_PUBLISHERIDTYPE_STRING,4). + +-type pubsub_state_machine() :: operational | error | enabled | paused. + +-record(connection_config, { + publisher_id, + publisher_id_type +}). + +-record(dataset_mirror,{}). + +-record(target_variable,{ + dataset_field_id = 0 :: non_neg_integer(), + receiver_index_range, + target_node_id, % node_id to write to + attribute_id, % attribute to write + write_index_range, + override_value_handling, + override_value +}). + +-record(dataset_field_metadata,{ + name :: binary(), + description :: undefined | binary(), + field_flags, % This flag indicates if the field is promoted to the NetworkMessage header + builtin_type :: opcua:builtin_type(), + data_type :: opcua:node_id(), + valueRank :: integer(), + array_dimensions, + maxStringLength, + dataset_field_id = 0 :: non_neg_integer(), + properties +}). + +-record(dataset_metadata,{ + name, + description, + fields = [] :: list(#dataset_field_metadata{}), + dataset_class_id, + configuration_version :: undefined | {non_neg_integer(),non_neg_integer()} +}). + +-record(dataset_reader_config,{ + name :: binary(), + publisher_id, + publisher_id_type, + writer_group_id, + dataset_writer_id, + dataset_metadata :: #dataset_metadata{} +}). + +-record(published_variable,{ + published_variable, + attribute_id, + sampling_interval_hint = -1, + deadband_type = 0 :: 0 | 1 | 2, + deadband_value = 0.0 :: float(), + index_rande, + substitute_value, + metadata_properties = [] +}). + +-record(published_events, { + event_notifier :: opcua:node_id(), + selected_fields :: list(), + filter +}). + +-type published_dataset_source() :: list(#published_variable{}) | + #published_events{}. + +-record(published_dataset,{ + name, + dataset_folder :: undefined | list(binary()),% path to the destination folder + dataset_metadata :: #dataset_metadata{}, + extension_fields, + dataset_source = [] :: published_dataset_source() +}). + +-define(UADP_NET_MSG_CONTENT_MASK_PUBLISHER_ID, (1 bsl 0)). +-define(UADP_NET_MSG_CONTENT_MASK_GROUP_HEADER, (1 bsl 1)). +-define(UADP_NET_MSG_CONTENT_MASK_WRITER_GROUP_ID, (1 bsl 2)). +-define(UADP_NET_MSG_CONTENT_MASK_GROUP_VERSION, (1 bsl 3)). +-define(UADP_NET_MSG_CONTENT_MASK_NET_MSG_NUM, (1 bsl 4)). +-define(UADP_NET_MSG_CONTENT_MASK_SEQ_NUM, (1 bsl 5)). +-define(UADP_NET_MSG_CONTENT_MASK_PAYLOAD_HEADER, (1 bsl 6)). +-define(UADP_NET_MSG_CONTENT_MASK_TIMESTAMP, (1 bsl 7)). +-define(UADP_NET_MSG_CONTENT_MASK_PICOSECONDS, (1 bsl 8)). +-define(UADP_NET_MSG_CONTENT_MASK_DATASET_CLASSID, (1 bsl 9)). +-define(UADP_NET_MSG_CONTENT_MASK_PROMOTED_FIELDS, (1 bsl 10)). + +-define(DEFAULT_NET_MSG_CONTENT, + ?UADP_NET_MSG_CONTENT_MASK_PUBLISHER_ID + bor ?UADP_NET_MSG_CONTENT_MASK_GROUP_HEADER + bor ?UADP_NET_MSG_CONTENT_MASK_WRITER_GROUP_ID + bor ?UADP_NET_MSG_CONTENT_MASK_PAYLOAD_HEADER). + +-record(uadp_writer_group_message_data,{ + groupVersion, + dataSetOrdering, + networkMessageContentMask = ?DEFAULT_NET_MSG_CONTENT, + samplingOffset, + publishingOffset +}). + +-record(writer_group_config,{ + enabled = true :: boolean(), + name, + writer_group_id, + publishing_interval, + keep_alive_time, + priority, + locale_ids, + transport_settings, + message_settings = #uadp_writer_group_message_data{} +}). + +-define(UADP_DATA_SET_FIELD_MASK_TIMESTAMP, 1). +-define(UADP_DATA_SET_FIELD_MASK_PICOSECONDS, (1 bsl 1)). +-define(UADP_DATA_SET_FIELD_MASK_STATUS, (1 bsl 2)). +-define(UADP_DATA_SET_FIELD_MASK_MAJORVERSION, (1 bsl 3)). +-define(UADP_DATA_SET_FIELD_MASK_MINORVERSION, (1 bsl 4)). +-define(UADP_DATA_SET_FIELD_MASK_SEQUENCENUMBER, (1 bsl 5)). + +-define(DEFAULT_DATA_SET_FIELD_CONTENT, + ?UADP_DATA_SET_FIELD_MASK_TIMESTAMP). + +-record(dataset_writer_config,{ + name :: binary(), + dataset_writer_id :: non_neg_integer(), + dataset_field_content_mask = ?DEFAULT_DATA_SET_FIELD_CONTENT, + keyframe_count = 1 :: non_neg_integer(), + dataset_name :: undefined | binary(), + transport_settings, + message_settings +}). + diff --git a/src/opcua_pubsub_connection.erl b/src/opcua_pubsub_connection.erl new file mode 100644 index 0000000..80e94db --- /dev/null +++ b/src/opcua_pubsub_connection.erl @@ -0,0 +1,181 @@ +-module(opcua_pubsub_connection). + +-export([create/3]). +-export([add_reader_group/2]). +-export([add_dataset_reader/3]). +-export([create_target_variables/4]). + +-export([add_writer_group/2]). +-export([add_dataset_writer/4]). + +-export([start_link/2]). + +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). + +-include("opcua_pubsub.hrl"). + +-record(state, { + id, + uri, + transport_config, + publisher_id, + publisher_id_type, + middleware, + reader_groups = #{}, + writer_groups = #{} +}). + + +% CONFIGURATION API %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% These help to build the initial Connection process state +% which holds the settings of all pubsub sub-entities + +create(Url, + #connection_config{publisher_id = PublisherId, + publisher_id_type = PublisherIdType}, + TransportOpts) -> + Uri = uri_string:parse(Url), + Config2 = maps:merge(default_config(), TransportOpts), + Config3 = maps:merge(Config2, #{uri => Uri}), + {ok, #state{uri = Uri, + transport_config = Config3, + publisher_id = PublisherId, + publisher_id_type = PublisherIdType}}. + +add_reader_group(ReaderGroupCfg, #state{reader_groups = RG} = S) -> + RG_id = uuid:get_v4(), + {ok, ReaderGroup} = opcua_pubsub_reader_group:new(ReaderGroupCfg), + RG2 = maps:put(RG_id, ReaderGroup, RG), + {ok, RG_id, S#state{reader_groups = RG2}}. + +add_dataset_reader(RG_id, DSR_cfg, #state{reader_groups = RGs} = S) -> + RG = maps:get(RG_id, RGs), + {ok, DSR_id, NewRG} = opcua_pubsub_reader_group:add_dataset_reader(DSR_cfg, RG), + NewGroups = maps:put(RG_id, NewRG, RGs), + {ok, DSR_id, S#state{reader_groups = NewGroups}}. + +create_target_variables(RG_id, DSR_id, Config, #state{reader_groups = RGs} = S) -> + RG = maps:get(RG_id, RGs), + {ok, NewRG} = opcua_pubsub_reader_group:create_target_variables(DSR_id, Config, RG), + NewGroups = maps:put(RG_id, NewRG, RGs), + {ok, S#state{reader_groups = NewGroups}}. + +add_writer_group(WriterGroupCfg, #state{ + publisher_id = PublisherId, + publisher_id_type = PublisherIdType, + writer_groups = WGs} = S) -> + WG_id = uuid:get_v4(), + {ok, WriterGroup} = opcua_pubsub_writer_group:new(PublisherId, + PublisherIdType, + WriterGroupCfg), + WGs2 = maps:put(WG_id, WriterGroup, WGs), + {ok, WG_id, S#state{writer_groups = WGs2}}. + +add_dataset_writer(WG_id, PDS_id, WriterCfg, #state{writer_groups = WGs} = S) -> + WG = maps:get(WG_id, WGs), + {ok, DSW_is, NewWriterGroup} = + opcua_pubsub_writer_group:add_dataset_writer(PDS_id, WriterCfg, WG), + WGs2 = maps:put(WG_id, NewWriterGroup, WGs), + {ok, DSW_is, S#state{writer_groups = WGs2}}. + +%%% GEN_SERVER API FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start_link(ID, ConfiguredState) -> + gen_server:start_link(?MODULE, [ID, ConfiguredState], []). + +%%% GEN_SERVER CALLBACKS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +init([ID, #state{transport_config = Config, + writer_groups = WriterGroups} = ConfiguredState]) -> + case start_transport(Config) of + {ok, Module, State} -> + opcua_pubsub:register_connection(ID), + WG2 = init_writer_groups(WriterGroups), + {ok, ConfiguredState#state{ + id = ID, + writer_groups = WG2, + middleware = {Module, State} + }}; + {error, E} -> error(E) + end. + +handle_call(_, _, State) -> + {reply, ok, State}. + +handle_cast(_, State) -> + {noreply, State}. + +handle_info({publish, WG_ID}, #state{ + middleware = {Module, MiddlewareState}, + writer_groups = WriterGroups} = State) -> + WG = maps:get(WG_ID, WriterGroups), + {NetMsg, NewWG} = opcua_pubsub_writer_group:write_network_message(WG), + % io:format("Sending NetworkMsg: ~p~n",[NetMsg]), + MiddlewareState2 = Module:send(NetMsg, MiddlewareState), + {noreply, State#state{ + middleware = {Module, MiddlewareState2}, + writer_groups = maps:put(WG_ID, NewWG, WriterGroups)}}; +handle_info(Info, #state{middleware = {M, S}} = State) -> + case M:handle_info(Info, S) of + %ignored -> {noreply, State}; + NetMsg -> + {ok, NewS} = handle_network_message(NetMsg, State), + {noreply, NewS} + end. + +%%% INTERNAL FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start_transport(#{uri := #{scheme := <<"opc.udp">>}} = Config) -> + {ok, Transport} = opcua_pubsub_udp:init(Config), + {ok, opcua_pubsub_udp, Transport}; +start_transport(_Config) -> + {error, unsupported_transport}. + +default_config() -> #{ + publisher_id_type => uint16, + publisher_id => 1111, + name => "Unnamed" + }. + +handle_network_message(Binary, #state{reader_groups = RGs} = S) -> + {Headers, Payload} = opcua_pubsub_uadp:decode_network_message_headers(Binary), + InterestedReaders = + [begin + DSR_ids = opcua_pubsub_reader_group:filter_readers(Headers,RG), + {RG_id, RG, DSR_ids} + end + || { RG_id, RG} <- maps:to_list(RGs)], + ReadersCount = lists:sum([length(DSR_ids) + || {_, _, DSR_ids} <- InterestedReaders]), + case ReadersCount > 0 of + false -> % io:format("Skipped NetMsg = ~p\n",[Binary]), + {ok, S}; + true -> % io:format("Accepting NetMsg = ~p\n",[Headers]), + % we can procede with the security step if needed: + % opcua_pubsub_security: ... not_implemented yet + % Then we decode all messages + DataSetMessages = opcua_pubsub_uadp:decode_payload(Headers, Payload), + #{payload_header := #{dataset_writer_ids := DSW_ids}} = Headers, + BundledMessages = lists:zip(DSW_ids, DataSetMessages), + % After processing, the DSRs could change state. + % All groups must be updated + RG_list = dispatchMessages(BundledMessages, InterestedReaders), + NewRGs = lists:foldl(fun + ({RG_id, NewRG}, Map) -> maps:put(RG_id, NewRG, Map) + end, RGs, RG_list), + {ok, S#state{reader_groups = NewRGs}} + end. + +dispatchMessages(BundledMessages, InterestedReaders) -> + [begin + NewRG = opcua_pubsub_reader_group:dispatch_messages(BundledMessages, + DSR_ids, RG), + {RG_id, NewRG} + end || {RG_id, RG, DSR_ids} <- InterestedReaders]. + +init_writer_groups(WriterGroups) -> + maps:from_list([begin + NewWG = opcua_pubsub_writer_group:init(ID, G), + {ID, NewWG} + end || {ID, G} <- maps:to_list(WriterGroups)]). diff --git a/src/opcua_pubsub_connection_sup.erl b/src/opcua_pubsub_connection_sup.erl new file mode 100644 index 0000000..22f1ef1 --- /dev/null +++ b/src/opcua_pubsub_connection_sup.erl @@ -0,0 +1,26 @@ +-module(opcua_pubsub_connection_sup). + +-behaviour(supervisor). + +%%% EXPORTS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% API Functions +-export([start_link/0]). + +%% Behaviour supervisor callback functions +-export([init/1]). + + +%%% API FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +%%% BEHAVIOUR supervisor CALLBACK FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +init([]) -> + ChildSpecs = [#{id => none, + start => {opcua_pubsub_connection, start_link, []}, + shutdown => brutal_kill}], + {ok, {#{strategy => simple_one_for_one}, ChildSpecs}}. diff --git a/src/opcua_pubsub_dataset_reader.erl b/src/opcua_pubsub_dataset_reader.erl new file mode 100644 index 0000000..0d3a53b --- /dev/null +++ b/src/opcua_pubsub_dataset_reader.erl @@ -0,0 +1,162 @@ +-module(opcua_pubsub_dataset_reader). + +-export([new/1]). +-export([is_interested/2]). +-export([process_messages/2]). +-export([create_target_variables/2]). + +-include("opcua.hrl"). +-include("opcua_pubsub.hrl"). +-include_lib("kernel/include/logger.hrl"). + +-record(state, { + state = operational :: operational | error | enabled | paused, + name :: binary(), + publisher_id, + publisher_id_type, + writer_group_id, + dataset_writer_id, + dataset_metadata :: #dataset_metadata{}, + + subscribed_dataset :: undefined | [#target_variable{}] | #dataset_mirror{} +}). + +new(#dataset_reader_config{ + name = Name, + publisher_id = PubId, + publisher_id_type = PubIdType, + writer_group_id = WGId, + dataset_writer_id = DataSetWriterId, + dataset_metadata = DataSetMetadata}) -> + {ok, #state{name = Name, publisher_id = PubId, + publisher_id_type = PubIdType, + writer_group_id = WGId, + dataset_writer_id = DataSetWriterId, + dataset_metadata = set_metadata_fields_ids(DataSetMetadata)}}. + +create_target_variables(Variables, State) -> + {ok, State#state{subscribed_dataset = set_tgt_var_ids(Variables)}}. + +% Checklist: +% writergroup match +% payload contains at least one message from the desired writer +is_interested(#{ + publisher_id := Pub_id, + group_header := #{ + writer_group_id := WG_id + }, + payload_header := #{ + dataset_writer_ids := DSW_ids + } + } = _Headers, + #state{ publisher_id = Pub_id, + writer_group_id = WG_id, + dataset_writer_id = DataSetWriterId}) -> + lists:member(DataSetWriterId, DSW_ids); +is_interested(_, _) -> + false. + +process_messages([], State) -> State; +process_messages([{DataSetWriterId, {Header, Data}} | Messages], + #state{dataset_writer_id = DataSetWriterId} = State) -> + % io:format("~p handling ~p~n",[?MODULE,Header]), + % TODO: add msg version check, state machine management ecc.. + {DataSet, NewState} = decode_dataset_message(Header, Data, State), + NewState2 = update_subscribed_dataset(DataSet, NewState), + process_messages(Messages, NewState2); +process_messages([ _| Messages], State) -> + process_messages(Messages, State). + +decode_dataset_message( % case of invalid message + #{dataset_flags1 := + #{dataset_msg_valid := 0}}, + _, S) -> + {[], S}; +decode_dataset_message( + #{ + dataset_flags1 := #{ + dataset_msg_valid := 1, + field_encoding := Encoding, + dataset_msg_seq_num := _, + status := _, + config_ver_minor_ver := _, + config_ver_major_ver := _, + dataset_flags2 := _ + }, + dataset_flags2 := #{ + msg_type := MessageType, % keyframe / deltaframe / event ecc... + timestamp := _, + picoseconds := _ + }, + dataset_seq_num := _, + timestamp := _, + picoseconds := _, + status := _, + config_ver_major_ver := _, + config_ver_minor_ver := _ + }, + Data, + #state{ + dataset_metadata = #dataset_metadata{ + fields = FieldsMetaData, + configuration_version = _Ver} + } = S) -> + case decode_fields(Encoding, MessageType, FieldsMetaData, Data) of + {error, E} -> + ?LOG_ERROR("Failure decoding DataSetMessageFields: ~p",[E]), + {[], S#state{state = error}}; + DataSet -> {DataSet, S} + end. + +decode_fields(Encoding, data_key_frame, FieldsMetaData, Data) -> + {FieldCount, FieldsBin} = opcua_codec_binary_builtin:decode(uint16, Data), + decode_keyframe(Encoding, FieldsMetaData, FieldCount, FieldsBin, []); +decode_fields(_Encoding, _MessageType, _Fields, _Data) -> + error(bad_not_implemented). + +decode_keyframe( _, _, _, <<>>, DataSet) -> lists:reverse(DataSet); +decode_keyframe(Encoding, [FieldMD|NextMDMD], FieldCount, Binary, DataSet) -> + {Decoded, Rest} = opcua_pubsub_uadp:decode_dataset_message_field(Encoding, + FieldMD, + Binary), + Data = {FieldMD, Decoded}, + case Decoded of + {error, E} -> {error, E}; + _ -> decode_keyframe(Encoding, NextMDMD, FieldCount-1, Rest, [Data|DataSet]) + end. + +update_subscribed_dataset([], #state{state = error} = S) -> S; % skip +update_subscribed_dataset(_DataSet, #state{ subscribed_dataset = Sub }) + when #dataset_mirror{} == Sub -> + error(dataset_mirror_not_implemented); +update_subscribed_dataset(DataSet, #state{subscribed_dataset = TGT_vars} = S) + when is_list(TGT_vars)-> + ok = update_target_variables(DataSet, TGT_vars), + S. + +update_target_variables([], _TGT_vars) -> ok; +update_target_variables([{FieldMD, Variable} | RemainingDataSet], TGT_vars) -> + FieldId = FieldMD#dataset_field_metadata.dataset_field_id, + {[TGT], OtherTGTs} = lists:partition( + fun(#target_variable{dataset_field_id = DataSetFieldId}) -> + DataSetFieldId == FieldId + end, + TGT_vars), + TargetNodeId = TGT#target_variable.target_node_id, + AttrId = TGT#target_variable.attribute_id, + update_tgt_var_attribute(TargetNodeId, AttrId, Variable), + update_target_variables(RemainingDataSet, OtherTGTs). + +update_tgt_var_attribute(TargetNodeId, ?UA_ATTRIBUTEID_VALUE, + #opcua_variant{value = Value}) -> + opcua_server:set_value(TargetNodeId, Value). + +set_metadata_fields_ids(#dataset_metadata{fields = Fields} = DSMD) -> + Ids = lists:seq(0, length(Fields) - 1), + DSMD#dataset_metadata{fields = + [F#dataset_field_metadata{dataset_field_id = I} + || {I,F} <- lists:zip(Ids, Fields)]}. + +set_tgt_var_ids(Varables) -> + Ids = lists:seq(0, length(Varables) - 1), + [V#target_variable{dataset_field_id = I} || {I,V} <- lists:zip(Ids, Varables)]. diff --git a/src/opcua_pubsub_dataset_writer.erl b/src/opcua_pubsub_dataset_writer.erl new file mode 100644 index 0000000..1cae551 --- /dev/null +++ b/src/opcua_pubsub_dataset_writer.erl @@ -0,0 +1,80 @@ +-module(opcua_pubsub_dataset_writer). + +-export([new/2]). +-export([write_dataset_message/1]). + +-include("opcua.hrl"). +-include("opcua_pubsub.hrl"). +-include_lib("kernel/include/logger.hrl"). + +-record(state, { + state = operational :: pubsub_state_machine(), + name, + dataset_writer_id, + dataset_field_content_mask, + keyframe_count, + dataset_name, + transport_settings, + message_settings, + connected_published_dataset +}). + +new(PDS_id, #dataset_writer_config{ + name = N, + dataset_writer_id = DS_WID, + dataset_field_content_mask = CM, + keyframe_count = KF_C, + dataset_name = DN, + transport_settings = TS, + message_settings = MS + }) -> + {ok, #state{ + state = operational, + name = N, + dataset_writer_id = DS_WID, + dataset_field_content_mask = CM, + keyframe_count = KF_C, + dataset_name = DN, + transport_settings = TS, + message_settings = MS, + connected_published_dataset = opcua_pubsub:get_published_dataset(PDS_id) + }}. + +write_dataset_message(#state{dataset_writer_id = DSW_ID, + connected_published_dataset = PDS, + dataset_field_content_mask = ContentMask} = S) -> + % We are going to produce a keyframe with variant encoding, always. + % We do not support delta-frames so we ignore keyframe_count + #published_dataset{ + dataset_metadata = #dataset_metadata{fields = FieldsMetadata} = MD, + dataset_source = DatasetSource + } = PDS, + Values = read_sources(DatasetSource, []), + Fields = encode_data_set_fields(FieldsMetadata, Values), + Header = opcua_pubsub_uadp:encode_dataset_message_header(variant, + data_key_frame, + ContentMask, MD), + DataSetMessage = opcua_pubsub_uadp:encode_dataset_message(Header, Fields), + {DataSetMessage, DSW_ID, S}. + +read_sources([], Values) -> lists:reverse(Values); +read_sources([#published_variable{ + published_variable = NodeID, + attribute_id = ?UA_ATTRIBUTEID_VALUE + } | Rest], Values) -> + [DataValue] = opcua_server_registry:perform(NodeID,[#opcua_read_command{attr = value}]), + #opcua_data_value{ + value = Value, + status = good + } = DataValue, + % io:format("Read value: ~p~n",[Value]), + read_sources(Rest, [Value | Values]). + +encode_data_set_fields(FieldsMetadata, Values) -> + encode_data_set_fields(FieldsMetadata, Values, []). + +encode_data_set_fields([], [], Results) -> lists:reverse(Results); +encode_data_set_fields([ FieldMeta | FMD], [Value | Values], Results) -> + Binary = opcua_pubsub_uadp:encode_dataset_message_field(FieldMeta, Value), + encode_data_set_fields(FMD, Values, [Binary | Results]). + diff --git a/src/opcua_pubsub_example.erl b/src/opcua_pubsub_example.erl new file mode 100644 index 0000000..718d61d --- /dev/null +++ b/src/opcua_pubsub_example.erl @@ -0,0 +1,107 @@ +-module(opcua_pubsub_example). + +-export([subscription/0]). +-export([publication/0]). + +-include("opcua.hrl"). +-include("opcua_pubsub.hrl"). + +-define(URL, <<"opc.udp://224.0.0.22:4840">>). + +subscription() -> + Url = ?URL, + ConnectionConfig = #connection_config{}, + {ok, Conn} = opcua_pubsub:new_connection(Url, ConnectionConfig, #{}), + + ReaderGroupconfig = #{ name => <<"Simple Reader Group">>}, + {ok, RG_id, Conn2} = opcua_pubsub:add_reader_group(Conn, ReaderGroupconfig), + + DSR_config = #dataset_reader_config{ + name = <<"Example Reader">>, + publisher_id = 2234, + publisher_id_type = uint16, + writer_group_id = 100, + dataset_writer_id = 62541, + dataset_metadata = #dataset_metadata{ + name = <<"DataSet 1">>, + description = <<"An example from 62541">>, + fields = [ + #dataset_field_metadata{ + name = <<"DateTime 1">>, + builtin_type = date_time, + data_type = opcua_node:id(date_time), + valueRank = -1 % a scalar, + }] + } + }, + {ok, DSR_id, Conn3} = + opcua_pubsub:add_dataset_reader(Conn2, RG_id, DSR_config), + + % A dedicated object on the server (or any address space available) + % containing all variables that will be updated by the DSR + DataSetObject = opcua_server:add_object(<<"Subscribed Data">>, numeric), + VarNodeId = opcua_server:add_variable(DataSetObject, <<"Publisher Time">>, + undefined, date_time, 0), + + TGT = #target_variable{ + dataset_field_id = 0, + target_node_id = VarNodeId, + attribute_id = ?UA_ATTRIBUTEID_VALUE + }, + {ok, Conn4} = opcua_pubsub:create_target_variables(Conn3,RG_id,DSR_id,[TGT]), + + {ok, ID} = opcua_pubsub:start_connection(Conn4), + ok. + +publication() -> + + PDS_cfg = #published_dataset{ + name = <<"PublishedDataSet Example">>, + dataset_metadata = #dataset_metadata{ + name = <<"My Metadata">> + } + }, + {ok, PDS_id} = opcua_pubsub:add_published_dataset(PDS_cfg), + + % we specify the fields metadata and their sources + % In this case we list available variables as sources + FieldsMetaData = [#dataset_field_metadata{ + name = <<"DateTime 1">>, + builtin_type = date_time, + data_type = opcua_node:id(date_time), + valueRank = -1 % a scalar, + }], + FieldsSource = [ + #published_variable{ + published_variable = ?NNID(2258), + attribute_id = ?UA_ATTRIBUTEID_VALUE + }], + ok = opcua_pubsub:add_published_dataset_field(PDS_id, FieldsMetaData, FieldsSource), + + Url = ?URL, + ConnectionConfig = #connection_config{ + publisher_id = 2234, + publisher_id_type = uint16 + }, + {ok, Conn} = opcua_pubsub:new_connection(Url, ConnectionConfig, #{}), + + WriterGroupconfig = #writer_group_config{ + name = <<"Simple Writer Group">>, + writer_group_id = 100, + publishing_interval = 100 + }, + {ok, WG_id, Conn2} = opcua_pubsub:add_writer_group(Conn, WriterGroupconfig), + + DataSetWriterConfig = #dataset_writer_config{ + name = <<"Simple DataSet Writer">>, + dataset_writer_id = 62541, + keyframe_count = 10 + }, + {ok, DSW_id, Conn3} = opcua_pubsub:add_dataset_writer(Conn2, WG_id, + PDS_id, DataSetWriterConfig), + + + {ok, ID} = opcua_pubsub:start_connection(Conn3), + + ok. + diff --git a/src/opcua_pubsub_reader_group.erl b/src/opcua_pubsub_reader_group.erl new file mode 100644 index 0000000..bbd381d --- /dev/null +++ b/src/opcua_pubsub_reader_group.erl @@ -0,0 +1,46 @@ +-module(opcua_pubsub_reader_group). + +-export([new/1]). +-export([add_dataset_reader/2]). +-export([filter_readers/2]). +-export([dispatch_messages/3]). +-export([create_target_variables/3]). + +-record(state, { + name, + dataset_readers = #{} +}). + +new(#{name := RG_name}) -> + {ok, #state{name = RG_name}}. + +add_dataset_reader(DSR_cfg, #state{dataset_readers = DSRs} = S) -> + DSR_id = uuid:get_v4(), + {ok, DSR} = opcua_pubsub_dataset_reader:new(DSR_cfg), + NewDSRs = maps:put(DSR_id, DSR, DSRs), + {ok, DSR_id, S#state{dataset_readers = NewDSRs}}. + +create_target_variables(DSR_id, Config,#state{dataset_readers = DSRs} = S) -> + DSR = maps:get(DSR_id, DSRs), + {ok, NewDSR} = opcua_pubsub_dataset_reader:create_target_variables(Config, DSR), + NewDSRs = maps:put(DSR_id, NewDSR, DSRs), + {ok, S#state{dataset_readers = NewDSRs}}. + +filter_readers(Headers, #state{dataset_readers = DSRs}) -> + [DSR_id || {DSR_id, DSR} <- maps:to_list(DSRs), + opcua_pubsub_dataset_reader:is_interested(Headers, DSR)]. + +dispatch_messages(BundledMessages, DSR_ids, #state{dataset_readers = DSRs} = S) -> + Updated = [ + begin + DSR = maps:get(ID, DSRs), + NewDSR = opcua_pubsub_dataset_reader:process_messages(BundledMessages, DSR), + {ID, NewDSR} + end || ID <- DSR_ids], + NewDSRs = lists:foldl(fun + ({ID,Value}, Map) -> + maps:put(ID, Value, Map) + end, DSRs, Updated), + S#state{dataset_readers = NewDSRs}. + + diff --git a/src/opcua_pubsub_security.erl b/src/opcua_pubsub_security.erl new file mode 100644 index 0000000..eb846fc --- /dev/null +++ b/src/opcua_pubsub_security.erl @@ -0,0 +1,13 @@ +-module(opcua_pubsub_security). + +-export([lock/1]). +-export([unlock/1]). + +%%% API FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +lock(Binary) -> + Binary. + +unlock(Binary) -> + Binary. + +%%% INTERNALS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/opcua_pubsub_sup.erl b/src/opcua_pubsub_sup.erl new file mode 100644 index 0000000..fbe94cd --- /dev/null +++ b/src/opcua_pubsub_sup.erl @@ -0,0 +1,35 @@ +-module(opcua_pubsub_sup). + +-behaviour(supervisor). + +%%% EXPORTS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% API Functions +-export([start_link/0]). + +%% Behaviour supervisor callback functions +-export([init/1]). + + +%%% API FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +%%% BEHAVIOUR supervisor CALLBACK FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +init([]) -> + Childs = [ + supervisor(opcua_pubsub_connection_sup, []), + worker(opcua_pubsub, [])], + {ok, {#{strategy => one_for_all}, Childs}}. + + +%%% INTERNAL FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +worker(Module, Args) -> + #{id => Module, start => {Module, start_link, Args}}. + +supervisor(Module, Args) -> + #{id => Module, type => supervisor, start => {Module, start_link, Args}}. diff --git a/src/opcua_pubsub_uadp.erl b/src/opcua_pubsub_uadp.erl new file mode 100644 index 0000000..85b348f --- /dev/null +++ b/src/opcua_pubsub_uadp.erl @@ -0,0 +1,514 @@ +-module(opcua_pubsub_uadp). + + +-export([decode_network_message_headers/1]). +-export([decode_payload/2]). +-export([decode_dataset_message_field/3]). + +-export([encode_dataset_message_field/2]). +-export([encode_dataset_message_header/4]). +-export([encode_dataset_message/2]). +-export([encode_payload/1]). +-export([encode_network_message_headers/4]). + +-include("opcua.hrl"). +-include("opcua_pubsub.hrl"). + +%%%%%% Encoding binary flags %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% Used In UADPFlags +-define(UADP_VERSION, 1). +-define(PUBLISHER_ID_ENABLED, (1 bsl 4)). +-define(GROUP_HEADER_ENABLED, (1 bsl 5)). +-define(PAYLOAD_HEADER_ENABLED, (1 bsl 6)). +-define(EXT_FLAGS_1_ENABLED, (1 bsl 7)). +% Used In ExtendedFlags2 +-define(EXT_FLAGS2_CHUNK_MESSAGE, 1). +-define(EXT_FLAGS2_PROMOTED_FIELD_ENABLED, (1 bsl 1)). +-define(EXT_FLAGS2_DATASET_MSG_TYPE, 0). +-define(EXT_FLAGS2_DISCOVERY_REQUEST_MSG_TYPE, (1 bsl 2)). +-define(EXT_FLAGS2_DISCOVERY_RESPONSE_MSG_TYPE, (1 bsl 3)). +% Used In ExtendedFlags1 +-define(EXT_FLAGS1_DATA_SET_CLASS_ID_ENABLED, (1 bsl 3)). +% -define(SECURITY_ENABLED, (1 bsl 4)). % not implemented +-define(EXT_FLAGS1_TIMESTAMP_ENABLED, (1 bsl 6)). +-define(EXT_FLAGS1_PICOSECONDS_ENABLED, (1 bsl 6)). +-define(EXT_FLAGS1_EXT_FLAGS_2_ENABLED, (1 bsl 7)). +% Used In GroupFlags +-define(WRITER_GROUP_ENABLED, 1). +-define(GROUP_VERSION_ENABLED, (1 bsl 1)). +-define(NETWORK_MESSAGE_ENABLED, (1 bsl 2)). +-define(SEQUENCE_NUMBER_ENABLED, (1 bsl 3)). +% Used in DataSetMessageHeader DataSetFlags1 +-define(DATASET_FLAGS1_VALID, 1). +-define(DATASET_FLAGS1_VARIANT, 0). +-define(DATASET_FLAGS1_RAWDATA, (1 bsl 1)). +-define(DATASET_FLAGS1_DATAVALUE, (1 bsl 2)). +-define(DATASET_FLAGS1_SEQ_NUM_ENABLED, (1 bsl 3)). +-define(DATASET_FLAGS1_STATUS_ENABLED, (1 bsl 4)). +-define(DATASET_FLAGS1_MAJOR_V_ENABLED, (1 bsl 5)). +-define(DATASET_FLAGS1_MINOR_V_ENABLED, (1 bsl 6)). +-define(DATASET_FLAGS1_FLAGS2_ENABLED, (1 bsl 7)). +% Used in DataSetMessageHeader DataSetFlags2 +-define(DATASET_FLAGS2_KEY_FRAME, 0). +-define(DATASET_FLAGS2_DELTA_FRAME, 1). +-define(DATASET_FLAGS2_EVENT, 2). +-define(DATASET_FLAGS2_KEEP_ALIVE, 3). +-define(DATASET_FLAGS2_TIMESTAMP_ENABLED, (1 bsl 4)). +-define(DATASET_FLAGS2_PICOSECONDS_ENABLED, (1 bsl 5)). + +%%% API FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +% Extracts the clear data from the message Headers and the payload binary +decode_network_message_headers(<>) -> + <> = VersionFlags, + {ExtendedFlags1Map, Rest2} = decode_extended_flags1(ExtendedFlags1, Rest), + + % Skipping many optional fields, enforcing a minimal setup for testing + % TODO: add them once needed and remove this hard match + #{dataset_class_id := 0, extended_flags2 := 0, + picoseconds := 0, decode_publisher_id_type := uint16, + security := 0, timestamp := 0} = ExtendedFlags1Map, + ExtendedFlags2 = maps:get(extended_flags2, ExtendedFlags1Map, 0), + {ExtendedFlags2Map, Rest3} = decode_extended_flags2(ExtendedFlags2, Rest2), + {PublisherIDValue, Rest4} = decode_publisherID(PublisherId, ExtendedFlags1Map, Rest3), + % {DataSetClassId, Rest5} = decode_dataset_class_id(PublisherID, ExtendedFlags1Record, Rest4), + {GroupHeaderMap, Rest5} = decode_group_header(GroupHeader, Rest4), + {PayloadHeaderMap, Rest6} = decode_payload_header(PayloadHeader, ExtendedFlags2Map, Rest5), + % Network Message Extended Header + % {TimeStamp, Rest7} = , + % {Picoseconds, Rest8} = , + % {PromotedFields, Rest7} = decode_promoted_fields(ExtendedFlags2Record, Rest6), + % Security + % {SecurityHeader, Rest8} = decode_security_header(ExtendedFlags1Record, Rest7), + Headers = #{ + publisher_id => PublisherIDValue, + extended_flags1 => ExtendedFlags1Map, + extended_flags2 => ExtendedFlags2Map, + group_header => GroupHeaderMap, + payload_header => PayloadHeaderMap + }, + {Headers, Rest6}; +decode_network_message_headers(_) -> + {error, unknown_message}. + +%extracts Dataset Messages from the payload blob decoding the headers +decode_payload(#{payload_header := undefined}, Payload) -> + {DSM_header, Binary} = decode_dataset_message_header(Payload), + [{DSM_header, Binary}]; +decode_payload(#{payload_header := #{count := 1}}, Payload) -> + {DSM_header, Binary} = decode_dataset_message_header(Payload), + [{DSM_header, Binary}]; +decode_payload(#{payload_header := #{count := Count}}, Payload) -> + <> = Payload, + Sizes = [Size || <> <= SizesBinary], + decode_multi_dataset_message(Rest, Sizes). + +decode_dataset_message_field(variant, FieldMetadata, Binary) -> + #dataset_field_metadata{ + builtin_type = BuiltinType, + data_type = _NodeId, + valueRank = _ + } = FieldMetadata, + {Result, Rest} = opcua_codec_binary:decode(variant, Binary), + case Result of + #opcua_variant{type = BuiltinType} -> {Result, Rest}; + _ -> {error, unmatched_metadata} + end; +decode_dataset_message_field(_, _, _) -> + error(bad_encoding_not_implemented). + +encode_dataset_message_field(#dataset_field_metadata{ + data_type = DataType}, #opcua_variant{} = V) -> + % TODO: make sure to correctly extract a value before encoding. + %io:format("Original Variant: ~p~n",[V]), + {_TypeID, Val} = opcua_codec:unpack_variant(V), + V2 = opcua_codec:pack_variant(opcua_server_space, DataType, -1, Val), + %io:format("Variant to publish: ~p~n",[V2]), + {Binary, _} = opcua_codec_binary:encode(variant, V2), + %io:format("Encoded ~p~n", [Binary]), + {_Result, _Rest} = opcua_codec_binary:decode(variant, Binary), + %io:format("Decoded: ~p~n",[Result]), + Binary. + +encode_dataset_message_header(FieldEncoding, MsgType, ContentMask, + #dataset_metadata{configuration_version = _MajorMinor}) -> + Flags1 = ?DATASET_FLAGS1_VALID + bor encode_field_encoding(FieldEncoding) + bor ?DATASET_FLAGS1_FLAGS2_ENABLED, + Flags2 = 0, + % DataSetFlags1 + % F1 = #{ + % dataset_msg_valid => 1, + % field_encoding => FieldEncoding, + % dataset_flags2 => 1}, + % {Status, F1_1} = case ContentMask band ?UADP_DATA_SET_FIELD_MASK_STATUS of + % 1 -> error(bad_not_implemented); + % 0 -> {<<>>, maps:put(status, 0, F1)} + % end, + % {MajorVersion, F1_2} = case ContentMask band ?UADP_DATA_SET_FIELD_MASK_MAJORVERSION of + % 1 -> error(bad_not_implemented); + % 0 -> {<<>>, maps:put(config_ver_major_ver, 0, F1_1)} + % end, + % {MinorVersion, F1_3} = case ContentMask band ?UADP_DATA_SET_FIELD_MASK_MINORVERSION of + % 1 -> error(bad_not_implemented); + % 0 -> {<<>>, maps:put(config_ver_minor_ver, 0, F1_2)} + % end, + % {SeqNumber, F1_4} = case ContentMask band ?UADP_DATA_SET_FIELD_MASK_SEQUENCENUMBER of + % 1 -> error(bad_not_implemented); + % 0 -> {<<>>, maps:put(dataset_msg_seq_num, 0, F1_3)} + % end, + {Flags2_1, Timestamp} = case ContentMask band ?UADP_DATA_SET_FIELD_MASK_TIMESTAMP of + 0 -> {Flags2, <<>>}; + _ -> T = opcua_codec_binary_builtin:encode(date_time, opcua_util:date_time()), + {Flags2 bor ?DATASET_FLAGS2_TIMESTAMP_ENABLED, T} + end, + % {PicoSeconds, F2_2} = case ContentMask band ?UADP_DATA_SET_FIELD_MASK_PICOSECONDS of + % 1 -> error(bad_not_implemented); + % 0 -> {<<>>, maps:put(picoseconds, 0, F2_1)} + iolist_to_binary([Flags1, Flags2_1, Timestamp]). + +encode_dataset_message(Header, Fields) -> + FieldCount = <<(length(Fields)):16/unsigned-little>>, + iolist_to_binary([Header, FieldCount, Fields]). + +encode_payload([DataSetMessage]) -> DataSetMessage; +encode_payload(DataSetMessages) -> + Sizes = [ <<(byte_size(DSM)):16/unsigned-little>> || DSM <- DataSetMessages], + iolist_to_binary([Sizes, DataSetMessages]). + +encode_network_message_headers(PublisherID, PublisherIdType, DSW_IDS, + #writer_group_config{ + message_settings = #uadp_writer_group_message_data{ + networkMessageContentMask = Mask} + } = WriterGroupCfg) -> + UADPFlags = ?UADP_VERSION, % from the specification + ExtFlags1 = 0, + % ExtFlags2 = 0, % unused + % UADP main flags + {UADPFlags1, PubID} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_PUBLISHER_ID of + 0 -> {UADPFlags, <<>>}; + _ -> ID = opcua_codec_binary_builtin:encode(PublisherIdType, PublisherID), + {UADPFlags bor ?PUBLISHER_ID_ENABLED, ID} + end, + {UADPFlags2, GH} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_GROUP_HEADER of + 0 -> {UADPFlags1, <<>>}; + _ -> H = encode_group_header(WriterGroupCfg, Mask), + {UADPFlags1 bor ?GROUP_HEADER_ENABLED, H} + end, + {UADPFlags3, PH} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_GROUP_HEADER of + 0 -> {UADPFlags2, <<>>}; + _ -> H_ = encode_payload_header(DSW_IDS), + {UADPFlags2 bor ?PAYLOAD_HEADER_ENABLED, H_} + end, + % ExtendedFlags1 always enabled + UADPFlags4 = UADPFlags3 bor ?EXT_FLAGS_1_ENABLED, + ExtFlags1_1 = case UADPFlags4 band ?PUBLISHER_ID_ENABLED of + 0 -> ExtFlags1; + _ -> ExtFlags1 bor encode_publisher_id_type(PublisherIdType) + end, + {ExtFlags1_2, ClassID} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_DATASET_CLASSID of + 0 -> {ExtFlags1_1, <<>>}; + _ -> {ExtFlags1_1 bor ?EXT_FLAGS1_DATA_SET_CLASS_ID_ENABLED, error(not_implemented)} + end, + % Security disabled, + % TODO: add check here when is implemented + {ExtFlags1_3, SH} = {ExtFlags1_2, <<>>}, + % ... + {ExtFlags1_4, Timestamp} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_TIMESTAMP of + 0 -> {ExtFlags1_3, <<>>}; + _ -> T = opcua_codec_binary_builtin:encode(date_time, opcua_util:date_time()), + {ExtFlags1_3 bor ?EXT_FLAGS1_TIMESTAMP_ENABLED, T} + end, + {ExtFlags1_5, PicoSeconds} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_PICOSECONDS of + 0 -> {ExtFlags1_4, <<>>}; + _ -> {ExtFlags1_4 bor ?EXT_FLAGS1_PICOSECONDS_ENABLED, error(not_implemented)} + end, + + % ExtendedFlags2 disabled for simplicity + % this disables promoted_fields, picosecods timestamp + % and defaults to dataset_message + ExtFlags1_6 = ExtFlags1_5, % bor ?EXT_FLAGS_2_ENABLED, + + % ExtFlags2_1 = ExtFlags2 bor 0, % TODO: add check and support for ?CHUNK_MESSAGE + % {ExtFlags2_2, PromotedFields} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_PROMOTED_FIELDS of + % 0 -> {ExtFlags2_1, <<>>}; + % _ -> {ExtFlags2_1 bor ?EXT_FLAGS2_PROMOTED_FIELD_ENABLED, error(not_implemented)} + % end, + % % TODO: support more message types + % HardcodedMsgType = dataset_message, + % ExtFlags2_3 = ExtFlags2_2 bor encode_network_msg_type(HardcodedMsgType), + + iolist_to_binary([ + UADPFlags4, ExtFlags1_6, %% Flags ExtFlags2 is unused for now + PubID, ClassID, GH, PH, % Main elements + % extended header elements (unused) + %Timestamp, PicoSeconds, PromotedFields, + SH % optional security info (unused) + ]). + + +%%% INTERNALS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +decode_extended_flags1(0, Bin) -> + {#{ + extended_flags2 => 0, + picoseconds => 0, + timestamp => 0, + security => 0, + dataset_class_id => 0, + decode_publisher_id_type => decode_publisher_id_type(0) + }, Bin}; +decode_extended_flags1(1, << + ExtendedFlags2:1, + PicoSeconds:1, + Timestamp:1, + Security:1, + DataSetClassId:1, + PublisherIdType:3/little-unsigned, Rest/binary>>) -> + {#{ + extended_flags2 => ExtendedFlags2, + picoseconds => PicoSeconds, + timestamp => Timestamp, + security => Security, + dataset_class_id => DataSetClassId, + decode_publisher_id_type => decode_publisher_id_type(PublisherIdType) + }, Rest}. + +decode_publisher_id_type(?UA_PUBLISHERIDTYPE_BYTE) -> byte; +decode_publisher_id_type(?UA_PUBLISHERIDTYPE_UINT16) -> uint16; +decode_publisher_id_type(?UA_PUBLISHERIDTYPE_UINT32) -> uint32; +decode_publisher_id_type(?UA_PUBLISHERIDTYPE_UINT64) -> uint64; +decode_publisher_id_type(?UA_PUBLISHERIDTYPE_STRING) -> string; +decode_publisher_id_type(_) -> reserved. + +decode_extended_flags2(0, Bin) -> + {#{ + chunk => 0, + promoted_fields => 0, + network_message_type => decode_network_msg_type(<<0:1,0:1,0:1>>) + }, Bin}; +decode_extended_flags2(1, << + _Reserved:3, + NetworkMsgType:3/bitstring, + PromotedFields:1, + Chunk:1, + Bin/binary>>) -> + {#{ + chunk => Chunk, + promoted_fields => PromotedFields, + network_message_type => decode_network_msg_type(NetworkMsgType) + }, Bin}. + + +decode_network_msg_type(<< 0:1, 0:1, 0:1>>) -> dataset_message; +decode_network_msg_type(<< 0:1, 0:1, 1:1>>) -> discovery_request; +decode_network_msg_type(<< 0:1, 1:1, 0:1>>) -> discovery_responce; +decode_network_msg_type(<< _:1, _:1, _:1>>) -> reserved. + +decode_publisherID(0, _, Binary) -> {undefined, Binary}; +decode_publisherID(1, #{decode_publisher_id_type := uint16}, Binary) -> + <> = Binary, + {PublisherID, Rest}. +% TODO: handle other PublisherID types + +decode_group_header(0, Bin) -> { undefined, Bin}; +decode_group_header(1, <>) -> + <<_ReservedBits:4, + SeqNum_flag:1, + NetworkMessageNumber_flag:1, + GroupVersion_flag:1, + WrtiterGroupId_flag:1>> = GroupFlags, + {WriterGroupId, Rest} = decode_writer_group_id(WrtiterGroupId_flag, Bin), + {GroupVersion, Rest2} = decode_group_version_id(GroupVersion_flag, Rest), + {NetworkMessageNumber, Rest3} = decode_network_message_number(NetworkMessageNumber_flag, Rest2), + {SeqNum, Rest4} = decode_network_sequence_number(SeqNum_flag, Rest3), + {#{ + writer_group_id => WriterGroupId, + group_version => GroupVersion, + network_message_number => NetworkMessageNumber, + sequence_number => SeqNum + }, Rest4}. + +decode_writer_group_id(0, Bin) -> {undefined, Bin}; +decode_writer_group_id(1, Bin) -> opcua_codec_binary_builtin:decode(uint16, Bin). + +decode_group_version_id(0, Bin) -> {undefined, Bin}; +decode_group_version_id(1, Bin) -> + %UInt32 that represents the time in seconds since the year 2000 + opcua_codec_binary_builtin:decode(uint32, Bin). + +decode_network_message_number(0, Bin) -> {undefined, Bin}; +decode_network_message_number(1, Bin) -> opcua_codec_binary_builtin:decode(uint16, Bin). + +decode_network_sequence_number(0, Bin) -> {undefined, Bin}; +decode_network_sequence_number(1, Bin) -> opcua_codec_binary_builtin:decode(uint16, Bin). + +decode_payload_header(0, _, Bin) -> {undefined, Bin}; +decode_payload_header(1, #{chunk := 1}, Bin) -> + {DataSetWriterID, Rest} = opcua_codec_binary_builtin:decode(uint16, Bin), + throw({not_implemented, chunked_network_message}); +decode_payload_header(1, #{network_message_type := dataset_message}, Bin) -> + <> = Bin, + <> = Rest, + {#{ + count => MsgCount, + dataset_writer_ids => + [ DataWriterID || <> <= DataWriterIDs] + }, Rest2}; +decode_payload_header(1, #{network_message_type := discovery_request}, Bin) -> + throw({not_implemented, discovery_request}); +decode_payload_header(1, #{network_message_type := discovery_responce}, Bin) -> + throw({not_implemented, discovery_responce}). + +decode_multi_dataset_message(Bin, Sizes) -> + decode_multi_dataset_message(Bin, Sizes, []). + +decode_multi_dataset_message(<<>>, [], Result) -> lists:reverse(Result); +decode_multi_dataset_message(Bin, [S|TL], Result) -> + <> = Bin, + {DSM_header, Binary1} = decode_dataset_message_header(DSM), + decode_multi_dataset_message(Rest, [ {DSM_header, Binary1} | Result], TL). + + + +decode_dataset_message_header(DataSetMessageBinary) -> + {DataSetFlags1, Rest} = decode_dataset_flags1(DataSetMessageBinary), + {DataSetFlags2, Rest1} = decode_dataset_flags2(DataSetFlags1, Rest), + {DataSetSeqNum, Rest2} = decode_dataset_seq_num(DataSetFlags1, Rest1), + {Timestamp, Rest3} = decode_dataset_timestamp(DataSetFlags2, Rest2), + {Picoseconds, Rest4} = decode_dataset_picoseconds(DataSetFlags2, Rest3), + {Status, Rest5} = decode_dataset_status(DataSetFlags1, Rest4), + {ConfigVerMajorVer, Rest6} = decode_dataset_cfg_major_ver(DataSetFlags1, Rest5), + {ConfigVerMinorVer, Rest7} = decode_dataset_cfg_minor_ver(DataSetFlags1, Rest6), + {#{ + dataset_flags1 => DataSetFlags1, + dataset_flags2 => DataSetFlags2, + dataset_seq_num => DataSetSeqNum, + timestamp => Timestamp, + picoseconds => Picoseconds, + status => Status, + config_ver_major_ver => ConfigVerMajorVer, + config_ver_minor_ver => ConfigVerMinorVer + }, + Rest7}. + +decode_dataset_flags1(<< + DataSetFlags2:1, + ConfigVerMinorVer:1, + ConfigVerMajorVer:1, + Status:1, + DataSetMsgSeqNum:1, + FieldEncoding:2/bitstring, + DataSetMsgValid:1, + Rest/binary>>) -> + {#{ + dataset_msg_valid => DataSetMsgValid, + field_encoding => decode_field_encoding(FieldEncoding), + dataset_msg_seq_num => DataSetMsgSeqNum, + status => Status, + config_ver_minor_ver => ConfigVerMajorVer, + config_ver_major_ver => ConfigVerMinorVer, + dataset_flags2 => DataSetFlags2 + }, Rest}. + +decode_field_encoding(<<0:1, 0:1>>) -> variant; +decode_field_encoding(<<0:1, 1:1>>) -> raw; +decode_field_encoding(<<1:1, 0:1>>) -> data_value; +decode_field_encoding(<<1:1, 1:1>>) -> reserved. + +decode_dataset_flags2(#{dataset_flags2 := 0}, Bin) -> + {#{ + msg_type => decode_dataset_message_type(<<0:4>>), + timestamp => 0, + picoseconds => 0}, Bin}; +decode_dataset_flags2(#{dataset_flags2 := 1}, + <<_Reserved:2, + PicoSeconds:1, + Timestamp:1, + DataMsgType:4/bitstring, + Rest/binary>>) -> + {#{ + msg_type => decode_dataset_message_type(DataMsgType), + timestamp => Timestamp, + picoseconds => PicoSeconds + }, Rest}. + +decode_dataset_message_type(<<0:4>>) -> data_key_frame; +decode_dataset_message_type(<<0:1, 0:1, 0:1, 1:1>>) -> data_delta_frame; +decode_dataset_message_type(<<0:1, 0:1, 1:1, 0:1>>) -> event; +decode_dataset_message_type(<<0:1, 0:1, 1:1, 1:1>>) -> keep_alive; +decode_dataset_message_type(<<_:4>>) -> reserved. + +decode_dataset_seq_num(#{dataset_msg_seq_num := 0}, Bin) -> {undefined, Bin}; +decode_dataset_seq_num(#{dataset_msg_seq_num := 1}, Bin) -> + opcua_codec_binary_builtin:decode(uint16, Bin). + +decode_dataset_timestamp(#{timestamp := 0}, Bin) -> {undefined, Bin}; +decode_dataset_timestamp(#{timestamp := 1}, Bin) -> + opcua_codec_binary_builtin:decode(date_time, Bin). + +decode_dataset_picoseconds(#{picoseconds := 0}, Bin) -> {undefined, Bin}; +decode_dataset_picoseconds(#{picoseconds := 1}, Bin) -> + opcua_codec_binary_builtin:decode(uint16, Bin). + +decode_dataset_status(#{status := 0}, Bin) -> {undefined, Bin}; +decode_dataset_status(#{status := 1}, Bin) -> + opcua_codec_binary_builtin:decode(uint16, Bin). + +decode_dataset_cfg_major_ver(#{config_ver_major_ver := 0}, Bin) -> + {undefined, Bin}; +decode_dataset_cfg_major_ver(#{config_ver_major_ver := 1}, Bin) -> + opcua_codec_binary_builtin:decode(uint32, Bin). + +decode_dataset_cfg_minor_ver(#{config_ver_minor_ver := 0}, Bin) -> + {undefined, Bin}; +decode_dataset_cfg_minor_ver(#{config_ver_minor_ver := 1}, Bin) -> + opcua_codec_binary_builtin:decode(uint32, Bin). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +encode_field_encoding(variant) -> ?DATASET_FLAGS1_VARIANT; +encode_field_encoding(raw) -> ?DATASET_FLAGS1_RAWDATA; +encode_field_encoding(data_value) -> ?DATASET_FLAGS1_DATAVALUE. + +encode_publisher_id_type(byte) -> ?UA_PUBLISHERIDTYPE_BYTE; +encode_publisher_id_type(uint16) -> ?UA_PUBLISHERIDTYPE_UINT16; +encode_publisher_id_type(uint32) -> ?UA_PUBLISHERIDTYPE_UINT32; +encode_publisher_id_type(uint64) -> ?UA_PUBLISHERIDTYPE_UINT64; +encode_publisher_id_type(string) -> ?UA_PUBLISHERIDTYPE_STRING. + +% encode_network_msg_type(dataset_message) -> ?EXT_FLAGS2_DATASET_MSG_TYPE; +% encode_network_msg_type(discovery_request) -> ?EXT_FLAGS2_DISCOVERY_REQUEST_MSG_TYPE; +% encode_network_msg_type(discovery_responce) -> ?EXT_FLAGS2_DISCOVERY_RESPONSE_MSG_TYPE. + +encode_payload_header(DSW_IDS) -> + IDS = [<> || ID <- DSW_IDS ], + Count = <<(length(DSW_IDS)):8/unsigned-little>>, + iolist_to_binary([Count | IDS]). + +encode_group_header(#writer_group_config{ + writer_group_id = WriterGroupId}, Mask) -> + Flags = 0, + Elements = [], + {Flags1, Elements1} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_SEQ_NUM of + 0 -> {Flags, Elements}; + _ -> {Flags bor ?SEQUENCE_NUMBER_ENABLED, error(not_implemented)} + end, + {Flags2, Elements2} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_NET_MSG_NUM of + 0 -> {Flags1, Elements1}; + _ -> {Flags1 bor ?NETWORK_MESSAGE_ENABLED, error(not_implemented)} + end, + {Flags3, Elements3} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_GROUP_VERSION of + 0 -> {Flags2, Elements2}; + _ -> {Flags2 bor ?GROUP_VERSION_ENABLED, error(not_implemented)} + end, + {Flags4, Elements4} = case Mask band ?UADP_NET_MSG_CONTENT_MASK_WRITER_GROUP_ID of + 0 -> {Flags3, Elements3}; + _ -> {Flags3 bor ?WRITER_GROUP_ENABLED, + [opcua_codec_binary_builtin:encode(uint16, WriterGroupId) | Elements3]} + end, + iolist_to_binary([Flags4, Elements4]). diff --git a/src/opcua_pubsub_udp.erl b/src/opcua_pubsub_udp.erl new file mode 100644 index 0000000..52410a5 --- /dev/null +++ b/src/opcua_pubsub_udp.erl @@ -0,0 +1,86 @@ +-module(opcua_pubsub_udp). + + +-export([init/1, send/2, handle_info/2]). + +-include_lib("kernel/include/logger.hrl"). + +-record(state, { + socket, + out_socket +}). + + +init(#{ + uri := #{ + host := BinaryIP, + port := Port + } + }) -> + MulticastGroup = parse_ip(BinaryIP), + InterfaceIP = get_ip_of_valid_interface(), + ?LOG_DEBUG("PubSub UDP using interface ~p",[InterfaceIP]), + Opts = [ + binary, + {active, true}, + {reuseaddr, true}, + {ip, MulticastGroup}, + {multicast_ttl, 10}, + {multicast_loop, false} + ], + case gen_udp:open(Port, Opts) of + {ok, Socket} -> + inet:setopts(Socket, [{add_membership,{MulticastGroup, InterfaceIP}}]), + {ok, S} = gen_udp:open(0), + {ok, #state{ + socket = Socket, + out_socket = S + }}; + {error, Reason} -> {error, Reason} + end. + +send(Data, #state{out_socket = Socket} = S) -> + ok = gen_udp:send(Socket, {224,0,0,22}, 4840, Data), + S. + +handle_info({udp, Socket, _IP, _Port, Packet}, #state{socket = Socket} = _S) -> + Packet. + +% helpers %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +parse_ip(BinaryIP) -> + [A,B,C,D] = [ binary_to_integer(N) || N <- string:split(BinaryIP, ".", all)], + {A,B,C,D}. + +get_ip_of_valid_interface() -> + case get_valid_interfaces() of + [ {_Name, Opts} | _] -> get_ipv4_from_opts(Opts); + _ -> undefined + end. + +get_valid_interfaces() -> + {ok, Interfaces} = inet:getifaddrs(), + Selected = [ I + || {_Name, [{flags, Flags} | Opts]} = I <- Interfaces, + flags_are_ok(Flags), + has_ipv4(Opts) + ], + HasLoopback = fun({_Name, [{flags, Flags} | _]}) -> + lists:member(loopback, Flags) + end, + {LoopBack, Others} = lists:partition(HasLoopback, Selected), + Others ++ LoopBack. + +has_ipv4(Opts) -> + get_ipv4_from_opts(Opts) =/= undefined. + +flags_are_ok(Flags) -> + lists:member(up, Flags) and + lists:member(running, Flags). + +get_ipv4_from_opts([]) -> + undefined; +get_ipv4_from_opts([{addr, {_1, _2, _3, _4}} | _]) -> + {_1, _2, _3, _4}; +get_ipv4_from_opts([_ | TL]) -> + get_ipv4_from_opts(TL). diff --git a/src/opcua_pubsub_writer_group.erl b/src/opcua_pubsub_writer_group.erl new file mode 100644 index 0000000..24a4e2b --- /dev/null +++ b/src/opcua_pubsub_writer_group.erl @@ -0,0 +1,58 @@ +-module(opcua_pubsub_writer_group). + +-export([new/3]). +-export([add_dataset_writer/3]). +-export([init/2]). +-export([write_network_message/1]). + +-include("opcua_pubsub.hrl"). + +-record(state, { + state = operational :: pubsub_state_machine(), + publisher_id, + publisher_id_type, + config :: #writer_group_config{}, + dataset_writers = #{}, + timer +}). + +new(PublisherId, PublisherIdType, #writer_group_config{} = Config) -> + {ok, #state{ + publisher_id = PublisherId, + publisher_id_type = PublisherIdType, + config = Config, + dataset_writers = #{} + }}. + +add_dataset_writer(PDS_id, DSW_cfg, #state{dataset_writers = DSWs} = S) -> + DSW_id = uuid:get_v4(), + {ok, DSW} = opcua_pubsub_dataset_writer:new(PDS_id, DSW_cfg), + NewDSWs = maps:put(DSW_id, DSW, DSWs), + {ok, DSW_id, S#state{dataset_writers = NewDSWs}}. + +init(ID, #state{config = #writer_group_config{ + publishing_interval = PublishingInterval}} = S) -> + {ok, Tref} = timer:send_interval(PublishingInterval, {publish, ID}), + S#state{state = operational, timer = Tref}. + + +write_network_message(#state{publisher_id = PublisherId, + publisher_id_type = PublisherIdType, + config = Config, + dataset_writers = DatasetWriters} = S) -> + Results = [ + begin + {DSM, DSW_ID, NewState} = opcua_pubsub_dataset_writer:write_dataset_message(DSW), + {DSM, DSW_ID, {ID, NewState}} + end || {ID, DSW} <- maps:to_list(DatasetWriters)], + {DataSetMessages, DSW_IDS, KV_pairs_DSWs} = lists:unzip3(Results), + NewState = S#state{dataset_writers = maps:from_list(KV_pairs_DSWs)}, + % io:format("DSMs: ~p~n", [DataSetMessages]), + Payload = opcua_pubsub_uadp:encode_payload(DataSetMessages), + % Headers presence in the Network message should be regulated by the content mask + Headers = opcua_pubsub_uadp:encode_network_message_headers(PublisherId, + PublisherIdType, + DSW_IDS, + Config), + NetworkMessage = iolist_to_binary([Headers, Payload]), + {NetworkMessage, NewState}. diff --git a/src/opcua_sup.erl b/src/opcua_sup.erl index 2a798ea..7af9840 100644 --- a/src/opcua_sup.erl +++ b/src/opcua_sup.erl @@ -26,7 +26,8 @@ init([]) -> Childs = [ worker(opcua_keychain_default, [KeychainOpts]), worker(opcua_nodeset, [NodeSetDir]), - supervisor(opcua_client_sup, []) + supervisor(opcua_client_sup, []), + supervisor(opcua_pubsub_sup, []) ], Childs2 = case application:get_env(start_server) of {ok, false} -> Childs;