Steaming Pipeline

package micro.apps.pipeline

/* ktlint-disable no-wildcard-imports */
import com.github.avrokotlin.avro4k.Avro
import micro.apps.core.LogDefinition.Companion.config
import micro.apps.kbeam.*
import micro.apps.kbeam.coders.AvroToPubsub
import micro.apps.model.Person
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.beam.runners.dataflow.util.TimeUtil
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions
import org.apache.beam.sdk.options.*
import org.apache.beam.sdk.transforms.Create
import org.apache.beam.sdk.transforms.MapElements
import org.apache.beam.sdk.transforms.windowing.AfterWatermark
import org.apache.beam.sdk.transforms.windowing.FixedWindows
import org.apache.beam.sdk.transforms.windowing.Window
import org.joda.time.Duration

/* ktlint-enable no-wildcard-imports */

interface MyStreamingOptions : ApplicationNameOptions, PipelineOptions, StreamingOptions, PubsubOptions, GcpOptions {
    @get:Description("""The Cloud Pub/Sub topic to read from.
        The name should be in the format of projects/<project-id>/topics/<topic-name>.""")
    @get:Default.String("projects/my-project-id/topics/streaming-input")
    @get:Validation.Required
    var inputTopic: ValueProvider<String>

    @get:Description("""The Cloud Pub/Sub topic to publish to.
        The name should be in the format of projects/<project-id>/topics/<topic-name>.""")
    @get:Default.String("projects/my-project-id/topics/streaming-output")
    @get:Validation.Required
    var outputTopic: ValueProvider<String>

    @get:Description(
        """The window duration in which data will be written. Defaults to 5m.
                Allowed formats are:
                Ns (for seconds, example: 5s),
                Nm (for minutes, example: 12m),
                Nh (for hours, example: 2h).")""")
    @get:Default.String("300s")
    var windowDuration: String
}

/**
 * showcase side-input and split
 */
object StreamingPipeline {
    @JvmStatic
    private val logger: FluentLogger = FluentLogger.forEnclosingClass().config()

    @JvmStatic
    fun main(args: Array<String>) {

        logger.atConfig().log("My Args: %s", args)

        val (pipe, options) = PipeBuilder.from<MyStreamingOptions>(args)
        options.isStreaming = true
        // set `pubsubRootUrl` via CLI args for development
        // options.pubsubRootUrl = "http://localhost:8085"

        logger.atInfo()
            .with(single("Runner", String::class.java), options.runner.name)
            .with(single("JobName", String::class.java), options.jobName)
            .log("Started job with:")

        val schema = Schema.Parser().parse(javaClass.getResourceAsStream("/data/person.avsc"))

        // create dummy `keys` to use as `side input` for decryption
        val keys = pipe.apply(Create.of(listOf("aaa", "bbb"))).toList()

        logger.atInfo()
            .with(single("schema", Schema::class.java), schema)
            .with(single("windowDuration", String::class.java), options.windowDuration)
            .with(single("pubsubRootUrl", String::class.java), options.pubsubRootUrl)
            .log()

        val input = pipe
            .apply("Read new Data from PubSub", PubsubIO.readAvroGenericRecords(schema).fromTopic(options.inputTopic))
            // Batch events into 5 minute windows
            .apply("Batch Events, windowDuration: ${options.windowDuration}", Window.into<GenericRecord>(
                FixedWindows.of(TimeUtil.fromCloudDuration(options.windowDuration)))
                .triggering(AfterWatermark.pastEndOfWindow())
                .discardingFiredPanes()
                .withAllowedLateness(Duration.standardSeconds(300)))

            // iterating GenericRecord
            .parDo<GenericRecord, GenericRecord>(
                "decrypt and enrich record",
                sideInputs = listOf(keys)) {
                println(element)
                println(timestamp)
                println(element.schema)
                println("key used to decrypt encrypted field: ${sideInputs[keys][0]}")
                for (field in schema.fields /*element.schema.fields*/) {
                    val fieldKey: String = field.name()
                    println("$fieldKey : ${element.get(fieldKey)}, is encrypted? ${field.getProp("encrypted")}")
                }
                // TODO: may a copy, modify and emit
                // output(element.copy(email = "decrypted email"))
                element
            }

            // GenericRecord to Entity
            .map {
                Avro.default.fromRecord(Person.serializer(), it)
            }

            // decrypt fields
            .parDo<Person, Person>(
                "decrypt and enrich record",
                sideInputs = listOf(keys)) {
                // TODO: may a copy, modify and emit
                output(element.copy(email = "decrypted email"))
            }

        val (old, young) = input.split {
            println(it)
            it.age >= 20
        }

        old.parDo<Person, Void> {
            println("Old: $element")
        }

        young.parDo<Person, Void> {
            println("Young: $element")
        }

        input
            // Entity to GenericRecord
            .map {
                Avro.default.toRecord(Person.serializer(), it)
            }
            .apply(MapElements.via(AvroToPubsubMessage()))
            // write back to PubSub
            .apply("Write PubSub Events", PubsubIO.writeMessages().to(options.outputTopic))

        pipe.run()
    }
}

Last updated