|
46 | 46 | #define DEFERRED_DELETION_TICKS 25 // Five seconds
|
47 | 47 | #define VFLOW_ID_CUSTOM 0xffffffffffffffff
|
48 | 48 |
|
| 49 | +// If the number of discretionary records rises above |
| 50 | +// this threshold, stop production of them to avoid |
| 51 | +// excessive memory growth. |
| 52 | +#define DISCRETIONARY_RECORDS_STOP_THRESHOLD 5000 |
| 53 | + |
| 54 | +// If the number of discretionary records falls below |
| 55 | +// this threshold, allow their production to start again. |
| 56 | +#define DISCRETIONARY_RECORDS_START_THRESHOLD 4990 |
| 57 | + |
49 | 58 | //
|
50 | 59 | // If the record_id value is VFLOW_ID_CUSTOM, use the full_id for arbitrary strings, otherwise use ${s.source_id}:${record_id}
|
51 | 60 | //
|
@@ -148,6 +157,10 @@ static const int rate_span = 10; // Ten-second rolling av
|
148 | 157 | static sys_atomic_t site_configured;
|
149 | 158 |
|
150 | 159 | typedef struct {
|
| 160 | + // How many records of types that support discretionary currently exist? |
| 161 | + // Their creation can be interrupted if necessary to avoid excessive memory growth. |
| 162 | + sys_atomic_t discretionary_record_count; |
| 163 | + sys_atomic_t emit_discretionary_records; |
151 | 164 | qdr_core_t *router_core;
|
152 | 165 | sys_mutex_t lock;
|
153 | 166 | sys_mutex_t id_lock;
|
@@ -864,6 +877,15 @@ static void _vflow_create_router_record(void)
|
864 | 877 | */
|
865 | 878 | static void _vflow_free_record_TH(vflow_record_t *record, bool recursive)
|
866 | 879 | {
|
| 880 | + // If this is one of the record types that supports discretionary functionality, |
| 881 | + // count it. If the number currently existing is now below the |
| 882 | + // resumption threshold, indicate that we should resume producing them. |
| 883 | + if (discretionary_records[record->record_type]) { |
| 884 | + if (sys_atomic_dec(&state->discretionary_record_count) <= DISCRETIONARY_RECORDS_START_THRESHOLD) { |
| 885 | + sys_atomic_set(&state->emit_discretionary_records, 1); |
| 886 | + } |
| 887 | + } |
| 888 | + |
867 | 889 | //
|
868 | 890 | // If this record is a child of a parent, remove it from the parent's child list
|
869 | 891 | //
|
@@ -1830,6 +1852,19 @@ vflow_record_t *vflow_start_record_custom_id(vflow_record_type_t record_type, vf
|
1830 | 1852 | //=====================================================================================
|
1831 | 1853 | vflow_record_t *vflow_start_record(vflow_record_type_t record_type, vflow_record_t *parent)
|
1832 | 1854 | {
|
| 1855 | + // If this is one of the record types that supports discretionary functionality, |
| 1856 | + // and if we already have the maximum allowed number of those, just don't produce |
| 1857 | + // another one. |
| 1858 | + // Otherwise, produce and count it, and check if this one put us over the limit. |
| 1859 | + if (discretionary_records[record_type]) { |
| 1860 | + if (0 == sys_atomic_get(&state->emit_discretionary_records)) { |
| 1861 | + return 0; |
| 1862 | + } |
| 1863 | + if (sys_atomic_inc(&state->discretionary_record_count) >= DISCRETIONARY_RECORDS_STOP_THRESHOLD) { |
| 1864 | + sys_atomic_set(&state->emit_discretionary_records, 0); |
| 1865 | + } |
| 1866 | + } |
| 1867 | + |
1833 | 1868 | vflow_record_t *record = new_vflow_record_t();
|
1834 | 1869 | vflow_work_t *work = _vflow_work(_vflow_start_record_TH);
|
1835 | 1870 | ZERO(record);
|
@@ -1862,6 +1897,12 @@ vflow_record_t *vflow_start_co_record_iter(vflow_record_type_t record_type, qd_i
|
1862 | 1897 | // search/replacement algorithm in _vflow_process_co_record_TH will need to be re-written in a more general way.
|
1863 | 1898 | //
|
1864 | 1899 | assert(record_type == VFLOW_RECORD_BIFLOW_TPORT);
|
| 1900 | + // The above record type is 'discretionary', which |
| 1901 | + // means we may have been told to temporarily halt |
| 1902 | + // production of them. |
| 1903 | + if (sys_atomic_get(&state->emit_discretionary_records) == 0) { |
| 1904 | + return 0; |
| 1905 | + } |
1865 | 1906 | vflow_record_t *record = new_vflow_record_t();
|
1866 | 1907 | ZERO(record);
|
1867 | 1908 | record->record_type = record_type;
|
@@ -1902,14 +1943,15 @@ void vflow_end_record(vflow_record_t *record)
|
1902 | 1943 | void vflow_serialize_identity(const vflow_record_t *record, qd_composed_field_t *field)
|
1903 | 1944 | {
|
1904 | 1945 | char buffer[IDENTITY_MAX + 1];
|
1905 |
| - assert(!!record); |
1906 | 1946 | if (!!record) {
|
1907 | 1947 | if (record->identity.record_id == VFLOW_ID_CUSTOM) {
|
1908 | 1948 | qd_compose_insert_string(field, record->identity.s.full_id);
|
1909 | 1949 | } else {
|
1910 | 1950 | snprintf(buffer, IDENTITY_MAX, "%s:%"PRIu64, record->identity.s.source_id, record->identity.record_id);
|
1911 | 1951 | qd_compose_insert_string(field, buffer);
|
1912 | 1952 | }
|
| 1953 | + } else { |
| 1954 | + qd_compose_insert_null(field); |
1913 | 1955 | }
|
1914 | 1956 | }
|
1915 | 1957 |
|
@@ -2440,6 +2482,8 @@ static void _vflow_init(qdr_core_t *core, void **adaptor_context)
|
2440 | 2482 | state = NEW(vflow_state_t);
|
2441 | 2483 | ZERO(state);
|
2442 | 2484 |
|
| 2485 | + sys_atomic_init(&state->discretionary_record_count, 0); |
| 2486 | + sys_atomic_init(&state->emit_discretionary_records, 1); |
2443 | 2487 | state->router_core = core;
|
2444 | 2488 | state->hostname = getenv("HOSTNAME");
|
2445 | 2489 | size_t hostLength = !!state->hostname ? strlen(state->hostname) : 0;
|
|
0 commit comments