Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion include/opcua.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,37 @@
-define(OBJ_SERVER_TYPE, 2004).
-define(OBJ_SERVER_STATUS_TYPE, 2138).


% Attribute Id
% ------------
% Every node in an OPC UA information model contains attributes depending on
% the node type. Possible attributes are as follows:
-define(UA_ATTRIBUTEID_NODEID, 1).
-define(UA_ATTRIBUTEID_NODECLASS, 2).
-define(UA_ATTRIBUTEID_BROWSENAME, 3).
-define(UA_ATTRIBUTEID_DISPLAYNAME, 4).
-define(UA_ATTRIBUTEID_DESCRIPTION, 5).
-define(UA_ATTRIBUTEID_WRITEMASK, 6).
-define(UA_ATTRIBUTEID_USERWRITEMASK, 7).
-define(UA_ATTRIBUTEID_ISABSTRACT, 8).
-define(UA_ATTRIBUTEID_SYMMETRIC, 9).
-define(UA_ATTRIBUTEID_INVERSENAME, 10).
-define(UA_ATTRIBUTEID_CONTAINSNOLOOPS, 11).
-define(UA_ATTRIBUTEID_EVENTNOTIFIER, 12).
-define(UA_ATTRIBUTEID_VALUE, 13).
-define(UA_ATTRIBUTEID_DATATYPE, 14).
-define(UA_ATTRIBUTEID_VALUERANK, 15).
-define(UA_ATTRIBUTEID_ARRAYDIMENSIONS, 16).
-define(UA_ATTRIBUTEID_ACCESSLEVEL, 17).
-define(UA_ATTRIBUTEID_USERACCESSLEVEL, 18).
-define(UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL,19).
-define(UA_ATTRIBUTEID_HISTORIZING, 20).
-define(UA_ATTRIBUTEID_EXECUTABLE, 21).
-define(UA_ATTRIBUTEID_USEREXECUTABLE, 22).
-define(UA_ATTRIBUTEID_DATATYPEDEFINITION, 23).
-define(UA_ATTRIBUTEID_ROLEPERMISSIONS, 24).
-define(UA_ATTRIBUTEID_USERROLEPERMISSIONS, 25).
-define(UA_ATTRIBUTEID_ACCESSRESTRICTIONS, 26).
-define(UA_ATTRIBUTEID_ACCESSLEVELEX, 27).
%%% TYPES %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%-- OPCUA Types Records --------------------------------------------------------
Expand Down
13 changes: 13 additions & 0 deletions src/opcua_codec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
-export([unpack_type/3]).
-export([unpack_enum/2]).
-export([unpack_option_set/2]).
-export([unpack_variant/1]).
-export([builtin_type_name/1]).
-export([builtin_type_id/1]).

Expand Down Expand Up @@ -119,6 +120,18 @@ unpack_option_set(#opcua_option_set{fields = Fields}, Value) ->
end, [], Fields),
lists:reverse(FieldNames).

-spec unpack_variant(opcua:variant()) -> term().
unpack_variant(#opcua_variant{type = extension_object,
value = #opcua_extension_object{
type_id = DataTypeID,
body = Data
}}) ->
% Not sure what to do here ...
% For now I just extract the type and value
{DataTypeID, Data};
unpack_variant(#opcua_variant{type = _, value = _}) ->
error(bad_not_implemented).

builtin_type_name( 1) -> boolean;
builtin_type_name( 2) -> sbyte;
builtin_type_name( 3) -> byte;
Expand Down
140 changes: 140 additions & 0 deletions src/opcua_pubsub.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
-module(opcua_pubsub).

-export([start_link/0]).


-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).


-export([add_published_dataset/1]).
-export([add_published_dataset_field/3]).

-export([new_connection/3]).

-export([add_reader_group/2]).
-export([add_writer_group/2]).

-export([add_dataset_reader/3]).
-export([create_target_variables/4]).

-export([add_dataset_writer/4]).

-export([start_connection/1]).
-export([stop_connection/1]).
-export([register_connection/1]).
-export([get_published_dataset/1]).

-include("opcua_pubsub.hrl").

-record(state, {
state,
connections = #{},% Maps Ids to Pids
published_datasets = #{}
}).

%%% API FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

start_connection(Connection) ->
gen_server:call(?MODULE, {?FUNCTION_NAME, Connection}).

stop_connection(ConnectionID) ->
gen_server:call(?MODULE, {?FUNCTION_NAME, ConnectionID}).

% Publised Data Set configuration: PDS are independent
add_published_dataset(Config) ->
gen_server:call(?MODULE, {?FUNCTION_NAME, Config}).

% Adds definitions per-field for a PublishedDataSet
add_published_dataset_field(PDS_ID, FieldsMetaData, FieldsSource) ->
gen_server:call(?MODULE, {?FUNCTION_NAME, PDS_ID, FieldsMetaData, FieldsSource}).

get_published_dataset(PDS_ID) ->
gen_server:call(?MODULE, {?FUNCTION_NAME, PDS_ID}).

new_connection(Url, Config, Opts) ->
opcua_pubsub_connection:create(Url, Config, Opts).

% Just a place to group DataSetReaders
add_reader_group(Connection, Config) ->
opcua_pubsub_connection:add_reader_group(Config, Connection).

add_writer_group(Connection, Config) ->
opcua_pubsub_connection:add_writer_group(Config, Connection).

% define a DataSetReader, this includes its DataSetFieldMetaData
add_dataset_reader(Connection, RG_id, DSR_cfg) ->
opcua_pubsub_connection:add_dataset_reader(RG_id, DSR_cfg, Connection).

% Add target variables to tell a DataSetReader where to write the decoded Fields
create_target_variables(Connection, RG_id, DSR_id, Cfg) ->
opcua_pubsub_connection:create_target_variables(RG_id, DSR_id, Cfg, Connection).

add_dataset_writer(Connection, WG_id, PDS_id, DWR_cfg) ->
opcua_pubsub_connection:add_dataset_writer(WG_id, PDS_id, DWR_cfg, Connection).

% INTERNAL API %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

register_connection(ID) ->
gen_server:cast(?MODULE, {?FUNCTION_NAME, ID, self()}).

% GEN_SERVER callbacks %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([]) ->
{ok, #state{state = operational}}.

handle_call({start_connection, ConnectionConfig}, _, S) ->
ID = uuid:get_v4(),
{ok, _} = supervisor:start_child(opcua_pubsub_connection_sup, [ID, ConnectionConfig]),
{reply, {ok, ID}, S};
handle_call({stop_connection, ConnectionID}, _, #state{connections = Conns} = S) ->
Pid = maps:get(ConnectionID, Conns),
ok = supervisor:terminate_child(opcua_pubsub_connection_sup, Pid),
NewMap = maps:remove(ConnectionID, Conns),
{reply, ok, S#state{connections = NewMap}};
handle_call({add_published_dataset, Config}, _, #state{published_datasets = PDSs} = S) ->
PDS = h_add_published_dataset(Config),
ID = uuid:get_v4(),
NewMap = maps:put(ID, PDS, PDSs),
{reply, {ok, ID}, S#state{published_datasets = NewMap}};
handle_call({add_published_dataset_field, PDS_id, FieldsMetadata, FieldsSources},
_, #state{published_datasets = PDSs} = S) ->
PDS = maps:get(PDS_id, PDSs),
NewPDS = h_add_published_dataset_field(PDS, FieldsMetadata, FieldsSources),
NewMap = maps:put(PDS_id, NewPDS, PDSs),
{reply, ok, S#state{published_datasets = NewMap}};
handle_call({get_published_dataset, PDS_ID},
_, #state{published_datasets = PublishedDatasets} = S) ->
{reply, maps:get(PDS_ID, PublishedDatasets), S}.

handle_cast({register_connection, ID, Pid}, #state{connections = Conns} = State) ->
{noreply, State#state{connections = maps:put(ID, Pid, Conns)}}.

handle_info(_, S) ->
{noreply, S}.


%%% INTERNAL FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

h_add_published_dataset(Config) ->
% TODO add the PDS as object to the address space
Config.

h_add_published_dataset_field(
#published_dataset{
dataset_metadata = #dataset_metadata{fields = MDFields} = DM,
dataset_source = DSSource
} = PDS,
FieldsMetaData, FieldsSource) ->

% TODO do more then just copy the provided configuration
% check for correctness in the config
% and show this stuff in the address space
PDS#published_dataset{
dataset_metadata = DM#dataset_metadata{
fields = MDFields ++ FieldsMetaData
},
dataset_source = DSSource ++ FieldsSource
}.
141 changes: 141 additions & 0 deletions src/opcua_pubsub.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
-define(UA_PUBLISHERIDTYPE_BYTE,0).
-define(UA_PUBLISHERIDTYPE_UINT16,1).
-define(UA_PUBLISHERIDTYPE_UINT32,2).
-define(UA_PUBLISHERIDTYPE_UINT64,3).
-define(UA_PUBLISHERIDTYPE_STRING,4).

-type pubsub_state_machine() :: operational | error | enabled | paused.

-record(connection_config, {
publisher_id,
publisher_id_type
}).

-record(dataset_mirror,{}).

-record(target_variable,{
dataset_field_id = 0 :: non_neg_integer(),
receiver_index_range,
target_node_id, % node_id to write to
attribute_id, % attribute to write
write_index_range,
override_value_handling,
override_value
}).

-record(dataset_field_metadata,{
name :: binary(),
description :: undefined | binary(),
field_flags, % This flag indicates if the field is promoted to the NetworkMessage header
builtin_type :: opcua:builtin_type(),
data_type :: opcua:node_id(),
valueRank :: integer(),
array_dimensions,
maxStringLength,
dataset_field_id = 0 :: non_neg_integer(),
properties
}).

-record(dataset_metadata,{
name,
description,
fields = [] :: list(#dataset_field_metadata{}),
dataset_class_id,
configuration_version :: undefined | {non_neg_integer(),non_neg_integer()}
}).

-record(dataset_reader_config,{
name :: binary(),
publisher_id,
publisher_id_type,
writer_group_id,
dataset_writer_id,
dataset_metadata :: #dataset_metadata{}
}).

-record(published_variable,{
published_variable,
attribute_id,
sampling_interval_hint = -1,
deadband_type = 0 :: 0 | 1 | 2,
deadband_value = 0.0 :: float(),
index_rande,
substitute_value,
metadata_properties = []
}).

-record(published_events, {
event_notifier :: opcua:node_id(),
selected_fields :: list(),
filter
}).

-type published_dataset_source() :: list(#published_variable{}) |
#published_events{}.

-record(published_dataset,{
name,
dataset_folder :: undefined | list(binary()),% path to the destination folder
dataset_metadata :: #dataset_metadata{},
extension_fields,
dataset_source = [] :: published_dataset_source()
}).

-define(UADP_NET_MSG_CONTENT_MASK_PUBLISHER_ID, (1 bsl 0)).
-define(UADP_NET_MSG_CONTENT_MASK_GROUP_HEADER, (1 bsl 1)).
-define(UADP_NET_MSG_CONTENT_MASK_WRITER_GROUP_ID, (1 bsl 2)).
-define(UADP_NET_MSG_CONTENT_MASK_GROUP_VERSION, (1 bsl 3)).
-define(UADP_NET_MSG_CONTENT_MASK_NET_MSG_NUM, (1 bsl 4)).
-define(UADP_NET_MSG_CONTENT_MASK_SEQ_NUM, (1 bsl 5)).
-define(UADP_NET_MSG_CONTENT_MASK_PAYLOAD_HEADER, (1 bsl 6)).
-define(UADP_NET_MSG_CONTENT_MASK_TIMESTAMP, (1 bsl 7)).
-define(UADP_NET_MSG_CONTENT_MASK_PICOSECONDS, (1 bsl 8)).
-define(UADP_NET_MSG_CONTENT_MASK_DATASET_CLASSID, (1 bsl 9)).
-define(UADP_NET_MSG_CONTENT_MASK_PROMOTED_FIELDS, (1 bsl 10)).

-define(DEFAULT_NET_MSG_CONTENT,
?UADP_NET_MSG_CONTENT_MASK_PUBLISHER_ID
bor ?UADP_NET_MSG_CONTENT_MASK_GROUP_HEADER
bor ?UADP_NET_MSG_CONTENT_MASK_WRITER_GROUP_ID
bor ?UADP_NET_MSG_CONTENT_MASK_PAYLOAD_HEADER).

-record(uadp_writer_group_message_data,{
groupVersion,
dataSetOrdering,
networkMessageContentMask = ?DEFAULT_NET_MSG_CONTENT,
samplingOffset,
publishingOffset
}).

-record(writer_group_config,{
enabled = true :: boolean(),
name,
writer_group_id,
publishing_interval,
keep_alive_time,
priority,
locale_ids,
transport_settings,
message_settings = #uadp_writer_group_message_data{}
}).

-define(UADP_DATA_SET_FIELD_MASK_TIMESTAMP, 1).
-define(UADP_DATA_SET_FIELD_MASK_PICOSECONDS, (1 bsl 1)).
-define(UADP_DATA_SET_FIELD_MASK_STATUS, (1 bsl 2)).
-define(UADP_DATA_SET_FIELD_MASK_MAJORVERSION, (1 bsl 3)).
-define(UADP_DATA_SET_FIELD_MASK_MINORVERSION, (1 bsl 4)).
-define(UADP_DATA_SET_FIELD_MASK_SEQUENCENUMBER, (1 bsl 5)).

-define(DEFAULT_DATA_SET_FIELD_CONTENT,
?UADP_DATA_SET_FIELD_MASK_TIMESTAMP).

-record(dataset_writer_config,{
name :: binary(),
dataset_writer_id :: non_neg_integer(),
dataset_field_content_mask = ?DEFAULT_DATA_SET_FIELD_CONTENT,
keyframe_count = 1 :: non_neg_integer(),
dataset_name :: undefined | binary(),
transport_settings,
message_settings
}).

Loading