On November 11, 2024, the neighborhood successfully launched a brand-new model of AWS provider connectors, an innovative open-source contribution to the tech community. This groundbreaking launch, model 5.0.0, revolutionizes the way data is collected by introducing a cutting-edge supply connector capable of seamlessly learning from diverse information sources.
This submission outlines the benefits of the innovative features in our connector, demonstrating how they can significantly boost the efficiency and reliability of your Apache Flink application.
Apache Flink provides supply and sink connectors to learn from and write to Amazon Kinesis Data Streams. As we focus on the new supply connector in this submission, it is because model 5.0.0 does not bring any enhancements to the sink’s functionality.
Apache Flink is a powerful framework and distributed stream processing engine that enables high-speed computation at in-memory velocity, scalable to meet the demands of large-scale data processing. Introducing a comprehensive, serverless solution for deploying your Flink applications, seamlessly supporting code written in Java, Python, or SQL, and leveraging the full range of available APIs – including SQL, Table, DataStream, and ProcessFunction – to deliver streamlined data processing and analytics.
Apache Flink connectors
Flink enables seamless integration with external systems by providing connectors to popular message brokers, databases, and object stores, facilitating the flow of data between them. Kinesis Knowledge Streams serves as a popular source and destination for real-time data processing and analytics, enabling efficient event-driven architectures in modern applications. Flink provides supply and sink connectors for Kinesis DataStreams.
The following diagram elucidates the underlying pattern architecture.
Before delving further, it is crucial to clarify three terms often confused with one another in data streaming and within the Apache Flink documentation:
- Amazon’s customer-centric approach ensures seamless shopping experiences, with accurate product recommendations and personalized offers.
- The revised text in a different style as a professional editor:
As part of discussing the Apache Flink framework, we delve into the supply connectors that enable the consumption of data from Kinesis Data Streams.
- During this submission, we utilise the timeframe denoting a solitary instance.
The Flinke Kinesis supply connector, a cutting-edge innovation in data streaming.
AWS introduces its latest 5.0.0 model of connectors, featuring a cutting-edge new connector designed to extract insights from Kinesis Data Streams events. The newly introduced connector, named [blank], supplants its predecessor, [previous name], as the go-to option for integrating with Kinesis Knowledge Streams.
The brand’s newly introduced connector offers a range of innovative features while complying with the latest Flink standards. Supply
The interface is compatible with Flink 2.x, serving as the primary mode of operation within the Flink community. , together with eradicating the SourceFunction
interface utilized by legacy connectors. The legacy Kinesis client will no longer function with Flink versions 2.x, as the two have diverged in their support for Amazon’s messaging service.
Organizing the connection using this new connector is hardly dissimilar from how it was done with the legacy Kinesis connector. The DataStream API: Unlocking Real-Time Insights
You can utilize the brand new connector by first installing the required dependencies and then configuring your DataStream pipeline to incorporate the newly installed connector. To achieve this, follow these steps:
Initially, ensure you have the correct version of Apache Beam installed on your system. Then, navigate to the repository hosting the desired connector (e.g., GitHub) and clone it into a local directory. After that, install any required dependencies for the connector by executing the necessary commands in your terminal or command prompt.
Next, create a new Python file for your DataStream pipeline, importing the necessary modules from Apache Beam and your newly installed connector. Define your data processing logic within this file, taking advantage of the capabilities provided by the DataStream API and your chosen connector.
To run your pipeline, execute the following command in your terminal or command prompt: `python -m apache_beam.pipeline –direct YourPipelineFile.py`. This will execute your pipeline using Beam’s direct runner.
For a more robust implementation, consider deploying your pipeline to a cloud-based data processing service such as Google Cloud Dataflow.
To seamlessly integrate the latest connector into your existing utility, consider swapping out outdated dependencies and embracing the innovative addition. The DataStream API’s dependence on a different identifier was rebranded as. flink-connector-aws-kinesis-streams
.
At the time of writing, the current state-of-the-art connector model stands at version 5.0.0, which supports the latest stable Flink releases, including versions 1.19 and 1.20. Although the connector is compatible with Flink 2.0, there is currently no officially released connector available for use with Flink 2.x. Utilizing Flink 1.20, the newly introduced dependency is as follows:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-kinesis streams</artifactId>
<model>5.0.0-1.20</model>
</dependency>
The connector leverages the cutting-edge capabilities of Flink. Supply
interface. This interface modernizes the standard, effectively superseding the outdated legacy. SourceFunction
interface that has been deprecated. SourceFunction
will be fully removed from Flink 2.x.
In your utility, you can now employ a fluent and expressive builder interface to seamlessly create and customize the supply. The minimal setup simply demands the Amazon Resource Name (ARN) of the stream and the deserialization schema.
KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder()
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
.setDeserializationSchema(new SimpleStringSchema())
.construct();
The brand-new supply class is titled KinesisStreamSource
. To avoid being confused with the legacy supply. FlinkKinesisConsumer
.
You may then introduce the supply into the execution environment using the newly fromSource()
technique. When applying this technique, it is essential to explicitly specify the watermark method alongside a reputation for the delivery.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ...
DataStream<String> kinesisRecordsWithEventTimeWatermarks = env.fromSource(
kdsSource,
WatermarkStrategy.<String>forMonotonousTimestamps()
.withIdleness(Length.ofSeconds(1)),
"Kinesis supply");
Here are the few key changes to the interface’s connector, examined in more detail below.
Stream ARN
The Kinesis information stream ARN versus the stream identity: The Amazon Resource Name (ARN) for a Kinesis information stream is in the format arn:aws:kinesisp:{region}:{account_id}:stream/{stream_name}, while the stream identity refers to the unique identifier assigned to the stream when it’s created, which can be used to specify the stream in API operations. Enabling seamless consumption across both area and account boundaries simplifies the process of accessing relevant data from diverse sources.
When working with Amazon Managed Service for Apache Flink, your primary goal is to configure IAM role permissions to enable access to the stream you’re processing. The Amazon Resource Name (ARN) enables you to point to a stream located within a specific AWS Region or account without requiring the assumption of roles or the provision of external credentials.
Specific watermark
What sets a successful startup apart from others is the unique combination of skills, knowledge, and vision that its founders possess. Supply
The explicit outline of a watermark technique is mandatory when connecting the power supply to the operational environment. When a utility exclusively relies on processing-time semantics, you can explicitly WatermarkStrategy.noWatermarks()
.
That’s a significant enhancement in terms of code readability? Assessing current supplies, you likely recognize the type of watermark present or lacking altogether if none exists. Prior to this, numerous connectivity providers had traditionally offered a standard watermark option, allowing users to customize their own settings if needed. Notwithstanding, the default watermark on each connector was surprisingly distinct and convoluted for the average user.
With the brand-new connector, you’ll be able to achieve exactly the same performance as the legacy solution. FlinkKinesisConsumer
default watermarks, utilizing WatermarkStrategy.forMonotonousTimestamps()
As demonstrated in a previous scenario. This technique produces high-quality watermarks primarily based on the approximateArrivalTimestamp
returned by Kinesis Knowledge Streams. The timestamp corresponds exactly to the moment when the file was successfully printed to Kinesis Knowledge Streams.
Idleness and watermark alignment
Using the watermark technique enables you to seamlessly outline data transmission, allowing the watermark to advance smoothly even during periods of reduced network activity or complete silence in a data stream. Consult with industry experts for in-depth information on the specifics of idleness and watermarks pertaining to turbine technology?
A distinctive feature introduced by the innovative brand? Supply
The seamless interface, thoroughly supported by the innovative Kinesis supply chain. The watermark fails to align correctly due to a lack of activity. When a shard advances at a pace faster than its peers, it hampers consumption. When replaying information from a stream, scaling back the amount of data buffered in the utility state can be particularly valuable. Consult with them regarding additional details.
Can you successfully integrate your SQL database with the Desk.com API using a suitable programming language like Python or JavaScript?
“`
import requests
import json
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
# Set up your SQL database connection using SQLAlchemy
engine = create_engine(‘sqlite:///path/to/your/database.db’)
connection = engine.connect()
# Define the Desk API credentials and endpoint URL
desk_api_key = ‘YOUR_DESK_API_KEY’
desk_api_secret = ‘YOUR_DESK_API_SECRET’
desk_base_url = ‘https://api.desk.com/v2′
# Use the requests library to make a GET request to the Desk API’s tickets endpoint
response = requests.get(f'{desk_base_url}/tickets’,
headers={‘Authorization’: f’Bearer {desk_api_key}’},
params={‘limit’: 10})
# Convert the JSON response to a Python dictionary
ticket_data = json.loads(response.text)
# Use SQLAlchemy to execute a SQL query on your database
query = text(“SELECT * FROM tickets WHERE status = ‘open'”)
result = connection.execute(query).fetchall()
# Iterate over the result set and perform any necessary processing or manipulation
for row in result:
# Do something with each ticket, such as updating its status or creating a new task
pass
# Close the SQL database connection when you’re done
connection.close()
“`
To utilize Flink 1.20, dependencies containing each Kinesis source and sink for the Desk API and SQL comprise the following configuration (supporting both Flink 1.19 and 1.20 versions).
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<model>5.0.0-1.20</model>
</dependency>
This dependency satisfies both the requirements of the new inventory and the existing inventory. When consulting with others, consider whether you’re planning to utilize each tool in the same application?
When referencing the Kinesis supply in SQL or the Desk API, use the “kinesis” connector identifier, as this is a relic from the legacy supply setup. Despite these changes, numerous parameters have undergone significant modifications within the context of this innovative supply.
CREATE TABLE KinesisTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`conduct` STRING,
`ts` TIMESTAMP(3)
) PARTITIONED BY (`user_id`, `item_id`)
WITH (
'connector' = 'kinesis',
'stream.arn' = 'arn:aws:kinesis:us-east-1:012345678901:stream/my_stream_name',
'aws.area' = 'us-east-1',
'supply.init.place' = 'LATEST',
'format' = 'csv'
);
Several key connector selections have undergone revisions since the legacy system’s inception.
stream.arn
The new text: Specifies the Amazon Resource Name (ARN) of the stream, contrasting with the stream identifier employed in legacy infrastructure.init.initpos
defines the beginning place. While this feature operates with comparable efficacy to its traditional counterpart, the distinctiveness of identification lies in a vastly altered paradigm. It was beforehandscan.stream.initpos
.
What are the key findings from our analysis of connector options?
New options and enhancements
We concentrate on a pivotal feature introduced by the innovative connector’s latest release. The various options that exist within the DataStream API, the Desk API, and SQL are readily accessible.
Ordering ensures
Significant advancements have been made with the introduction of a revolutionary new connector, resulting in enhanced ordering guarantees.
With Kinesis Knowledge Streams, the ordering of messages is preserved throughout partitionId
. That is achieved by organizing all relevant details in a singular partitionId
in the identical shard. Regardless of scale, when scaling streams, efficiently splitting and merging shards, consistency in identical information is maintained throughout. partitionId
Discover yourself in a fresh, uncharted world. During resharding, Kinesis preserves a record of the parent-child lineage.
A recognised limitation of the legacy Kinesis supply is its inability to trace and observe the intricate parent-child shard relationships. As a direct result, reordering was inherently unreliable during resharing instances. The problem persisted when users replayed earlier messages from a rehashed stream, which had occurred due to changes in the ordering sequence potentially resulting in misplacement issues. This alteration introduced non-determinism in watermark era and event-time processing.
The brand-new connector ensures that ordering is preserved even during resharding events.
The process works by tracing the parent-child shard lineage and incorporating all available information from an ancestor shard before progressing to the child shard.
A greater default shard assigner
Each Kinesis information stream consists of numerous shards. The Flink supply operator executes multiple parallel subtasks for enhanced performance. This component determines the optimal method for allocating the fragments of the stream across various supply subtasks. When assigning shards, the complexity arises from potential resharing necessitated by scale-up or -down stream operations, involving shard cuts and merges.
This innovative connector debuts with a state-of-the-art default assigner. UniformShardAssigner
. The stream is evenly distributed across assignments. partitionId
Throughout parallel subtasks, a crucial consideration arises particularly when resharding occurs. That is achieved by tuning into the array ofHashKeyRange
) of every shard.
Although the shard assigner was available in previous connector models, it was not the default setting due to backward compatibility concerns; users had to manually configure it. It’s certainly not the case that this is true for brand-new supplies. The former default shard assigner, a relic of legacy designs FlinkKinesisConsumer
The ancient sorcerer, with eyes gleaming like polished onyx, stood poised to unleash a maelstrom of mystical energy. partitionId
) throughout subtasks. During re-sharding, the distribution of precise information may become unbalanced, as a mix of open and closed shards exists in the stream, leading to potentially disparate outcomes. Consult on a detailed basis for additional information.
Diminished JAR dimension
The size of the JAR file has been dramatically reduced by 99%, shrinking from approximately 60 megabytes to just 200 kilobytes. The reduced scale of your utility’s usage with the connector significantly diminishes its overall effectiveness. A reduced JAR can significantly accelerate numerous operations that necessitate redeploying the application.
AWS SDK for Java 2.x
The newly introduced connector is built upon the latest technology, offering multiple choices and enhancing support for non-blocking input/output operations seamlessly. This ensures the integration remains future-proof with the upcoming release of the AWS SDK v1.
AWS SDK built-in retry technique
The brand-new connector leverages AWS’s native retry mechanism within its SDK, differing from the custom retry approach employed by the legacy connector. Using the Amazon Web Services (AWS) Software Development Kit (SDK) enables more accurate error classification by distinguishing between retryable and non-retryable exceptions.
The existing Kinesis consumer library has been decoupled from its dependencies on the Kinesis Shopper Library and Kinesis Producer Library.
The newly released connector package does not include KCL or KPL, thereby enabling a significant reduction in the JAR size, as previously discussed.
An implication of this modification is that the newly designed connector no longer facilitates de-aggregation in the field. Unless you’re leveraging the KPL’s features to disseminate data to a specific audience or promoting content that resonates with your target market, enabling this functionality won’t yield significant results. When using KPL (Kilowatt Peak Load) aggregation from your producers, consider developing a tailored DeserializationSchema
to decompose the data embedded in the supply chain.
Migrating from the legacy connector
Flink sources typically store their position within the input data streams, denoted by checkpoint locations, which are managed by Amazon Managed Service for Apache Flink. When an application ceases and restarts, or if it’s replaced to deploy a change, the typical behavior is that the app saves its current state in a snapshot just prior to stopping, then restores it upon restarting. This setting allows Flink to guarantee exactly-once processing of the data stream.
Notwithstanding the primary transformations introduced by the latest developments, KinesisSource
The compatibility issue hinders seamless integration. FlinkKinesisConsumer
. Due to this reason, improving the supply of a current utility prevents immediate restoration of the supply point from its previous state.
Because of this, contemplating a move to the newly available supply necessitates careful thought. The migration process’s specific trajectory depends on the unique requirements of your project. There are two common situations:
- Your utility leverages the DataStream API to effectively define
- If your utility relies heavily on the Desk API, SQL, or DataStream, make sure to always define a UID for each operator instance.
Let’s cover every one of those situations.
The data processing pipeline leverages the capabilities of the DataStream API, with each operator uniquely identified by a custom-generated UID.
Considerably reducing the potential for unintended consequences, the strategy would involve deliberately resetting the status quo of the supply operator while preserving the integrity of all other utility states. The culmination of efforts is outlined below:
- Rebuild your foundation by upgrading outdated dependencies, substituting legacy code with modern alternatives.
FlinkKinesisConsumer
with the brand newKinesisSource
. - TheSupplyOperator-12345 Eliminate all distinct operators’ UIDs; this selective resetting of the supply state preserves the states of other operators while affecting none.
- Configuring the supply starting point using
AT_TIMESTAMP
The changes will be set to take effect a few seconds prior to their actual deployment time, ensuring a seamless transition for users. Discovering ways to set a starting point? By proposing that timestamps are passed as runtime properties, we enhance versatility in this regard. When no previous savepoint or snapshot is available to restore the application’s state, the configured supply starting point is utilized as a fallback. By deliberately modifying the unique identifier assigned to the supply operator, we are explicitly overriding its natural sequence. - The team upgraded to Amazon SageMaker Pipelines, seamlessly integrating the revamped JAR file, now housing the refactored utility. Let’s restart from scratch. Let’s begin with a clean slate, no assumptions, no prior knowledge. We’ll start fresh with the default settings. I’m ready when you are!
allowNonRestoredState = true
. If the checkpointing mechanism is not properly configured in Flink, the application may not restart as intended, resulting in loss of the previously saved state due to a lack of restoration capabilities. See here for additional details aboutallowNonRestoredState
.
This approach may compromise exactly-once processing of data from the supply, potentially jeopardizing the integrity of internal states? Cautiously weigh the repercussions of reprocessing within your system, taking into account the potential impact on subsequent processes as well as any duplication’s effects.
The usage of the Desk API or SQL suggests that your application is well-structured; however, it’s crucial to define a unique identifier (UID) for each operator to ensure seamless integration with the other components.
You won’t be able to selectively reset the state of the supply operator.
Why does this occur? When using the Desk API, SQL, or DataStream API without explicitly defining an operator’s UID, Flink automatically assigns unique identifiers to all operators based on the structure of the job graph in your application.
Identifiers are employed to track the state of each operator when a snapshot is taken, allowing for seamless revival to the correct operator upon restarting the application.
Changes to the application may initiate adjustments within the underlying data stream. This modifications the auto-generated identifier. When using the DataStream API and providing a custom UID, Flink leverages these identifiers in place of automatically generated ones, thereby enabling seamless mapping back to the operator even when changes are made to the application. As a fundamental characteristic of Apache Flink, this constraint is explicitly stated. Enabling allowNonRestoredState
However, this challenge isn’t necessarily addressed by Flink’s built-in snapshot mechanism, since it cannot accurately map the state saved in the snapshot to the actual operators involved in the pipeline following the modifications?
During a migration scenario, one potential solution is to reset the status of your utilities. You can obtain this feature in Amazon Managed Service for Apache Flink by deciding whether to run your application on the managed service’s default configuration or customize it according to your specific needs.SKIP_RESTORE_FROM_SNAPSHOT
When deploying a change that replaces the supply connector.
Once the newly applied configuration is operational, you can switch back to the default behavior upon restarting the application, leveraging the latest updates.RESTORE_FROM_LATEST_SNAPSHOT
). When restarting the application, this manner ensures that no critical information is lost.
To determine the most suitable connector bundle and model for your application, consider the specific requirements of your system, including power transmission, signal integrity, and environmental factors. Assess the level of electromagnetic interference (EMI), electrical noise, and vibration tolerance required by your system. This will enable you to select a connector that effectively manages these stresses while ensuring reliable data transfer.
The dependency model that one might choose is typically -
. The latest Kinesis Connector model is version 5.0.0, marking a significant milestone in data integration and processing capabilities. When using a Flink runtime model of version 1.20.x, ensure that your project’s dependencies include a suitable implementation of the DataStream API. 5.0.0-1.20
.
For the latest advancements in connector technology, consult.
Connector artifact
Prior to version 4.x, distinct packages were required for both the power source (supply) and load device (sink). The additional layer of intricacy has been successfully removed with the introduction of Model 5.x.
The following dependency, bundled in your Java utility or Python functions using Maven, accommodates the latest versions of both source and sink connectors, as illustrated in the example.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<model>5.0.0-1.20</model>
</dependency>
Utilize the latest available version. At the time of writing, this version is 5.0.0. Can you please confirm the obtainable artifact variations? Flink’s scalability and fault-tolerance features allow it to process large amounts of data in a distributed manner, making it an ideal choice for processing data streams in real-time. The Flink cluster was created with earlier instances using Flink 1.20.0.
Connector artifacts for Python utility
For Python developers, we recommend managing JAR dependencies via Maven, as demonstrated in the project. When deploying Flink utilities, those using a single JAR can utilize an artifact containing all transitive dependencies. In the context of the brand-new Kinetic supply and sink system, this phenomenon is commonly known as flink-sql-connector-aws-kinesis-streams
. The package holds a fresh stock of merchandise exclusively. Consult on the most suitable bundle, taking into account whether you require both the innovative and the traditional inventory.
Conclusion
The introduction of the brand-new Flink Kinesis supply connector brings numerous enhancements to stability and efficiency, effectively future-proofing your utility for seamless integration with Flink 2.x capabilities. Effective handling of watermark idle time and alignment is crucial when working with event-time semantics in your application. The ability to maintain file ordering ensures consistent information retention, particularly crucial during stream resharding, where preserving order is vital for seamless playback of previously stored data from the reshared stream.
When transitioning users of your utility from the outdated Kinesis supply connector to your modern alternative, it is crucial to meticulously orchestrate this change and ensure that all necessary steps are taken into account. Additionally, be certain to adhere to Flink’s best practices by assigning a unique identifier (UID) to every DataStream operator.
You will discover a working instance of Java DataStream API functionality utilising the newly introduced connector, embedded within.
To deepen your understanding of the newly introduced Flink Kinesis supply connector, engage in a thorough discussion with both .
Concerning the Writer
Serves as a Senior Streaming Options Architect within Amazon Web Services (AWS), providing expertise and solutions to clients across the Europe, Middle East, and Africa (EMEA) regions. With a quarter-century of experience, he has developed and implemented cloud-based, data-driven solutions across diverse sectors through both consulting and product companies. With extensive experience in open-source technologies, he has made significant contributions to various projects, including the development of Apache Flink.