Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query params not replaced, can you provide any SQL example ? #136

Open
giulio1979 opened this issue Nov 16, 2024 · 7 comments
Open

Query params not replaced, can you provide any SQL example ? #136

giulio1979 opened this issue Nov 16, 2024 · 7 comments

Comments

@giulio1979
Copy link

Hi, I am facing a bit of an issue and I am not sure what I am missing:

CREATE OR REPLACE TABLE api_driver_safety_records (
driverId BIGINT,
crashCount INT,
harshAccelCount INT,
harshBrakingCount INT,
harshTurningCount INT,
harshEvents ARRAY<
ROW<
harshEventType STRING,
timestampMs BIGINT,
vehicleId BIGINT
>
>,
safetyScore DOUBLE,
safetyScoreRank STRING,
timeOverSpeedLimitMs BIGINT,
totalDistanceDrivenMeters DOUBLE,
totalHarshEventCount INT,
totalTimeDrivenMs BIGINT,
-- Metadata fields
processing_time AS PROCTIME(),
PRIMARY KEY (driverId) NOT ENFORCED
) WITH (
'connector' = 'rest-lookup',
'url' = 'https://flinkhttptest.requestcatcher.com/v1/fleet/drivers/{driverId}/safety/score',
'format' = 'json',
'asyncPolling' = 'false',
'lookup.max-retries' = '3',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.max-rows' = '200',
'gid.connector.http.security.oidc.token.request' = 'Bearer test',
'gid.connector.http.source.lookup.request.thread-pool.size' = '1'
);

insert into kafka_driver_safety_records
SELECT
d.driverId,
s.crashCount,
s.harshAccelCount,
s.harshBrakingCount,
s.harshTurningCount,
s.harshEvents,
s.safetyScore,
s.safetyScoreRank,
s.timeOverSpeedLimitMs,
s.totalDistanceDrivenMeters,
s.totalHarshEventCount,
s.totalTimeDrivenMs,
CURRENT_TIMESTAMP as processing_time
FROM drivers_view AS d
JOIN api_driver_safety_records FOR SYSTEM_TIME AS OF d.proc_time AS s
ON d.driverId = s.driverId;

But as far as I see the driverId gets appended rather then replaced:

Caused by: java.net.URISyntaxException: Illegal character in path at index 60: https://flinkhttptest.requestcatcher.com/v1/fleet/drivers/{driverId}/safety/score?driverId=4356456
at java.net.URI$Parser.fail(URI.java:2913) ~[?:?]
at java.net.URI$Parser.checkChars(URI.java:3084) ~[?:?]
at java.net.URI$Parser.parseHierarchical(URI.java:3166) ~[?:?]
at java.net.URI$Parser.parse(URI.java:3114) ~[?:?]
at java.net.URI.(URI.java:600) ~[?:?]
at com.getindata.connectors.http.internal.utils.uri.URIBuilder.(URIBuilder.java:73) ~[blob_p-2b38d7220f14718bc605e0994b819fe148bb3b80-c1d558420f77f39328360a4ae3905558:?]
at com.getindata.connectors.http.internal.table.lookup.GetRequestFactory.constructGetUri(GetRequestFactory.java:65) ~[blob_p-2b38d7220f14718bc605e0994b819fe148bb3b80-c1d558420f77f39328360a4ae3905558:?]

I have tried changing query factories, it didnt help, so I am a bit stuck.

@davidradl
Copy link
Contributor

davidradl commented Nov 21, 2024

In your URL example I think you want to replace the insert in your path as {driverId} is a segment in your url path. Unfortunately at the moment you need to specify your "gid.connector.http.source.lookup.query-creator" with the id of a custom LookupQueryCreatorFactory and you need to write a custom query creator implementation.

I am looking into contributing our version of this factory and implementation - which does the path templating you require - as part of #99. I should have an update on this next week for you.

@giulio1979
Copy link
Author

giulio1979 commented Nov 21, 2024

I did a custom LookupQueryCreator, seems to work for now:

`
package com.getindata.connectors.http.internal.table.lookup.querycreators;

import com.getindata.connectors.http.LookupArg;
import com.getindata.connectors.http.LookupQueryCreator;
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions;
import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo;
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
import com.getindata.connectors.http.internal.utils.uri.NameValuePair;
import com.getindata.connectors.http.internal.utils.uri.URLEncodedUtils;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class CustomGetQueryCreator implements LookupQueryCreator {

private final LookupRow lookupRow;
private final ReadableConfig readableConfig;

public CustomGetQueryCreator(ReadableConfig readableConfig,
                             LookupRow lookupRow) {
    this.lookupRow = lookupRow;
    this.readableConfig = readableConfig;
}

@Override
public LookupQueryInfo createLookupQuery(RowData lookupDataRow) {

    Collection<LookupArg> lookupArgs = lookupRow.convertToLookupArgs(lookupDataRow);
    Map<String, String> pathBasedUrlParams = new HashMap<>();

    for(LookupArg a : lookupArgs)
        if(readableConfig.get(HttpLookupConnectorOptions.URL).indexOf("{" + a.getArgName() + "}") > 0)
            pathBasedUrlParams.put(a.getArgName(), a.getArgValue());

    String lookupQuery =
        URLEncodedUtils.format(
            lookupArgs.stream()
                    .map(arg -> new NameValuePair(arg.getArgName(), arg.getArgValue()))
                    .collect(Collectors.toList()),
            StandardCharsets.UTF_8);

    return new LookupQueryInfo(lookupQuery,null,pathBasedUrlParams);
}

}
`

@davidradl
Copy link
Contributor

davidradl commented Nov 21, 2024

Looks good, our code is looking to do query params for GET otherwise do body parms. In all cases do path param replacement. And wrap it with a factory with an id we can reference in the config.

I am thinking a nicer way would be for the query creator to:
replace path {} for urls like aaa/bbb/{insert}/
replace query {} for urls like aaa/bbb?qqq={insert}
everything else is mapped to the body.
Then this query creator does everything - no need to ever change it for standard json rest calls.

@grzegorz8
Copy link
Member

Thanks @davidradl for answering the issue.

Looks good, our code is looking to do query params for GET otherwise do body parms. In all cases do path param replacement. And wrap it with a factory with an id we can reference in the config.

I am thinking a nicer way would be for the query creator to: replace path {} for urls like aaa/bbb/{insert}/ replace query {} for urls like aaa/bbb?qqq={insert} everything else is mapped to the body. Then this query creator does everything - no need to ever change it for standard json rest calls.

I agree, in the future we may refactor this to make it more flexible. I mean, not only building URL should be more flexible, but the entire request, including headers (authentication in particular) and body.

@davidradl
Copy link
Contributor

@grzegorz8 I am thunking of introducing this as a new factory in #99

@giulio1979
Copy link
Author

These guys did a great job imo with the request templating: https://github.com/castorm/kafka-connect-http/tree/master/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/request/template in my opinion, something interesting to look.

@davidradl
Copy link
Contributor

@giulio1979 we have something equivalent but everything is from the Table definition. I hope to ship this factory and enable this sort of templating but config driven rather than the fields being hard coded in the factory implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants