Process Employment Data Through Complete Pipeline
Source:R/process_employment_pipeline.R
process_employment_pipeline.RdAn optimized helper function that chains together vecshift(), merge_original_columns(), and merge_consecutive_employment() operations for efficient memory usage and performance. This function provides a complete pipeline from raw employment contracts to processed temporal segments with merged original data and period consolidation using over_id.
Usage
process_employment_pipeline(
original_data,
apply_vecshift = TRUE,
merge_columns = NULL,
handle_overlaps = !is.null(merge_columns),
collapse_consecutive = TRUE,
consolidate_periods = TRUE,
consolidation_type = "both",
classify_status = TRUE,
status_rules = NULL,
validate_over_id = TRUE,
validate_functions = TRUE,
show_progress = TRUE
)Arguments
- original_data
A data.table containing employment contract records with required columns:
id: Contract identifiercf: Person identifierinizio: Contract start datefine: Contract end dateprior: Employment type indicator
- apply_vecshift
Logical. If TRUE (default), applies vecshift transformation. Set to FALSE if input data is already processed vecshift output.
- merge_columns
Character vector of column names to merge from original_data. Set to NULL (default) to skip column merging step.
- handle_overlaps
Logical. If TRUE (default when merge_columns is specified), processes overlapping employment values. Only applies when merge_columns is not NULL.
- collapse_consecutive
Logical. If TRUE (default), collapses consecutive employment periods using the over_id-based implementation.
- consolidate_periods
Logical. If TRUE (default), enables period consolidation when collapse_consecutive is TRUE. Passed to merge_consecutive_employment().
- consolidation_type
Character. Consolidation strategy when consolidate_periods is TRUE: "both" (default), "overlapping", "consecutive", or "none".
- classify_status
Logical. If TRUE (default), applies employment status classification as a separate step after vecshift transformation.
- status_rules
Optional custom status rules for status classification step. If NULL, uses defaults.
- validate_over_id
Logical. If TRUE (default), validates over_id consistency and duration invariant after vecshift transformation.
- validate_functions
Logical. If TRUE (default), checks for function availability before execution and provides informative error messages.
- show_progress
Logical. If TRUE (default), displays a progress bar showing the current processing step, percentage completion, and estimated time remaining. Uses the 'progress' package if available, falls back to utils::txtProgressBar or simple messages if not available.
Value
A data.table containing the processed employment segments. The exact structure depends on which pipeline steps were applied:
Base vecshift output: cf, inizio, fine, arco, prior, id, durata, stato, over_id
With merged columns: Additional columns from original_data
With period consolidation: collapsed, n_periods, and aggregated values
The result also includes attributes:
pipeline_steps: Logical vector of applied steps
validation_results: over_id and duration validation results
consolidation_stats: Statistics on period consolidation (if applied)
Details
The function performs the following operations in sequence:
vecshift(): Core temporal transformation to create segments with over_id
classify_employment_status(): Applies employment status classification as separate step (optional)
Validation: Validates over_id consistency and duration invariant
merge_original_columns(): Merges additional columns from original data (optional)
merge_overlapping_values(): Handles overlapping employment values (optional)
merge_consecutive_employment(): Consolidates periods using over_id (optional)
The function is optimized for memory efficiency by:
Using data.table's reference semantics to minimize copying
Avoiding intermediate object creation where possible
Allowing selective execution of pipeline steps
Gracefully handling missing functions with fallback behavior
Note
This function automatically detects which processing functions are available and gracefully handles missing dependencies. If merge_original_columns() or merge_consecutive_employment() are not available, the corresponding steps are skipped with a warning message.
The over_id functionality requires vecshift() output with over_id column. If over_id is not available, consolidation falls back to traditional consecutive merging.
Examples
if (FALSE) { # \dontrun{
library(data.table)
# Create sample employment data with metadata
employment_data <- data.table(
id = 1:4,
cf = c("PERSON001", "PERSON001", "PERSON001", "PERSON002"),
inizio = as.Date(c("2023-01-01", "2023-04-01", "2023-07-01", "2023-02-01")),
fine = as.Date(c("2023-03-31", "2023-06-30", "2023-12-31", "2023-11-30")),
prior = c(1, 1, 0, 1),
company = c("CompanyA", "CompanyB", "CompanyC", "CompanyD"),
salary = c(50000, 55000, 25000, 60000),
department = c("IT", "IT", "HR", "Finance")
)
# Complete pipeline with all steps and progress bar
result_full <- process_employment_pipeline(
original_data = employment_data,
merge_columns = c("company", "salary", "department"),
handle_overlaps = TRUE,
collapse_consecutive = TRUE,
consolidate_periods = TRUE,
consolidation_type = "both",
show_progress = TRUE
)
print(result_full)
print(attr(result_full, "validation_results"))
# Only vecshift transformation without progress bar
result_basic <- process_employment_pipeline(
original_data = employment_data,
merge_columns = NULL,
collapse_consecutive = FALSE,
show_progress = FALSE
)
# Pipeline with column merging but no consecutive collapsing
result_merged <- process_employment_pipeline(
original_data = employment_data,
merge_columns = c("company", "salary"),
collapse_consecutive = FALSE,
validate_over_id = TRUE
)
# Process already-transformed vecshift output
segments <- vecshift(employment_data)
result_post <- process_employment_pipeline(
original_data = employment_data,
apply_vecshift = FALSE,
merge_columns = c("company"),
collapse_consecutive = TRUE,
consolidation_type = "overlapping"
)
} # }